use std::io::{BufRead, Read, Write}; use byteorder::WriteBytesExt; #[derive(Debug, Clone)] pub struct Message { pub event: lexpr::Value, pub data: lexpr::Value, } pub struct Client { reader: std::io::BufReader, buf: String, } impl Client { pub fn new(addr: &str, subs: &[lexpr::Value]) -> Self { let mut socket = std::net::TcpStream::connect(addr).expect("failed to connect to message bus"); socket.set_nonblocking(true).expect("failed to set message bus socket nonblocking"); for s in subs { write!(socket, "(sub {})\n", s).expect("failed to send subscribe message to bus"); } let reader = std::io::BufReader::new(socket); Self { reader, buf: String::new(), } } pub fn pump(&mut self) -> Option { match self.reader.read_line(&mut self.buf) { Ok(l) => { // log::info!("read line: {}", self.buf); let mv = lexpr::from_str(&self.buf); self.buf.clear(); match mv { Ok(v) => { match v.as_cons() { Some(cs) => { Some(Message { event: cs.car().clone(), data: cs.cdr().clone() }) }, _ => { log::error!("malformed message bus input s-expression: {}", v); None }, } }, Err(e) => { log::error!("malformed message bus input line: {}", e); None }, } }, Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // if self.buf.len() > 0 { // log::error!("error wouldblock: buf is {}", self.buf); // } None }, Err(e) => panic!("IO error on message bus: {}", e), } } } #[derive(Debug, Clone)] pub struct BinaryMessage { pub event: Vec, pub data: Vec } #[derive(Debug, Clone)] pub enum BinaryClientState { PartialEventLength { buf_len: usize, buf: [u8; 4] }, PartialEvent { len: usize, buf_len: usize, buf: Vec }, PartialDataLength { event: Vec, buf_len: usize, buf: [u8; 4] }, PartialData { event: Vec, len: usize, buf_len: usize, buf: Vec }, Message { event: Vec, data: Vec }, } impl Default for BinaryClientState { fn default() -> Self { Self::PartialEventLength { buf_len: 0, buf: [0; 4] } } } pub struct BinaryClient { state: BinaryClientState, reader: std::io::BufReader, } impl BinaryClient { pub fn new(addr: &str, subs: &[&[u8]]) -> Self { let mut socket = std::net::TcpStream::connect(addr).expect("failed to connect to message bus"); socket.set_nonblocking(true).expect("failed to set message bus socket nonblocking"); for s in subs { write!(socket, "s").expect("failed to send subscribe message to bus"); socket.write_u32::(s.len() as u32).expect("failed to send subscribe message length to bus"); socket.write_all(s).expect("failed to send subscribe message to bus"); } socket.flush().expect("failed to flush bus connection"); let reader = std::io::BufReader::new(socket); Self { state: BinaryClientState::PartialEventLength { buf_len: 0, buf: [0; 4] }, reader } } fn read(reader: &mut std::io::BufReader, buf: &mut [u8]) -> Option { match reader.read(buf) { Ok(sz) => Some(sz), Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { None }, Err(e) => panic!("IO error on message bus: {}", e), } } fn update_state( reader: &mut std::io::BufReader, mut state: BinaryClientState, ) -> BinaryClientState { loop { let new_state = match state { BinaryClientState::PartialEventLength { mut buf_len, mut buf } => { buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) { x } else { break; }; if buf_len == 4 { let len = u32::from_le_bytes(buf) as usize; BinaryClientState::PartialEvent { len, buf_len: 0, buf: vec![0; len], } } else { BinaryClientState::PartialEventLength { buf_len, buf } } }, BinaryClientState::PartialEvent { len, mut buf_len, mut buf } => { buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) { x } else { break; }; if buf_len == len { BinaryClientState::PartialDataLength { event: buf.clone(), buf_len: 0, buf: [0; 4], } } else { BinaryClientState::PartialEvent { len, buf_len, buf } } }, BinaryClientState::PartialDataLength { event, mut buf_len, mut buf } => { buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) { x } else { break; }; if buf_len == 4 { let len = u32::from_le_bytes(buf) as usize; BinaryClientState::PartialData { event, len, buf_len: 0, buf: vec![0; len], } } else { BinaryClientState::PartialDataLength { event, buf_len, buf } } }, BinaryClientState::PartialData { event, len, mut buf_len, mut buf } => { buf_len += if let Some(x) = Self::read(reader, &mut buf[buf_len..]) { x } else { break; }; if buf_len == len { BinaryClientState::Message { event, data: buf.clone(), } } else { BinaryClientState::PartialData { event, len, buf_len, buf } } }, BinaryClientState::Message{..} => break, }; state = new_state; }; state } pub fn pump(&mut self) -> Option { None // loop { // if let Some(new) = self.update_state(mem::take(&mut self.state)) { // self.state = new; // } else { break; } // } // if let BinaryClientState::Message { event, data } = self.state.clone() { // self.state = BinaryClientState::PartialEventLength { buf_len: 0, buf: [0; 4] }; // Some(BinaryMessage { event, data }) // } else { // None // } } }