From 2986b9f40734fac6aa40b0e6c70d2d4ed59686f5 Mon Sep 17 00:00:00 2001 From: LLLL Colonq Date: Fri, 14 Nov 2025 14:01:04 -0500 Subject: Fix binary bus --- crates/teleia/src/fig.rs | 39 ++++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/crates/teleia/src/fig.rs b/crates/teleia/src/fig.rs index c584b97..9dfbea8 100644 --- a/crates/teleia/src/fig.rs +++ b/crates/teleia/src/fig.rs @@ -3,6 +3,8 @@ use byteorder::{LE, ReadBytesExt, WriteBytesExt}; use crate::{Erm, WrapErr}; +const KEY: usize = 42; + pub fn read_length_prefixed_utf8(r: &mut R) -> Erm where R: std::io::Read { let len = r.read_u32::()?; let mut bs = vec![0; len as usize]; @@ -30,13 +32,13 @@ impl BinaryClient { socket.write_u32::(s.len() as u32).wrap_err("failed to send subscribe message length to bus")?; socket.write_all(s).wrap_err("failed to send subscribe message to bus")?; } - // socket.set_nonblocking(false).wrap_err("failed to set message bus socket nonblocking")?; + socket.set_nonblocking(true).wrap_err("failed to set message bus socket nonblocking")?; socket.flush().wrap_err("failed to flush bus connection")?; - let in_buf = vec![0; 1024]; - let out_buf = vec![0; 1024]; + let in_buf = Vec::new(); + let out_buf = Vec::new(); let poller = polling::Poller::new()?; unsafe { - poller.add(&socket, polling::Event::all(0)).wrap_err("failed to add event to poll")?; + poller.add(&socket, polling::Event::all(KEY)).wrap_err("failed to add event to poll")?; } Ok(Self { in_buf, out_buf, @@ -64,23 +66,34 @@ impl BinaryClient { let data_len = reader.read_u32::().ok()?; let mut data = vec![0 as u8; data_len as usize]; reader.read_exact(&mut data).ok()?; + let len = reader.position() as usize; + self.in_buf.drain(..len); Some(BinaryMessage { event, data }) } pub fn pump(&mut self) -> Erm> { let mut events = polling::Events::new(); + events.clear(); self.poller.wait(&mut events, Some(std::time::Duration::from_secs(0))) .wrap_err("failed to poll message bus")?; for ev in events.iter() { - if ev.readable { - let mut buf = [0; 1024]; - let sz = self.socket.read(&mut buf).wrap_err("failed to read from bus socket")?; - self.in_buf.extend_from_slice(&buf[..sz]); - } - if ev.writable { - let sz = self.socket.write(&self.out_buf).wrap_err("failed to write to bus socket")?; - self.out_buf.drain(..sz); + if ev.key == KEY { + if ev.readable { + let mut buf = [0; 1024]; + match self.socket.read(&mut buf) { + Ok(sz) => self.in_buf.extend_from_slice(&buf[..sz]), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}, + e => { e.wrap_err("failed to read from bus socket")?; }, + } + } + if ev.writable && !self.out_buf.is_empty() { + match self.socket.write(&self.out_buf) { + Ok(sz) => { self.out_buf.drain(..sz); }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}, + e => { e.wrap_err("failed to write to bus socket")?; }, + } + } + self.poller.modify(&self.socket, polling::Event::all(KEY)).wrap_err("failed to update event to poll")?; } - self.poller.modify(&self.socket, polling::Event::all(0)).wrap_err("failed to update event to poll")?; } Ok(self.pop_incoming_message()) } -- cgit v1.2.3