From a786f4b79b4fb7b9b58616d780d145befc7a54b4 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 11 May 2020 22:02:43 +0200 Subject: [PATCH] Bidirectional codec --- src/packets.rs | 52 +++++++++++++++++++++++++++++++++++--------------- src/peer.rs | 12 +++--------- 2 files changed, 40 insertions(+), 24 deletions(-) diff --git a/src/packets.rs b/src/packets.rs index 8e04c91..a5ca46e 100644 --- a/src/packets.rs +++ b/src/packets.rs @@ -5,12 +5,13 @@ use bytes::{Buf, BytesMut}; use preserves::value; use std::io; use std::sync::Arc; +use std::marker::PhantomData; pub type EndpointName = V; pub type Assertion = V; pub type Captures = Arc>; -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum Action { Assert(EndpointName, Assertion), Clear(EndpointName), @@ -25,7 +26,7 @@ pub enum Event { End(EndpointName), } -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum In { Connect(V), Turn(Vec), @@ -85,20 +86,41 @@ impl From for io::Error { } } -//--------------------------------------------------------------------------- - -pub struct Codec { - codec: value::Codec, -} - -impl Codec { - pub fn new(codec: value::Codec) -> Self { - Codec { codec } +impl std::fmt::Display for EncodeError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{:?}", self) } } -impl tokio_util::codec::Decoder for Codec { - type Item = In; +impl std::error::Error for EncodeError { +} + +//--------------------------------------------------------------------------- + +pub struct Codec { + codec: value::Codec, + ph_in: PhantomData, + ph_out: PhantomData, +} + +impl Codec { + pub fn new(codec: value::Codec) -> Self { + Codec { codec, ph_in: PhantomData, ph_out: PhantomData } + } + + pub fn standard() -> Self { + Self::new(value::Codec::new({ + let mut m = value::Map::new(); + m.insert(0, value::Value::symbol("Discard")); + m.insert(1, value::Value::symbol("Capture")); + m.insert(2, value::Value::symbol("Observe")); + m + })) + } +} + +impl tokio_util::codec::Decoder for Codec { + type Item = InT; type Error = DecodeError; fn decode(&mut self, bs: &mut BytesMut) -> Result, Self::Error> { let mut buf = &bs[..]; @@ -119,10 +141,10 @@ impl tokio_util::codec::Decoder for Codec { } } -impl tokio_util::codec::Encoder for Codec +impl tokio_util::codec::Encoder for Codec { type Error = EncodeError; - fn encode(&mut self, item: Out, bs: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, item: OutT, bs: &mut BytesMut) -> Result<(), Self::Error> { let v: V = value::to_value(&item)?; bs.extend(self.codec.encode_bytes(&v)?); Ok(()) diff --git a/src/peer.rs b/src/peer.rs index 53ce601..3158334 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -8,7 +8,7 @@ use core::time::Duration; use futures::FutureExt; use futures::SinkExt; use futures::select; -use preserves::value::{self, Map}; +use preserves::value; use std::sync::{Mutex, Arc}; use tokio::net::TcpStream; use tokio::stream::StreamExt; @@ -20,7 +20,7 @@ pub struct Peer { id: ConnId, tx: UnboundedSender, rx: UnboundedReceiver, - frames: Framed, + frames: Framed>, space: Option, } @@ -31,13 +31,7 @@ fn err(s: &str, ctx: V) -> packets::Out { impl Peer { pub async fn new(id: ConnId, stream: TcpStream) -> Self { let (tx, rx) = unbounded_channel(); - let frames = Framed::new(stream, packets::Codec::new(value::Codec::new({ - let mut m = Map::new(); - m.insert(0, value::Value::symbol("Discard")); - m.insert(1, value::Value::symbol("Capture")); - m.insert(2, value::Value::symbol("Observe")); - m - }))); + let frames = Framed::new(stream, packets::Codec::standard()); Peer{ id, tx, rx, frames, space: None } }