From 99c0be80a32d45043b223dd40e4569aaa37ef058 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 18 May 2020 16:46:41 +0200 Subject: [PATCH] Better error reporting and tracing --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- src/bin/syndicate-server.rs | 26 ++++++++++++++++++++++---- src/peer.rs | 5 ++++- src/skeleton.rs | 4 ++-- 5 files changed, 31 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1099b45..b0edd8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -717,9 +717,9 @@ checksum = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea" [[package]] name = "preserves" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0dd44fa1ee67d7a3b89fd813b128dab86b8f4aa2b8fb6746756ad4c2082588f4" +checksum = "10c5593fd32171d91d438556bbde8666a8ab38ce3260e53c977833564a923f83" dependencies = [ "num", "num_enum", diff --git a/Cargo.toml b/Cargo.toml index c29b1a2..60f77cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ debug = true name = "syndicate" [dependencies] -preserves = "0.4.0" +preserves = "0.5.0" serde = { version = "1.0", features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/bin/syndicate-server.rs b/src/bin/syndicate-server.rs index 2d013b4..1daa01c 100644 --- a/src/bin/syndicate-server.rs +++ b/src/bin/syndicate-server.rs @@ -49,8 +49,24 @@ fn message_decoder(codec: &value::Codec) Message::Text(_) => Err(packets::DecodeError::Read( value::decoder::Error::Syntax("Text websocket frames are not accepted"))), Message::Binary(ref bs) => { - let v = codec.decode(&mut &bs[..])?; - value::from_value(&v).map_err(|e| packets::DecodeError::Parse(e, v)) + let mut buf = &bs[..]; + match codec.decode(&mut buf) { + Ok(v) => if buf.len() > 0 { + Err(packets::DecodeError::Read( + value::decoder::Error::Io( + std::io::Error::new(std::io::ErrorKind::Other, + format!("{} trailing bytes", + buf.len()))))) + } else { + value::from_value(&v).map_err(|e| packets::DecodeError::Parse(e, v)) + } + Err(value::decoder::Error::Eof) => + Err(packets::DecodeError::Read( + value::decoder::Error::Io( + std::io::Error::new(std::io::ErrorKind::UnexpectedEof, + "short packet")))), + Err(e) => Err(e.into()), + } } Message::Ping(_) => continue, // pings are handled by tungstenite before we see them Message::Pong(_) => continue, // unsolicited pongs are to be ignored @@ -66,6 +82,7 @@ fn message_decoder(codec: &value::Codec) async fn run_connection(connid: ConnId, mut stream: TcpStream, spaces: Arc>, + addr: std::net::SocketAddr, config: config::ServerConfigRef) -> UnitAsyncResult { @@ -73,6 +90,7 @@ async fn run_connection(connid: ConnId, match stream.peek(&mut buf).await? { 1 => match buf[0] { 71 /* ASCII 'G' for "GET" */ => { + info!(protocol = display("websocket"), peer = debug(addr)); let s = tokio_tungstenite::accept_async(stream).await .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; let (o, i) = s.split(); @@ -83,6 +101,7 @@ async fn run_connection(connid: ConnId, p.run(spaces, &config).await? }, _ => { + info!(protocol = display("raw"), peer = debug(addr)); let (o, i) = Framed::new(stream, packets::Codec::standard()).split(); let mut p = Peer::new(connid, i, o); p.run(spaces, &config).await? @@ -109,8 +128,7 @@ async fn run_listener(spaces: Arc>, port: u16, config: con if let Some(n) = config.recv_buffer_size { stream.set_recv_buffer_size(n)?; } if let Some(n) = config.send_buffer_size { stream.set_send_buffer_size(n)?; } tokio::spawn(async move { - info!(addr = display(addr), "accepted"); - match run_connection(id, stream, spaces, config).await { + match run_connection(id, stream, spaces, addr, config).await { Ok(()) => info!("closed"), Err(e) => info!(error = display(e), "closed"), } diff --git a/src/peer.rs b/src/peer.rs index 2fa78bf..c6d602c 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -121,7 +121,10 @@ where I: Stream + Send, } } } - Err(packets::DecodeError::Read(value::decoder::Error::Eof)) => running = false, + Err(packets::DecodeError::Read(value::decoder::Error::Eof)) => { + tracing::trace!("eof"); + running = false; + } Err(packets::DecodeError::Read(value::decoder::Error::Io(e))) => return Err(e), Err(packets::DecodeError::Read(value::decoder::Error::Syntax(s))) => { to_send.push(err(s, value::Value::from(false).wrap())); diff --git a/src/skeleton.rs b/src/skeleton.rs index dc4a895..90e6ac2 100644 --- a/src/skeleton.rs +++ b/src/skeleton.rs @@ -590,8 +590,8 @@ fn instantiate_assertion_walk(capture_paths: &mut Paths, Value::from(Vec::from_iter(a.value().as_sequence().unwrap() .iter().enumerate().map(f))) .wrap(), - Some(Guard::Rec(l, _)) => - Value::record(l, a.value().as_record().unwrap().1 + Some(Guard::Rec(l, fieldcount)) => + Value::record(l, a.value().as_record(Some(fieldcount)).unwrap().1 .iter().enumerate().map(f).collect()) .wrap(), None =>