From e51d53893362bf6ce4a7da5c4c235931ddbf7a9d Mon Sep 17 00:00:00 2001 From: LLLL Colonq Date: Fri, 30 Jan 2026 18:33:17 -0500 Subject: Update --- crates/teleia/src/lib.rs | 2 +- crates/teleia/src/net/ws.rs | 2 +- crates/teleia/src/net/ws/client/native.rs | 73 +++++++++++++++------------- crates/teleia/src/net/ws/server.rs | 80 +++++++++++++++++++++++++++++-- crates/teleia/src/state.rs | 4 +- 5 files changed, 120 insertions(+), 41 deletions(-) diff --git a/crates/teleia/src/lib.rs b/crates/teleia/src/lib.rs index 5cdf777..df1a528 100644 --- a/crates/teleia/src/lib.rs +++ b/crates/teleia/src/lib.rs @@ -62,7 +62,7 @@ where pub fn run<'a, F, G>(title: &str, w: u32, h: u32, options: Options, gnew: F) -> Erm<()> where G: state::Game + 'static, - F: (Fn(&'a context::Context) -> G), + F: (FnOnce(&'a context::Context) -> G), { env_logger::Builder::new() .filter(None, log::LevelFilter::Info) diff --git a/crates/teleia/src/net/ws.rs b/crates/teleia/src/net/ws.rs index bedcc29..8ea1173 100644 --- a/crates/teleia/src/net/ws.rs +++ b/crates/teleia/src/net/ws.rs @@ -3,7 +3,7 @@ pub mod client; #[cfg(not(target_arch = "wasm32"))] pub mod server; -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum Message { Binary(Vec), Text(String), diff --git a/crates/teleia/src/net/ws/client/native.rs b/crates/teleia/src/net/ws/client/native.rs index efefdf2..3b8502c 100644 --- a/crates/teleia/src/net/ws/client/native.rs +++ b/crates/teleia/src/net/ws/client/native.rs @@ -3,7 +3,7 @@ use std::sync::{Arc, Mutex}; use std::thread::spawn; use crate::net::ws::Message; -use crate::{Erm, WrapErr}; +use crate::{utils, Erm, WrapErr}; const KEY: usize = 42; @@ -24,63 +24,70 @@ impl Client { }; let poller = match polling::Poller::new() { Ok(v) => v, - Err(e) => { - log::warn!("failed to create poller: {}", e); - return; - } + Err(e) => { log::warn!("failed to create poller: {}", e); return; } }; - match socket.get_ref() { + let res: Erm<()> = try { match socket.get_ref() { tungstenite::stream::MaybeTlsStream::Plain(s) => unsafe { - if let Err(e) = poller.add(s, polling::Event::readable(KEY)) { - log::warn!("failed to add event to poll: {}", e); - let _ = socket.close(None); - return; - }; + s.set_nonblocking(true).wrap_err("failed to set socket nonblocking")?; + poller.add(s, polling::Event::readable(KEY)).wrap_err("failed to add event to poll")?; }, tungstenite::stream::MaybeTlsStream::NativeTls(s) => unsafe { - if let Err(e) = poller.add(s.get_ref(), polling::Event::readable(KEY)) { - log::warn!("failed to add event to poll: {}", e); - let _ = socket.close(None); - return; - }; - }, - _ => { - log::warn!("unknown socket type; cannot poll!"); - let _ = socket.close(None); - return; + s.get_ref().set_nonblocking(true).wrap_err("failed to set socket nonblocking")?; + poller.add(s.get_ref(), polling::Event::readable(KEY)).wrap_err("failed to add event to poll")?; }, - } + _ => utils::erm_msg("unknown socket type; cannot poll!")?, + }}; + if let Err(e) = res { + log::warn!("failed to add event to poll: {}", e); + let _ = socket.close(None); + return; + }; let (outgoing_sender, outgoing_receiver) = channel(); let (incoming_sender, incoming_receiver) = channel(); *self.channels.lock().unwrap() = Some((outgoing_sender, incoming_receiver)); - let channels_ref = self.channels.clone(); + let send_channels_ref = self.channels.clone(); + let recv_channels_ref = self.channels.clone(); + let send_socket_ref = Arc::new(Mutex::new(socket)); + let recv_socket_ref = send_socket_ref.clone(); let mut events = polling::Events::new(); spawn(move || loop { + // write all outgoing messages let res: Erm<()> = try { - // send all outgoing messages - while let Ok(msg) = outgoing_receiver.try_recv() { + while let Ok(msg) = outgoing_receiver.recv() { match msg { - Message::Text(txt) => socket.send(tungstenite::Message::Text(txt)) + Message::Text(txt) => send_socket_ref.lock().unwrap() + .send(tungstenite::Message::Text(txt)) .wrap_err("failed to send websocket text")?, - Message::Binary(bytes) => socket.send(tungstenite::Message::Binary(bytes)) + Message::Binary(bytes) => send_socket_ref.lock().unwrap() + .send(tungstenite::Message::Binary(bytes)) .wrap_err("failed to send websocket bytes")?, } } - // wait until we've received data + }; + if let Err(e) = res { + log::warn!("error in websocket send thread: {}", e); + *send_channels_ref.lock().unwrap() = None; + return; + } + }); + spawn(move || loop { + // wait for and receive all incoming messages + let res: Erm<()> = try { events.clear(); poller.wait(&mut events, None).wrap_err("failed to wait on websocket poller")?; for ev in events.iter() { if ev.key == KEY { - if ev.readable { match socket.read().wrap_err("failed to recv on websocket")? { + if ev.readable { match recv_socket_ref.lock().unwrap().read().wrap_err("failed to recv on websocket")? { tungstenite::Message::Text(txt) => incoming_sender.send(Message::Text(txt)) .wrap_err("failed to send incoming websocket message on channel")?, tungstenite::Message::Binary(bytes) => incoming_sender.send(Message::Binary(bytes)) .wrap_err("failed to send incoming websocket message on channel")?, - tungstenite::Message::Ping(bs) => socket.send(tungstenite::Message::Pong(bs)) + tungstenite::Message::Ping(bs) => recv_socket_ref.lock().unwrap() + .send(tungstenite::Message::Pong(bs)) .wrap_err("failed to send websocket pong")?, m => log::warn!("unhandled incoming websocket message: {}", m), }} - match socket.get_ref() { + match recv_socket_ref.lock().unwrap().get_ref() { tungstenite::stream::MaybeTlsStream::Plain(s) => poller.modify(s, polling::Event::readable(KEY)) .wrap_err("failed to modify event to poll")?, @@ -93,8 +100,8 @@ impl Client { } }; if let Err(e) = res { - log::warn!("error in websocket thread: {}", e); - *channels_ref.lock().unwrap() = None; + log::warn!("error in websocket recv thread: {}", e); + *recv_channels_ref.lock().unwrap() = None; return; } }); diff --git a/crates/teleia/src/net/ws/server.rs b/crates/teleia/src/net/ws/server.rs index 60537a1..7cd0cf8 100644 --- a/crates/teleia/src/net/ws/server.rs +++ b/crates/teleia/src/net/ws/server.rs @@ -5,7 +5,7 @@ use std::sync::{Arc, Mutex}; use std::thread::spawn; use crate::net::ws::Message; -use crate::{Erm, WrapErr}; +use crate::{utils, Erm, WrapErr}; const ACCEPT_KEY: usize = 1; @@ -25,7 +25,8 @@ impl Server { } } pub fn start(&mut self, addr: &str) { - let clients_ref = self.clients.clone(); + let recv_clients_ref = self.clients.clone(); + let send_clients_ref = self.clients.clone(); let (outgoing_sender, outgoing_receiver) = channel(); let (incoming_sender, incoming_receiver) = channel(); *self.channels.lock().unwrap() = Some((outgoing_sender, incoming_receiver)); @@ -39,6 +40,31 @@ impl Server { let mut events = polling::Events::new(); let mut next_id = ACCEPT_KEY + 1; spawn(move || loop { + // write all outgoing messages + while let Ok((cid, msg)) = outgoing_receiver.recv() { + let res: Erm<()> = try { + if let Some(sock) = send_clients_ref.lock().unwrap().get_mut(&cid) { + let res = match msg { + Message::Text(txt) => sock.send(tungstenite::Message::Text(txt)), + Message::Binary(bytes) => sock.send(tungstenite::Message::Binary(bytes)) + }; + match res { + Ok(_) => {}, + Err(tungstenite::Error::Io(e)) if e.kind() == std::io::ErrorKind::WouldBlock => { + log::info!("client {} receive send block", cid); + }, + Err(e) => utils::erm(e)?, + } + } + }; + if let Err(e) = res { + log::warn!("error when sending to client {}, disconnecting: {}", cid, e); + send_clients_ref.lock().unwrap().remove(&cid); + } + } + }); + spawn(move || loop { + // wait for and receive all new connections and incoming messages let res: Erm<()> = try { events.clear(); poller.wait(&mut events, None).wrap_err("failed to wait on websocket server poller")?; @@ -46,11 +72,12 @@ impl Server { if ev.key == ACCEPT_KEY { let (sock, client_addr) = listener.accept().wrap_err("failed to accept on socket")?; let id = next_id; next_id += 1; - let res = try { + let res: Erm<()> = try { unsafe { poller.add(&sock, polling::Event::readable(id)) .wrap_err("failed to add socket to poller")?; } let conn = tungstenite::accept(sock).wrap_err("failed to upgrade to websocket")?; - clients_ref.lock().unwrap().insert(id, conn); + recv_clients_ref.lock().unwrap().insert(id, conn); + log::info!("new connected client! {}", id); }; if let Err(e) = res { log::warn!("error during connection from {}: {}", client_addr, e); @@ -58,6 +85,34 @@ impl Server { poller.modify(&listener, polling::Event::readable(ACCEPT_KEY)) .wrap_err("failed to modify event to poll")?; } else { + let res: Erm<()> = try { + if let Some(sock) = recv_clients_ref.lock().unwrap().get_mut(&ev.key) { + match sock.read() { + Ok(tungstenite::Message::Text(t)) => + incoming_sender.send((ev.key, Message::Text(t))) + .wrap_err("failed to send incoming message on channel")?, + Ok(tungstenite::Message::Binary(b)) => + incoming_sender.send((ev.key, Message::Binary(b))) + .wrap_err("failed to send incoming message on channel")?, + Ok(tungstenite::Message::Ping(x)) => + sock.write(tungstenite::Message::Pong(x)) + .wrap_err("failed to reply with pong")?, + Ok(m) => log::info!("received unhandled websocket message: {}", m), + Err(tungstenite::Error::Io(e)) if e.kind() == std::io::ErrorKind::WouldBlock => { + log::info!("client {} receive would block", ev.key); + }, + Err(e) => utils::erm(e)?, + } + poller.modify(sock.get_ref(), polling::Event::readable(ev.key)) + .wrap_err("failed to modify event to poll")?; + } else { + log::warn!("poller indicated event for unknown client: {}", ev.key); + } + }; + if let Err(e) = res { + log::warn!("error on client {} connection, disconnecting: {}", ev.key, e); + recv_clients_ref.lock().unwrap().remove(&ev.key); + } } } }; @@ -68,4 +123,21 @@ impl Server { } }); } + pub fn clients(&self) -> Vec { + self.clients.lock().unwrap().keys().copied().collect() + } + pub fn poll(&mut self) -> Option<(ClientId, Message)> { + if let Some((_, incoming)) = &*self.channels.lock().unwrap() { + incoming.try_recv().ok() + } else { None } + } + pub fn send(&self, client: ClientId, msg: Message) { + if let Some((outgoing, _)) = &mut *self.channels.lock().unwrap() { + if let Err(e) = outgoing.send((client, msg)) { + log::warn!("failed to send websocket message to client {} on channel: {}", client, e); + } + } else { + log::warn!("tried to send message to client {}, but websocket is not connected", client); + } + } } diff --git a/crates/teleia/src/state.rs b/crates/teleia/src/state.rs index 9c288a2..5bec34e 100644 --- a/crates/teleia/src/state.rs +++ b/crates/teleia/src/state.rs @@ -10,8 +10,8 @@ use crate::{audio, context, framebuffer, mesh, shader, utils}; const DELTA_TIME: f64 = 0.016; // todo pub trait Game { - fn initialize(&self, ctx: &context::Context, st: &State) -> utils::Erm<()> { Ok(()) } - fn finalize(&self, ctx: &context::Context, st: &State) -> utils::Erm<()> { Ok(()) } + fn initialize(&mut self, ctx: &context::Context, st: &mut State) -> utils::Erm<()> { Ok(()) } + fn finalize(&mut self, ctx: &context::Context, st: &mut State) -> utils::Erm<()> { Ok(()) } fn initialize_audio( &self, ctx: &context::Context, st: &State, actx: &audio::Context -- cgit v1.2.3