Apparently we *do* see pings, and the loop was insane anyway, so filter_map it is

This commit is contained in:
Tony Garnock-Jones 2020-07-22 10:02:12 +02:00
parent eb57378570
commit 8b7cafda81
1 changed files with 22 additions and 21 deletions

View File

@ -1,5 +1,5 @@
use syndicate::{config, spaces, packets, ConnId}; use syndicate::{config, spaces, packets, ConnId};
use syndicate::peer::{Peer, ResultC2S}; use syndicate::peer::Peer;
use std::sync::{Mutex, Arc}; use std::sync::{Mutex, Arc};
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
@ -34,27 +34,28 @@ fn message_encoder(p: packets::S2C) -> futures::future::Ready<Result<Message, pa
futures::future::ready(encode_message(p)) futures::future::ready(encode_message(p))
} }
fn message_decoder(r: Result<Message, tungstenite::Error>) -> ResultC2S async fn message_decoder(r: Result<Message, tungstenite::Error>) -> Option<Result<packets::C2S, packets::Error>>
{ {
loop { match r {
return match r { Ok(ref m) => match m {
Ok(ref m) => match m { Message::Text(_) =>
Message::Text(_) => Some(Err(preserves::error::syntax_error("Text websocket frames are not accepted"))),
Err(preserves::error::syntax_error("Text websocket frames are not accepted")), Message::Binary(ref bs) =>
Message::Binary(ref bs) => match preserves::de::from_bytes(bs) {
Ok(preserves::de::from_bytes(bs)?), Ok(p) => Some(Ok(p)),
Message::Ping(_) => Err(e) => Some(Err(e.into())),
continue, // pings are handled by tungstenite before we see them },
Message::Pong(_) => Message::Ping(_) =>
continue, // unsolicited pongs are to be ignored None, // pings are handled by tungstenite before we see them
Message::Close(_) => Message::Pong(_) =>
Err(preserves::error::eof()), None, // unsolicited pongs are to be ignored
} Message::Close(_) =>
Err(tungstenite::Error::Io(e)) => Some(Err(preserves::error::eof())),
Err(e.into()),
Err(e) =>
Err(message_error(e)),
} }
Err(tungstenite::Error::Io(e)) =>
Some(Err(e.into())),
Err(e) =>
Some(Err(message_error(e))),
} }
} }
@ -73,7 +74,7 @@ async fn run_connection(connid: ConnId,
let s = tokio_tungstenite::accept_async(stream).await let s = tokio_tungstenite::accept_async(stream).await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
let (o, i) = s.split(); let (o, i) = s.split();
let i = i.map(message_decoder); let i = i.filter_map(message_decoder);
let o = o.sink_map_err(message_error).with(message_encoder); let o = o.sink_map_err(message_error).with(message_encoder);
let mut p = Peer::new(connid, i, o); let mut p = Peer::new(connid, i, o);
p.run(spaces, &config).await? p.run(spaces, &config).await?