From 836760c3c5f80674def4c0994c6b0d20f08ae148 Mon Sep 17 00:00:00 2001 From: LLLL Colonq Date: Sun, 25 Jan 2026 16:16:10 -0500 Subject: Networking --- crates/teleia/Cargo.toml | 3 +- crates/teleia/src/audio.rs | 63 +++++++++++----- crates/teleia/src/fig.rs | 100 ------------------------- crates/teleia/src/lib.rs | 5 +- crates/teleia/src/net.rs | 5 +- crates/teleia/src/net/client.rs | 2 - crates/teleia/src/net/client/wasm.rs | 67 ----------------- crates/teleia/src/net/fig.rs | 100 +++++++++++++++++++++++++ crates/teleia/src/net/ws.rs | 10 +++ crates/teleia/src/net/ws/client.rs | 9 +++ crates/teleia/src/net/ws/client/native.rs | 119 ++++++++++++++++++++++++++++++ crates/teleia/src/net/ws/client/wasm.rs | 73 ++++++++++++++++++ crates/teleia/src/net/ws/server.rs | 71 ++++++++++++++++++ crates/teleia/src/save.rs | 3 +- crates/teleia/src/state.rs | 47 ------------ 15 files changed, 437 insertions(+), 240 deletions(-) delete mode 100644 crates/teleia/src/fig.rs delete mode 100644 crates/teleia/src/net/client.rs delete mode 100644 crates/teleia/src/net/client/wasm.rs create mode 100644 crates/teleia/src/net/fig.rs create mode 100644 crates/teleia/src/net/ws.rs create mode 100644 crates/teleia/src/net/ws/client.rs create mode 100644 crates/teleia/src/net/ws/client/native.rs create mode 100644 crates/teleia/src/net/ws/client/wasm.rs create mode 100644 crates/teleia/src/net/ws/server.rs (limited to 'crates') diff --git a/crates/teleia/Cargo.toml b/crates/teleia/Cargo.toml index 66edbc8..18718a4 100644 --- a/crates/teleia/Cargo.toml +++ b/crates/teleia/Cargo.toml @@ -48,4 +48,5 @@ env_logger = "*" # configurable logging to stdout glfw = { git = "https://github.com/lcolonq/glfw-rs", features = ["serde"] } # window management kira = { version = "=0.9.6", default-features = false, features = ["cpal", "ogg", "wav"] } # audio directories = { git = "https://github.com/lcolonq/directories-rs" } # standard system directories -polling = "*" # interface to epoll \ No newline at end of file +polling = "*" # interface to epoll +tungstenite = { version = "*", features = ["native-tls"] } # websockets \ No newline at end of file diff --git a/crates/teleia/src/audio.rs b/crates/teleia/src/audio.rs index e95a86d..8106c20 100644 --- a/crates/teleia/src/audio.rs +++ b/crates/teleia/src/audio.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; -#[cfg(target_arch = "wasm32")] use std::sync::{Arc, Mutex}; #[cfg(target_arch = "wasm32")] @@ -71,7 +70,10 @@ impl Audio { } } - pub fn play(&self, ctx: &Context, looping: Option<(Option, Option)>) -> Option { + pub fn play(&self, + ctx: &mut Context, + looping: Option<(Option, Option)> + ) -> Option { let source = ctx.audio.create_buffer_source().ok()?; if let Some(ab) = &*self.buffer.lock().unwrap() { source.set_buffer(Some(&ab)); @@ -148,6 +150,24 @@ impl Context { } } +#[cfg(not(target_arch = "wasm32"))] +pub struct AudioPlayingHandle { + handle: Arc> +} + +#[cfg(not(target_arch = "wasm32"))] +impl AudioPlayingHandle { + pub fn stop(&self, _ctx: &Context) { + self.handle.lock().unwrap().stop(kira::tween::Tween::default()); + } + pub fn fade_out(&self, _ctx: &Context, time: f32) { + self.handle.lock().unwrap().stop(kira::tween::Tween { + duration: std::time::Duration::from_secs_f32(time + 0.5), + ..Default::default() + }); + } +} + #[cfg(not(target_arch = "wasm32"))] pub struct Audio { data: kira::sound::static_sound::StaticSoundData, @@ -155,18 +175,30 @@ pub struct Audio { #[cfg(not(target_arch = "wasm32"))] impl Audio { - pub fn new(_ctx: &Context, bytes: &'static [u8]) -> Self { + pub fn new(_ctx: &Context, bytes: &[u8]) -> Self { Self { - data: kira::sound::static_sound::StaticSoundData::from_cursor(std::io::Cursor::new(bytes)) + data: kira::sound::static_sound::StaticSoundData::from_cursor(std::io::Cursor::new(bytes.to_owned())) .expect("failed to decode audio"), } } + pub fn from_samples(_ctx: &Context, sample_rate: f32, samples: &[f32]) -> Self { + let frames: Vec = samples.iter().map(|f| kira::Frame { left: *f, right: *f }).collect(); + Self { + data: kira::sound::static_sound::StaticSoundData { + sample_rate: sample_rate as u32, + frames: frames.into(), + settings: kira::sound::static_sound::StaticSoundSettings::default(), + slice: None, + }, + } + } + pub fn play( &self, ctx: &mut Context, looping: Option<(Option, Option)> - ) -> Result + ) -> Option { let sd = if let Some((ss, se)) = looping { let start = if let Some(s) = ss { s } else { 0.0 }; @@ -178,10 +210,7 @@ impl Audio { } else { self.data.clone() }; - match ctx.manager.play(sd) { - Ok(h) => Ok(h), - Err(e) => Err(e.to_string()), - } + ctx.manager.play(sd).ok().map(|h| AudioPlayingHandle { handle: Arc::new(Mutex::new(h)) }) } } @@ -189,7 +218,7 @@ impl Audio { pub struct Assets { pub ctx: Context, pub audio: HashMap, - pub music_handle: Option, + pub music_handle: Option, } #[cfg(not(target_arch = "wasm32"))] @@ -208,29 +237,29 @@ impl Assets { pub fn play_sfx(&mut self, name: &str) { if let Some(a) = self.audio.get(name) { - if let Err(e) = a.play(&mut self.ctx, None) { - log::warn!("failed to play sound {}: {}", name, e); + if a.play(&mut self.ctx, None).is_none() { + log::warn!("failed to play sound {}", name); } } } pub fn is_music_playing(&self) -> bool { if let Some(mh) = &self.music_handle { - mh.state() == kira::sound::PlaybackState::Playing + mh.handle.lock().unwrap().state() == kira::sound::PlaybackState::Playing } else { false } } pub fn play_music(&mut self, name: &str, start: Option, end: Option) { if let Some(s) = &mut self.music_handle { - let _ = s.stop(kira::tween::Tween::default()); + let _ = s.stop(&self.ctx); } if let Some(a) = self.audio.get(name) { match a.play(&mut self.ctx, Some((start, end))) { - Ok(h) => { + Some(h) => { self.music_handle = Some(h); }, - Err(e) => { - log::warn!("failed to play music {}: {}", name, e); + _ => { + log::warn!("failed to play music {}", name); } } } diff --git a/crates/teleia/src/fig.rs b/crates/teleia/src/fig.rs deleted file mode 100644 index 9dfbea8..0000000 --- a/crates/teleia/src/fig.rs +++ /dev/null @@ -1,100 +0,0 @@ -use std::io::{Read, Write}; -use byteorder::{LE, ReadBytesExt, WriteBytesExt}; - -use crate::{Erm, WrapErr}; - -const KEY: usize = 42; - -pub fn read_length_prefixed_utf8(r: &mut R) -> Erm where R: std::io::Read { - let len = r.read_u32::()?; - let mut bs = vec![0; len as usize]; - r.read_exact(&mut bs)?; - Ok(String::from_utf8(bs)?) -} - -#[derive(Debug, Clone)] -pub struct BinaryMessage { - pub event: Vec, - pub data: Vec -} - -pub struct BinaryClient { - in_buf: Vec, - out_buf: Vec, - socket: std::net::TcpStream, - poller: polling::Poller, -} -impl BinaryClient { - pub fn new(addr: &str, subs: &[&[u8]]) -> Erm { - let mut socket = std::net::TcpStream::connect(addr).wrap_err("failed to connect to message bus")?; - for s in subs { - write!(socket, "s").wrap_err("failed to send subscribe message to bus")?; - socket.write_u32::(s.len() as u32).wrap_err("failed to send subscribe message length to bus")?; - socket.write_all(s).wrap_err("failed to send subscribe message to bus")?; - } - socket.set_nonblocking(true).wrap_err("failed to set message bus socket nonblocking")?; - socket.flush().wrap_err("failed to flush bus connection")?; - let in_buf = Vec::new(); - let out_buf = Vec::new(); - let poller = polling::Poller::new()?; - unsafe { - poller.add(&socket, polling::Event::all(KEY)).wrap_err("failed to add event to poll")?; - } - Ok(Self { - in_buf, out_buf, - socket, - poller, - }) - } - fn write_length_prefixed(&mut self, buf: &[u8]) -> Erm<()> { - self.out_buf.write_u32::(buf.len() as u32).wrap_err("failed to send message")?; - self.out_buf.write_all(buf).wrap_err("failed to send message")?; - Ok(()) - - } - pub fn publish(&mut self, ev: &[u8], data: &[u8]) -> Erm<()> { - write!(self.out_buf, "p").wrap_err("failed to send publish message to bus")?; - self.write_length_prefixed(ev)?; - self.write_length_prefixed(data)?; - Ok(()) - } - fn pop_incoming_message(&mut self) -> Option { - let mut reader = std::io::Cursor::new(&self.in_buf); - let event_len = reader.read_u32::().ok()?; - let mut event = vec![0 as u8; event_len as usize]; - reader.read_exact(&mut event).ok()?; - let data_len = reader.read_u32::().ok()?; - let mut data = vec![0 as u8; data_len as usize]; - reader.read_exact(&mut data).ok()?; - let len = reader.position() as usize; - self.in_buf.drain(..len); - Some(BinaryMessage { event, data }) - } - pub fn pump(&mut self) -> Erm> { - let mut events = polling::Events::new(); - events.clear(); - self.poller.wait(&mut events, Some(std::time::Duration::from_secs(0))) - .wrap_err("failed to poll message bus")?; - for ev in events.iter() { - if ev.key == KEY { - if ev.readable { - let mut buf = [0; 1024]; - match self.socket.read(&mut buf) { - Ok(sz) => self.in_buf.extend_from_slice(&buf[..sz]), - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}, - e => { e.wrap_err("failed to read from bus socket")?; }, - } - } - if ev.writable && !self.out_buf.is_empty() { - match self.socket.write(&self.out_buf) { - Ok(sz) => { self.out_buf.drain(..sz); }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}, - e => { e.wrap_err("failed to write to bus socket")?; }, - } - } - self.poller.modify(&self.socket, polling::Event::all(KEY)).wrap_err("failed to update event to poll")?; - } - } - Ok(self.pop_incoming_message()) - } -} diff --git a/crates/teleia/src/lib.rs b/crates/teleia/src/lib.rs index 81bc617..5cdf777 100644 --- a/crates/teleia/src/lib.rs +++ b/crates/teleia/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(try_blocks)] + pub mod utils; pub mod ui; pub mod context; @@ -15,9 +17,6 @@ pub mod physics; pub mod save; pub mod level2d; -#[cfg(not(target_arch = "wasm32"))] -pub mod fig; - pub use utils::{erm, install_error_handler, Erm}; pub use audio::AudioPlayback; pub use simple_eyre::eyre::WrapErr; diff --git a/crates/teleia/src/net.rs b/crates/teleia/src/net.rs index b9babe5..0fee34d 100644 --- a/crates/teleia/src/net.rs +++ b/crates/teleia/src/net.rs @@ -1 +1,4 @@ -pub mod client; +pub mod ws; + +#[cfg(not(target_arch = "wasm32"))] +pub mod fig; diff --git a/crates/teleia/src/net/client.rs b/crates/teleia/src/net/client.rs deleted file mode 100644 index fcd51b8..0000000 --- a/crates/teleia/src/net/client.rs +++ /dev/null @@ -1,2 +0,0 @@ -#[cfg(target_arch = "wasm32")] -pub mod wasm; diff --git a/crates/teleia/src/net/client/wasm.rs b/crates/teleia/src/net/client/wasm.rs deleted file mode 100644 index d9936e5..0000000 --- a/crates/teleia/src/net/client/wasm.rs +++ /dev/null @@ -1,67 +0,0 @@ -use std::{collections::VecDeque, sync::{Arc, Mutex}}; - -use wasm_bindgen::prelude::*; - -#[derive(Debug)] -pub enum Message { - Binary(Vec), - Text(String), -} - -impl Message { - pub fn from_messageevent(e: web_sys::MessageEvent) -> Self { - if let Ok(abuf) = e.data().dyn_into::() { - let array = js_sys::Uint8Array::new(&abuf).to_vec(); - Message::Binary(array) - } else if let Ok(txt) = e.data().dyn_into::() { - Message::Text(txt.into()) - } else { - panic!("received weird websocked message: {:?}", e); - } - } -} - -pub struct Client { - pub ws: Option, - pub messages: Arc>>, -} - -impl Client { - pub fn new() -> Self { - Self { - ws: None, - messages: Arc::new(Mutex::new(VecDeque::new())), - } - } - pub fn connect(&mut self, url: &str) { - let ws = web_sys::WebSocket::new(url).expect("failed to open websocket"); - let messages_ref = self.messages.clone(); - let cb: Closure = Closure::new(move |e: web_sys::MessageEvent| { - let msg = Message::from_messageevent(e); - log::info!("incoming: {:?}", msg); - messages_ref.lock().unwrap().push_back(msg); - }); - ws.set_onmessage(Some(cb.as_ref().unchecked_ref())); - cb.forget(); - self.ws = Some(ws); - } - pub fn poll(&mut self) -> Option { - self.messages.lock().unwrap().pop_front() - } - pub fn send(&self, msg: Message) { - if let Some(ws) = &self.ws { - match msg { - Message::Text(txt) => { - if let Err(e) = ws.send_with_str(&txt) { - log::warn!("failed to send string: {:?}", e); - } - }, - Message::Binary(bytes) => { - if let Err(e) = ws.send_with_u8_array(&bytes) { - log::warn!("failed to send bytes: {:?}", e); - } - }, - } - } - } -} diff --git a/crates/teleia/src/net/fig.rs b/crates/teleia/src/net/fig.rs new file mode 100644 index 0000000..9dfbea8 --- /dev/null +++ b/crates/teleia/src/net/fig.rs @@ -0,0 +1,100 @@ +use std::io::{Read, Write}; +use byteorder::{LE, ReadBytesExt, WriteBytesExt}; + +use crate::{Erm, WrapErr}; + +const KEY: usize = 42; + +pub fn read_length_prefixed_utf8(r: &mut R) -> Erm where R: std::io::Read { + let len = r.read_u32::()?; + let mut bs = vec![0; len as usize]; + r.read_exact(&mut bs)?; + Ok(String::from_utf8(bs)?) +} + +#[derive(Debug, Clone)] +pub struct BinaryMessage { + pub event: Vec, + pub data: Vec +} + +pub struct BinaryClient { + in_buf: Vec, + out_buf: Vec, + socket: std::net::TcpStream, + poller: polling::Poller, +} +impl BinaryClient { + pub fn new(addr: &str, subs: &[&[u8]]) -> Erm { + let mut socket = std::net::TcpStream::connect(addr).wrap_err("failed to connect to message bus")?; + for s in subs { + write!(socket, "s").wrap_err("failed to send subscribe message to bus")?; + socket.write_u32::(s.len() as u32).wrap_err("failed to send subscribe message length to bus")?; + socket.write_all(s).wrap_err("failed to send subscribe message to bus")?; + } + socket.set_nonblocking(true).wrap_err("failed to set message bus socket nonblocking")?; + socket.flush().wrap_err("failed to flush bus connection")?; + let in_buf = Vec::new(); + let out_buf = Vec::new(); + let poller = polling::Poller::new()?; + unsafe { + poller.add(&socket, polling::Event::all(KEY)).wrap_err("failed to add event to poll")?; + } + Ok(Self { + in_buf, out_buf, + socket, + poller, + }) + } + fn write_length_prefixed(&mut self, buf: &[u8]) -> Erm<()> { + self.out_buf.write_u32::(buf.len() as u32).wrap_err("failed to send message")?; + self.out_buf.write_all(buf).wrap_err("failed to send message")?; + Ok(()) + + } + pub fn publish(&mut self, ev: &[u8], data: &[u8]) -> Erm<()> { + write!(self.out_buf, "p").wrap_err("failed to send publish message to bus")?; + self.write_length_prefixed(ev)?; + self.write_length_prefixed(data)?; + Ok(()) + } + fn pop_incoming_message(&mut self) -> Option { + let mut reader = std::io::Cursor::new(&self.in_buf); + let event_len = reader.read_u32::().ok()?; + let mut event = vec![0 as u8; event_len as usize]; + reader.read_exact(&mut event).ok()?; + let data_len = reader.read_u32::().ok()?; + let mut data = vec![0 as u8; data_len as usize]; + reader.read_exact(&mut data).ok()?; + let len = reader.position() as usize; + self.in_buf.drain(..len); + Some(BinaryMessage { event, data }) + } + pub fn pump(&mut self) -> Erm> { + let mut events = polling::Events::new(); + events.clear(); + self.poller.wait(&mut events, Some(std::time::Duration::from_secs(0))) + .wrap_err("failed to poll message bus")?; + for ev in events.iter() { + if ev.key == KEY { + if ev.readable { + let mut buf = [0; 1024]; + match self.socket.read(&mut buf) { + Ok(sz) => self.in_buf.extend_from_slice(&buf[..sz]), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}, + e => { e.wrap_err("failed to read from bus socket")?; }, + } + } + if ev.writable && !self.out_buf.is_empty() { + match self.socket.write(&self.out_buf) { + Ok(sz) => { self.out_buf.drain(..sz); }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}, + e => { e.wrap_err("failed to write to bus socket")?; }, + } + } + self.poller.modify(&self.socket, polling::Event::all(KEY)).wrap_err("failed to update event to poll")?; + } + } + Ok(self.pop_incoming_message()) + } +} diff --git a/crates/teleia/src/net/ws.rs b/crates/teleia/src/net/ws.rs new file mode 100644 index 0000000..bedcc29 --- /dev/null +++ b/crates/teleia/src/net/ws.rs @@ -0,0 +1,10 @@ +pub mod client; + +#[cfg(not(target_arch = "wasm32"))] +pub mod server; + +#[derive(Debug)] +pub enum Message { + Binary(Vec), + Text(String), +} diff --git a/crates/teleia/src/net/ws/client.rs b/crates/teleia/src/net/ws/client.rs new file mode 100644 index 0000000..ebd7681 --- /dev/null +++ b/crates/teleia/src/net/ws/client.rs @@ -0,0 +1,9 @@ +#[cfg(target_arch = "wasm32")] +pub mod wasm; +#[cfg(target_arch = "wasm32")] +pub use wasm::*; + +#[cfg(not(target_arch = "wasm32"))] +pub mod native; +#[cfg(not(target_arch = "wasm32"))] +pub use native::*; diff --git a/crates/teleia/src/net/ws/client/native.rs b/crates/teleia/src/net/ws/client/native.rs new file mode 100644 index 0000000..efefdf2 --- /dev/null +++ b/crates/teleia/src/net/ws/client/native.rs @@ -0,0 +1,119 @@ +use std::sync::mpsc::{Sender, Receiver, channel}; +use std::sync::{Arc, Mutex}; +use std::thread::spawn; + +use crate::net::ws::Message; +use crate::{Erm, WrapErr}; + +const KEY: usize = 42; + +pub struct Client { + channels: Arc, Receiver)>>>, +} + +impl Client { + pub fn new() -> Self { + Self { + channels: Arc::new(Mutex::new(None)), + } + } + pub fn connect(&mut self, url: &str) { + let (mut socket, _resp) = if let Ok(v) = tungstenite::connect(url) { v } else { + log::warn!("failed to connect to websocket: {}", url); + return; + }; + let poller = match polling::Poller::new() { + Ok(v) => v, + Err(e) => { + log::warn!("failed to create poller: {}", e); + return; + } + }; + match socket.get_ref() { + tungstenite::stream::MaybeTlsStream::Plain(s) => unsafe { + if let Err(e) = poller.add(s, polling::Event::readable(KEY)) { + log::warn!("failed to add event to poll: {}", e); + let _ = socket.close(None); + return; + }; + }, + tungstenite::stream::MaybeTlsStream::NativeTls(s) => unsafe { + if let Err(e) = poller.add(s.get_ref(), polling::Event::readable(KEY)) { + log::warn!("failed to add event to poll: {}", e); + let _ = socket.close(None); + return; + }; + }, + _ => { + log::warn!("unknown socket type; cannot poll!"); + let _ = socket.close(None); + return; + }, + } + let (outgoing_sender, outgoing_receiver) = channel(); + let (incoming_sender, incoming_receiver) = channel(); + *self.channels.lock().unwrap() = Some((outgoing_sender, incoming_receiver)); + let channels_ref = self.channels.clone(); + let mut events = polling::Events::new(); + spawn(move || loop { + let res: Erm<()> = try { + // send all outgoing messages + while let Ok(msg) = outgoing_receiver.try_recv() { + match msg { + Message::Text(txt) => socket.send(tungstenite::Message::Text(txt)) + .wrap_err("failed to send websocket text")?, + Message::Binary(bytes) => socket.send(tungstenite::Message::Binary(bytes)) + .wrap_err("failed to send websocket bytes")?, + } + } + // wait until we've received data + events.clear(); + poller.wait(&mut events, None).wrap_err("failed to wait on websocket poller")?; + for ev in events.iter() { + if ev.key == KEY { + if ev.readable { match socket.read().wrap_err("failed to recv on websocket")? { + tungstenite::Message::Text(txt) => incoming_sender.send(Message::Text(txt)) + .wrap_err("failed to send incoming websocket message on channel")?, + tungstenite::Message::Binary(bytes) => incoming_sender.send(Message::Binary(bytes)) + .wrap_err("failed to send incoming websocket message on channel")?, + tungstenite::Message::Ping(bs) => socket.send(tungstenite::Message::Pong(bs)) + .wrap_err("failed to send websocket pong")?, + m => log::warn!("unhandled incoming websocket message: {}", m), + }} + match socket.get_ref() { + tungstenite::stream::MaybeTlsStream::Plain(s) => + poller.modify(s, polling::Event::readable(KEY)) + .wrap_err("failed to modify event to poll")?, + tungstenite::stream::MaybeTlsStream::NativeTls(s) => + poller.modify(s.get_ref(), polling::Event::readable(KEY)) + .wrap_err("failed to modify event to poll")?, + _ => log::warn!("unknown socket type; cannot modify polling!"), + } + } + } + }; + if let Err(e) = res { + log::warn!("error in websocket thread: {}", e); + *channels_ref.lock().unwrap() = None; + return; + } + }); + } + pub fn is_connected(&self) -> bool { + self.channels.lock().unwrap().is_some() + } + pub fn poll(&mut self) -> Option { + if let Some((_, incoming)) = &*self.channels.lock().unwrap() { + incoming.try_recv().ok() + } else { None } + } + pub fn send(&self, msg: Message) { + if let Some((outgoing, _)) = &mut *self.channels.lock().unwrap() { + if let Err(e) = outgoing.send(msg) { + log::warn!("failed to send websocket message on channel: {}", e); + } + } else { + log::warn!("tried to send message, but websocket is not connected"); + } + } +} diff --git a/crates/teleia/src/net/ws/client/wasm.rs b/crates/teleia/src/net/ws/client/wasm.rs new file mode 100644 index 0000000..a8942c4 --- /dev/null +++ b/crates/teleia/src/net/ws/client/wasm.rs @@ -0,0 +1,73 @@ +use std::{collections::VecDeque, sync::{Arc, Mutex}}; + +use wasm_bindgen::prelude::*; + +use crate::net::ws::Message; + +impl Message { + fn from_messageevent(e: web_sys::MessageEvent) -> Self { + if let Ok(abuf) = e.data().dyn_into::() { + let array = js_sys::Uint8Array::new(&abuf).to_vec(); + Message::Binary(array) + } else if let Ok(txt) = e.data().dyn_into::() { + Message::Text(txt.into()) + } else { + panic!("received weird websocket message: {:?}", e); + } + } +} + +pub struct Client { + ws: Arc>>, + messages: Arc>>, +} + +impl Client { + pub fn new() -> Self { + Self { + ws: Arc::new(Mutex::new(None)), + messages: Arc::new(Mutex::new(VecDeque::new())), + } + } + pub fn connect(&mut self, url: &str) { + let ws_ref = self.ws.clone(); + let messages_ref = self.messages.clone(); + let cb: Closure = Closure::new(move |e: web_sys::MessageEvent| { + let msg = Message::from_messageevent(e); + log::info!("incoming: {:?}", msg); + messages_ref.lock().unwrap().push_back(msg); + }); + let close_cb: Closure = Closure::new(move |_: web_sys::MessageEvent| { + log::info!("closed!"); + *ws_ref.lock().unwrap() = None; + }); + let ws = web_sys::WebSocket::new(url).expect("failed to open websocket"); + ws.set_onmessage(Some(cb.as_ref().unchecked_ref())); + ws.set_onclose(Some(close_cb.as_ref().unchecked_ref())); + cb.forget(); + close_cb.forget(); + *self.ws.lock().unwrap() = Some(ws); + } + pub fn is_connected(&self) -> bool { + self.ws.lock().unwrap().is_none() + } + pub fn poll(&mut self) -> Option { + self.messages.lock().unwrap().pop_front() + } + pub fn send(&self, msg: Message) { + if let Some(ws) = &*self.ws.lock().unwrap() { + match msg { + Message::Text(txt) => { + if let Err(e) = ws.send_with_str(&txt) { + log::warn!("failed to send string: {:?}", e); + } + }, + Message::Binary(bytes) => { + if let Err(e) = ws.send_with_u8_array(&bytes) { + log::warn!("failed to send bytes: {:?}", e); + } + }, + } + } + } +} diff --git a/crates/teleia/src/net/ws/server.rs b/crates/teleia/src/net/ws/server.rs new file mode 100644 index 0000000..60537a1 --- /dev/null +++ b/crates/teleia/src/net/ws/server.rs @@ -0,0 +1,71 @@ +use std::collections::HashMap; +use std::net::{TcpListener, TcpStream}; +use std::sync::mpsc::{Sender, Receiver, channel}; +use std::sync::{Arc, Mutex}; +use std::thread::spawn; + +use crate::net::ws::Message; +use crate::{Erm, WrapErr}; + +const ACCEPT_KEY: usize = 1; + +type ClientId = usize; +type ClientMessage = (ClientId, Message); + +pub struct Server { + clients: Arc>>>, + channels: Arc, Receiver)>>>, +} + +impl Server { + pub fn new() -> Self { + Self { + clients: Arc::new(Mutex::new(HashMap::new())), + channels: Arc::new(Mutex::new(None)), + } + } + pub fn start(&mut self, addr: &str) { + let clients_ref = self.clients.clone(); + let (outgoing_sender, outgoing_receiver) = channel(); + let (incoming_sender, incoming_receiver) = channel(); + *self.channels.lock().unwrap() = Some((outgoing_sender, incoming_receiver)); + let channels_ref = self.channels.clone(); + let listener = TcpListener::bind(addr).expect("failed to bind server socket"); + let poller = polling::Poller::new().expect("failed to create poller for websocket server"); + unsafe { + poller.add(&listener, polling::Event::readable(ACCEPT_KEY)) + .expect("failed to add poll server socket"); + } + let mut events = polling::Events::new(); + let mut next_id = ACCEPT_KEY + 1; + spawn(move || loop { + let res: Erm<()> = try { + events.clear(); + poller.wait(&mut events, None).wrap_err("failed to wait on websocket server poller")?; + for ev in events.iter() { + if ev.key == ACCEPT_KEY { + let (sock, client_addr) = listener.accept().wrap_err("failed to accept on socket")?; + let id = next_id; next_id += 1; + let res = try { + unsafe { poller.add(&sock, polling::Event::readable(id)) + .wrap_err("failed to add socket to poller")?; } + let conn = tungstenite::accept(sock).wrap_err("failed to upgrade to websocket")?; + clients_ref.lock().unwrap().insert(id, conn); + }; + if let Err(e) = res { + log::warn!("error during connection from {}: {}", client_addr, e); + }; + poller.modify(&listener, polling::Event::readable(ACCEPT_KEY)) + .wrap_err("failed to modify event to poll")?; + } else { + } + } + }; + if let Err(e) = res { + log::warn!("unhandled error in websocket thread, stopping server: {}", e); + *channels_ref.lock().unwrap() = None; + return; + } + }); + } +} diff --git a/crates/teleia/src/save.rs b/crates/teleia/src/save.rs index dea4332..60bbb43 100644 --- a/crates/teleia/src/save.rs +++ b/crates/teleia/src/save.rs @@ -1,3 +1,4 @@ +#[cfg(target_arch = "wasm32")] use base64::prelude::*; #[cfg(target_arch = "wasm32")] @@ -32,7 +33,6 @@ pub fn save(id: &str, data: &W) where W: serde::Serialize { let _ = std::fs::create_dir_all(pd.data_dir()); let path = pd.data_dir().join("teleia.save"); let mut file = std::fs::File::create(&path).expect("failed to open save file"); - // serde_json::to_writer(file, data).expect("failed to write save file"); bincode::serde::encode_into_std_write(data, &mut file, bincode::config::standard()) .expect("failed to write save file"); } @@ -43,6 +43,5 @@ pub fn load(id: &str) -> Option where W: serde::de::DeserializeOwned { let _ = std::fs::create_dir_all(pd.data_dir()); let path = pd.data_dir().join("teleia.save"); let mut file = std::fs::File::open(&path).ok()?; - // serde_json::from_reader(file).ok() bincode::serde::decode_from_std_read(&mut file, bincode::config::standard()).ok() } diff --git a/crates/teleia/src/state.rs b/crates/teleia/src/state.rs index 76be948..9c288a2 100644 --- a/crates/teleia/src/state.rs +++ b/crates/teleia/src/state.rs @@ -9,20 +9,6 @@ use crate::{audio, context, framebuffer, mesh, shader, utils}; const DELTA_TIME: f64 = 0.016; // todo -// pub struct WinitWaker {} -// impl WinitWaker { -// fn new() -> Self { Self {} } -// } -// impl std::task::Wake for WinitWaker { -// fn wake(self: std::sync::Arc) {} -// } - -// pub struct Response { -// pub url: String, -// pub status: reqwest::StatusCode, -// pub body: bytes::Bytes, -// } - pub trait Game { fn initialize(&self, ctx: &context::Context, st: &State) -> utils::Erm<()> { Ok(()) } fn finalize(&self, ctx: &context::Context, st: &State) -> utils::Erm<()> { Ok(()) } @@ -35,7 +21,6 @@ pub trait Game { fn finish_title(&mut self, st: &mut State) {} fn mouse_move(&mut self, ctx: &context::Context, st: &mut State, x: i32, y: i32) {} fn mouse_press(&mut self, ctx: &context::Context, st: &mut State) {} - // fn request_return(&mut self, ctx: &context::Context, st: &mut State, res: Response) {} fn update(&mut self, ctx: &context::Context, st: &mut State) -> utils::Erm<()> { Ok(()) } fn render(&mut self, ctx: &context::Context, st: &mut State) -> utils::Erm<()> { Ok(()) } } @@ -188,10 +173,6 @@ pub struct State { pub lighting: (glam::Vec3, glam::Vec3, glam::Vec3), pub point_lights: Vec, - // pub waker_ctx: std::task::Context<'static>, - // pub http_client: reqwest::Client, - // pub request: Option>>>>, - pub log: Vec<(u64, String)>, } @@ -258,10 +239,6 @@ impl State { ); let mesh_square = mesh::Mesh::from_obj(ctx, include_bytes!("assets/meshes/square.obj")); - // let waker = std::sync::Arc::new(WinitWaker::new()); - // let cwaker = Box::leak(Box::new(waker.into())); - // let waker_ctx = std::task::Context::from_waker(cwaker); - let nextframe = now(ctx); Self { @@ -521,30 +498,6 @@ impl State { self.rebinding = Some(k); } - // pub fn request(&mut self, f: F) - // where F: Fn(&reqwest::Client) -> reqwest::RequestBuilder - // { - // let builder = f(&self.http_client); - // let fut = async { - // let resp = builder.send().await?; - // let url = resp.url().clone().to_string(); - // let status = resp.status().clone(); - // let body = resp.bytes().await?; - // reqwest::Result::Ok(Response { - // url, - // status, - // body, - // }) - // }; - // self.request = Some(Box::pin(fut)); - // } - // pub fn requesting(&self) -> bool { self.request.is_some() } - // pub fn request_returned(&mut self, ctx: &context::Context, game: &mut G, res: Response) - // where G: Game - // { - // game.request_return(ctx, self, res); - // } - pub fn run_update(&mut self, ctx: &context::Context, game: &mut G) -> utils::Erm<()> where G: Game { let now = now(ctx); if now > self.nextframe { -- cgit v1.2.3