diff options
| author | LLLL Colonq <llll@colonq> | 2025-07-25 20:35:51 -0400 |
|---|---|---|
| committer | LLLL Colonq <llll@colonq> | 2025-07-25 20:35:51 -0400 |
| commit | ba1ff8e7e3ea915c6d673c05cdcdee10a9b5b9f0 (patch) | |
| tree | 41de2e6d7a99ad8d8059ca9f482646be92f79d39 /crates/renderer/src/fig.rs | |
| parent | 513a3dd4c5899063e80fd691600d9142910547c4 (diff) | |
WIP
Diffstat (limited to 'crates/renderer/src/fig.rs')
| -rw-r--r-- | crates/renderer/src/fig.rs | 133 |
1 files changed, 132 insertions, 1 deletions
diff --git a/crates/renderer/src/fig.rs b/crates/renderer/src/fig.rs index aa972a4..8023f01 100644 --- a/crates/renderer/src/fig.rs +++ b/crates/renderer/src/fig.rs @@ -1,4 +1,5 @@ -use std::io::{BufRead, Write}; +use std::io::{BufRead, Read, Write}; +use byteorder::WriteBytesExt; #[derive(Debug, Clone)] pub struct Message { @@ -48,3 +49,133 @@ impl Client { } } } + +#[derive(Debug, Clone)] +pub struct BinaryMessage { + pub event: Vec<u8>, + pub data: Vec<u8> +} +#[derive(Debug, Clone)] +pub enum BinaryClientState { + PartialEventLength { buf_len: usize, buf: [u8; 4] }, + PartialEvent { len: usize, buf_len: usize, buf: Vec<u8> }, + PartialDataLength { event: Vec<u8>, buf_len: usize, buf: [u8; 4] }, + PartialData { event: Vec<u8>, len: usize, buf_len: usize, buf: Vec<u8> }, + Message { event: Vec<u8>, data: Vec<u8> }, +} +impl Default for BinaryClientState { + fn default() -> Self { + Self::PartialEventLength { buf_len: 0, buf: [0; 4] } + } +} +pub struct BinaryClient { + state: BinaryClientState, + reader: std::io::BufReader<std::net::TcpStream>, +} +impl BinaryClient { + pub fn new(addr: &str, subs: &[&[u8]]) -> Self { + let mut socket = std::net::TcpStream::connect(addr).expect("failed to connect to message bus"); + socket.set_nonblocking(true).expect("failed to set message bus socket nonblocking"); + for s in subs { + write!(socket, "s").expect("failed to send subscribe message to bus"); + socket.write_u32::<byteorder::LE>(s.len() as u32).expect("failed to send subscribe message length to bus"); + socket.write_all(s).expect("failed to send subscribe message to bus"); + } + socket.flush().expect("failed to flush bus connection"); + let reader = std::io::BufReader::new(socket); + Self { state: BinaryClientState::PartialEventLength { buf_len: 0, buf: [0; 4] }, reader } + } + fn read(reader: &mut std::io::BufReader<std::net::TcpStream>, buf: &mut [u8]) -> Option<usize> { + match reader.read(buf) { + Ok(sz) => Some(sz), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + None + }, + Err(e) => panic!("IO error on message bus: {}", e), + } + } + fn update_state( + reader: &mut std::io::BufReader<std::net::TcpStream>, + mut state: BinaryClientState, + ) -> BinaryClientState { + loop { + let new_state = match state { + BinaryClientState::PartialEventLength { mut buf_len, mut buf } => { + buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) { + x + } else { break; }; + if buf_len == 4 { + let len = u32::from_le_bytes(buf) as usize; + BinaryClientState::PartialEvent { + len, + buf_len: 0, + buf: vec![0; len], + } + } else { + BinaryClientState::PartialEventLength { buf_len, buf } + } + }, + BinaryClientState::PartialEvent { len, mut buf_len, mut buf } => { + buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) { + x + } else { break; }; + if buf_len == len { + BinaryClientState::PartialDataLength { + event: buf.clone(), + buf_len: 0, + buf: [0; 4], + } + } else { + BinaryClientState::PartialEvent { len, buf_len, buf } + } + }, + BinaryClientState::PartialDataLength { event, mut buf_len, mut buf } => { + buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) { + x + } else { break; }; + if buf_len == 4 { + let len = u32::from_le_bytes(buf) as usize; + BinaryClientState::PartialData { + event, + len, + buf_len: 0, + buf: vec![0; len], + } + } else { + BinaryClientState::PartialDataLength { event, buf_len, buf } + } + }, + BinaryClientState::PartialData { event, len, mut buf_len, mut buf } => { + buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) { + x + } else { break; }; + if buf_len == len { + BinaryClientState::Message { + event, + data: buf.clone(), + } + } else { + BinaryClientState::PartialData { event, len, buf_len, buf } + } + }, + BinaryClientState::Message{..} => break, + }; + state = new_state; + }; + state + } + pub fn pump(&mut self) -> Option<BinaryMessage> { + None + // loop { + // if let Some(new) = self.update_state(mem::take(&mut self.state)) { + // self.state = new; + // } else { break; } + // } + // if let BinaryClientState::Message { event, data } = self.state.clone() { + // self.state = BinaryClientState::PartialEventLength { buf_len: 0, buf: [0; 4] }; + // Some(BinaryMessage { event, data }) + // } else { + // None + // } + } +} |
