New preserves interface

This commit is contained in:
Tony Garnock-Jones 2020-05-25 15:04:50 +02:00
parent a5b00a45bf
commit 78a45116b9
3 changed files with 35 additions and 41 deletions

View File

@ -52,33 +52,29 @@ fn message_decoder(codec: &value::Codec<V, Syndicate>)
return match r { return match r {
Ok(ref m) => match m { Ok(ref m) => match m {
Message::Text(_) => Err(packets::DecodeError::Read( Message::Text(_) => Err(packets::DecodeError::Read(
value::decoder::Error::Syntax("Text websocket frames are not accepted"))), value::reader::err("Text websocket frames are not accepted"))),
Message::Binary(ref bs) => { Message::Binary(ref bs) => {
let mut buf = &bs[..]; let mut buf = &bs[..];
match codec.decode(&mut buf) { let mut vs = codec.decode_all(&mut buf)?;
Ok(v) => if buf.len() > 0 { if vs.len() > 1 {
Err(packets::DecodeError::Read( Err(packets::DecodeError::Read(
value::decoder::Error::Io( std::io::Error::new(std::io::ErrorKind::Other,
std::io::Error::new(std::io::ErrorKind::Other, "Multiple packets in a single message")))
format!("{} trailing bytes", } else if vs.len() == 0 {
buf.len()))))) Err(packets::DecodeError::Read(
} else { std::io::Error::new(std::io::ErrorKind::Other,
value::from_value(&v).map_err(|e| packets::DecodeError::Parse(e, v)) "Empty message")))
} } else {
Err(value::decoder::Error::Eof) => value::from_value(&vs[0])
Err(packets::DecodeError::Read( .map_err(|e| packets::DecodeError::Parse(e, vs.swap_remove(0)))
value::decoder::Error::Io(
std::io::Error::new(std::io::ErrorKind::UnexpectedEof,
"short packet")))),
Err(e) => Err(e.into()),
} }
} }
Message::Ping(_) => continue, // pings are handled by tungstenite before we see them Message::Ping(_) => continue, // pings are handled by tungstenite before we see them
Message::Pong(_) => continue, // unsolicited pongs are to be ignored Message::Pong(_) => continue, // unsolicited pongs are to be ignored
Message::Close(_) => Err(packets::DecodeError::Read(value::decoder::Error::Eof)), Message::Close(_) => Err(packets::DecodeError::Read(value::reader::eof())),
} }
Err(tungstenite::Error::Io(e)) => Err(e.into()), Err(tungstenite::Error::Io(e)) => Err(e.into()),
Err(e) => Err(packets::DecodeError::Read(value::decoder::Error::Io(other_eio(e)))), Err(e) => Err(packets::DecodeError::Read(other_eio(e))),
} }
} }
}; };

View File

@ -2,7 +2,7 @@ use super::V;
use super::Syndicate; use super::Syndicate;
use bytes::{Buf, buf::BufMutExt, BytesMut}; use bytes::{Buf, buf::BufMutExt, BytesMut};
use preserves::{value, ser::Serializer}; use preserves::{value, ser::Serializer, value::Reader};
use std::io; use std::io;
use std::sync::Arc; use std::sync::Arc;
use std::marker::PhantomData; use std::marker::PhantomData;
@ -50,12 +50,6 @@ pub enum DecodeError {
Parse(value::error::Error<Syndicate>, V), Parse(value::error::Error<Syndicate>, V),
} }
impl From<value::decoder::Error> for DecodeError {
fn from(v: value::decoder::Error) -> Self {
DecodeError::Read(v)
}
}
impl From<io::Error> for DecodeError { impl From<io::Error> for DecodeError {
fn from(v: io::Error) -> Self { fn from(v: io::Error) -> Self {
DecodeError::Read(v.into()) DecodeError::Read(v.into())
@ -109,6 +103,7 @@ impl std::fmt::Display for EncodeError {
impl std::error::Error for EncodeError { impl std::error::Error for EncodeError {
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
pub struct Codec<InT, OutT> { pub struct Codec<InT, OutT> {
@ -148,18 +143,19 @@ impl<InT: serde::de::DeserializeOwned, OutT> tokio_util::codec::Decoder for Code
fn decode(&mut self, bs: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { fn decode(&mut self, bs: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let mut buf = &bs[..]; let mut buf = &bs[..];
let orig_len = buf.len(); let orig_len = buf.len();
let res = self.codec.decode(&mut buf); let mut d = self.codec.decoder(&mut buf);
let final_len = buf.len(); match d.next() {
match res { None => Ok(None),
Ok(v) => { Some(res) => {
bs.advance(orig_len - final_len); let v = res?;
let buffered_len = d.read.buffered_len()?;
let final_len = buf.len();
bs.advance(orig_len - final_len - buffered_len);
match value::from_value(&v) { match value::from_value(&v) {
Ok(p) => Ok(Some(p)), Ok(p) => Ok(Some(p)),
Err(e) => Err(DecodeError::Parse(e, v)) Err(e) => Err(DecodeError::Parse(e, v))
} }
} }
Err(value::decoder::Error::Eof) => Ok(None),
Err(e) => Err(DecodeError::Read(e)),
} }
} }
} }

View File

@ -121,14 +121,16 @@ where I: Stream<Item = ResultC2S> + Send,
} }
} }
} }
Err(packets::DecodeError::Read(value::decoder::Error::Eof)) => { Err(packets::DecodeError::Read(e)) => {
tracing::trace!("eof"); if value::is_eof_error(&e) {
running = false; tracing::trace!("eof");
} running = false;
Err(packets::DecodeError::Read(value::decoder::Error::Io(e))) => return Err(e), } else if value::is_syntax_error(&e) {
Err(packets::DecodeError::Read(value::decoder::Error::Syntax(s))) => { to_send.push(err(&e.to_string(), value::Value::from(false).wrap()));
to_send.push(err(s, value::Value::from(false).wrap())); running = false;
running = false; } else {
return Err(e)
}
} }
Err(packets::DecodeError::Parse(e, v)) => { Err(packets::DecodeError::Parse(e, v)) => {
to_send.push(err(&format!("Packet deserialization error: {}", e), v)); to_send.push(err(&format!("Packet deserialization error: {}", e), v));