Bidirectional codec
This commit is contained in:
parent
87bcaacaa4
commit
a786f4b79b
|
@ -5,12 +5,13 @@ use bytes::{Buf, BytesMut};
|
|||
use preserves::value;
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
pub type EndpointName = V;
|
||||
pub type Assertion = V;
|
||||
pub type Captures = Arc<Vec<Assertion>>;
|
||||
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
|
||||
pub enum Action {
|
||||
Assert(EndpointName, Assertion),
|
||||
Clear(EndpointName),
|
||||
|
@ -25,7 +26,7 @@ pub enum Event {
|
|||
End(EndpointName),
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
|
||||
pub enum In {
|
||||
Connect(V),
|
||||
Turn(Vec<Action>),
|
||||
|
@ -85,20 +86,41 @@ impl From<EncodeError> for io::Error {
|
|||
}
|
||||
}
|
||||
|
||||
//---------------------------------------------------------------------------
|
||||
|
||||
pub struct Codec {
|
||||
codec: value::Codec<V, Syndicate>,
|
||||
}
|
||||
|
||||
impl Codec {
|
||||
pub fn new(codec: value::Codec<V, Syndicate>) -> Self {
|
||||
Codec { codec }
|
||||
impl std::fmt::Display for EncodeError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "{:?}", self)
|
||||
}
|
||||
}
|
||||
|
||||
impl tokio_util::codec::Decoder for Codec {
|
||||
type Item = In;
|
||||
impl std::error::Error for EncodeError {
|
||||
}
|
||||
|
||||
//---------------------------------------------------------------------------
|
||||
|
||||
pub struct Codec<InT, OutT> {
|
||||
codec: value::Codec<V, Syndicate>,
|
||||
ph_in: PhantomData<InT>,
|
||||
ph_out: PhantomData<OutT>,
|
||||
}
|
||||
|
||||
impl<InT, OutT> Codec<InT, OutT> {
|
||||
pub fn new(codec: value::Codec<V, Syndicate>) -> Self {
|
||||
Codec { codec, ph_in: PhantomData, ph_out: PhantomData }
|
||||
}
|
||||
|
||||
pub fn standard() -> Self {
|
||||
Self::new(value::Codec::new({
|
||||
let mut m = value::Map::new();
|
||||
m.insert(0, value::Value::symbol("Discard"));
|
||||
m.insert(1, value::Value::symbol("Capture"));
|
||||
m.insert(2, value::Value::symbol("Observe"));
|
||||
m
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl<InT: serde::de::DeserializeOwned, OutT> tokio_util::codec::Decoder for Codec<InT, OutT> {
|
||||
type Item = InT;
|
||||
type Error = DecodeError;
|
||||
fn decode(&mut self, bs: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||
let mut buf = &bs[..];
|
||||
|
@ -119,10 +141,10 @@ impl tokio_util::codec::Decoder for Codec {
|
|||
}
|
||||
}
|
||||
|
||||
impl tokio_util::codec::Encoder<Out> for Codec
|
||||
impl<InT, OutT: serde::Serialize> tokio_util::codec::Encoder<OutT> for Codec<InT, OutT>
|
||||
{
|
||||
type Error = EncodeError;
|
||||
fn encode(&mut self, item: Out, bs: &mut BytesMut) -> Result<(), Self::Error> {
|
||||
fn encode(&mut self, item: OutT, bs: &mut BytesMut) -> Result<(), Self::Error> {
|
||||
let v: V = value::to_value(&item)?;
|
||||
bs.extend(self.codec.encode_bytes(&v)?);
|
||||
Ok(())
|
||||
|
|
12
src/peer.rs
12
src/peer.rs
|
@ -8,7 +8,7 @@ use core::time::Duration;
|
|||
use futures::FutureExt;
|
||||
use futures::SinkExt;
|
||||
use futures::select;
|
||||
use preserves::value::{self, Map};
|
||||
use preserves::value;
|
||||
use std::sync::{Mutex, Arc};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::stream::StreamExt;
|
||||
|
@ -20,7 +20,7 @@ pub struct Peer {
|
|||
id: ConnId,
|
||||
tx: UnboundedSender<packets::Out>,
|
||||
rx: UnboundedReceiver<packets::Out>,
|
||||
frames: Framed<TcpStream, packets::Codec>,
|
||||
frames: Framed<TcpStream, packets::Codec<packets::In, packets::Out>>,
|
||||
space: Option<dataspace::DataspaceRef>,
|
||||
}
|
||||
|
||||
|
@ -31,13 +31,7 @@ fn err(s: &str, ctx: V) -> packets::Out {
|
|||
impl Peer {
|
||||
pub async fn new(id: ConnId, stream: TcpStream) -> Self {
|
||||
let (tx, rx) = unbounded_channel();
|
||||
let frames = Framed::new(stream, packets::Codec::new(value::Codec::new({
|
||||
let mut m = Map::new();
|
||||
m.insert(0, value::Value::symbol("Discard"));
|
||||
m.insert(1, value::Value::symbol("Capture"));
|
||||
m.insert(2, value::Value::symbol("Observe"));
|
||||
m
|
||||
})));
|
||||
let frames = Framed::new(stream, packets::Codec::standard());
|
||||
Peer{ id, tx, rx, frames, space: None }
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue