diff --git a/src/main.rs b/src/main.rs index 727f575..27d517e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,14 +2,13 @@ mod bag; mod skeleton; +mod packets; -use bytes::BytesMut; use core::time::Duration; use futures::select; use preserves::value::{self, Map}; -use std::io; use std::sync::{Mutex, RwLock, Arc}; -use tokio::codec::{Framed, Encoder, Decoder}; +use tokio::codec::Framed; use tokio::net::{TcpListener, TcpStream}; use tokio::prelude::*; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; @@ -19,130 +18,7 @@ use tokio::timer::Interval; type ConnId = u64; -// type V = value::PlainValue; -type V = value::ArcValue; - -mod packets { - use super::V; - - pub type EndpointName = V; - pub type Assertion = V; - pub type Captures = Vec; - - #[derive(Debug, serde::Serialize, serde::Deserialize)] - pub enum Action { - Assert(EndpointName, Assertion), - Clear(EndpointName), - Message(Assertion), - } - - #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] - pub enum Event { - Add(EndpointName, Captures), - Del(EndpointName, Captures), - Msg(EndpointName, Captures), - End(EndpointName), - } - - #[derive(Debug, serde::Serialize, serde::Deserialize)] - pub enum In { - Connect(V), - Turn(Vec), - Ping(), - Pong(), - } - - #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] - pub enum Out { - Err(String), - Turn(Vec), - Ping(), - Pong(), - } -} - -#[derive(Debug)] -enum PacketDecodeError { - Read(value::decoder::Error), - Parse(value::error::Error), -} - -impl From for PacketDecodeError { - fn from(v: io::Error) -> Self { - PacketDecodeError::Read(v.into()) - } -} - -impl From for PacketDecodeError { - fn from(v: value::error::Error) -> Self { - PacketDecodeError::Parse(v) - } -} - -#[derive(Debug)] -enum PacketEncodeError { - Write(value::encoder::Error), - Unparse(value::error::Error), -} - -impl From for PacketEncodeError { - fn from(v: io::Error) -> Self { - PacketEncodeError::Write(v.into()) - } -} - -impl From for PacketEncodeError { - fn from(v: value::error::Error) -> Self { - PacketEncodeError::Unparse(v) - } -} - -impl From for io::Error { - fn from(v: PacketEncodeError) -> Self { - match v { - PacketEncodeError::Write(e) => e, - PacketEncodeError::Unparse(e) => - Self::new(io::ErrorKind::InvalidData, format!("{:?}", e)), - } - } -} - -struct PacketCodec { - codec: value::Codec, -} - -impl PacketCodec { - fn new(codec: value::Codec) -> Self { - PacketCodec { codec } - } -} - -impl Decoder for PacketCodec { - type Item = packets::In; - type Error = PacketDecodeError; - fn decode(&mut self, bs: &mut BytesMut) -> Result, Self::Error> { - let mut buf = &bs[..]; - let orig_len = buf.len(); - let res = self.codec.decode(&mut buf); - let final_len = buf.len(); - bs.advance(orig_len - final_len); - match res { - Ok(v) => Ok(Some(value::from_value(&v)?)), - Err(value::decoder::Error::Eof) => Ok(None), - Err(e) => Err(PacketDecodeError::Read(e)), - } - } -} - -impl Encoder for PacketCodec { - type Item = packets::Out; - type Error = PacketEncodeError; - fn encode(&mut self, item: Self::Item, bs: &mut BytesMut) -> Result<(), Self::Error> { - let v: V = value::to_value(&item)?; - bs.extend(self.codec.encode_bytes(&v)?); - Ok(()) - } -} +pub type V = value::ArcValue; fn err(s: &str) -> packets::Out { packets::Out::Err(s.into()) @@ -152,13 +28,13 @@ struct Peer { id: ConnId, tx: UnboundedSender, rx: UnboundedReceiver, - frames: Framed, + frames: Framed, } impl Peer { async fn new(id: ConnId, stream: TcpStream) -> Self { let (tx, rx) = unbounded_channel(); - let frames = Framed::new(stream, PacketCodec::new(value::Codec::new({ + let frames = Framed::new(stream, packets::Codec::new(value::Codec::new({ let mut m = Map::new(); m.insert(0, value::Value::symbol("Discard")); m.insert(1, value::Value::symbol("Capture")); @@ -168,7 +44,7 @@ impl Peer { Peer{ id, tx, rx, frames } } - async fn run(&mut self, spaces: Arc>>) -> Result<(), io::Error> { + async fn run(&mut self, spaces: Arc>>) -> Result<(), std::io::Error> { println!("{:?}: got {:?}", self.id, &self.frames.get_ref()); let firstpacket = self.frames.next().await; @@ -220,13 +96,13 @@ impl Peer { } } } - Err(PacketDecodeError::Read(value::decoder::Error::Eof)) => running = false, - Err(PacketDecodeError::Read(value::decoder::Error::Io(e))) => return Err(e), - Err(PacketDecodeError::Read(value::decoder::Error::Syntax(s))) => { + Err(packets::DecodeError::Read(value::decoder::Error::Eof)) => running = false, + Err(packets::DecodeError::Read(value::decoder::Error::Io(e))) => return Err(e), + Err(packets::DecodeError::Read(value::decoder::Error::Syntax(s))) => { to_send.push(err(s)); running = false; } - Err(PacketDecodeError::Parse(e)) => { + Err(packets::DecodeError::Parse(e)) => { to_send.push(err(&format!("Packet deserialization error: {:?}", e))); running = false; } diff --git a/src/packets.rs b/src/packets.rs new file mode 100644 index 0000000..91a19eb --- /dev/null +++ b/src/packets.rs @@ -0,0 +1,129 @@ +use super::V; + +use bytes::BytesMut; +use preserves::value; +use std::io; + +pub type EndpointName = V; +pub type Assertion = V; +pub type Captures = Vec; + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub enum Action { + Assert(EndpointName, Assertion), + Clear(EndpointName), + Message(Assertion), +} + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub enum Event { + Add(EndpointName, Captures), + Del(EndpointName, Captures), + Msg(EndpointName, Captures), + End(EndpointName), +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub enum In { + Connect(V), + Turn(Vec), + Ping(), + Pong(), +} + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub enum Out { + Err(String), + Turn(Vec), + Ping(), + Pong(), +} + +//--------------------------------------------------------------------------- + +#[derive(Debug)] +pub enum DecodeError { + Read(value::decoder::Error), + Parse(value::error::Error), +} + +impl From for DecodeError { + fn from(v: io::Error) -> Self { + DecodeError::Read(v.into()) + } +} + +impl From for DecodeError { + fn from(v: value::error::Error) -> Self { + DecodeError::Parse(v) + } +} + +//--------------------------------------------------------------------------- + +#[derive(Debug)] +pub enum EncodeError { + Write(value::encoder::Error), + Unparse(value::error::Error), +} + +impl From for EncodeError { + fn from(v: io::Error) -> Self { + EncodeError::Write(v.into()) + } +} + +impl From for EncodeError { + fn from(v: value::error::Error) -> Self { + EncodeError::Unparse(v) + } +} + +impl From for io::Error { + fn from(v: EncodeError) -> Self { + match v { + EncodeError::Write(e) => e, + EncodeError::Unparse(e) => + Self::new(io::ErrorKind::InvalidData, format!("{:?}", e)), + } + } +} + +//--------------------------------------------------------------------------- + +pub struct Codec { + codec: value::Codec, +} + +impl Codec { + pub fn new(codec: value::Codec) -> Self { + Codec { codec } + } +} + +impl tokio::codec::Decoder for Codec { + type Item = In; + type Error = DecodeError; + fn decode(&mut self, bs: &mut BytesMut) -> Result, Self::Error> { + let mut buf = &bs[..]; + let orig_len = buf.len(); + let res = self.codec.decode(&mut buf); + let final_len = buf.len(); + bs.advance(orig_len - final_len); + match res { + Ok(v) => Ok(Some(value::from_value(&v)?)), + Err(value::decoder::Error::Eof) => Ok(None), + Err(e) => Err(DecodeError::Read(e)), + } + } +} + +impl tokio::codec::Encoder for Codec { + type Item = Out; + type Error = EncodeError; + fn encode(&mut self, item: Self::Item, bs: &mut BytesMut) -> Result<(), Self::Error> { + let v: V = value::to_value(&item)?; + bs.extend(self.codec.encode_bytes(&v)?); + Ok(()) + } +}