From 6a397916e0177c9e7c6b93fb9cc7510c14a659ff Mon Sep 17 00:00:00 2001 From: LLLL Colonq Date: Thu, 6 Nov 2025 20:32:11 -0500 Subject: Fix fig client --- .dir-locals.el | 2 +- Cargo.lock | 1 + crates/teleia/Cargo.toml | 3 +- crates/teleia/src/fig.rs | 229 ++++++++++++--------------------------------- crates/teleia/src/utils.rs | 17 ++++ 5 files changed, 79 insertions(+), 173 deletions(-) diff --git a/.dir-locals.el b/.dir-locals.el index 7d26985..9a84ba1 100644 --- a/.dir-locals.el +++ b/.dir-locals.el @@ -2,7 +2,7 @@ ((eglot-workspace-configuration . (:rust-analyzer ( :cargo - ( :target "wasm32-unknown-unknown" + ( ;; :target "wasm32-unknown-unknown" :targetDir t) :hover (:show (:fields 10)))))))) diff --git a/Cargo.lock b/Cargo.lock index eacf1a9..18468eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2724,6 +2724,7 @@ dependencies = [ "log", "nalgebra", "parry3d", + "polling", "rand", "rapier3d", "reqwest", diff --git a/crates/teleia/Cargo.toml b/crates/teleia/Cargo.toml index 9c1037f..6de4df0 100644 --- a/crates/teleia/Cargo.toml +++ b/crates/teleia/Cargo.toml @@ -47,4 +47,5 @@ web-sys = { version = "*", features = ["Document", "Window", "Element", "HtmlCan env_logger = "*" # configurable logging to stdout glfw = { git = "https://github.com/lcolonq/glfw-rs", features = ["serde"] } # window management kira = { version = "=0.9.6", default-features = false, features = ["cpal", "ogg", "wav"] } # audio -directories = { git = "https://github.com/lcolonq/directories-rs" } # standard system directories \ No newline at end of file +directories = { git = "https://github.com/lcolonq/directories-rs" } # standard system directories +polling = "*" # interface to epoll \ No newline at end of file diff --git a/crates/teleia/src/fig.rs b/crates/teleia/src/fig.rs index 13b4773..0dd2f68 100644 --- a/crates/teleia/src/fig.rs +++ b/crates/teleia/src/fig.rs @@ -1,193 +1,80 @@ -use std::io::{BufRead, Read, Write}; -use byteorder::WriteBytesExt; +use std::io::{Read, Write}; +use byteorder::{ReadBytesExt, WriteBytesExt}; -#[derive(Debug, Clone)] -pub struct SexpMessage { - pub event: lexpr::Value, - pub data: lexpr::Value, -} - -pub struct SexpClient { - reader: std::io::BufReader, - buf: String, -} -impl SexpClient { - pub fn new(addr: &str, subs: &[lexpr::Value]) -> Self { - let mut socket = std::net::TcpStream::connect(addr).expect("failed to connect to message bus"); - socket.set_nonblocking(true).expect("failed to set message bus socket nonblocking"); - for s in subs { - write!(socket, "(sub {})\n", s).expect("failed to send subscribe message to bus"); - } - let reader = std::io::BufReader::new(socket); - Self { reader, buf: String::new(), } - } - pub fn pump(&mut self) -> Option { - match self.reader.read_line(&mut self.buf) { - Ok(_l) => { - let mv = lexpr::from_str(&self.buf); - self.buf.clear(); - match mv { - Ok(v) => { - match v.as_cons() { - Some(cs) => { - Some(SexpMessage { event: cs.car().clone(), data: cs.cdr().clone() }) - }, - _ => { log::error!("malformed message bus input s-expression: {}", v); None }, - } - }, - Err(e) => { log::error!("malformed message bus input line: {}", e); None }, - } - }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - None - }, - Err(e) => panic!("IO error on message bus: {}", e), - } - } -} +use crate::{Erm, WrapErr}; #[derive(Debug, Clone)] pub struct BinaryMessage { pub event: Vec, pub data: Vec } -#[derive(Debug, Clone)] -pub enum BinaryClientState { - PartialEventLength { buf_len: usize, buf: [u8; 4] }, - PartialEvent { len: usize, buf_len: usize, buf: Vec }, - PartialDataLength { event: Vec, buf_len: usize, buf: [u8; 4] }, - PartialData { event: Vec, len: usize, buf_len: usize, buf: Vec }, - Message { event: Vec, data: Vec }, -} -impl Default for BinaryClientState { - fn default() -> Self { - Self::PartialEventLength { buf_len: 0, buf: [0; 4] } - } -} + pub struct BinaryClient { - state: BinaryClientState, - writer: std::net::TcpStream, - reader: std::io::BufReader, - blocking: bool, + in_buf: Vec, + out_buf: Vec, + socket: std::net::TcpStream, + poller: polling::Poller, } impl BinaryClient { - pub fn new(addr: &str, blocking: bool, subs: &[&[u8]]) -> Self { - let mut socket = std::net::TcpStream::connect(addr).expect("failed to connect to message bus"); - socket.set_nonblocking(!blocking).expect("failed to set message bus socket nonblocking"); + pub fn new(addr: &str, subs: &[&[u8]]) -> Erm { + let mut socket = std::net::TcpStream::connect(addr).wrap_err("failed to connect to message bus")?; for s in subs { - write!(socket, "s").expect("failed to send subscribe message to bus"); - socket.write_u32::(s.len() as u32).expect("failed to send subscribe message length to bus"); - socket.write_all(s).expect("failed to send subscribe message to bus"); + write!(socket, "s").wrap_err("failed to send subscribe message to bus")?; + socket.write_u32::(s.len() as u32).wrap_err("failed to send subscribe message length to bus")?; + socket.write_all(s).wrap_err("failed to send subscribe message to bus")?; } - socket.flush().expect("failed to flush bus connection"); - let writer = socket.try_clone().expect("failed to clone socket"); - let reader = std::io::BufReader::new(socket); - Self { - state: BinaryClientState::PartialEventLength { buf_len: 0, buf: [0; 4] }, - writer, - reader, - blocking, + // socket.set_nonblocking(false).wrap_err("failed to set message bus socket nonblocking")?; + socket.flush().wrap_err("failed to flush bus connection")?; + let in_buf = vec![0; 1024]; + let out_buf = vec![0; 1024]; + let poller = polling::Poller::new()?; + unsafe { + poller.add(&socket, polling::Event::all(0)).wrap_err("failed to add event to poll")?; } + Ok(Self { + in_buf, out_buf, + socket, + poller, + }) } - fn write_length_prefixed(&mut self, buf: &[u8]) { - self.writer.write_u32::(buf.len() as u32).expect("failed to send message"); - self.writer.write_all(buf).expect("failed to send message"); + fn write_length_prefixed(&mut self, buf: &[u8]) -> Erm<()> { + self.out_buf.write_u32::(buf.len() as u32).wrap_err("failed to send message")?; + self.out_buf.write_all(buf).wrap_err("failed to send message")?; + Ok(()) } - pub fn publish(&mut self, ev: &[u8], data: &[u8]) { - if !self.blocking { - self.writer.set_nonblocking(false).expect("failed to set message bus socket nonblocking"); - } - write!(self.writer, "p").expect("failed to send publish message to bus"); - self.write_length_prefixed(ev); - self.write_length_prefixed(data); - if !self.blocking { - self.writer.set_nonblocking(true).expect("failed to set message bus socket nonblocking"); - } - } - fn read(reader: &mut std::io::BufReader, buf: &mut [u8]) -> Option { - match reader.read(buf) { - Ok(sz) => Some(sz), - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - None - }, - Err(e) => panic!("IO error on message bus: {}", e), - } + pub fn publish(&mut self, ev: &[u8], data: &[u8]) -> Erm<()> { + write!(self.out_buf, "p").wrap_err("failed to send publish message to bus")?; + self.write_length_prefixed(ev)?; + self.write_length_prefixed(data)?; + Ok(()) } - fn update_state( - reader: &mut std::io::BufReader, - mut state: BinaryClientState, - ) -> BinaryClientState { - loop { - state = match state { - BinaryClientState::PartialEventLength { mut buf_len, mut buf } => { - buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) { - x - } else { break BinaryClientState::PartialEventLength { buf_len, buf }; }; - if buf_len == 4 { - let len = u32::from_le_bytes(buf) as usize; - BinaryClientState::PartialEvent { - len, - buf_len: 0, - buf: vec![0; len], - } - } else { BinaryClientState::PartialEventLength { buf_len, buf } } - }, - BinaryClientState::PartialEvent { len, mut buf_len, mut buf } => { - buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) { - x - } else { break BinaryClientState::PartialEvent { len, buf_len, buf }; }; - if buf_len == len { - BinaryClientState::PartialDataLength { - event: buf.clone(), - buf_len: 0, - buf: [0; 4], - } - } else { BinaryClientState::PartialEvent { len, buf_len, buf } } - }, - BinaryClientState::PartialDataLength { event, mut buf_len, mut buf } => { - buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) { - x - } else { break BinaryClientState::PartialDataLength { event, buf_len, buf }; }; - if buf_len == 4 { - let len = u32::from_le_bytes(buf) as usize; - BinaryClientState::PartialData { - event, - len, - buf_len: 0, - buf: vec![0; len], - } - } else { BinaryClientState::PartialDataLength { event, buf_len, buf } } - }, - BinaryClientState::PartialData { event, len, mut buf_len, mut buf } => { - buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) { - x - } else { break BinaryClientState::PartialData { event, len, buf_len, buf }; }; - if buf_len == len { - BinaryClientState::Message { - event, - data: buf.clone(), - } - } else { BinaryClientState::PartialData { event, len, buf_len, buf } } - }, - st@BinaryClientState::Message{..} => break st, - }; - } + fn pop_incoming_message(&mut self) -> Option { + let mut reader = std::io::Cursor::new(&self.in_buf); + let event_len = reader.read_u32::().ok()?; + let mut event = vec![0 as u8; event_len as usize]; + reader.read_exact(&mut event).ok()?; + let data_len = reader.read_u32::().ok()?; + let mut data = vec![0 as u8; data_len as usize]; + reader.read_exact(&mut data).ok()?; + Some(BinaryMessage { event, data }) } - pub fn pump(&mut self) -> Option { - self.state = Self::update_state(&mut self.reader, std::mem::take(&mut self.state)); - match std::mem::take(&mut self.state) { - BinaryClientState::Message { event, data } => { - self.state = BinaryClientState::PartialEventLength { buf_len: 0, buf: [0; 4] }; - Some(BinaryMessage { - event: event, - data: data, - }) - }, - st => { - self.state = st; - None + pub fn pump(&mut self) -> Erm> { + let mut events = polling::Events::new(); + self.poller.wait(&mut events, Some(std::time::Duration::from_secs(0))) + .wrap_err("failed to poll message bus")?; + for ev in events.iter() { + if ev.readable { + let mut buf = [0; 1024]; + let sz = self.socket.read(&mut buf).wrap_err("failed to read from bus socket")?; + self.in_buf.extend_from_slice(&buf[..sz]); + } + if ev.writable { + let sz = self.socket.write(&self.out_buf).wrap_err("failed to write to bus socket")?; + self.out_buf.drain(..sz); } + self.poller.modify(&self.socket, polling::Event::all(0)).wrap_err("failed to update event to poll")?; } + Ok(self.pop_incoming_message()) } } diff --git a/crates/teleia/src/utils.rs b/crates/teleia/src/utils.rs index 05e9251..c479560 100644 --- a/crates/teleia/src/utils.rs +++ b/crates/teleia/src/utils.rs @@ -9,6 +9,23 @@ pub fn erm(e: E) -> Erm where E: std::error::Error + std::marker::Send Err(e.into()) } +#[derive(Debug)] +pub struct Error { + msg: String +} +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "teleia error: {}", self.msg) + } +} +impl std::error::Error for Error {} + +pub fn erm_msg(msg: &str) -> Erm { + Err(Error { + msg: msg.to_owned(), + }.into()) +} + pub struct ErrorHandler; impl color_eyre::eyre::EyreHandler for ErrorHandler { fn debug( -- cgit v1.2.3