summaryrefslogtreecommitdiff
path: root/crates
diff options
context:
space:
mode:
Diffstat (limited to 'crates')
-rw-r--r--crates/teleia/src/lib.rs2
-rw-r--r--crates/teleia/src/net/ws.rs2
-rw-r--r--crates/teleia/src/net/ws/client/native.rs73
-rw-r--r--crates/teleia/src/net/ws/server.rs80
-rw-r--r--crates/teleia/src/state.rs4
5 files changed, 120 insertions, 41 deletions
diff --git a/crates/teleia/src/lib.rs b/crates/teleia/src/lib.rs
index 5cdf777..df1a528 100644
--- a/crates/teleia/src/lib.rs
+++ b/crates/teleia/src/lib.rs
@@ -62,7 +62,7 @@ where
pub fn run<'a, F, G>(title: &str, w: u32, h: u32, options: Options, gnew: F) -> Erm<()>
where
G: state::Game + 'static,
- F: (Fn(&'a context::Context) -> G),
+ F: (FnOnce(&'a context::Context) -> G),
{
env_logger::Builder::new()
.filter(None, log::LevelFilter::Info)
diff --git a/crates/teleia/src/net/ws.rs b/crates/teleia/src/net/ws.rs
index bedcc29..8ea1173 100644
--- a/crates/teleia/src/net/ws.rs
+++ b/crates/teleia/src/net/ws.rs
@@ -3,7 +3,7 @@ pub mod client;
#[cfg(not(target_arch = "wasm32"))]
pub mod server;
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub enum Message {
Binary(Vec<u8>),
Text(String),
diff --git a/crates/teleia/src/net/ws/client/native.rs b/crates/teleia/src/net/ws/client/native.rs
index efefdf2..3b8502c 100644
--- a/crates/teleia/src/net/ws/client/native.rs
+++ b/crates/teleia/src/net/ws/client/native.rs
@@ -3,7 +3,7 @@ use std::sync::{Arc, Mutex};
use std::thread::spawn;
use crate::net::ws::Message;
-use crate::{Erm, WrapErr};
+use crate::{utils, Erm, WrapErr};
const KEY: usize = 42;
@@ -24,63 +24,70 @@ impl Client {
};
let poller = match polling::Poller::new() {
Ok(v) => v,
- Err(e) => {
- log::warn!("failed to create poller: {}", e);
- return;
- }
+ Err(e) => { log::warn!("failed to create poller: {}", e); return; }
};
- match socket.get_ref() {
+ let res: Erm<()> = try { match socket.get_ref() {
tungstenite::stream::MaybeTlsStream::Plain(s) => unsafe {
- if let Err(e) = poller.add(s, polling::Event::readable(KEY)) {
- log::warn!("failed to add event to poll: {}", e);
- let _ = socket.close(None);
- return;
- };
+ s.set_nonblocking(true).wrap_err("failed to set socket nonblocking")?;
+ poller.add(s, polling::Event::readable(KEY)).wrap_err("failed to add event to poll")?;
},
tungstenite::stream::MaybeTlsStream::NativeTls(s) => unsafe {
- if let Err(e) = poller.add(s.get_ref(), polling::Event::readable(KEY)) {
- log::warn!("failed to add event to poll: {}", e);
- let _ = socket.close(None);
- return;
- };
- },
- _ => {
- log::warn!("unknown socket type; cannot poll!");
- let _ = socket.close(None);
- return;
+ s.get_ref().set_nonblocking(true).wrap_err("failed to set socket nonblocking")?;
+ poller.add(s.get_ref(), polling::Event::readable(KEY)).wrap_err("failed to add event to poll")?;
},
- }
+ _ => utils::erm_msg("unknown socket type; cannot poll!")?,
+ }};
+ if let Err(e) = res {
+ log::warn!("failed to add event to poll: {}", e);
+ let _ = socket.close(None);
+ return;
+ };
let (outgoing_sender, outgoing_receiver) = channel();
let (incoming_sender, incoming_receiver) = channel();
*self.channels.lock().unwrap() = Some((outgoing_sender, incoming_receiver));
- let channels_ref = self.channels.clone();
+ let send_channels_ref = self.channels.clone();
+ let recv_channels_ref = self.channels.clone();
+ let send_socket_ref = Arc::new(Mutex::new(socket));
+ let recv_socket_ref = send_socket_ref.clone();
let mut events = polling::Events::new();
spawn(move || loop {
+ // write all outgoing messages
let res: Erm<()> = try {
- // send all outgoing messages
- while let Ok(msg) = outgoing_receiver.try_recv() {
+ while let Ok(msg) = outgoing_receiver.recv() {
match msg {
- Message::Text(txt) => socket.send(tungstenite::Message::Text(txt))
+ Message::Text(txt) => send_socket_ref.lock().unwrap()
+ .send(tungstenite::Message::Text(txt))
.wrap_err("failed to send websocket text")?,
- Message::Binary(bytes) => socket.send(tungstenite::Message::Binary(bytes))
+ Message::Binary(bytes) => send_socket_ref.lock().unwrap()
+ .send(tungstenite::Message::Binary(bytes))
.wrap_err("failed to send websocket bytes")?,
}
}
- // wait until we've received data
+ };
+ if let Err(e) = res {
+ log::warn!("error in websocket send thread: {}", e);
+ *send_channels_ref.lock().unwrap() = None;
+ return;
+ }
+ });
+ spawn(move || loop {
+ // wait for and receive all incoming messages
+ let res: Erm<()> = try {
events.clear();
poller.wait(&mut events, None).wrap_err("failed to wait on websocket poller")?;
for ev in events.iter() {
if ev.key == KEY {
- if ev.readable { match socket.read().wrap_err("failed to recv on websocket")? {
+ if ev.readable { match recv_socket_ref.lock().unwrap().read().wrap_err("failed to recv on websocket")? {
tungstenite::Message::Text(txt) => incoming_sender.send(Message::Text(txt))
.wrap_err("failed to send incoming websocket message on channel")?,
tungstenite::Message::Binary(bytes) => incoming_sender.send(Message::Binary(bytes))
.wrap_err("failed to send incoming websocket message on channel")?,
- tungstenite::Message::Ping(bs) => socket.send(tungstenite::Message::Pong(bs))
+ tungstenite::Message::Ping(bs) => recv_socket_ref.lock().unwrap()
+ .send(tungstenite::Message::Pong(bs))
.wrap_err("failed to send websocket pong")?,
m => log::warn!("unhandled incoming websocket message: {}", m),
}}
- match socket.get_ref() {
+ match recv_socket_ref.lock().unwrap().get_ref() {
tungstenite::stream::MaybeTlsStream::Plain(s) =>
poller.modify(s, polling::Event::readable(KEY))
.wrap_err("failed to modify event to poll")?,
@@ -93,8 +100,8 @@ impl Client {
}
};
if let Err(e) = res {
- log::warn!("error in websocket thread: {}", e);
- *channels_ref.lock().unwrap() = None;
+ log::warn!("error in websocket recv thread: {}", e);
+ *recv_channels_ref.lock().unwrap() = None;
return;
}
});
diff --git a/crates/teleia/src/net/ws/server.rs b/crates/teleia/src/net/ws/server.rs
index 60537a1..7cd0cf8 100644
--- a/crates/teleia/src/net/ws/server.rs
+++ b/crates/teleia/src/net/ws/server.rs
@@ -5,7 +5,7 @@ use std::sync::{Arc, Mutex};
use std::thread::spawn;
use crate::net::ws::Message;
-use crate::{Erm, WrapErr};
+use crate::{utils, Erm, WrapErr};
const ACCEPT_KEY: usize = 1;
@@ -25,7 +25,8 @@ impl Server {
}
}
pub fn start(&mut self, addr: &str) {
- let clients_ref = self.clients.clone();
+ let recv_clients_ref = self.clients.clone();
+ let send_clients_ref = self.clients.clone();
let (outgoing_sender, outgoing_receiver) = channel();
let (incoming_sender, incoming_receiver) = channel();
*self.channels.lock().unwrap() = Some((outgoing_sender, incoming_receiver));
@@ -39,6 +40,31 @@ impl Server {
let mut events = polling::Events::new();
let mut next_id = ACCEPT_KEY + 1;
spawn(move || loop {
+ // write all outgoing messages
+ while let Ok((cid, msg)) = outgoing_receiver.recv() {
+ let res: Erm<()> = try {
+ if let Some(sock) = send_clients_ref.lock().unwrap().get_mut(&cid) {
+ let res = match msg {
+ Message::Text(txt) => sock.send(tungstenite::Message::Text(txt)),
+ Message::Binary(bytes) => sock.send(tungstenite::Message::Binary(bytes))
+ };
+ match res {
+ Ok(_) => {},
+ Err(tungstenite::Error::Io(e)) if e.kind() == std::io::ErrorKind::WouldBlock => {
+ log::info!("client {} receive send block", cid);
+ },
+ Err(e) => utils::erm(e)?,
+ }
+ }
+ };
+ if let Err(e) = res {
+ log::warn!("error when sending to client {}, disconnecting: {}", cid, e);
+ send_clients_ref.lock().unwrap().remove(&cid);
+ }
+ }
+ });
+ spawn(move || loop {
+ // wait for and receive all new connections and incoming messages
let res: Erm<()> = try {
events.clear();
poller.wait(&mut events, None).wrap_err("failed to wait on websocket server poller")?;
@@ -46,11 +72,12 @@ impl Server {
if ev.key == ACCEPT_KEY {
let (sock, client_addr) = listener.accept().wrap_err("failed to accept on socket")?;
let id = next_id; next_id += 1;
- let res = try {
+ let res: Erm<()> = try {
unsafe { poller.add(&sock, polling::Event::readable(id))
.wrap_err("failed to add socket to poller")?; }
let conn = tungstenite::accept(sock).wrap_err("failed to upgrade to websocket")?;
- clients_ref.lock().unwrap().insert(id, conn);
+ recv_clients_ref.lock().unwrap().insert(id, conn);
+ log::info!("new connected client! {}", id);
};
if let Err(e) = res {
log::warn!("error during connection from {}: {}", client_addr, e);
@@ -58,6 +85,34 @@ impl Server {
poller.modify(&listener, polling::Event::readable(ACCEPT_KEY))
.wrap_err("failed to modify event to poll")?;
} else {
+ let res: Erm<()> = try {
+ if let Some(sock) = recv_clients_ref.lock().unwrap().get_mut(&ev.key) {
+ match sock.read() {
+ Ok(tungstenite::Message::Text(t)) =>
+ incoming_sender.send((ev.key, Message::Text(t)))
+ .wrap_err("failed to send incoming message on channel")?,
+ Ok(tungstenite::Message::Binary(b)) =>
+ incoming_sender.send((ev.key, Message::Binary(b)))
+ .wrap_err("failed to send incoming message on channel")?,
+ Ok(tungstenite::Message::Ping(x)) =>
+ sock.write(tungstenite::Message::Pong(x))
+ .wrap_err("failed to reply with pong")?,
+ Ok(m) => log::info!("received unhandled websocket message: {}", m),
+ Err(tungstenite::Error::Io(e)) if e.kind() == std::io::ErrorKind::WouldBlock => {
+ log::info!("client {} receive would block", ev.key);
+ },
+ Err(e) => utils::erm(e)?,
+ }
+ poller.modify(sock.get_ref(), polling::Event::readable(ev.key))
+ .wrap_err("failed to modify event to poll")?;
+ } else {
+ log::warn!("poller indicated event for unknown client: {}", ev.key);
+ }
+ };
+ if let Err(e) = res {
+ log::warn!("error on client {} connection, disconnecting: {}", ev.key, e);
+ recv_clients_ref.lock().unwrap().remove(&ev.key);
+ }
}
}
};
@@ -68,4 +123,21 @@ impl Server {
}
});
}
+ pub fn clients(&self) -> Vec<ClientId> {
+ self.clients.lock().unwrap().keys().copied().collect()
+ }
+ pub fn poll(&mut self) -> Option<(ClientId, Message)> {
+ if let Some((_, incoming)) = &*self.channels.lock().unwrap() {
+ incoming.try_recv().ok()
+ } else { None }
+ }
+ pub fn send(&self, client: ClientId, msg: Message) {
+ if let Some((outgoing, _)) = &mut *self.channels.lock().unwrap() {
+ if let Err(e) = outgoing.send((client, msg)) {
+ log::warn!("failed to send websocket message to client {} on channel: {}", client, e);
+ }
+ } else {
+ log::warn!("tried to send message to client {}, but websocket is not connected", client);
+ }
+ }
}
diff --git a/crates/teleia/src/state.rs b/crates/teleia/src/state.rs
index 9c288a2..5bec34e 100644
--- a/crates/teleia/src/state.rs
+++ b/crates/teleia/src/state.rs
@@ -10,8 +10,8 @@ use crate::{audio, context, framebuffer, mesh, shader, utils};
const DELTA_TIME: f64 = 0.016; // todo
pub trait Game {
- fn initialize(&self, ctx: &context::Context, st: &State) -> utils::Erm<()> { Ok(()) }
- fn finalize(&self, ctx: &context::Context, st: &State) -> utils::Erm<()> { Ok(()) }
+ fn initialize(&mut self, ctx: &context::Context, st: &mut State) -> utils::Erm<()> { Ok(()) }
+ fn finalize(&mut self, ctx: &context::Context, st: &mut State) -> utils::Erm<()> { Ok(()) }
fn initialize_audio(
&self, ctx: &context::Context, st: &State,
actx: &audio::Context