diff options
Diffstat (limited to 'crates')
| -rw-r--r-- | crates/teleia/Cargo.toml | 3 | ||||
| -rw-r--r-- | crates/teleia/src/audio.rs | 63 | ||||
| -rw-r--r-- | crates/teleia/src/lib.rs | 5 | ||||
| -rw-r--r-- | crates/teleia/src/net.rs | 5 | ||||
| -rw-r--r-- | crates/teleia/src/net/client.rs | 2 | ||||
| -rw-r--r-- | crates/teleia/src/net/fig.rs (renamed from crates/teleia/src/fig.rs) | 0 | ||||
| -rw-r--r-- | crates/teleia/src/net/ws.rs | 10 | ||||
| -rw-r--r-- | crates/teleia/src/net/ws/client.rs | 9 | ||||
| -rw-r--r-- | crates/teleia/src/net/ws/client/native.rs | 119 | ||||
| -rw-r--r-- | crates/teleia/src/net/ws/client/wasm.rs (renamed from crates/teleia/src/net/client/wasm.rs) | 32 | ||||
| -rw-r--r-- | crates/teleia/src/net/ws/server.rs | 71 | ||||
| -rw-r--r-- | crates/teleia/src/save.rs | 3 | ||||
| -rw-r--r-- | crates/teleia/src/state.rs | 47 |
13 files changed, 283 insertions, 86 deletions
diff --git a/crates/teleia/Cargo.toml b/crates/teleia/Cargo.toml index 66edbc8..18718a4 100644 --- a/crates/teleia/Cargo.toml +++ b/crates/teleia/Cargo.toml @@ -48,4 +48,5 @@ env_logger = "*" # configurable logging to stdout glfw = { git = "https://github.com/lcolonq/glfw-rs", features = ["serde"] } # window management kira = { version = "=0.9.6", default-features = false, features = ["cpal", "ogg", "wav"] } # audio directories = { git = "https://github.com/lcolonq/directories-rs" } # standard system directories -polling = "*" # interface to epoll
\ No newline at end of file +polling = "*" # interface to epoll +tungstenite = { version = "*", features = ["native-tls"] } # websockets
\ No newline at end of file diff --git a/crates/teleia/src/audio.rs b/crates/teleia/src/audio.rs index e95a86d..8106c20 100644 --- a/crates/teleia/src/audio.rs +++ b/crates/teleia/src/audio.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; -#[cfg(target_arch = "wasm32")] use std::sync::{Arc, Mutex}; #[cfg(target_arch = "wasm32")] @@ -71,7 +70,10 @@ impl Audio { } } - pub fn play(&self, ctx: &Context, looping: Option<(Option<f64>, Option<f64>)>) -> Option<AudioPlayingHandle> { + pub fn play(&self, + ctx: &mut Context, + looping: Option<(Option<f64>, Option<f64>)> + ) -> Option<AudioPlayingHandle> { let source = ctx.audio.create_buffer_source().ok()?; if let Some(ab) = &*self.buffer.lock().unwrap() { source.set_buffer(Some(&ab)); @@ -149,24 +151,54 @@ impl Context { } #[cfg(not(target_arch = "wasm32"))] +pub struct AudioPlayingHandle { + handle: Arc<Mutex<kira::sound::static_sound::StaticSoundHandle>> +} + +#[cfg(not(target_arch = "wasm32"))] +impl AudioPlayingHandle { + pub fn stop(&self, _ctx: &Context) { + self.handle.lock().unwrap().stop(kira::tween::Tween::default()); + } + pub fn fade_out(&self, _ctx: &Context, time: f32) { + self.handle.lock().unwrap().stop(kira::tween::Tween { + duration: std::time::Duration::from_secs_f32(time + 0.5), + ..Default::default() + }); + } +} + +#[cfg(not(target_arch = "wasm32"))] pub struct Audio { data: kira::sound::static_sound::StaticSoundData, } #[cfg(not(target_arch = "wasm32"))] impl Audio { - pub fn new(_ctx: &Context, bytes: &'static [u8]) -> Self { + pub fn new(_ctx: &Context, bytes: &[u8]) -> Self { Self { - data: kira::sound::static_sound::StaticSoundData::from_cursor(std::io::Cursor::new(bytes)) + data: kira::sound::static_sound::StaticSoundData::from_cursor(std::io::Cursor::new(bytes.to_owned())) .expect("failed to decode audio"), } } + pub fn from_samples(_ctx: &Context, sample_rate: f32, samples: &[f32]) -> Self { + let frames: Vec<kira::Frame> = samples.iter().map(|f| kira::Frame { left: *f, right: *f }).collect(); + Self { + data: kira::sound::static_sound::StaticSoundData { + sample_rate: sample_rate as u32, + frames: frames.into(), + settings: kira::sound::static_sound::StaticSoundSettings::default(), + slice: None, + }, + } + } + pub fn play( &self, ctx: &mut Context, looping: Option<(Option<f64>, Option<f64>)> - ) -> Result<kira::sound::static_sound::StaticSoundHandle, String> + ) -> Option<AudioPlayingHandle> { let sd = if let Some((ss, se)) = looping { let start = if let Some(s) = ss { s } else { 0.0 }; @@ -178,10 +210,7 @@ impl Audio { } else { self.data.clone() }; - match ctx.manager.play(sd) { - Ok(h) => Ok(h), - Err(e) => Err(e.to_string()), - } + ctx.manager.play(sd).ok().map(|h| AudioPlayingHandle { handle: Arc::new(Mutex::new(h)) }) } } @@ -189,7 +218,7 @@ impl Audio { pub struct Assets { pub ctx: Context, pub audio: HashMap<String, Audio>, - pub music_handle: Option<kira::sound::static_sound::StaticSoundHandle>, + pub music_handle: Option<AudioPlayingHandle>, } #[cfg(not(target_arch = "wasm32"))] @@ -208,29 +237,29 @@ impl Assets { pub fn play_sfx(&mut self, name: &str) { if let Some(a) = self.audio.get(name) { - if let Err(e) = a.play(&mut self.ctx, None) { - log::warn!("failed to play sound {}: {}", name, e); + if a.play(&mut self.ctx, None).is_none() { + log::warn!("failed to play sound {}", name); } } } pub fn is_music_playing(&self) -> bool { if let Some(mh) = &self.music_handle { - mh.state() == kira::sound::PlaybackState::Playing + mh.handle.lock().unwrap().state() == kira::sound::PlaybackState::Playing } else { false } } pub fn play_music(&mut self, name: &str, start: Option<f64>, end: Option<f64>) { if let Some(s) = &mut self.music_handle { - let _ = s.stop(kira::tween::Tween::default()); + let _ = s.stop(&self.ctx); } if let Some(a) = self.audio.get(name) { match a.play(&mut self.ctx, Some((start, end))) { - Ok(h) => { + Some(h) => { self.music_handle = Some(h); }, - Err(e) => { - log::warn!("failed to play music {}: {}", name, e); + _ => { + log::warn!("failed to play music {}", name); } } } diff --git a/crates/teleia/src/lib.rs b/crates/teleia/src/lib.rs index 81bc617..5cdf777 100644 --- a/crates/teleia/src/lib.rs +++ b/crates/teleia/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(try_blocks)] + pub mod utils; pub mod ui; pub mod context; @@ -15,9 +17,6 @@ pub mod physics; pub mod save; pub mod level2d; -#[cfg(not(target_arch = "wasm32"))] -pub mod fig; - pub use utils::{erm, install_error_handler, Erm}; pub use audio::AudioPlayback; pub use simple_eyre::eyre::WrapErr; diff --git a/crates/teleia/src/net.rs b/crates/teleia/src/net.rs index b9babe5..0fee34d 100644 --- a/crates/teleia/src/net.rs +++ b/crates/teleia/src/net.rs @@ -1 +1,4 @@ -pub mod client; +pub mod ws; + +#[cfg(not(target_arch = "wasm32"))] +pub mod fig; diff --git a/crates/teleia/src/net/client.rs b/crates/teleia/src/net/client.rs deleted file mode 100644 index fcd51b8..0000000 --- a/crates/teleia/src/net/client.rs +++ /dev/null @@ -1,2 +0,0 @@ -#[cfg(target_arch = "wasm32")] -pub mod wasm; diff --git a/crates/teleia/src/fig.rs b/crates/teleia/src/net/fig.rs index 9dfbea8..9dfbea8 100644 --- a/crates/teleia/src/fig.rs +++ b/crates/teleia/src/net/fig.rs diff --git a/crates/teleia/src/net/ws.rs b/crates/teleia/src/net/ws.rs new file mode 100644 index 0000000..bedcc29 --- /dev/null +++ b/crates/teleia/src/net/ws.rs @@ -0,0 +1,10 @@ +pub mod client; + +#[cfg(not(target_arch = "wasm32"))] +pub mod server; + +#[derive(Debug)] +pub enum Message { + Binary(Vec<u8>), + Text(String), +} diff --git a/crates/teleia/src/net/ws/client.rs b/crates/teleia/src/net/ws/client.rs new file mode 100644 index 0000000..ebd7681 --- /dev/null +++ b/crates/teleia/src/net/ws/client.rs @@ -0,0 +1,9 @@ +#[cfg(target_arch = "wasm32")] +pub mod wasm; +#[cfg(target_arch = "wasm32")] +pub use wasm::*; + +#[cfg(not(target_arch = "wasm32"))] +pub mod native; +#[cfg(not(target_arch = "wasm32"))] +pub use native::*; diff --git a/crates/teleia/src/net/ws/client/native.rs b/crates/teleia/src/net/ws/client/native.rs new file mode 100644 index 0000000..efefdf2 --- /dev/null +++ b/crates/teleia/src/net/ws/client/native.rs @@ -0,0 +1,119 @@ +use std::sync::mpsc::{Sender, Receiver, channel}; +use std::sync::{Arc, Mutex}; +use std::thread::spawn; + +use crate::net::ws::Message; +use crate::{Erm, WrapErr}; + +const KEY: usize = 42; + +pub struct Client { + channels: Arc<Mutex<Option<(Sender<Message>, Receiver<Message>)>>>, +} + +impl Client { + pub fn new() -> Self { + Self { + channels: Arc::new(Mutex::new(None)), + } + } + pub fn connect(&mut self, url: &str) { + let (mut socket, _resp) = if let Ok(v) = tungstenite::connect(url) { v } else { + log::warn!("failed to connect to websocket: {}", url); + return; + }; + let poller = match polling::Poller::new() { + Ok(v) => v, + Err(e) => { + log::warn!("failed to create poller: {}", e); + return; + } + }; + match socket.get_ref() { + tungstenite::stream::MaybeTlsStream::Plain(s) => unsafe { + if let Err(e) = poller.add(s, polling::Event::readable(KEY)) { + log::warn!("failed to add event to poll: {}", e); + let _ = socket.close(None); + return; + }; + }, + tungstenite::stream::MaybeTlsStream::NativeTls(s) => unsafe { + if let Err(e) = poller.add(s.get_ref(), polling::Event::readable(KEY)) { + log::warn!("failed to add event to poll: {}", e); + let _ = socket.close(None); + return; + }; + }, + _ => { + log::warn!("unknown socket type; cannot poll!"); + let _ = socket.close(None); + return; + }, + } + let (outgoing_sender, outgoing_receiver) = channel(); + let (incoming_sender, incoming_receiver) = channel(); + *self.channels.lock().unwrap() = Some((outgoing_sender, incoming_receiver)); + let channels_ref = self.channels.clone(); + let mut events = polling::Events::new(); + spawn(move || loop { + let res: Erm<()> = try { + // send all outgoing messages + while let Ok(msg) = outgoing_receiver.try_recv() { + match msg { + Message::Text(txt) => socket.send(tungstenite::Message::Text(txt)) + .wrap_err("failed to send websocket text")?, + Message::Binary(bytes) => socket.send(tungstenite::Message::Binary(bytes)) + .wrap_err("failed to send websocket bytes")?, + } + } + // wait until we've received data + events.clear(); + poller.wait(&mut events, None).wrap_err("failed to wait on websocket poller")?; + for ev in events.iter() { + if ev.key == KEY { + if ev.readable { match socket.read().wrap_err("failed to recv on websocket")? { + tungstenite::Message::Text(txt) => incoming_sender.send(Message::Text(txt)) + .wrap_err("failed to send incoming websocket message on channel")?, + tungstenite::Message::Binary(bytes) => incoming_sender.send(Message::Binary(bytes)) + .wrap_err("failed to send incoming websocket message on channel")?, + tungstenite::Message::Ping(bs) => socket.send(tungstenite::Message::Pong(bs)) + .wrap_err("failed to send websocket pong")?, + m => log::warn!("unhandled incoming websocket message: {}", m), + }} + match socket.get_ref() { + tungstenite::stream::MaybeTlsStream::Plain(s) => + poller.modify(s, polling::Event::readable(KEY)) + .wrap_err("failed to modify event to poll")?, + tungstenite::stream::MaybeTlsStream::NativeTls(s) => + poller.modify(s.get_ref(), polling::Event::readable(KEY)) + .wrap_err("failed to modify event to poll")?, + _ => log::warn!("unknown socket type; cannot modify polling!"), + } + } + } + }; + if let Err(e) = res { + log::warn!("error in websocket thread: {}", e); + *channels_ref.lock().unwrap() = None; + return; + } + }); + } + pub fn is_connected(&self) -> bool { + self.channels.lock().unwrap().is_some() + } + pub fn poll(&mut self) -> Option<Message> { + if let Some((_, incoming)) = &*self.channels.lock().unwrap() { + incoming.try_recv().ok() + } else { None } + } + pub fn send(&self, msg: Message) { + if let Some((outgoing, _)) = &mut *self.channels.lock().unwrap() { + if let Err(e) = outgoing.send(msg) { + log::warn!("failed to send websocket message on channel: {}", e); + } + } else { + log::warn!("tried to send message, but websocket is not connected"); + } + } +} diff --git a/crates/teleia/src/net/client/wasm.rs b/crates/teleia/src/net/ws/client/wasm.rs index d9936e5..a8942c4 100644 --- a/crates/teleia/src/net/client/wasm.rs +++ b/crates/teleia/src/net/ws/client/wasm.rs @@ -2,54 +2,60 @@ use std::{collections::VecDeque, sync::{Arc, Mutex}}; use wasm_bindgen::prelude::*; -#[derive(Debug)] -pub enum Message { - Binary(Vec<u8>), - Text(String), -} +use crate::net::ws::Message; impl Message { - pub fn from_messageevent(e: web_sys::MessageEvent) -> Self { + fn from_messageevent(e: web_sys::MessageEvent) -> Self { if let Ok(abuf) = e.data().dyn_into::<js_sys::ArrayBuffer>() { let array = js_sys::Uint8Array::new(&abuf).to_vec(); Message::Binary(array) } else if let Ok(txt) = e.data().dyn_into::<js_sys::JsString>() { Message::Text(txt.into()) } else { - panic!("received weird websocked message: {:?}", e); + panic!("received weird websocket message: {:?}", e); } } } pub struct Client { - pub ws: Option<web_sys::WebSocket>, - pub messages: Arc<Mutex<VecDeque<Message>>>, + ws: Arc<Mutex<Option<web_sys::WebSocket>>>, + messages: Arc<Mutex<VecDeque<Message>>>, } impl Client { pub fn new() -> Self { Self { - ws: None, + ws: Arc::new(Mutex::new(None)), messages: Arc::new(Mutex::new(VecDeque::new())), } } pub fn connect(&mut self, url: &str) { - let ws = web_sys::WebSocket::new(url).expect("failed to open websocket"); + let ws_ref = self.ws.clone(); let messages_ref = self.messages.clone(); let cb: Closure<dyn Fn(web_sys::MessageEvent)> = Closure::new(move |e: web_sys::MessageEvent| { let msg = Message::from_messageevent(e); log::info!("incoming: {:?}", msg); messages_ref.lock().unwrap().push_back(msg); }); + let close_cb: Closure<dyn Fn(web_sys::MessageEvent)> = Closure::new(move |_: web_sys::MessageEvent| { + log::info!("closed!"); + *ws_ref.lock().unwrap() = None; + }); + let ws = web_sys::WebSocket::new(url).expect("failed to open websocket"); ws.set_onmessage(Some(cb.as_ref().unchecked_ref())); + ws.set_onclose(Some(close_cb.as_ref().unchecked_ref())); cb.forget(); - self.ws = Some(ws); + close_cb.forget(); + *self.ws.lock().unwrap() = Some(ws); + } + pub fn is_connected(&self) -> bool { + self.ws.lock().unwrap().is_none() } pub fn poll(&mut self) -> Option<Message> { self.messages.lock().unwrap().pop_front() } pub fn send(&self, msg: Message) { - if let Some(ws) = &self.ws { + if let Some(ws) = &*self.ws.lock().unwrap() { match msg { Message::Text(txt) => { if let Err(e) = ws.send_with_str(&txt) { diff --git a/crates/teleia/src/net/ws/server.rs b/crates/teleia/src/net/ws/server.rs new file mode 100644 index 0000000..60537a1 --- /dev/null +++ b/crates/teleia/src/net/ws/server.rs @@ -0,0 +1,71 @@ +use std::collections::HashMap; +use std::net::{TcpListener, TcpStream}; +use std::sync::mpsc::{Sender, Receiver, channel}; +use std::sync::{Arc, Mutex}; +use std::thread::spawn; + +use crate::net::ws::Message; +use crate::{Erm, WrapErr}; + +const ACCEPT_KEY: usize = 1; + +type ClientId = usize; +type ClientMessage = (ClientId, Message); + +pub struct Server { + clients: Arc<Mutex<HashMap<ClientId, tungstenite::WebSocket<TcpStream>>>>, + channels: Arc<Mutex<Option<(Sender<ClientMessage>, Receiver<ClientMessage>)>>>, +} + +impl Server { + pub fn new() -> Self { + Self { + clients: Arc::new(Mutex::new(HashMap::new())), + channels: Arc::new(Mutex::new(None)), + } + } + pub fn start(&mut self, addr: &str) { + let clients_ref = self.clients.clone(); + let (outgoing_sender, outgoing_receiver) = channel(); + let (incoming_sender, incoming_receiver) = channel(); + *self.channels.lock().unwrap() = Some((outgoing_sender, incoming_receiver)); + let channels_ref = self.channels.clone(); + let listener = TcpListener::bind(addr).expect("failed to bind server socket"); + let poller = polling::Poller::new().expect("failed to create poller for websocket server"); + unsafe { + poller.add(&listener, polling::Event::readable(ACCEPT_KEY)) + .expect("failed to add poll server socket"); + } + let mut events = polling::Events::new(); + let mut next_id = ACCEPT_KEY + 1; + spawn(move || loop { + let res: Erm<()> = try { + events.clear(); + poller.wait(&mut events, None).wrap_err("failed to wait on websocket server poller")?; + for ev in events.iter() { + if ev.key == ACCEPT_KEY { + let (sock, client_addr) = listener.accept().wrap_err("failed to accept on socket")?; + let id = next_id; next_id += 1; + let res = try { + unsafe { poller.add(&sock, polling::Event::readable(id)) + .wrap_err("failed to add socket to poller")?; } + let conn = tungstenite::accept(sock).wrap_err("failed to upgrade to websocket")?; + clients_ref.lock().unwrap().insert(id, conn); + }; + if let Err(e) = res { + log::warn!("error during connection from {}: {}", client_addr, e); + }; + poller.modify(&listener, polling::Event::readable(ACCEPT_KEY)) + .wrap_err("failed to modify event to poll")?; + } else { + } + } + }; + if let Err(e) = res { + log::warn!("unhandled error in websocket thread, stopping server: {}", e); + *channels_ref.lock().unwrap() = None; + return; + } + }); + } +} diff --git a/crates/teleia/src/save.rs b/crates/teleia/src/save.rs index dea4332..60bbb43 100644 --- a/crates/teleia/src/save.rs +++ b/crates/teleia/src/save.rs @@ -1,3 +1,4 @@ +#[cfg(target_arch = "wasm32")] use base64::prelude::*; #[cfg(target_arch = "wasm32")] @@ -32,7 +33,6 @@ pub fn save<W>(id: &str, data: &W) where W: serde::Serialize { let _ = std::fs::create_dir_all(pd.data_dir()); let path = pd.data_dir().join("teleia.save"); let mut file = std::fs::File::create(&path).expect("failed to open save file"); - // serde_json::to_writer(file, data).expect("failed to write save file"); bincode::serde::encode_into_std_write(data, &mut file, bincode::config::standard()) .expect("failed to write save file"); } @@ -43,6 +43,5 @@ pub fn load<W>(id: &str) -> Option<W> where W: serde::de::DeserializeOwned { let _ = std::fs::create_dir_all(pd.data_dir()); let path = pd.data_dir().join("teleia.save"); let mut file = std::fs::File::open(&path).ok()?; - // serde_json::from_reader(file).ok() bincode::serde::decode_from_std_read(&mut file, bincode::config::standard()).ok() } diff --git a/crates/teleia/src/state.rs b/crates/teleia/src/state.rs index 76be948..9c288a2 100644 --- a/crates/teleia/src/state.rs +++ b/crates/teleia/src/state.rs @@ -9,20 +9,6 @@ use crate::{audio, context, framebuffer, mesh, shader, utils}; const DELTA_TIME: f64 = 0.016; // todo -// pub struct WinitWaker {} -// impl WinitWaker { -// fn new() -> Self { Self {} } -// } -// impl std::task::Wake for WinitWaker { -// fn wake(self: std::sync::Arc<Self>) {} -// } - -// pub struct Response { -// pub url: String, -// pub status: reqwest::StatusCode, -// pub body: bytes::Bytes, -// } - pub trait Game { fn initialize(&self, ctx: &context::Context, st: &State) -> utils::Erm<()> { Ok(()) } fn finalize(&self, ctx: &context::Context, st: &State) -> utils::Erm<()> { Ok(()) } @@ -35,7 +21,6 @@ pub trait Game { fn finish_title(&mut self, st: &mut State) {} fn mouse_move(&mut self, ctx: &context::Context, st: &mut State, x: i32, y: i32) {} fn mouse_press(&mut self, ctx: &context::Context, st: &mut State) {} - // fn request_return(&mut self, ctx: &context::Context, st: &mut State, res: Response) {} fn update(&mut self, ctx: &context::Context, st: &mut State) -> utils::Erm<()> { Ok(()) } fn render(&mut self, ctx: &context::Context, st: &mut State) -> utils::Erm<()> { Ok(()) } } @@ -188,10 +173,6 @@ pub struct State { pub lighting: (glam::Vec3, glam::Vec3, glam::Vec3), pub point_lights: Vec<PointLight>, - // pub waker_ctx: std::task::Context<'static>, - // pub http_client: reqwest::Client, - // pub request: Option<std::pin::Pin<Box<dyn std::future::Future<Output = reqwest::Result<Response>>>>>, - pub log: Vec<(u64, String)>, } @@ -258,10 +239,6 @@ impl State { ); let mesh_square = mesh::Mesh::from_obj(ctx, include_bytes!("assets/meshes/square.obj")); - // let waker = std::sync::Arc::new(WinitWaker::new()); - // let cwaker = Box::leak(Box::new(waker.into())); - // let waker_ctx = std::task::Context::from_waker(cwaker); - let nextframe = now(ctx); Self { @@ -521,30 +498,6 @@ impl State { self.rebinding = Some(k); } - // pub fn request<F>(&mut self, f: F) - // where F: Fn(&reqwest::Client) -> reqwest::RequestBuilder - // { - // let builder = f(&self.http_client); - // let fut = async { - // let resp = builder.send().await?; - // let url = resp.url().clone().to_string(); - // let status = resp.status().clone(); - // let body = resp.bytes().await?; - // reqwest::Result::Ok(Response { - // url, - // status, - // body, - // }) - // }; - // self.request = Some(Box::pin(fut)); - // } - // pub fn requesting(&self) -> bool { self.request.is_some() } - // pub fn request_returned<G>(&mut self, ctx: &context::Context, game: &mut G, res: Response) - // where G: Game - // { - // game.request_return(ctx, self, res); - // } - pub fn run_update<G>(&mut self, ctx: &context::Context, game: &mut G) -> utils::Erm<()> where G: Game { let now = now(ctx); if now > self.nextframe { |
