diff options
| author | LLLL Colonq <llll@colonq> | 2025-11-06 20:32:11 -0500 |
|---|---|---|
| committer | LLLL Colonq <llll@colonq> | 2025-11-06 20:32:25 -0500 |
| commit | 6a397916e0177c9e7c6b93fb9cc7510c14a659ff (patch) | |
| tree | a859364ea5ff5ff038b6171bba9d5d99d7771a61 /crates | |
| parent | 71b22b03b51aaba01df786f70becb03a30429f03 (diff) | |
Fix fig client
Diffstat (limited to 'crates')
| -rw-r--r-- | crates/teleia/Cargo.toml | 3 | ||||
| -rw-r--r-- | crates/teleia/src/fig.rs | 229 | ||||
| -rw-r--r-- | crates/teleia/src/utils.rs | 17 |
3 files changed, 77 insertions, 172 deletions
diff --git a/crates/teleia/Cargo.toml b/crates/teleia/Cargo.toml index 9c1037f..6de4df0 100644 --- a/crates/teleia/Cargo.toml +++ b/crates/teleia/Cargo.toml @@ -47,4 +47,5 @@ web-sys = { version = "*", features = ["Document", "Window", "Element", "HtmlCan env_logger = "*" # configurable logging to stdout glfw = { git = "https://github.com/lcolonq/glfw-rs", features = ["serde"] } # window management kira = { version = "=0.9.6", default-features = false, features = ["cpal", "ogg", "wav"] } # audio -directories = { git = "https://github.com/lcolonq/directories-rs" } # standard system directories
\ No newline at end of file +directories = { git = "https://github.com/lcolonq/directories-rs" } # standard system directories +polling = "*" # interface to epoll
\ No newline at end of file diff --git a/crates/teleia/src/fig.rs b/crates/teleia/src/fig.rs index 13b4773..0dd2f68 100644 --- a/crates/teleia/src/fig.rs +++ b/crates/teleia/src/fig.rs @@ -1,193 +1,80 @@ -use std::io::{BufRead, Read, Write}; -use byteorder::WriteBytesExt; +use std::io::{Read, Write}; +use byteorder::{ReadBytesExt, WriteBytesExt}; -#[derive(Debug, Clone)] -pub struct SexpMessage { - pub event: lexpr::Value, - pub data: lexpr::Value, -} - -pub struct SexpClient { - reader: std::io::BufReader<std::net::TcpStream>, - buf: String, -} -impl SexpClient { - pub fn new(addr: &str, subs: &[lexpr::Value]) -> Self { - let mut socket = std::net::TcpStream::connect(addr).expect("failed to connect to message bus"); - socket.set_nonblocking(true).expect("failed to set message bus socket nonblocking"); - for s in subs { - write!(socket, "(sub {})\n", s).expect("failed to send subscribe message to bus"); - } - let reader = std::io::BufReader::new(socket); - Self { reader, buf: String::new(), } - } - pub fn pump(&mut self) -> Option<SexpMessage> { - match self.reader.read_line(&mut self.buf) { - Ok(_l) => { - let mv = lexpr::from_str(&self.buf); - self.buf.clear(); - match mv { - Ok(v) => { - match v.as_cons() { - Some(cs) => { - Some(SexpMessage { event: cs.car().clone(), data: cs.cdr().clone() }) - }, - _ => { log::error!("malformed message bus input s-expression: {}", v); None }, - } - }, - Err(e) => { log::error!("malformed message bus input line: {}", e); None }, - } - }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - None - }, - Err(e) => panic!("IO error on message bus: {}", e), - } - } -} +use crate::{Erm, WrapErr}; #[derive(Debug, Clone)] pub struct BinaryMessage { pub event: Vec<u8>, pub data: Vec<u8> } -#[derive(Debug, Clone)] -pub enum BinaryClientState { - PartialEventLength { buf_len: usize, buf: [u8; 4] }, - PartialEvent { len: usize, buf_len: usize, buf: Vec<u8> }, - PartialDataLength { event: Vec<u8>, buf_len: usize, buf: [u8; 4] }, - PartialData { event: Vec<u8>, len: usize, buf_len: usize, buf: Vec<u8> }, - Message { event: Vec<u8>, data: Vec<u8> }, -} -impl Default for BinaryClientState { - fn default() -> Self { - Self::PartialEventLength { buf_len: 0, buf: [0; 4] } - } -} + pub struct BinaryClient { - state: BinaryClientState, - writer: std::net::TcpStream, - reader: std::io::BufReader<std::net::TcpStream>, - blocking: bool, + in_buf: Vec<u8>, + out_buf: Vec<u8>, + socket: std::net::TcpStream, + poller: polling::Poller, } impl BinaryClient { - pub fn new(addr: &str, blocking: bool, subs: &[&[u8]]) -> Self { - let mut socket = std::net::TcpStream::connect(addr).expect("failed to connect to message bus"); - socket.set_nonblocking(!blocking).expect("failed to set message bus socket nonblocking"); + pub fn new(addr: &str, subs: &[&[u8]]) -> Erm<Self> { + let mut socket = std::net::TcpStream::connect(addr).wrap_err("failed to connect to message bus")?; for s in subs { - write!(socket, "s").expect("failed to send subscribe message to bus"); - socket.write_u32::<byteorder::LE>(s.len() as u32).expect("failed to send subscribe message length to bus"); - socket.write_all(s).expect("failed to send subscribe message to bus"); + write!(socket, "s").wrap_err("failed to send subscribe message to bus")?; + socket.write_u32::<byteorder::LE>(s.len() as u32).wrap_err("failed to send subscribe message length to bus")?; + socket.write_all(s).wrap_err("failed to send subscribe message to bus")?; } - socket.flush().expect("failed to flush bus connection"); - let writer = socket.try_clone().expect("failed to clone socket"); - let reader = std::io::BufReader::new(socket); - Self { - state: BinaryClientState::PartialEventLength { buf_len: 0, buf: [0; 4] }, - writer, - reader, - blocking, + // socket.set_nonblocking(false).wrap_err("failed to set message bus socket nonblocking")?; + socket.flush().wrap_err("failed to flush bus connection")?; + let in_buf = vec![0; 1024]; + let out_buf = vec![0; 1024]; + let poller = polling::Poller::new()?; + unsafe { + poller.add(&socket, polling::Event::all(0)).wrap_err("failed to add event to poll")?; } + Ok(Self { + in_buf, out_buf, + socket, + poller, + }) } - fn write_length_prefixed(&mut self, buf: &[u8]) { - self.writer.write_u32::<byteorder::LE>(buf.len() as u32).expect("failed to send message"); - self.writer.write_all(buf).expect("failed to send message"); + fn write_length_prefixed(&mut self, buf: &[u8]) -> Erm<()> { + self.out_buf.write_u32::<byteorder::LE>(buf.len() as u32).wrap_err("failed to send message")?; + self.out_buf.write_all(buf).wrap_err("failed to send message")?; + Ok(()) } - pub fn publish(&mut self, ev: &[u8], data: &[u8]) { - if !self.blocking { - self.writer.set_nonblocking(false).expect("failed to set message bus socket nonblocking"); - } - write!(self.writer, "p").expect("failed to send publish message to bus"); - self.write_length_prefixed(ev); - self.write_length_prefixed(data); - if !self.blocking { - self.writer.set_nonblocking(true).expect("failed to set message bus socket nonblocking"); - } - } - fn read(reader: &mut std::io::BufReader<std::net::TcpStream>, buf: &mut [u8]) -> Option<usize> { - match reader.read(buf) { - Ok(sz) => Some(sz), - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - None - }, - Err(e) => panic!("IO error on message bus: {}", e), - } + pub fn publish(&mut self, ev: &[u8], data: &[u8]) -> Erm<()> { + write!(self.out_buf, "p").wrap_err("failed to send publish message to bus")?; + self.write_length_prefixed(ev)?; + self.write_length_prefixed(data)?; + Ok(()) } - fn update_state( - reader: &mut std::io::BufReader<std::net::TcpStream>, - mut state: BinaryClientState, - ) -> BinaryClientState { - loop { - state = match state { - BinaryClientState::PartialEventLength { mut buf_len, mut buf } => { - buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) { - x - } else { break BinaryClientState::PartialEventLength { buf_len, buf }; }; - if buf_len == 4 { - let len = u32::from_le_bytes(buf) as usize; - BinaryClientState::PartialEvent { - len, - buf_len: 0, - buf: vec![0; len], - } - } else { BinaryClientState::PartialEventLength { buf_len, buf } } - }, - BinaryClientState::PartialEvent { len, mut buf_len, mut buf } => { - buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) { - x - } else { break BinaryClientState::PartialEvent { len, buf_len, buf }; }; - if buf_len == len { - BinaryClientState::PartialDataLength { - event: buf.clone(), - buf_len: 0, - buf: [0; 4], - } - } else { BinaryClientState::PartialEvent { len, buf_len, buf } } - }, - BinaryClientState::PartialDataLength { event, mut buf_len, mut buf } => { - buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) { - x - } else { break BinaryClientState::PartialDataLength { event, buf_len, buf }; }; - if buf_len == 4 { - let len = u32::from_le_bytes(buf) as usize; - BinaryClientState::PartialData { - event, - len, - buf_len: 0, - buf: vec![0; len], - } - } else { BinaryClientState::PartialDataLength { event, buf_len, buf } } - }, - BinaryClientState::PartialData { event, len, mut buf_len, mut buf } => { - buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) { - x - } else { break BinaryClientState::PartialData { event, len, buf_len, buf }; }; - if buf_len == len { - BinaryClientState::Message { - event, - data: buf.clone(), - } - } else { BinaryClientState::PartialData { event, len, buf_len, buf } } - }, - st@BinaryClientState::Message{..} => break st, - }; - } + fn pop_incoming_message(&mut self) -> Option<BinaryMessage> { + let mut reader = std::io::Cursor::new(&self.in_buf); + let event_len = reader.read_u32::<byteorder::LE>().ok()?; + let mut event = vec![0 as u8; event_len as usize]; + reader.read_exact(&mut event).ok()?; + let data_len = reader.read_u32::<byteorder::LE>().ok()?; + let mut data = vec![0 as u8; data_len as usize]; + reader.read_exact(&mut data).ok()?; + Some(BinaryMessage { event, data }) } - pub fn pump(&mut self) -> Option<BinaryMessage> { - self.state = Self::update_state(&mut self.reader, std::mem::take(&mut self.state)); - match std::mem::take(&mut self.state) { - BinaryClientState::Message { event, data } => { - self.state = BinaryClientState::PartialEventLength { buf_len: 0, buf: [0; 4] }; - Some(BinaryMessage { - event: event, - data: data, - }) - }, - st => { - self.state = st; - None + pub fn pump(&mut self) -> Erm<Option<BinaryMessage>> { + let mut events = polling::Events::new(); + self.poller.wait(&mut events, Some(std::time::Duration::from_secs(0))) + .wrap_err("failed to poll message bus")?; + for ev in events.iter() { + if ev.readable { + let mut buf = [0; 1024]; + let sz = self.socket.read(&mut buf).wrap_err("failed to read from bus socket")?; + self.in_buf.extend_from_slice(&buf[..sz]); + } + if ev.writable { + let sz = self.socket.write(&self.out_buf).wrap_err("failed to write to bus socket")?; + self.out_buf.drain(..sz); } + self.poller.modify(&self.socket, polling::Event::all(0)).wrap_err("failed to update event to poll")?; } + Ok(self.pop_incoming_message()) } } diff --git a/crates/teleia/src/utils.rs b/crates/teleia/src/utils.rs index 05e9251..c479560 100644 --- a/crates/teleia/src/utils.rs +++ b/crates/teleia/src/utils.rs @@ -9,6 +9,23 @@ pub fn erm<E, T>(e: E) -> Erm<T> where E: std::error::Error + std::marker::Send Err(e.into()) } +#[derive(Debug)] +pub struct Error { + msg: String +} +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "teleia error: {}", self.msg) + } +} +impl std::error::Error for Error {} + +pub fn erm_msg<T>(msg: &str) -> Erm<T> { + Err(Error { + msg: msg.to_owned(), + }.into()) +} + pub struct ErrorHandler; impl color_eyre::eyre::EyreHandler for ErrorHandler { fn debug( |
