summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--crates/teleia/src/fig.rs39
1 files changed, 26 insertions, 13 deletions
diff --git a/crates/teleia/src/fig.rs b/crates/teleia/src/fig.rs
index c584b97..9dfbea8 100644
--- a/crates/teleia/src/fig.rs
+++ b/crates/teleia/src/fig.rs
@@ -3,6 +3,8 @@ use byteorder::{LE, ReadBytesExt, WriteBytesExt};
use crate::{Erm, WrapErr};
+const KEY: usize = 42;
+
pub fn read_length_prefixed_utf8<R>(r: &mut R) -> Erm<String> where R: std::io::Read {
let len = r.read_u32::<LE>()?;
let mut bs = vec![0; len as usize];
@@ -30,13 +32,13 @@ impl BinaryClient {
socket.write_u32::<byteorder::LE>(s.len() as u32).wrap_err("failed to send subscribe message length to bus")?;
socket.write_all(s).wrap_err("failed to send subscribe message to bus")?;
}
- // socket.set_nonblocking(false).wrap_err("failed to set message bus socket nonblocking")?;
+ socket.set_nonblocking(true).wrap_err("failed to set message bus socket nonblocking")?;
socket.flush().wrap_err("failed to flush bus connection")?;
- let in_buf = vec![0; 1024];
- let out_buf = vec![0; 1024];
+ let in_buf = Vec::new();
+ let out_buf = Vec::new();
let poller = polling::Poller::new()?;
unsafe {
- poller.add(&socket, polling::Event::all(0)).wrap_err("failed to add event to poll")?;
+ poller.add(&socket, polling::Event::all(KEY)).wrap_err("failed to add event to poll")?;
}
Ok(Self {
in_buf, out_buf,
@@ -64,23 +66,34 @@ impl BinaryClient {
let data_len = reader.read_u32::<byteorder::LE>().ok()?;
let mut data = vec![0 as u8; data_len as usize];
reader.read_exact(&mut data).ok()?;
+ let len = reader.position() as usize;
+ self.in_buf.drain(..len);
Some(BinaryMessage { event, data })
}
pub fn pump(&mut self) -> Erm<Option<BinaryMessage>> {
let mut events = polling::Events::new();
+ events.clear();
self.poller.wait(&mut events, Some(std::time::Duration::from_secs(0)))
.wrap_err("failed to poll message bus")?;
for ev in events.iter() {
- if ev.readable {
- let mut buf = [0; 1024];
- let sz = self.socket.read(&mut buf).wrap_err("failed to read from bus socket")?;
- self.in_buf.extend_from_slice(&buf[..sz]);
- }
- if ev.writable {
- let sz = self.socket.write(&self.out_buf).wrap_err("failed to write to bus socket")?;
- self.out_buf.drain(..sz);
+ if ev.key == KEY {
+ if ev.readable {
+ let mut buf = [0; 1024];
+ match self.socket.read(&mut buf) {
+ Ok(sz) => self.in_buf.extend_from_slice(&buf[..sz]),
+ Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {},
+ e => { e.wrap_err("failed to read from bus socket")?; },
+ }
+ }
+ if ev.writable && !self.out_buf.is_empty() {
+ match self.socket.write(&self.out_buf) {
+ Ok(sz) => { self.out_buf.drain(..sz); },
+ Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {},
+ e => { e.wrap_err("failed to write to bus socket")?; },
+ }
+ }
+ self.poller.modify(&self.socket, polling::Event::all(KEY)).wrap_err("failed to update event to poll")?;
}
- self.poller.modify(&self.socket, polling::Event::all(0)).wrap_err("failed to update event to poll")?;
}
Ok(self.pop_incoming_message())
}