diff --git a/src/bin/syndicate-server.rs b/src/bin/syndicate-server.rs index 1850641..09d16b6 100644 --- a/src/bin/syndicate-server.rs +++ b/src/bin/syndicate-server.rs @@ -1,5 +1,5 @@ use syndicate::{config, spaces, packets, ConnId}; -use syndicate::peer::{Peer, ResultC2S}; +use syndicate::peer::Peer; use std::sync::{Mutex, Arc}; use futures::{SinkExt, StreamExt}; @@ -34,27 +34,28 @@ fn message_encoder(p: packets::S2C) -> futures::future::Ready) -> ResultC2S +async fn message_decoder(r: Result) -> Option> { - loop { - return match r { - Ok(ref m) => match m { - Message::Text(_) => - Err(preserves::error::syntax_error("Text websocket frames are not accepted")), - Message::Binary(ref bs) => - Ok(preserves::de::from_bytes(bs)?), - Message::Ping(_) => - continue, // pings are handled by tungstenite before we see them - Message::Pong(_) => - continue, // unsolicited pongs are to be ignored - Message::Close(_) => - Err(preserves::error::eof()), - } - Err(tungstenite::Error::Io(e)) => - Err(e.into()), - Err(e) => - Err(message_error(e)), + match r { + Ok(ref m) => match m { + Message::Text(_) => + Some(Err(preserves::error::syntax_error("Text websocket frames are not accepted"))), + Message::Binary(ref bs) => + match preserves::de::from_bytes(bs) { + Ok(p) => Some(Ok(p)), + Err(e) => Some(Err(e.into())), + }, + Message::Ping(_) => + None, // pings are handled by tungstenite before we see them + Message::Pong(_) => + None, // unsolicited pongs are to be ignored + Message::Close(_) => + Some(Err(preserves::error::eof())), } + Err(tungstenite::Error::Io(e)) => + Some(Err(e.into())), + Err(e) => + Some(Err(message_error(e))), } } @@ -73,7 +74,7 @@ async fn run_connection(connid: ConnId, let s = tokio_tungstenite::accept_async(stream).await .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; let (o, i) = s.split(); - let i = i.map(message_decoder); + let i = i.filter_map(message_decoder); let o = o.sink_map_err(message_error).with(message_encoder); let mut p = Peer::new(connid, i, o); p.run(spaces, &config).await?