From 836760c3c5f80674def4c0994c6b0d20f08ae148 Mon Sep 17 00:00:00 2001 From: LLLL Colonq Date: Sun, 25 Jan 2026 16:16:10 -0500 Subject: Networking --- .dir-locals.el | 2 +- .gitignore | 3 +- Cargo.lock | 268 +++++++++++++++++++++++++++++- crates/teleia/Cargo.toml | 3 +- crates/teleia/src/audio.rs | 63 +++++-- crates/teleia/src/fig.rs | 100 ----------- crates/teleia/src/lib.rs | 5 +- crates/teleia/src/net.rs | 5 +- crates/teleia/src/net/client.rs | 2 - crates/teleia/src/net/client/wasm.rs | 67 -------- crates/teleia/src/net/fig.rs | 100 +++++++++++ crates/teleia/src/net/ws.rs | 10 ++ crates/teleia/src/net/ws/client.rs | 9 + crates/teleia/src/net/ws/client/native.rs | 119 +++++++++++++ crates/teleia/src/net/ws/client/wasm.rs | 73 ++++++++ crates/teleia/src/net/ws/server.rs | 71 ++++++++ crates/teleia/src/save.rs | 3 +- crates/teleia/src/state.rs | 47 ------ 18 files changed, 702 insertions(+), 248 deletions(-) delete mode 100644 crates/teleia/src/fig.rs delete mode 100644 crates/teleia/src/net/client.rs delete mode 100644 crates/teleia/src/net/client/wasm.rs create mode 100644 crates/teleia/src/net/fig.rs create mode 100644 crates/teleia/src/net/ws.rs create mode 100644 crates/teleia/src/net/ws/client.rs create mode 100644 crates/teleia/src/net/ws/client/native.rs create mode 100644 crates/teleia/src/net/ws/client/wasm.rs create mode 100644 crates/teleia/src/net/ws/server.rs diff --git a/.dir-locals.el b/.dir-locals.el index 7d26985..9a84ba1 100644 --- a/.dir-locals.el +++ b/.dir-locals.el @@ -2,7 +2,7 @@ ((eglot-workspace-configuration . (:rust-analyzer ( :cargo - ( :target "wasm32-unknown-unknown" + ( ;; :target "wasm32-unknown-unknown" :targetDir t) :hover (:show (:fields 10)))))))) diff --git a/.gitignore b/.gitignore index caf058f..7011f7f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /.direnv /target /.cargo -/dist \ No newline at end of file +/dist +/result \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 2ef0a87..50375a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -263,6 +263,15 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "block-sys" version = "0.2.1" @@ -471,7 +480,7 @@ dependencies = [ "bitflags 1.3.2", "core-foundation", "core-graphics-types", - "foreign-types", + "foreign-types 0.5.0", "libc", ] @@ -552,6 +561,15 @@ dependencies = [ "windows", ] +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + [[package]] name = "crc32fast" version = "1.4.0" @@ -617,6 +635,16 @@ version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" +[[package]] +name = "crypto-common" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "cursor-icon" version = "1.1.0" @@ -632,6 +660,22 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c87e182de0887fd5361989c677c4e8f5000cd9491d6d563161a8f3a5519fc7f" +[[package]] +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "directories" version = "6.0.0" @@ -781,6 +825,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "fdeflate" version = "0.3.4" @@ -828,6 +878,15 @@ dependencies = [ "ttf-parser 0.20.0", ] +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared 0.1.1", +] + [[package]] name = "foreign-types" version = "0.5.0" @@ -835,7 +894,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d737d9aa519fb7b749cbc3b962edcf310a8dd1f4b67c91c4f83975dbdd17d965" dependencies = [ "foreign-types-macros", - "foreign-types-shared", + "foreign-types-shared 0.3.1", ] [[package]] @@ -849,12 +908,28 @@ dependencies = [ "syn", ] +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "foreign-types-shared" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aa9a19cbb55df58761df49b23516a86d432839add4af60fc256da840f66ed35b" +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "gethostname" version = "0.4.3" @@ -995,6 +1070,22 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "http" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a" +dependencies = [ + "bytes", + "itoa", +] + +[[package]] +name = "httparse" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" + [[package]] name = "humantime" version = "2.1.0" @@ -1102,10 +1193,11 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.68" +version = "0.3.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "406cda4b368d531c842222cf9d2600a9a4acce8d29423695379c6868a143a9ee" +checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" dependencies = [ + "once_cell", "wasm-bindgen", ] @@ -1133,9 +1225,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.153" +version = "0.2.179" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +checksum = "c5a2d376baa530d1238d133232d15e239abad80d05838b4b59354e5268af431f" [[package]] name = "libloading" @@ -1276,6 +1368,23 @@ dependencies = [ "syn", ] +[[package]] +name = "native-tls" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "ndk" version = "0.8.0" @@ -1473,6 +1582,50 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "openssl" +version = "0.10.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08838db121398ad17ab8531ce9de97b244589089e290a384c900cb9ff7434328" +dependencies = [ + "bitflags 2.6.0", + "cfg-if", + "foreign-types 0.3.2", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "openssl-probe" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" + +[[package]] +name = "openssl-sys" +version = "0.9.111" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82cab2d520aa75e3c58898289429321eb788c3106963d0dc886ec7a5f4adc321" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -1902,6 +2055,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "scoped-tls" version = "1.0.1" @@ -1921,6 +2083,29 @@ dependencies = [ "tiny-skia", ] +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags 2.6.0", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc1f0cbffaac4852523ce30d8bd3c5cdc873501d96ff467ca09b6767bb8cd5c0" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "self_cell" version = "1.2.1" @@ -1965,6 +2150,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -2285,6 +2481,7 @@ dependencies = [ "strum", "tobj", "tracing-wasm", + "tungstenite", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", @@ -2301,6 +2498,19 @@ dependencies = [ "walkdir", ] +[[package]] +name = "tempfile" +version = "3.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04cbcdd0c794ebb0d4cf35e88edd2f7d2c4c3e9a5a6dab322839b321c6a87a64" +dependencies = [ + "cfg-if", + "fastrand", + "once_cell", + "rustix", + "windows-sys 0.59.0", +] + [[package]] name = "thiserror" version = "1.0.57" @@ -2491,6 +2701,25 @@ version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c591d83f69777866b9126b24c6dd9a18351f177e49d625920d19f989fd31cf8" +[[package]] +name = "tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "native-tls", + "rand", + "sha1", + "thiserror 1.0.57", + "utf-8", +] + [[package]] name = "typenum" version = "1.18.0" @@ -2557,12 +2786,24 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8parse" version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.4" @@ -2851,6 +3092,12 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + [[package]] name = "windows-result" version = "0.1.2" @@ -2896,6 +3143,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link", +] + [[package]] name = "windows-targets" version = "0.42.2" diff --git a/crates/teleia/Cargo.toml b/crates/teleia/Cargo.toml index 66edbc8..18718a4 100644 --- a/crates/teleia/Cargo.toml +++ b/crates/teleia/Cargo.toml @@ -48,4 +48,5 @@ env_logger = "*" # configurable logging to stdout glfw = { git = "https://github.com/lcolonq/glfw-rs", features = ["serde"] } # window management kira = { version = "=0.9.6", default-features = false, features = ["cpal", "ogg", "wav"] } # audio directories = { git = "https://github.com/lcolonq/directories-rs" } # standard system directories -polling = "*" # interface to epoll \ No newline at end of file +polling = "*" # interface to epoll +tungstenite = { version = "*", features = ["native-tls"] } # websockets \ No newline at end of file diff --git a/crates/teleia/src/audio.rs b/crates/teleia/src/audio.rs index e95a86d..8106c20 100644 --- a/crates/teleia/src/audio.rs +++ b/crates/teleia/src/audio.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; -#[cfg(target_arch = "wasm32")] use std::sync::{Arc, Mutex}; #[cfg(target_arch = "wasm32")] @@ -71,7 +70,10 @@ impl Audio { } } - pub fn play(&self, ctx: &Context, looping: Option<(Option, Option)>) -> Option { + pub fn play(&self, + ctx: &mut Context, + looping: Option<(Option, Option)> + ) -> Option { let source = ctx.audio.create_buffer_source().ok()?; if let Some(ab) = &*self.buffer.lock().unwrap() { source.set_buffer(Some(&ab)); @@ -148,6 +150,24 @@ impl Context { } } +#[cfg(not(target_arch = "wasm32"))] +pub struct AudioPlayingHandle { + handle: Arc> +} + +#[cfg(not(target_arch = "wasm32"))] +impl AudioPlayingHandle { + pub fn stop(&self, _ctx: &Context) { + self.handle.lock().unwrap().stop(kira::tween::Tween::default()); + } + pub fn fade_out(&self, _ctx: &Context, time: f32) { + self.handle.lock().unwrap().stop(kira::tween::Tween { + duration: std::time::Duration::from_secs_f32(time + 0.5), + ..Default::default() + }); + } +} + #[cfg(not(target_arch = "wasm32"))] pub struct Audio { data: kira::sound::static_sound::StaticSoundData, @@ -155,18 +175,30 @@ pub struct Audio { #[cfg(not(target_arch = "wasm32"))] impl Audio { - pub fn new(_ctx: &Context, bytes: &'static [u8]) -> Self { + pub fn new(_ctx: &Context, bytes: &[u8]) -> Self { Self { - data: kira::sound::static_sound::StaticSoundData::from_cursor(std::io::Cursor::new(bytes)) + data: kira::sound::static_sound::StaticSoundData::from_cursor(std::io::Cursor::new(bytes.to_owned())) .expect("failed to decode audio"), } } + pub fn from_samples(_ctx: &Context, sample_rate: f32, samples: &[f32]) -> Self { + let frames: Vec = samples.iter().map(|f| kira::Frame { left: *f, right: *f }).collect(); + Self { + data: kira::sound::static_sound::StaticSoundData { + sample_rate: sample_rate as u32, + frames: frames.into(), + settings: kira::sound::static_sound::StaticSoundSettings::default(), + slice: None, + }, + } + } + pub fn play( &self, ctx: &mut Context, looping: Option<(Option, Option)> - ) -> Result + ) -> Option { let sd = if let Some((ss, se)) = looping { let start = if let Some(s) = ss { s } else { 0.0 }; @@ -178,10 +210,7 @@ impl Audio { } else { self.data.clone() }; - match ctx.manager.play(sd) { - Ok(h) => Ok(h), - Err(e) => Err(e.to_string()), - } + ctx.manager.play(sd).ok().map(|h| AudioPlayingHandle { handle: Arc::new(Mutex::new(h)) }) } } @@ -189,7 +218,7 @@ impl Audio { pub struct Assets { pub ctx: Context, pub audio: HashMap, - pub music_handle: Option, + pub music_handle: Option, } #[cfg(not(target_arch = "wasm32"))] @@ -208,29 +237,29 @@ impl Assets { pub fn play_sfx(&mut self, name: &str) { if let Some(a) = self.audio.get(name) { - if let Err(e) = a.play(&mut self.ctx, None) { - log::warn!("failed to play sound {}: {}", name, e); + if a.play(&mut self.ctx, None).is_none() { + log::warn!("failed to play sound {}", name); } } } pub fn is_music_playing(&self) -> bool { if let Some(mh) = &self.music_handle { - mh.state() == kira::sound::PlaybackState::Playing + mh.handle.lock().unwrap().state() == kira::sound::PlaybackState::Playing } else { false } } pub fn play_music(&mut self, name: &str, start: Option, end: Option) { if let Some(s) = &mut self.music_handle { - let _ = s.stop(kira::tween::Tween::default()); + let _ = s.stop(&self.ctx); } if let Some(a) = self.audio.get(name) { match a.play(&mut self.ctx, Some((start, end))) { - Ok(h) => { + Some(h) => { self.music_handle = Some(h); }, - Err(e) => { - log::warn!("failed to play music {}: {}", name, e); + _ => { + log::warn!("failed to play music {}", name); } } } diff --git a/crates/teleia/src/fig.rs b/crates/teleia/src/fig.rs deleted file mode 100644 index 9dfbea8..0000000 --- a/crates/teleia/src/fig.rs +++ /dev/null @@ -1,100 +0,0 @@ -use std::io::{Read, Write}; -use byteorder::{LE, ReadBytesExt, WriteBytesExt}; - -use crate::{Erm, WrapErr}; - -const KEY: usize = 42; - -pub fn read_length_prefixed_utf8(r: &mut R) -> Erm where R: std::io::Read { - let len = r.read_u32::()?; - let mut bs = vec![0; len as usize]; - r.read_exact(&mut bs)?; - Ok(String::from_utf8(bs)?) -} - -#[derive(Debug, Clone)] -pub struct BinaryMessage { - pub event: Vec, - pub data: Vec -} - -pub struct BinaryClient { - in_buf: Vec, - out_buf: Vec, - socket: std::net::TcpStream, - poller: polling::Poller, -} -impl BinaryClient { - pub fn new(addr: &str, subs: &[&[u8]]) -> Erm { - let mut socket = std::net::TcpStream::connect(addr).wrap_err("failed to connect to message bus")?; - for s in subs { - write!(socket, "s").wrap_err("failed to send subscribe message to bus")?; - socket.write_u32::(s.len() as u32).wrap_err("failed to send subscribe message length to bus")?; - socket.write_all(s).wrap_err("failed to send subscribe message to bus")?; - } - socket.set_nonblocking(true).wrap_err("failed to set message bus socket nonblocking")?; - socket.flush().wrap_err("failed to flush bus connection")?; - let in_buf = Vec::new(); - let out_buf = Vec::new(); - let poller = polling::Poller::new()?; - unsafe { - poller.add(&socket, polling::Event::all(KEY)).wrap_err("failed to add event to poll")?; - } - Ok(Self { - in_buf, out_buf, - socket, - poller, - }) - } - fn write_length_prefixed(&mut self, buf: &[u8]) -> Erm<()> { - self.out_buf.write_u32::(buf.len() as u32).wrap_err("failed to send message")?; - self.out_buf.write_all(buf).wrap_err("failed to send message")?; - Ok(()) - - } - pub fn publish(&mut self, ev: &[u8], data: &[u8]) -> Erm<()> { - write!(self.out_buf, "p").wrap_err("failed to send publish message to bus")?; - self.write_length_prefixed(ev)?; - self.write_length_prefixed(data)?; - Ok(()) - } - fn pop_incoming_message(&mut self) -> Option { - let mut reader = std::io::Cursor::new(&self.in_buf); - let event_len = reader.read_u32::().ok()?; - let mut event = vec![0 as u8; event_len as usize]; - reader.read_exact(&mut event).ok()?; - let data_len = reader.read_u32::().ok()?; - let mut data = vec![0 as u8; data_len as usize]; - reader.read_exact(&mut data).ok()?; - let len = reader.position() as usize; - self.in_buf.drain(..len); - Some(BinaryMessage { event, data }) - } - pub fn pump(&mut self) -> Erm> { - let mut events = polling::Events::new(); - events.clear(); - self.poller.wait(&mut events, Some(std::time::Duration::from_secs(0))) - .wrap_err("failed to poll message bus")?; - for ev in events.iter() { - if ev.key == KEY { - if ev.readable { - let mut buf = [0; 1024]; - match self.socket.read(&mut buf) { - Ok(sz) => self.in_buf.extend_from_slice(&buf[..sz]), - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}, - e => { e.wrap_err("failed to read from bus socket")?; }, - } - } - if ev.writable && !self.out_buf.is_empty() { - match self.socket.write(&self.out_buf) { - Ok(sz) => { self.out_buf.drain(..sz); }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}, - e => { e.wrap_err("failed to write to bus socket")?; }, - } - } - self.poller.modify(&self.socket, polling::Event::all(KEY)).wrap_err("failed to update event to poll")?; - } - } - Ok(self.pop_incoming_message()) - } -} diff --git a/crates/teleia/src/lib.rs b/crates/teleia/src/lib.rs index 81bc617..5cdf777 100644 --- a/crates/teleia/src/lib.rs +++ b/crates/teleia/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(try_blocks)] + pub mod utils; pub mod ui; pub mod context; @@ -15,9 +17,6 @@ pub mod physics; pub mod save; pub mod level2d; -#[cfg(not(target_arch = "wasm32"))] -pub mod fig; - pub use utils::{erm, install_error_handler, Erm}; pub use audio::AudioPlayback; pub use simple_eyre::eyre::WrapErr; diff --git a/crates/teleia/src/net.rs b/crates/teleia/src/net.rs index b9babe5..0fee34d 100644 --- a/crates/teleia/src/net.rs +++ b/crates/teleia/src/net.rs @@ -1 +1,4 @@ -pub mod client; +pub mod ws; + +#[cfg(not(target_arch = "wasm32"))] +pub mod fig; diff --git a/crates/teleia/src/net/client.rs b/crates/teleia/src/net/client.rs deleted file mode 100644 index fcd51b8..0000000 --- a/crates/teleia/src/net/client.rs +++ /dev/null @@ -1,2 +0,0 @@ -#[cfg(target_arch = "wasm32")] -pub mod wasm; diff --git a/crates/teleia/src/net/client/wasm.rs b/crates/teleia/src/net/client/wasm.rs deleted file mode 100644 index d9936e5..0000000 --- a/crates/teleia/src/net/client/wasm.rs +++ /dev/null @@ -1,67 +0,0 @@ -use std::{collections::VecDeque, sync::{Arc, Mutex}}; - -use wasm_bindgen::prelude::*; - -#[derive(Debug)] -pub enum Message { - Binary(Vec), - Text(String), -} - -impl Message { - pub fn from_messageevent(e: web_sys::MessageEvent) -> Self { - if let Ok(abuf) = e.data().dyn_into::() { - let array = js_sys::Uint8Array::new(&abuf).to_vec(); - Message::Binary(array) - } else if let Ok(txt) = e.data().dyn_into::() { - Message::Text(txt.into()) - } else { - panic!("received weird websocked message: {:?}", e); - } - } -} - -pub struct Client { - pub ws: Option, - pub messages: Arc>>, -} - -impl Client { - pub fn new() -> Self { - Self { - ws: None, - messages: Arc::new(Mutex::new(VecDeque::new())), - } - } - pub fn connect(&mut self, url: &str) { - let ws = web_sys::WebSocket::new(url).expect("failed to open websocket"); - let messages_ref = self.messages.clone(); - let cb: Closure = Closure::new(move |e: web_sys::MessageEvent| { - let msg = Message::from_messageevent(e); - log::info!("incoming: {:?}", msg); - messages_ref.lock().unwrap().push_back(msg); - }); - ws.set_onmessage(Some(cb.as_ref().unchecked_ref())); - cb.forget(); - self.ws = Some(ws); - } - pub fn poll(&mut self) -> Option { - self.messages.lock().unwrap().pop_front() - } - pub fn send(&self, msg: Message) { - if let Some(ws) = &self.ws { - match msg { - Message::Text(txt) => { - if let Err(e) = ws.send_with_str(&txt) { - log::warn!("failed to send string: {:?}", e); - } - }, - Message::Binary(bytes) => { - if let Err(e) = ws.send_with_u8_array(&bytes) { - log::warn!("failed to send bytes: {:?}", e); - } - }, - } - } - } -} diff --git a/crates/teleia/src/net/fig.rs b/crates/teleia/src/net/fig.rs new file mode 100644 index 0000000..9dfbea8 --- /dev/null +++ b/crates/teleia/src/net/fig.rs @@ -0,0 +1,100 @@ +use std::io::{Read, Write}; +use byteorder::{LE, ReadBytesExt, WriteBytesExt}; + +use crate::{Erm, WrapErr}; + +const KEY: usize = 42; + +pub fn read_length_prefixed_utf8(r: &mut R) -> Erm where R: std::io::Read { + let len = r.read_u32::()?; + let mut bs = vec![0; len as usize]; + r.read_exact(&mut bs)?; + Ok(String::from_utf8(bs)?) +} + +#[derive(Debug, Clone)] +pub struct BinaryMessage { + pub event: Vec, + pub data: Vec +} + +pub struct BinaryClient { + in_buf: Vec, + out_buf: Vec, + socket: std::net::TcpStream, + poller: polling::Poller, +} +impl BinaryClient { + pub fn new(addr: &str, subs: &[&[u8]]) -> Erm { + let mut socket = std::net::TcpStream::connect(addr).wrap_err("failed to connect to message bus")?; + for s in subs { + write!(socket, "s").wrap_err("failed to send subscribe message to bus")?; + socket.write_u32::(s.len() as u32).wrap_err("failed to send subscribe message length to bus")?; + socket.write_all(s).wrap_err("failed to send subscribe message to bus")?; + } + socket.set_nonblocking(true).wrap_err("failed to set message bus socket nonblocking")?; + socket.flush().wrap_err("failed to flush bus connection")?; + let in_buf = Vec::new(); + let out_buf = Vec::new(); + let poller = polling::Poller::new()?; + unsafe { + poller.add(&socket, polling::Event::all(KEY)).wrap_err("failed to add event to poll")?; + } + Ok(Self { + in_buf, out_buf, + socket, + poller, + }) + } + fn write_length_prefixed(&mut self, buf: &[u8]) -> Erm<()> { + self.out_buf.write_u32::(buf.len() as u32).wrap_err("failed to send message")?; + self.out_buf.write_all(buf).wrap_err("failed to send message")?; + Ok(()) + + } + pub fn publish(&mut self, ev: &[u8], data: &[u8]) -> Erm<()> { + write!(self.out_buf, "p").wrap_err("failed to send publish message to bus")?; + self.write_length_prefixed(ev)?; + self.write_length_prefixed(data)?; + Ok(()) + } + fn pop_incoming_message(&mut self) -> Option { + let mut reader = std::io::Cursor::new(&self.in_buf); + let event_len = reader.read_u32::().ok()?; + let mut event = vec![0 as u8; event_len as usize]; + reader.read_exact(&mut event).ok()?; + let data_len = reader.read_u32::().ok()?; + let mut data = vec![0 as u8; data_len as usize]; + reader.read_exact(&mut data).ok()?; + let len = reader.position() as usize; + self.in_buf.drain(..len); + Some(BinaryMessage { event, data }) + } + pub fn pump(&mut self) -> Erm> { + let mut events = polling::Events::new(); + events.clear(); + self.poller.wait(&mut events, Some(std::time::Duration::from_secs(0))) + .wrap_err("failed to poll message bus")?; + for ev in events.iter() { + if ev.key == KEY { + if ev.readable { + let mut buf = [0; 1024]; + match self.socket.read(&mut buf) { + Ok(sz) => self.in_buf.extend_from_slice(&buf[..sz]), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}, + e => { e.wrap_err("failed to read from bus socket")?; }, + } + } + if ev.writable && !self.out_buf.is_empty() { + match self.socket.write(&self.out_buf) { + Ok(sz) => { self.out_buf.drain(..sz); }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}, + e => { e.wrap_err("failed to write to bus socket")?; }, + } + } + self.poller.modify(&self.socket, polling::Event::all(KEY)).wrap_err("failed to update event to poll")?; + } + } + Ok(self.pop_incoming_message()) + } +} diff --git a/crates/teleia/src/net/ws.rs b/crates/teleia/src/net/ws.rs new file mode 100644 index 0000000..bedcc29 --- /dev/null +++ b/crates/teleia/src/net/ws.rs @@ -0,0 +1,10 @@ +pub mod client; + +#[cfg(not(target_arch = "wasm32"))] +pub mod server; + +#[derive(Debug)] +pub enum Message { + Binary(Vec), + Text(String), +} diff --git a/crates/teleia/src/net/ws/client.rs b/crates/teleia/src/net/ws/client.rs new file mode 100644 index 0000000..ebd7681 --- /dev/null +++ b/crates/teleia/src/net/ws/client.rs @@ -0,0 +1,9 @@ +#[cfg(target_arch = "wasm32")] +pub mod wasm; +#[cfg(target_arch = "wasm32")] +pub use wasm::*; + +#[cfg(not(target_arch = "wasm32"))] +pub mod native; +#[cfg(not(target_arch = "wasm32"))] +pub use native::*; diff --git a/crates/teleia/src/net/ws/client/native.rs b/crates/teleia/src/net/ws/client/native.rs new file mode 100644 index 0000000..efefdf2 --- /dev/null +++ b/crates/teleia/src/net/ws/client/native.rs @@ -0,0 +1,119 @@ +use std::sync::mpsc::{Sender, Receiver, channel}; +use std::sync::{Arc, Mutex}; +use std::thread::spawn; + +use crate::net::ws::Message; +use crate::{Erm, WrapErr}; + +const KEY: usize = 42; + +pub struct Client { + channels: Arc, Receiver)>>>, +} + +impl Client { + pub fn new() -> Self { + Self { + channels: Arc::new(Mutex::new(None)), + } + } + pub fn connect(&mut self, url: &str) { + let (mut socket, _resp) = if let Ok(v) = tungstenite::connect(url) { v } else { + log::warn!("failed to connect to websocket: {}", url); + return; + }; + let poller = match polling::Poller::new() { + Ok(v) => v, + Err(e) => { + log::warn!("failed to create poller: {}", e); + return; + } + }; + match socket.get_ref() { + tungstenite::stream::MaybeTlsStream::Plain(s) => unsafe { + if let Err(e) = poller.add(s, polling::Event::readable(KEY)) { + log::warn!("failed to add event to poll: {}", e); + let _ = socket.close(None); + return; + }; + }, + tungstenite::stream::MaybeTlsStream::NativeTls(s) => unsafe { + if let Err(e) = poller.add(s.get_ref(), polling::Event::readable(KEY)) { + log::warn!("failed to add event to poll: {}", e); + let _ = socket.close(None); + return; + }; + }, + _ => { + log::warn!("unknown socket type; cannot poll!"); + let _ = socket.close(None); + return; + }, + } + let (outgoing_sender, outgoing_receiver) = channel(); + let (incoming_sender, incoming_receiver) = channel(); + *self.channels.lock().unwrap() = Some((outgoing_sender, incoming_receiver)); + let channels_ref = self.channels.clone(); + let mut events = polling::Events::new(); + spawn(move || loop { + let res: Erm<()> = try { + // send all outgoing messages + while let Ok(msg) = outgoing_receiver.try_recv() { + match msg { + Message::Text(txt) => socket.send(tungstenite::Message::Text(txt)) + .wrap_err("failed to send websocket text")?, + Message::Binary(bytes) => socket.send(tungstenite::Message::Binary(bytes)) + .wrap_err("failed to send websocket bytes")?, + } + } + // wait until we've received data + events.clear(); + poller.wait(&mut events, None).wrap_err("failed to wait on websocket poller")?; + for ev in events.iter() { + if ev.key == KEY { + if ev.readable { match socket.read().wrap_err("failed to recv on websocket")? { + tungstenite::Message::Text(txt) => incoming_sender.send(Message::Text(txt)) + .wrap_err("failed to send incoming websocket message on channel")?, + tungstenite::Message::Binary(bytes) => incoming_sender.send(Message::Binary(bytes)) + .wrap_err("failed to send incoming websocket message on channel")?, + tungstenite::Message::Ping(bs) => socket.send(tungstenite::Message::Pong(bs)) + .wrap_err("failed to send websocket pong")?, + m => log::warn!("unhandled incoming websocket message: {}", m), + }} + match socket.get_ref() { + tungstenite::stream::MaybeTlsStream::Plain(s) => + poller.modify(s, polling::Event::readable(KEY)) + .wrap_err("failed to modify event to poll")?, + tungstenite::stream::MaybeTlsStream::NativeTls(s) => + poller.modify(s.get_ref(), polling::Event::readable(KEY)) + .wrap_err("failed to modify event to poll")?, + _ => log::warn!("unknown socket type; cannot modify polling!"), + } + } + } + }; + if let Err(e) = res { + log::warn!("error in websocket thread: {}", e); + *channels_ref.lock().unwrap() = None; + return; + } + }); + } + pub fn is_connected(&self) -> bool { + self.channels.lock().unwrap().is_some() + } + pub fn poll(&mut self) -> Option { + if let Some((_, incoming)) = &*self.channels.lock().unwrap() { + incoming.try_recv().ok() + } else { None } + } + pub fn send(&self, msg: Message) { + if let Some((outgoing, _)) = &mut *self.channels.lock().unwrap() { + if let Err(e) = outgoing.send(msg) { + log::warn!("failed to send websocket message on channel: {}", e); + } + } else { + log::warn!("tried to send message, but websocket is not connected"); + } + } +} diff --git a/crates/teleia/src/net/ws/client/wasm.rs b/crates/teleia/src/net/ws/client/wasm.rs new file mode 100644 index 0000000..a8942c4 --- /dev/null +++ b/crates/teleia/src/net/ws/client/wasm.rs @@ -0,0 +1,73 @@ +use std::{collections::VecDeque, sync::{Arc, Mutex}}; + +use wasm_bindgen::prelude::*; + +use crate::net::ws::Message; + +impl Message { + fn from_messageevent(e: web_sys::MessageEvent) -> Self { + if let Ok(abuf) = e.data().dyn_into::() { + let array = js_sys::Uint8Array::new(&abuf).to_vec(); + Message::Binary(array) + } else if let Ok(txt) = e.data().dyn_into::() { + Message::Text(txt.into()) + } else { + panic!("received weird websocket message: {:?}", e); + } + } +} + +pub struct Client { + ws: Arc>>, + messages: Arc>>, +} + +impl Client { + pub fn new() -> Self { + Self { + ws: Arc::new(Mutex::new(None)), + messages: Arc::new(Mutex::new(VecDeque::new())), + } + } + pub fn connect(&mut self, url: &str) { + let ws_ref = self.ws.clone(); + let messages_ref = self.messages.clone(); + let cb: Closure = Closure::new(move |e: web_sys::MessageEvent| { + let msg = Message::from_messageevent(e); + log::info!("incoming: {:?}", msg); + messages_ref.lock().unwrap().push_back(msg); + }); + let close_cb: Closure = Closure::new(move |_: web_sys::MessageEvent| { + log::info!("closed!"); + *ws_ref.lock().unwrap() = None; + }); + let ws = web_sys::WebSocket::new(url).expect("failed to open websocket"); + ws.set_onmessage(Some(cb.as_ref().unchecked_ref())); + ws.set_onclose(Some(close_cb.as_ref().unchecked_ref())); + cb.forget(); + close_cb.forget(); + *self.ws.lock().unwrap() = Some(ws); + } + pub fn is_connected(&self) -> bool { + self.ws.lock().unwrap().is_none() + } + pub fn poll(&mut self) -> Option { + self.messages.lock().unwrap().pop_front() + } + pub fn send(&self, msg: Message) { + if let Some(ws) = &*self.ws.lock().unwrap() { + match msg { + Message::Text(txt) => { + if let Err(e) = ws.send_with_str(&txt) { + log::warn!("failed to send string: {:?}", e); + } + }, + Message::Binary(bytes) => { + if let Err(e) = ws.send_with_u8_array(&bytes) { + log::warn!("failed to send bytes: {:?}", e); + } + }, + } + } + } +} diff --git a/crates/teleia/src/net/ws/server.rs b/crates/teleia/src/net/ws/server.rs new file mode 100644 index 0000000..60537a1 --- /dev/null +++ b/crates/teleia/src/net/ws/server.rs @@ -0,0 +1,71 @@ +use std::collections::HashMap; +use std::net::{TcpListener, TcpStream}; +use std::sync::mpsc::{Sender, Receiver, channel}; +use std::sync::{Arc, Mutex}; +use std::thread::spawn; + +use crate::net::ws::Message; +use crate::{Erm, WrapErr}; + +const ACCEPT_KEY: usize = 1; + +type ClientId = usize; +type ClientMessage = (ClientId, Message); + +pub struct Server { + clients: Arc>>>, + channels: Arc, Receiver)>>>, +} + +impl Server { + pub fn new() -> Self { + Self { + clients: Arc::new(Mutex::new(HashMap::new())), + channels: Arc::new(Mutex::new(None)), + } + } + pub fn start(&mut self, addr: &str) { + let clients_ref = self.clients.clone(); + let (outgoing_sender, outgoing_receiver) = channel(); + let (incoming_sender, incoming_receiver) = channel(); + *self.channels.lock().unwrap() = Some((outgoing_sender, incoming_receiver)); + let channels_ref = self.channels.clone(); + let listener = TcpListener::bind(addr).expect("failed to bind server socket"); + let poller = polling::Poller::new().expect("failed to create poller for websocket server"); + unsafe { + poller.add(&listener, polling::Event::readable(ACCEPT_KEY)) + .expect("failed to add poll server socket"); + } + let mut events = polling::Events::new(); + let mut next_id = ACCEPT_KEY + 1; + spawn(move || loop { + let res: Erm<()> = try { + events.clear(); + poller.wait(&mut events, None).wrap_err("failed to wait on websocket server poller")?; + for ev in events.iter() { + if ev.key == ACCEPT_KEY { + let (sock, client_addr) = listener.accept().wrap_err("failed to accept on socket")?; + let id = next_id; next_id += 1; + let res = try { + unsafe { poller.add(&sock, polling::Event::readable(id)) + .wrap_err("failed to add socket to poller")?; } + let conn = tungstenite::accept(sock).wrap_err("failed to upgrade to websocket")?; + clients_ref.lock().unwrap().insert(id, conn); + }; + if let Err(e) = res { + log::warn!("error during connection from {}: {}", client_addr, e); + }; + poller.modify(&listener, polling::Event::readable(ACCEPT_KEY)) + .wrap_err("failed to modify event to poll")?; + } else { + } + } + }; + if let Err(e) = res { + log::warn!("unhandled error in websocket thread, stopping server: {}", e); + *channels_ref.lock().unwrap() = None; + return; + } + }); + } +} diff --git a/crates/teleia/src/save.rs b/crates/teleia/src/save.rs index dea4332..60bbb43 100644 --- a/crates/teleia/src/save.rs +++ b/crates/teleia/src/save.rs @@ -1,3 +1,4 @@ +#[cfg(target_arch = "wasm32")] use base64::prelude::*; #[cfg(target_arch = "wasm32")] @@ -32,7 +33,6 @@ pub fn save(id: &str, data: &W) where W: serde::Serialize { let _ = std::fs::create_dir_all(pd.data_dir()); let path = pd.data_dir().join("teleia.save"); let mut file = std::fs::File::create(&path).expect("failed to open save file"); - // serde_json::to_writer(file, data).expect("failed to write save file"); bincode::serde::encode_into_std_write(data, &mut file, bincode::config::standard()) .expect("failed to write save file"); } @@ -43,6 +43,5 @@ pub fn load(id: &str) -> Option where W: serde::de::DeserializeOwned { let _ = std::fs::create_dir_all(pd.data_dir()); let path = pd.data_dir().join("teleia.save"); let mut file = std::fs::File::open(&path).ok()?; - // serde_json::from_reader(file).ok() bincode::serde::decode_from_std_read(&mut file, bincode::config::standard()).ok() } diff --git a/crates/teleia/src/state.rs b/crates/teleia/src/state.rs index 76be948..9c288a2 100644 --- a/crates/teleia/src/state.rs +++ b/crates/teleia/src/state.rs @@ -9,20 +9,6 @@ use crate::{audio, context, framebuffer, mesh, shader, utils}; const DELTA_TIME: f64 = 0.016; // todo -// pub struct WinitWaker {} -// impl WinitWaker { -// fn new() -> Self { Self {} } -// } -// impl std::task::Wake for WinitWaker { -// fn wake(self: std::sync::Arc) {} -// } - -// pub struct Response { -// pub url: String, -// pub status: reqwest::StatusCode, -// pub body: bytes::Bytes, -// } - pub trait Game { fn initialize(&self, ctx: &context::Context, st: &State) -> utils::Erm<()> { Ok(()) } fn finalize(&self, ctx: &context::Context, st: &State) -> utils::Erm<()> { Ok(()) } @@ -35,7 +21,6 @@ pub trait Game { fn finish_title(&mut self, st: &mut State) {} fn mouse_move(&mut self, ctx: &context::Context, st: &mut State, x: i32, y: i32) {} fn mouse_press(&mut self, ctx: &context::Context, st: &mut State) {} - // fn request_return(&mut self, ctx: &context::Context, st: &mut State, res: Response) {} fn update(&mut self, ctx: &context::Context, st: &mut State) -> utils::Erm<()> { Ok(()) } fn render(&mut self, ctx: &context::Context, st: &mut State) -> utils::Erm<()> { Ok(()) } } @@ -188,10 +173,6 @@ pub struct State { pub lighting: (glam::Vec3, glam::Vec3, glam::Vec3), pub point_lights: Vec, - // pub waker_ctx: std::task::Context<'static>, - // pub http_client: reqwest::Client, - // pub request: Option>>>>, - pub log: Vec<(u64, String)>, } @@ -258,10 +239,6 @@ impl State { ); let mesh_square = mesh::Mesh::from_obj(ctx, include_bytes!("assets/meshes/square.obj")); - // let waker = std::sync::Arc::new(WinitWaker::new()); - // let cwaker = Box::leak(Box::new(waker.into())); - // let waker_ctx = std::task::Context::from_waker(cwaker); - let nextframe = now(ctx); Self { @@ -521,30 +498,6 @@ impl State { self.rebinding = Some(k); } - // pub fn request(&mut self, f: F) - // where F: Fn(&reqwest::Client) -> reqwest::RequestBuilder - // { - // let builder = f(&self.http_client); - // let fut = async { - // let resp = builder.send().await?; - // let url = resp.url().clone().to_string(); - // let status = resp.status().clone(); - // let body = resp.bytes().await?; - // reqwest::Result::Ok(Response { - // url, - // status, - // body, - // }) - // }; - // self.request = Some(Box::pin(fut)); - // } - // pub fn requesting(&self) -> bool { self.request.is_some() } - // pub fn request_returned(&mut self, ctx: &context::Context, game: &mut G, res: Response) - // where G: Game - // { - // game.request_return(ctx, self, res); - // } - pub fn run_update(&mut self, ctx: &context::Context, game: &mut G) -> utils::Erm<()> where G: Game { let now = now(ctx); if now > self.nextframe { -- cgit v1.2.3