diff options
| -rw-r--r-- | crates/teleia/src/fig.rs | 39 |
1 files changed, 26 insertions, 13 deletions
diff --git a/crates/teleia/src/fig.rs b/crates/teleia/src/fig.rs index c584b97..9dfbea8 100644 --- a/crates/teleia/src/fig.rs +++ b/crates/teleia/src/fig.rs @@ -3,6 +3,8 @@ use byteorder::{LE, ReadBytesExt, WriteBytesExt}; use crate::{Erm, WrapErr}; +const KEY: usize = 42; + pub fn read_length_prefixed_utf8<R>(r: &mut R) -> Erm<String> where R: std::io::Read { let len = r.read_u32::<LE>()?; let mut bs = vec![0; len as usize]; @@ -30,13 +32,13 @@ impl BinaryClient { socket.write_u32::<byteorder::LE>(s.len() as u32).wrap_err("failed to send subscribe message length to bus")?; socket.write_all(s).wrap_err("failed to send subscribe message to bus")?; } - // socket.set_nonblocking(false).wrap_err("failed to set message bus socket nonblocking")?; + socket.set_nonblocking(true).wrap_err("failed to set message bus socket nonblocking")?; socket.flush().wrap_err("failed to flush bus connection")?; - let in_buf = vec![0; 1024]; - let out_buf = vec![0; 1024]; + let in_buf = Vec::new(); + let out_buf = Vec::new(); let poller = polling::Poller::new()?; unsafe { - poller.add(&socket, polling::Event::all(0)).wrap_err("failed to add event to poll")?; + poller.add(&socket, polling::Event::all(KEY)).wrap_err("failed to add event to poll")?; } Ok(Self { in_buf, out_buf, @@ -64,23 +66,34 @@ impl BinaryClient { let data_len = reader.read_u32::<byteorder::LE>().ok()?; let mut data = vec![0 as u8; data_len as usize]; reader.read_exact(&mut data).ok()?; + let len = reader.position() as usize; + self.in_buf.drain(..len); Some(BinaryMessage { event, data }) } pub fn pump(&mut self) -> Erm<Option<BinaryMessage>> { let mut events = polling::Events::new(); + events.clear(); self.poller.wait(&mut events, Some(std::time::Duration::from_secs(0))) .wrap_err("failed to poll message bus")?; for ev in events.iter() { - if ev.readable { - let mut buf = [0; 1024]; - let sz = self.socket.read(&mut buf).wrap_err("failed to read from bus socket")?; - self.in_buf.extend_from_slice(&buf[..sz]); - } - if ev.writable { - let sz = self.socket.write(&self.out_buf).wrap_err("failed to write to bus socket")?; - self.out_buf.drain(..sz); + if ev.key == KEY { + if ev.readable { + let mut buf = [0; 1024]; + match self.socket.read(&mut buf) { + Ok(sz) => self.in_buf.extend_from_slice(&buf[..sz]), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}, + e => { e.wrap_err("failed to read from bus socket")?; }, + } + } + if ev.writable && !self.out_buf.is_empty() { + match self.socket.write(&self.out_buf) { + Ok(sz) => { self.out_buf.drain(..sz); }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}, + e => { e.wrap_err("failed to write to bus socket")?; }, + } + } + self.poller.modify(&self.socket, polling::Event::all(KEY)).wrap_err("failed to update event to poll")?; } - self.poller.modify(&self.socket, polling::Event::all(0)).wrap_err("failed to update event to poll")?; } Ok(self.pop_incoming_message()) } |
