summaryrefslogtreecommitdiff
path: root/crates/renderer/src/fig.rs
diff options
context:
space:
mode:
authorLLLL Colonq <llll@colonq>2025-07-25 20:35:51 -0400
committerLLLL Colonq <llll@colonq>2025-07-25 20:35:51 -0400
commitba1ff8e7e3ea915c6d673c05cdcdee10a9b5b9f0 (patch)
tree41de2e6d7a99ad8d8059ca9f482646be92f79d39 /crates/renderer/src/fig.rs
parent513a3dd4c5899063e80fd691600d9142910547c4 (diff)
WIP
Diffstat (limited to 'crates/renderer/src/fig.rs')
-rw-r--r--crates/renderer/src/fig.rs133
1 files changed, 132 insertions, 1 deletions
diff --git a/crates/renderer/src/fig.rs b/crates/renderer/src/fig.rs
index aa972a4..8023f01 100644
--- a/crates/renderer/src/fig.rs
+++ b/crates/renderer/src/fig.rs
@@ -1,4 +1,5 @@
-use std::io::{BufRead, Write};
+use std::io::{BufRead, Read, Write};
+use byteorder::WriteBytesExt;
#[derive(Debug, Clone)]
pub struct Message {
@@ -48,3 +49,133 @@ impl Client {
}
}
}
+
+#[derive(Debug, Clone)]
+pub struct BinaryMessage {
+ pub event: Vec<u8>,
+ pub data: Vec<u8>
+}
+#[derive(Debug, Clone)]
+pub enum BinaryClientState {
+ PartialEventLength { buf_len: usize, buf: [u8; 4] },
+ PartialEvent { len: usize, buf_len: usize, buf: Vec<u8> },
+ PartialDataLength { event: Vec<u8>, buf_len: usize, buf: [u8; 4] },
+ PartialData { event: Vec<u8>, len: usize, buf_len: usize, buf: Vec<u8> },
+ Message { event: Vec<u8>, data: Vec<u8> },
+}
+impl Default for BinaryClientState {
+ fn default() -> Self {
+ Self::PartialEventLength { buf_len: 0, buf: [0; 4] }
+ }
+}
+pub struct BinaryClient {
+ state: BinaryClientState,
+ reader: std::io::BufReader<std::net::TcpStream>,
+}
+impl BinaryClient {
+ pub fn new(addr: &str, subs: &[&[u8]]) -> Self {
+ let mut socket = std::net::TcpStream::connect(addr).expect("failed to connect to message bus");
+ socket.set_nonblocking(true).expect("failed to set message bus socket nonblocking");
+ for s in subs {
+ write!(socket, "s").expect("failed to send subscribe message to bus");
+ socket.write_u32::<byteorder::LE>(s.len() as u32).expect("failed to send subscribe message length to bus");
+ socket.write_all(s).expect("failed to send subscribe message to bus");
+ }
+ socket.flush().expect("failed to flush bus connection");
+ let reader = std::io::BufReader::new(socket);
+ Self { state: BinaryClientState::PartialEventLength { buf_len: 0, buf: [0; 4] }, reader }
+ }
+ fn read(reader: &mut std::io::BufReader<std::net::TcpStream>, buf: &mut [u8]) -> Option<usize> {
+ match reader.read(buf) {
+ Ok(sz) => Some(sz),
+ Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
+ None
+ },
+ Err(e) => panic!("IO error on message bus: {}", e),
+ }
+ }
+ fn update_state(
+ reader: &mut std::io::BufReader<std::net::TcpStream>,
+ mut state: BinaryClientState,
+ ) -> BinaryClientState {
+ loop {
+ let new_state = match state {
+ BinaryClientState::PartialEventLength { mut buf_len, mut buf } => {
+ buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) {
+ x
+ } else { break; };
+ if buf_len == 4 {
+ let len = u32::from_le_bytes(buf) as usize;
+ BinaryClientState::PartialEvent {
+ len,
+ buf_len: 0,
+ buf: vec![0; len],
+ }
+ } else {
+ BinaryClientState::PartialEventLength { buf_len, buf }
+ }
+ },
+ BinaryClientState::PartialEvent { len, mut buf_len, mut buf } => {
+ buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) {
+ x
+ } else { break; };
+ if buf_len == len {
+ BinaryClientState::PartialDataLength {
+ event: buf.clone(),
+ buf_len: 0,
+ buf: [0; 4],
+ }
+ } else {
+ BinaryClientState::PartialEvent { len, buf_len, buf }
+ }
+ },
+ BinaryClientState::PartialDataLength { event, mut buf_len, mut buf } => {
+ buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) {
+ x
+ } else { break; };
+ if buf_len == 4 {
+ let len = u32::from_le_bytes(buf) as usize;
+ BinaryClientState::PartialData {
+ event,
+ len,
+ buf_len: 0,
+ buf: vec![0; len],
+ }
+ } else {
+ BinaryClientState::PartialDataLength { event, buf_len, buf }
+ }
+ },
+ BinaryClientState::PartialData { event, len, mut buf_len, mut buf } => {
+ buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) {
+ x
+ } else { break; };
+ if buf_len == len {
+ BinaryClientState::Message {
+ event,
+ data: buf.clone(),
+ }
+ } else {
+ BinaryClientState::PartialData { event, len, buf_len, buf }
+ }
+ },
+ BinaryClientState::Message{..} => break,
+ };
+ state = new_state;
+ };
+ state
+ }
+ pub fn pump(&mut self) -> Option<BinaryMessage> {
+ None
+ // loop {
+ // if let Some(new) = self.update_state(mem::take(&mut self.state)) {
+ // self.state = new;
+ // } else { break; }
+ // }
+ // if let BinaryClientState::Message { event, data } = self.state.clone() {
+ // self.state = BinaryClientState::PartialEventLength { buf_len: 0, buf: [0; 4] };
+ // Some(BinaryMessage { event, data })
+ // } else {
+ // None
+ // }
+ }
+}