summaryrefslogtreecommitdiff
path: root/crates
diff options
context:
space:
mode:
authorLLLL Colonq <llll@colonq>2025-11-06 20:32:11 -0500
committerLLLL Colonq <llll@colonq>2025-11-06 20:32:25 -0500
commit6a397916e0177c9e7c6b93fb9cc7510c14a659ff (patch)
treea859364ea5ff5ff038b6171bba9d5d99d7771a61 /crates
parent71b22b03b51aaba01df786f70becb03a30429f03 (diff)
Fix fig client
Diffstat (limited to 'crates')
-rw-r--r--crates/teleia/Cargo.toml3
-rw-r--r--crates/teleia/src/fig.rs229
-rw-r--r--crates/teleia/src/utils.rs17
3 files changed, 77 insertions, 172 deletions
diff --git a/crates/teleia/Cargo.toml b/crates/teleia/Cargo.toml
index 9c1037f..6de4df0 100644
--- a/crates/teleia/Cargo.toml
+++ b/crates/teleia/Cargo.toml
@@ -47,4 +47,5 @@ web-sys = { version = "*", features = ["Document", "Window", "Element", "HtmlCan
env_logger = "*" # configurable logging to stdout
glfw = { git = "https://github.com/lcolonq/glfw-rs", features = ["serde"] } # window management
kira = { version = "=0.9.6", default-features = false, features = ["cpal", "ogg", "wav"] } # audio
-directories = { git = "https://github.com/lcolonq/directories-rs" } # standard system directories \ No newline at end of file
+directories = { git = "https://github.com/lcolonq/directories-rs" } # standard system directories
+polling = "*" # interface to epoll \ No newline at end of file
diff --git a/crates/teleia/src/fig.rs b/crates/teleia/src/fig.rs
index 13b4773..0dd2f68 100644
--- a/crates/teleia/src/fig.rs
+++ b/crates/teleia/src/fig.rs
@@ -1,193 +1,80 @@
-use std::io::{BufRead, Read, Write};
-use byteorder::WriteBytesExt;
+use std::io::{Read, Write};
+use byteorder::{ReadBytesExt, WriteBytesExt};
-#[derive(Debug, Clone)]
-pub struct SexpMessage {
- pub event: lexpr::Value,
- pub data: lexpr::Value,
-}
-
-pub struct SexpClient {
- reader: std::io::BufReader<std::net::TcpStream>,
- buf: String,
-}
-impl SexpClient {
- pub fn new(addr: &str, subs: &[lexpr::Value]) -> Self {
- let mut socket = std::net::TcpStream::connect(addr).expect("failed to connect to message bus");
- socket.set_nonblocking(true).expect("failed to set message bus socket nonblocking");
- for s in subs {
- write!(socket, "(sub {})\n", s).expect("failed to send subscribe message to bus");
- }
- let reader = std::io::BufReader::new(socket);
- Self { reader, buf: String::new(), }
- }
- pub fn pump(&mut self) -> Option<SexpMessage> {
- match self.reader.read_line(&mut self.buf) {
- Ok(_l) => {
- let mv = lexpr::from_str(&self.buf);
- self.buf.clear();
- match mv {
- Ok(v) => {
- match v.as_cons() {
- Some(cs) => {
- Some(SexpMessage { event: cs.car().clone(), data: cs.cdr().clone() })
- },
- _ => { log::error!("malformed message bus input s-expression: {}", v); None },
- }
- },
- Err(e) => { log::error!("malformed message bus input line: {}", e); None },
- }
- },
- Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
- None
- },
- Err(e) => panic!("IO error on message bus: {}", e),
- }
- }
-}
+use crate::{Erm, WrapErr};
#[derive(Debug, Clone)]
pub struct BinaryMessage {
pub event: Vec<u8>,
pub data: Vec<u8>
}
-#[derive(Debug, Clone)]
-pub enum BinaryClientState {
- PartialEventLength { buf_len: usize, buf: [u8; 4] },
- PartialEvent { len: usize, buf_len: usize, buf: Vec<u8> },
- PartialDataLength { event: Vec<u8>, buf_len: usize, buf: [u8; 4] },
- PartialData { event: Vec<u8>, len: usize, buf_len: usize, buf: Vec<u8> },
- Message { event: Vec<u8>, data: Vec<u8> },
-}
-impl Default for BinaryClientState {
- fn default() -> Self {
- Self::PartialEventLength { buf_len: 0, buf: [0; 4] }
- }
-}
+
pub struct BinaryClient {
- state: BinaryClientState,
- writer: std::net::TcpStream,
- reader: std::io::BufReader<std::net::TcpStream>,
- blocking: bool,
+ in_buf: Vec<u8>,
+ out_buf: Vec<u8>,
+ socket: std::net::TcpStream,
+ poller: polling::Poller,
}
impl BinaryClient {
- pub fn new(addr: &str, blocking: bool, subs: &[&[u8]]) -> Self {
- let mut socket = std::net::TcpStream::connect(addr).expect("failed to connect to message bus");
- socket.set_nonblocking(!blocking).expect("failed to set message bus socket nonblocking");
+ pub fn new(addr: &str, subs: &[&[u8]]) -> Erm<Self> {
+ let mut socket = std::net::TcpStream::connect(addr).wrap_err("failed to connect to message bus")?;
for s in subs {
- write!(socket, "s").expect("failed to send subscribe message to bus");
- socket.write_u32::<byteorder::LE>(s.len() as u32).expect("failed to send subscribe message length to bus");
- socket.write_all(s).expect("failed to send subscribe message to bus");
+ write!(socket, "s").wrap_err("failed to send subscribe message to bus")?;
+ socket.write_u32::<byteorder::LE>(s.len() as u32).wrap_err("failed to send subscribe message length to bus")?;
+ socket.write_all(s).wrap_err("failed to send subscribe message to bus")?;
}
- socket.flush().expect("failed to flush bus connection");
- let writer = socket.try_clone().expect("failed to clone socket");
- let reader = std::io::BufReader::new(socket);
- Self {
- state: BinaryClientState::PartialEventLength { buf_len: 0, buf: [0; 4] },
- writer,
- reader,
- blocking,
+ // socket.set_nonblocking(false).wrap_err("failed to set message bus socket nonblocking")?;
+ socket.flush().wrap_err("failed to flush bus connection")?;
+ let in_buf = vec![0; 1024];
+ let out_buf = vec![0; 1024];
+ let poller = polling::Poller::new()?;
+ unsafe {
+ poller.add(&socket, polling::Event::all(0)).wrap_err("failed to add event to poll")?;
}
+ Ok(Self {
+ in_buf, out_buf,
+ socket,
+ poller,
+ })
}
- fn write_length_prefixed(&mut self, buf: &[u8]) {
- self.writer.write_u32::<byteorder::LE>(buf.len() as u32).expect("failed to send message");
- self.writer.write_all(buf).expect("failed to send message");
+ fn write_length_prefixed(&mut self, buf: &[u8]) -> Erm<()> {
+ self.out_buf.write_u32::<byteorder::LE>(buf.len() as u32).wrap_err("failed to send message")?;
+ self.out_buf.write_all(buf).wrap_err("failed to send message")?;
+ Ok(())
}
- pub fn publish(&mut self, ev: &[u8], data: &[u8]) {
- if !self.blocking {
- self.writer.set_nonblocking(false).expect("failed to set message bus socket nonblocking");
- }
- write!(self.writer, "p").expect("failed to send publish message to bus");
- self.write_length_prefixed(ev);
- self.write_length_prefixed(data);
- if !self.blocking {
- self.writer.set_nonblocking(true).expect("failed to set message bus socket nonblocking");
- }
- }
- fn read(reader: &mut std::io::BufReader<std::net::TcpStream>, buf: &mut [u8]) -> Option<usize> {
- match reader.read(buf) {
- Ok(sz) => Some(sz),
- Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
- None
- },
- Err(e) => panic!("IO error on message bus: {}", e),
- }
+ pub fn publish(&mut self, ev: &[u8], data: &[u8]) -> Erm<()> {
+ write!(self.out_buf, "p").wrap_err("failed to send publish message to bus")?;
+ self.write_length_prefixed(ev)?;
+ self.write_length_prefixed(data)?;
+ Ok(())
}
- fn update_state(
- reader: &mut std::io::BufReader<std::net::TcpStream>,
- mut state: BinaryClientState,
- ) -> BinaryClientState {
- loop {
- state = match state {
- BinaryClientState::PartialEventLength { mut buf_len, mut buf } => {
- buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) {
- x
- } else { break BinaryClientState::PartialEventLength { buf_len, buf }; };
- if buf_len == 4 {
- let len = u32::from_le_bytes(buf) as usize;
- BinaryClientState::PartialEvent {
- len,
- buf_len: 0,
- buf: vec![0; len],
- }
- } else { BinaryClientState::PartialEventLength { buf_len, buf } }
- },
- BinaryClientState::PartialEvent { len, mut buf_len, mut buf } => {
- buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) {
- x
- } else { break BinaryClientState::PartialEvent { len, buf_len, buf }; };
- if buf_len == len {
- BinaryClientState::PartialDataLength {
- event: buf.clone(),
- buf_len: 0,
- buf: [0; 4],
- }
- } else { BinaryClientState::PartialEvent { len, buf_len, buf } }
- },
- BinaryClientState::PartialDataLength { event, mut buf_len, mut buf } => {
- buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) {
- x
- } else { break BinaryClientState::PartialDataLength { event, buf_len, buf }; };
- if buf_len == 4 {
- let len = u32::from_le_bytes(buf) as usize;
- BinaryClientState::PartialData {
- event,
- len,
- buf_len: 0,
- buf: vec![0; len],
- }
- } else { BinaryClientState::PartialDataLength { event, buf_len, buf } }
- },
- BinaryClientState::PartialData { event, len, mut buf_len, mut buf } => {
- buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) {
- x
- } else { break BinaryClientState::PartialData { event, len, buf_len, buf }; };
- if buf_len == len {
- BinaryClientState::Message {
- event,
- data: buf.clone(),
- }
- } else { BinaryClientState::PartialData { event, len, buf_len, buf } }
- },
- st@BinaryClientState::Message{..} => break st,
- };
- }
+ fn pop_incoming_message(&mut self) -> Option<BinaryMessage> {
+ let mut reader = std::io::Cursor::new(&self.in_buf);
+ let event_len = reader.read_u32::<byteorder::LE>().ok()?;
+ let mut event = vec![0 as u8; event_len as usize];
+ reader.read_exact(&mut event).ok()?;
+ let data_len = reader.read_u32::<byteorder::LE>().ok()?;
+ let mut data = vec![0 as u8; data_len as usize];
+ reader.read_exact(&mut data).ok()?;
+ Some(BinaryMessage { event, data })
}
- pub fn pump(&mut self) -> Option<BinaryMessage> {
- self.state = Self::update_state(&mut self.reader, std::mem::take(&mut self.state));
- match std::mem::take(&mut self.state) {
- BinaryClientState::Message { event, data } => {
- self.state = BinaryClientState::PartialEventLength { buf_len: 0, buf: [0; 4] };
- Some(BinaryMessage {
- event: event,
- data: data,
- })
- },
- st => {
- self.state = st;
- None
+ pub fn pump(&mut self) -> Erm<Option<BinaryMessage>> {
+ let mut events = polling::Events::new();
+ self.poller.wait(&mut events, Some(std::time::Duration::from_secs(0)))
+ .wrap_err("failed to poll message bus")?;
+ for ev in events.iter() {
+ if ev.readable {
+ let mut buf = [0; 1024];
+ let sz = self.socket.read(&mut buf).wrap_err("failed to read from bus socket")?;
+ self.in_buf.extend_from_slice(&buf[..sz]);
+ }
+ if ev.writable {
+ let sz = self.socket.write(&self.out_buf).wrap_err("failed to write to bus socket")?;
+ self.out_buf.drain(..sz);
}
+ self.poller.modify(&self.socket, polling::Event::all(0)).wrap_err("failed to update event to poll")?;
}
+ Ok(self.pop_incoming_message())
}
}
diff --git a/crates/teleia/src/utils.rs b/crates/teleia/src/utils.rs
index 05e9251..c479560 100644
--- a/crates/teleia/src/utils.rs
+++ b/crates/teleia/src/utils.rs
@@ -9,6 +9,23 @@ pub fn erm<E, T>(e: E) -> Erm<T> where E: std::error::Error + std::marker::Send
Err(e.into())
}
+#[derive(Debug)]
+pub struct Error {
+ msg: String
+}
+impl std::fmt::Display for Error {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "teleia error: {}", self.msg)
+ }
+}
+impl std::error::Error for Error {}
+
+pub fn erm_msg<T>(msg: &str) -> Erm<T> {
+ Err(Error {
+ msg: msg.to_owned(),
+ }.into())
+}
+
pub struct ErrorHandler;
impl color_eyre::eyre::EyreHandler for ErrorHandler {
fn debug(