From c5cf226d8666a24500bff5d7e09a539b65adac29 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 28 May 2020 23:32:33 +0200 Subject: [PATCH] Major Preserves 0.6.0 API changes --- Cargo.lock | 5 ++- Cargo.toml | 2 +- Makefile | 2 +- examples/consumer.rs | 3 +- examples/producer.rs | 3 +- src/bin/syndicate-server.rs | 79 +++++++++++++-------------------- src/lib.rs | 32 +++++++------- src/packets.rs | 85 ++++++++++------------------------- src/peer.rs | 45 +++++++++---------- src/skeleton.rs | 88 ++++++++++++++++++------------------- 10 files changed, 141 insertions(+), 203 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ed06d7e..d9e2d6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -727,10 +727,11 @@ checksum = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea" [[package]] name = "preserves" -version = "0.5.1" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ace656c2cf8a56d62009e911a999220f19afaca0d2483e4c307ff15e568396c" +checksum = "6647e1b2a91ac7ace6798481d95d8bb0d407b9cdb6cff3c4708a3cd07e411306" dependencies = [ + "lazy_static", "num", "num_enum", "serde", diff --git a/Cargo.toml b/Cargo.toml index 350dddb..c28cbe9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ debug = true name = "syndicate" [dependencies] -preserves = "0.5.1" +preserves = "0.6.0" serde = { version = "1.0", features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/Makefile b/Makefile index ac646e8..2363b40 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ watch: cargo watch -c -x check -x 'test -- --nocapture' run-watch: - RUST_BACKTRACE=1 cargo watch -c -x run + RUST_BACKTRACE=1 cargo watch -c -x 'build --all-targets' -x 'run' clippy-watch: cargo watch -c -x clippy diff --git a/examples/consumer.rs b/examples/consumer.rs index 08d5448..62708b3 100644 --- a/examples/consumer.rs +++ b/examples/consumer.rs @@ -16,8 +16,7 @@ async fn main() -> Result<(), Box> { let discard: V = Value::simple_record("Discard", vec![]).wrap(); let capture: V = Value::simple_record("Capture", vec![discard]).wrap(); - let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, - ClientCodec::standard()); + let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, ClientCodec::new()); frames.send(C2S::Connect(Value::from("chat").wrap())).await?; frames.send( C2S::Turn(vec![Action::Assert( diff --git a/examples/producer.rs b/examples/producer.rs index c7051f0..dae7f37 100644 --- a/examples/producer.rs +++ b/examples/producer.rs @@ -7,8 +7,7 @@ use std::task::Poll; #[tokio::main] async fn main() -> Result<(), Box> { - let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, - ClientCodec::standard()); + let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, ClientCodec::new()); frames.send(C2S::Connect(Value::from("chat").wrap())).await?; let mut counter: u64 = 0; diff --git a/src/bin/syndicate-server.rs b/src/bin/syndicate-server.rs index 82cafeb..15ebbb3 100644 --- a/src/bin/syndicate-server.rs +++ b/src/bin/syndicate-server.rs @@ -1,6 +1,5 @@ -use syndicate::{config, spaces, packets, ConnId, V, Syndicate}; +use syndicate::{config, spaces, packets, ConnId}; use syndicate::peer::{Peer, ResultC2S}; -use preserves::value; use std::sync::{Mutex, Arc}; use futures::{SinkExt, StreamExt}; @@ -18,62 +17,45 @@ use structopt::StructOpt; // for from_args in main type UnitAsyncResult = Result<(), std::io::Error>; -fn other_eio(e: E) -> std::io::Error { - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) +fn message_error(e: E) -> packets::Error { + packets::Error::Message(e.to_string()) } -fn encode_message(codec: &value::Codec, p: packets::S2C) -> - Result +fn encode_message(p: packets::S2C) -> + Result { - use serde::ser::Serialize; - use preserves::ser::Serializer; let mut bs = Vec::with_capacity(128); - let mut ser: Serializer<_, V, Syndicate> = - Serializer::new(&mut bs, codec.encode_placeholders.as_ref()); - p.serialize(&mut ser)?; + preserves::ser::to_writer(&mut bs, &p)?; Ok(Message::Binary(bs)) } -fn message_encoder(codec: &value::Codec) - -> impl Fn(packets::S2C) -> futures::future::Ready> + '_ +fn message_encoder(p: packets::S2C) -> futures::future::Ready> { - return move |p| futures::future::ready(encode_message(codec, p)); + futures::future::ready(encode_message(p)) } -fn message_decoder(codec: &value::Codec) - -> impl Fn(Result) -> ResultC2S + '_ +fn message_decoder(r: Result) -> ResultC2S { - return move |r| { - loop { - return match r { - Ok(ref m) => match m { - Message::Text(_) => Err(packets::DecodeError::Read( - value::reader::err("Text websocket frames are not accepted"))), - Message::Binary(ref bs) => { - let mut buf = &bs[..]; - let mut vs = codec.decode_all(&mut buf)?; - if vs.len() > 1 { - Err(packets::DecodeError::Read( - std::io::Error::new(std::io::ErrorKind::Other, - "Multiple packets in a single message"))) - } else if vs.len() == 0 { - Err(packets::DecodeError::Read( - std::io::Error::new(std::io::ErrorKind::Other, - "Empty message"))) - } else { - value::from_value(&vs[0]) - .map_err(|e| packets::DecodeError::Parse(e, vs.swap_remove(0))) - } - } - Message::Ping(_) => continue, // pings are handled by tungstenite before we see them - Message::Pong(_) => continue, // unsolicited pongs are to be ignored - Message::Close(_) => Err(packets::DecodeError::Read(value::reader::eof())), - } - Err(tungstenite::Error::Io(e)) => Err(e.into()), - Err(e) => Err(packets::DecodeError::Read(other_eio(e))), + loop { + return match r { + Ok(ref m) => match m { + Message::Text(_) => + Err(preserves::error::syntax_error("Text websocket frames are not accepted")), + Message::Binary(ref bs) => + Ok(preserves::de::from_bytes(bs)?), + Message::Ping(_) => + continue, // pings are handled by tungstenite before we see them + Message::Pong(_) => + continue, // unsolicited pongs are to be ignored + Message::Close(_) => + Err(preserves::error::eof()), } + Err(tungstenite::Error::Io(e)) => + Err(e.into()), + Err(e) => + Err(message_error(e)), } - }; + } } async fn run_connection(connid: ConnId, @@ -91,15 +73,14 @@ async fn run_connection(connid: ConnId, let s = tokio_tungstenite::accept_async(stream).await .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; let (o, i) = s.split(); - let codec = packets::standard_preserves_codec(); - let i = i.map(message_decoder(&codec)); - let o = o.sink_map_err(other_eio).with(message_encoder(&codec)); + let i = i.map(message_decoder); + let o = o.sink_map_err(message_error).with(message_encoder); let mut p = Peer::new(connid, i, o); p.run(spaces, &config).await? }, _ => { info!(protocol = display("raw"), peer = debug(addr)); - let (o, i) = Framed::new(stream, packets::Codec::standard()).split(); + let (o, i) = Framed::new(stream, packets::Codec::new()).split(); let mut p = Peer::new(connid, i, o); p.run(spaces, &config).await? } diff --git a/src/lib.rs b/src/lib.rs index 939e29e..1932afc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,21 +10,21 @@ pub mod spaces; pub use preserves::value; -use std::sync::atomic::{AtomicUsize, Ordering}; - -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub enum Syndicate { - Placeholder(usize), -} - -impl value::Domain for Syndicate {} - -static NEXT_PLACEHOLDER: AtomicUsize = AtomicUsize::new(0); -impl Syndicate { - pub fn new_placeholder() -> Self { - Self::Placeholder(NEXT_PLACEHOLDER.fetch_add(1, Ordering::SeqCst)) - } -} +// use std::sync::atomic::{AtomicUsize, Ordering}; +// +// #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +// pub enum Syndicate { +// Placeholder(usize), +// } +// +// impl value::Domain for Syndicate {} +// +// static NEXT_PLACEHOLDER: AtomicUsize = AtomicUsize::new(0); +// impl Syndicate { +// pub fn new_placeholder() -> Self { +// Self::Placeholder(NEXT_PLACEHOLDER.fetch_add(1, Ordering::SeqCst)) +// } +// } pub type ConnId = u64; -pub type V = value::ArcValue; +pub type V = value::IOValue; // value::ArcValue; diff --git a/src/packets.rs b/src/packets.rs index daa7837..f501c92 100644 --- a/src/packets.rs +++ b/src/packets.rs @@ -1,12 +1,16 @@ use super::V; -use super::Syndicate; use bytes::{Buf, buf::BufMutExt, BytesMut}; -use preserves::{value, ser::Serializer}; -use std::io; use std::sync::Arc; use std::marker::PhantomData; +use preserves::{ + de::Deserializer, + error, + ser::to_writer, + value::reader::from_bytes, +}; + pub type EndpointName = V; pub type Assertion = V; pub type Captures = Arc>; @@ -44,31 +48,9 @@ pub enum S2C { //--------------------------------------------------------------------------- -#[derive(Debug)] -pub enum DecodeError { - Read(io::Error), - Parse(value::de::error::Error, V), -} - -impl From for DecodeError { - fn from(v: io::Error) -> Self { - DecodeError::Read(v.into()) - } -} - -impl std::fmt::Display for DecodeError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{:?}", self) - } -} - -impl std::error::Error for DecodeError { -} - -//--------------------------------------------------------------------------- +pub type Error = error::Error; pub struct Codec { - codec: value::Codec, ph_in: PhantomData, ph_out: PhantomData, } @@ -76,45 +58,25 @@ pub struct Codec { pub type ServerCodec = Codec; pub type ClientCodec = Codec; -pub fn standard_preserves_placeholders() -> value::DecodePlaceholderMap { - let mut m = value::Map::new(); - m.insert(0, value::Value::symbol("Discard")); - m.insert(1, value::Value::symbol("Capture")); - m.insert(2, value::Value::symbol("Observe")); - m -} - -pub fn standard_preserves_codec() -> value::Codec { - value::Codec::new(standard_preserves_placeholders()) -} - impl Codec { - pub fn new(codec: value::Codec) -> Self { - Codec { codec, ph_in: PhantomData, ph_out: PhantomData } - } - - pub fn standard() -> Self { - Self::new(standard_preserves_codec()) + pub fn new() -> Self { + Codec { ph_in: PhantomData, ph_out: PhantomData } } } impl tokio_util::codec::Decoder for Codec { type Item = InT; - type Error = DecodeError; + type Error = Error; fn decode(&mut self, bs: &mut BytesMut) -> Result, Self::Error> { - let mut buf = &bs[..]; - let orig_len = buf.len(); - let mut d = self.codec.decoder(&mut buf); - match d.next() { - None => Ok(None), - Some(res) => { - let v = res?; - let final_len = buf.len(); - bs.advance(orig_len - final_len); - match value::from_value(&v) { - Ok(p) => Ok(Some(p)), - Err(e) => Err(DecodeError::Parse(e, v)) - } + let mut r = from_bytes(bs); + let mut d = Deserializer::from_reader(&mut r); + match Self::Item::deserialize(&mut d) { + Err(e) if error::is_eof_error(&e) => Ok(None), + Err(e) => Err(e), + Ok(item) => { + let count = d.read.source.index; + bs.advance(count); + Ok(Some(item)) } } } @@ -122,11 +84,8 @@ impl tokio_util::codec::Decoder for Code impl tokio_util::codec::Encoder for Codec { - type Error = io::Error; + type Error = Error; fn encode(&mut self, item: OutT, bs: &mut BytesMut) -> Result<(), Self::Error> { - let mut w = bs.writer(); - let mut ser: Serializer<_, V, Syndicate> = Serializer::new(&mut w, self.codec.encode_placeholders.as_ref()); - item.serialize(&mut ser).map_err(|e| std::io::Error::from(e))?; - Ok(()) + to_writer(&mut bs.writer(), &item) } } diff --git a/src/peer.rs b/src/peer.rs index 797dd0a..f193429 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -16,11 +16,11 @@ use tokio::stream::StreamExt; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver, error::TryRecvError}; use tokio::time::interval; -pub type ResultC2S = Result; +pub type ResultC2S = Result; pub struct Peer where I: Stream + Send, - O: Sink, + O: Sink, { id: ConnId, tx: UnboundedSender, @@ -36,7 +36,7 @@ fn err(s: &str, ctx: V) -> packets::S2C { impl Peer where I: Stream + Send, - O: Sink, + O: Sink, { pub fn new(id: ConnId, i: I, o: O) -> Self { let (tx, rx) = unbounded_channel(); @@ -44,15 +44,15 @@ where I: Stream + Send, } pub async fn run(&mut self, spaces: Arc>, config: &config::ServerConfig) -> - Result<(), std::io::Error> + Result<(), packets::Error> { let firstpacket = self.i.next().await; let dsname = if let Some(Ok(packets::C2S::Connect(dsname))) = firstpacket { dsname } else { let e = format!("Expected initial Connect, got {:?}", firstpacket); - self.o.send(err(&e, value::Value::from(false).wrap())).await?; - return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)) + self.o.send(err(&e, value::FALSE.clone())).await?; + return Err(preserves::error::syntax_error(&e)) }; self.space = Some(spaces.lock().unwrap().lookup(&dsname)); @@ -121,21 +121,23 @@ where I: Stream + Send, } } } - Err(packets::DecodeError::Read(e)) => { - if value::is_eof_error(&e) { - tracing::trace!("eof"); - running = false; - } else if value::is_syntax_error(&e) { - to_send.push(err(&e.to_string(), value::Value::from(false).wrap())); - running = false; - } else { - return Err(e) - } - } - Err(packets::DecodeError::Parse(e, v)) => { - to_send.push(err(&format!("Packet deserialization error: {}", e), v)); + Err(e) if preserves::error::is_eof_error(&e) => { + tracing::trace!("eof"); running = false; } + Err(e) if preserves::error::is_syntax_error(&e) => { + to_send.push(err(&e.to_string(), value::FALSE.clone())); + running = false; + } + Err(e) => { + if preserves::error::is_io_error(&e) { + return Err(e); + } else { + to_send.push(err(&format!("Packet deserialization error: {}", e), + value::FALSE.clone())); + running = false; + } + } } None => running = false, }, @@ -162,8 +164,7 @@ where I: Stream + Send, } if !ok { /* weird. */ - to_send.push(err("Outbound channel closed unexpectedly", - value::Value::from(false).wrap())); + to_send.push(err("Outbound channel closed unexpectedly", value::FALSE.clone())); running = false; } }, @@ -184,7 +185,7 @@ where I: Stream + Send, impl Drop for Peer where I: Stream + Send, - O: Sink, + O: Sink, { fn drop(&mut self) { if let Some(ref s) = self.space { diff --git a/src/skeleton.rs b/src/skeleton.rs index 90e6ac2..5cb125c 100644 --- a/src/skeleton.rs +++ b/src/skeleton.rs @@ -1,5 +1,4 @@ use super::ConnId; -use super::Syndicate; use super::bag; use super::packets::Assertion; use super::packets::Captures; @@ -9,7 +8,6 @@ use super::packets::Event; use preserves::value::{Map, Set, Value, NestedValue}; use std::cmp::Ordering; use std::collections::btree_map::Entry; -use std::iter::FromIterator; use std::sync::Arc; type Bag = bag::BTreeBag; @@ -555,47 +553,47 @@ pub fn analyze(a: &Assertion) -> AnalysisResults { } } -pub fn instantiate_assertion(a: &Assertion, cs: Captures) -> CachedAssertion { - let mut capture_paths = Vec::new(); - let mut path = Vec::new(); - let mut vs: Vec = (*cs).clone(); - vs.reverse(); - let instantiated = instantiate_assertion_walk(&mut capture_paths, &mut path, &mut vs, a); - CachedAssertion::VisibilityRestricted(capture_paths, instantiated) -} +// pub fn instantiate_assertion(a: &Assertion, cs: Captures) -> CachedAssertion { +// let mut capture_paths = Vec::new(); +// let mut path = Vec::new(); +// let mut vs: Vec = (*cs).clone(); +// vs.reverse(); +// let instantiated = instantiate_assertion_walk(&mut capture_paths, &mut path, &mut vs, a); +// CachedAssertion::VisibilityRestricted(capture_paths, instantiated) +// } -fn instantiate_assertion_walk(capture_paths: &mut Paths, - path: &mut Path, - vs: &mut Vec, - a: &Assertion) -> Assertion { - if let Some(fields) = a.value().as_simple_record("Capture", Some(1)) { - capture_paths.push(path.clone()); - let v = vs.pop().unwrap(); - instantiate_assertion_walk(capture_paths, path, vs, &fields[0]); - v - } else if a.value().is_simple_record("Discard", Some(0)) { - Value::Domain(Syndicate::new_placeholder()).wrap() - } else { - let f = |(i, aa)| { - path.push(i); - let vv = instantiate_assertion_walk(capture_paths, - path, - vs, - aa); - path.pop(); - vv - }; - match class_of(a) { - Some(Guard::Seq(_)) => - Value::from(Vec::from_iter(a.value().as_sequence().unwrap() - .iter().enumerate().map(f))) - .wrap(), - Some(Guard::Rec(l, fieldcount)) => - Value::record(l, a.value().as_record(Some(fieldcount)).unwrap().1 - .iter().enumerate().map(f).collect()) - .wrap(), - None => - a.clone(), - } - } -} +// fn instantiate_assertion_walk(capture_paths: &mut Paths, +// path: &mut Path, +// vs: &mut Vec, +// a: &Assertion) -> Assertion { +// if let Some(fields) = a.value().as_simple_record("Capture", Some(1)) { +// capture_paths.push(path.clone()); +// let v = vs.pop().unwrap(); +// instantiate_assertion_walk(capture_paths, path, vs, &fields[0]); +// v +// } else if a.value().is_simple_record("Discard", Some(0)) { +// Value::Domain(Syndicate::new_placeholder()).wrap() +// } else { +// let f = |(i, aa)| { +// path.push(i); +// let vv = instantiate_assertion_walk(capture_paths, +// path, +// vs, +// aa); +// path.pop(); +// vv +// }; +// match class_of(a) { +// Some(Guard::Seq(_)) => +// Value::from(Vec::from_iter(a.value().as_sequence().unwrap() +// .iter().enumerate().map(f))) +// .wrap(), +// Some(Guard::Rec(l, fieldcount)) => +// Value::record(l, a.value().as_record(Some(fieldcount)).unwrap().1 +// .iter().enumerate().map(f).collect()) +// .wrap(), +// None => +// a.clone(), +// } +// } +// }