diff --git a/Cargo.toml b/Cargo.toml index 91c57f2..de7bb01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Tony Garnock-Jones "] edition = "2018" [dependencies] -preserves = "0.2.0" +preserves = "0.2.1" serde = { version = "1.0", features = ["derive"] } serde_bytes = "0.11" diff --git a/src/main.rs b/src/main.rs index 3d0ede5..357d3d7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -#![recursion_limit="256"] +#![recursion_limit="512"] mod bag; mod skeleton; @@ -7,9 +7,10 @@ use bytes::BytesMut; use preserves::value::{self, Map}; use tokio::prelude::*; use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::mpsc::{channel, Sender, Receiver}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; use tokio::codec::{Framed, Encoder, Decoder}; use futures::select; +use std::io; // use self::skeleton::Index; @@ -58,25 +59,64 @@ mod packets { } #[derive(Debug)] -pub enum DataspaceMessage { - Join(ConnId, Sender), - Input(ConnId, packets::In), - Leave(ConnId), +enum PacketDecodeError { + Read(value::decoder::Error), + Parse(value::error::Error), } -struct ValueCodec { - codec: value::Codec, -} - -impl ValueCodec { - fn new(codec: value::Codec) -> Self { - ValueCodec { codec } +impl From for PacketDecodeError { + fn from(v: io::Error) -> Self { + PacketDecodeError::Read(v.into()) } } -impl Decoder for ValueCodec { - type Item = V; - type Error = value::decoder::Error; +impl From for PacketDecodeError { + fn from(v: value::error::Error) -> Self { + PacketDecodeError::Parse(v) + } +} + +#[derive(Debug)] +enum PacketEncodeError { + Write(value::encoder::Error), + Unparse(value::error::Error), +} + +impl From for PacketEncodeError { + fn from(v: io::Error) -> Self { + PacketEncodeError::Write(v.into()) + } +} + +impl From for PacketEncodeError { + fn from(v: value::error::Error) -> Self { + PacketEncodeError::Unparse(v) + } +} + +impl From for io::Error { + fn from(v: PacketEncodeError) -> Self { + match v { + PacketEncodeError::Write(e) => e, + PacketEncodeError::Unparse(e) => + Self::new(io::ErrorKind::InvalidData, format!("{:?}", e)), + } + } +} + +struct PacketCodec { + codec: value::Codec, +} + +impl PacketCodec { + fn new(codec: value::Codec) -> Self { + PacketCodec { codec } + } +} + +impl Decoder for PacketCodec { + type Item = packets::In; + type Error = PacketDecodeError; fn decode(&mut self, bs: &mut BytesMut) -> Result, Self::Error> { let mut buf = &bs[..]; let orig_len = buf.len(); @@ -84,68 +124,90 @@ impl Decoder for ValueCodec { let final_len = buf.len(); bs.advance(orig_len - final_len); match res { - Ok(v) => Ok(Some(v)), - Err(value::codec::Error::Eof) => Ok(None), - Err(e) => Err(e), + Ok(v) => Ok(Some(value::from_value(&v)?)), + Err(value::decoder::Error::Eof) => Ok(None), + Err(e) => Err(PacketDecodeError::Read(e)), } } } -impl Encoder for ValueCodec { - type Item = V; - type Error = value::encoder::Error; +impl Encoder for PacketCodec { + type Item = packets::Out; + type Error = PacketEncodeError; fn encode(&mut self, item: Self::Item, bs: &mut BytesMut) -> Result<(), Self::Error> { - bs.extend(self.codec.encode_bytes(&item)?); + let v: V = value::to_value(&item)?; + bs.extend(self.codec.encode_bytes(&v)?); Ok(()) } } +fn err(s: &str) -> packets::Out { + packets::Out::Err(s.into()) +} + struct Peer { id: ConnId, - rx: Receiver, - dataspace: Sender, - frames: Framed, + tx: UnboundedSender, + rx: UnboundedReceiver, + frames: Framed, } impl Peer { - async fn new(id: ConnId, mut dataspace: Sender, stream: TcpStream) -> Self { - let (tx, rx) = channel(1); - let frames = Framed::new(stream, ValueCodec::new(value::Codec::new({ + async fn new(id: ConnId, stream: TcpStream) -> Self { + let (tx, rx) = unbounded_channel(); + let frames = Framed::new(stream, PacketCodec::new(value::Codec::new({ let mut m = Map::new(); m.insert(0, value::Value::symbol("Discard")); m.insert(1, value::Value::symbol("Capture")); m.insert(2, value::Value::symbol("Observe")); m }))); - dataspace.send(DataspaceMessage::Join(id, tx)).await.unwrap(); - Peer{ id, rx, dataspace, frames } + Peer{ id, tx, rx, frames } } - async fn run(&mut self) -> Result<(), std::io::Error> { + async fn run(&mut self) -> Result<(), io::Error> { println!("Got {:?} {:?}", self.id, &self.frames.get_ref()); + + let firstpacket = self.frames.next().await; + let dsname = if let Some(Ok(packets::In::Connect(dsname))) = firstpacket { + dsname + } else { + let e: String = format!("Expected initial Connect, got {:?}", firstpacket); + println!("{}", e); + self.frames.send(err(&e)).await?; + return Ok(()) + }; + + println!("{}: connected to dataspace {:?}", self.id, dsname); + let mut running = true; while running { let mut to_send = Vec::new(); select! { frame = self.frames.next().boxed().fuse() => match frame { Some(res) => match res { - Ok(v) => { - println!("Input {}: {:?}", self.id, &v); - match value::from_value(&v) { - Ok(p) => { - let m = DataspaceMessage::Input(self.id, p); - self.dataspace.send(m).await.unwrap() + Ok(p) => { + println!("Input {}: {:?}", self.id, &p); + match p { + packets::In::Turn(actions) => (), + packets::In::Ping() => { + to_send.push(packets::Out::Pong()) } - Err(e) => { - to_send.push(packets::Out::Err(format!("{:?}", e))); + packets::In::Pong() => (), + packets::In::Connect(dsname) => { + to_send.push(err("Unexpected Connect")); running = false; } } } - Err(value::codec::Error::Eof) => running = false, - Err(value::codec::Error::Io(e)) => return Err(e), - Err(value::codec::Error::Syntax(s)) => { - to_send.push(packets::Out::Err(s.to_string())); + Err(PacketDecodeError::Read(value::decoder::Error::Eof)) => running = false, + Err(PacketDecodeError::Read(value::decoder::Error::Io(e))) => return Err(e), + Err(PacketDecodeError::Read(value::decoder::Error::Syntax(s))) => { + to_send.push(err(s)); + running = false; + } + Err(PacketDecodeError::Parse(e)) => { + to_send.push(err(&format!("Packet deserialization error: {:?}", e))); running = false; } } @@ -154,7 +216,11 @@ impl Peer { msgopt = self.rx.recv().boxed().fuse() => { match msgopt { Some(msg) => to_send.push(msg), - None => /* weird. */ running = false, + None => { + /* weird. */ + to_send.push(err("Outbound channel closed unexpectedly")); + running = false; + } } }, } @@ -164,7 +230,7 @@ impl Peer { } else { println!("Output {}: {:?}", self.id, &v); } - self.frames.send(value::to_value(v).unwrap()).await?; + self.frames.send(v).await?; } } Ok(()) @@ -173,67 +239,6 @@ impl Peer { impl Drop for Peer { fn drop(&mut self) { - let mut dataspace = self.dataspace.clone(); - let id = self.id; - tokio::spawn(async move { - let _ = dataspace.send(DataspaceMessage::Leave(id)).await; - }); - } -} - -struct Dataspace { - rx: Receiver, - peers: Map>, -} - -impl Dataspace { - fn new(rx: Receiver) -> Self { - Dataspace { rx, peers: Map::new() } - } - - async fn send(&mut self, i: ConnId, s: &mut Sender, m: &packets::Out) - -> bool - { - match s.send(m.clone()).await { - Ok(_) => true, - Err(_) => { self.remove(i); false } - } - } - - async fn send_to(&mut self, i: ConnId, m: &packets::Out) -> bool { - let mut ms = self.peers.get(&i).map(|s| s.clone()); - match ms { - Some(ref mut s) => self.send(i, s, m).await, - None => false, - } - } - - fn remove(&mut self, i: ConnId) { - self.peers.remove(&i); - // TODO: cleanup. Previously, this was: - // self.pending.push(PeerMessage::Leave(i)); - } - - async fn run(&mut self) { - loop { - println!("Dataspace waiting for message ({} connected)", self.peers.len()); - let msg = self.rx.recv().await.unwrap(); - println!("Dataspace: {:?}", msg); - match msg { - DataspaceMessage::Join(i, s) => { - self.peers.insert(i, s); - } - DataspaceMessage::Input(i, p) => { - match p { - packets::In::Connect(dsname) => (), - packets::In::Turn(actions) => (), - packets::In::Ping() => { self.send_to(i, &packets::Out::Pong()).await; } - packets::In::Pong() => (), - } - } - DataspaceMessage::Leave(i) => self.remove(i), - } - } } } @@ -241,12 +246,6 @@ impl Dataspace { async fn main() -> Result<(), Box> { // let i = Index::new(); - // Unlike std channels, a zero buffer is not supported - let (tx, rx) = channel(100); // but ugh a big buffer is needed to avoid deadlocks??? - tokio::spawn(async { - Dataspace::new(rx).run().await; - }); - let mut id = 0; let port = 8001; @@ -254,11 +253,10 @@ async fn main() -> Result<(), Box> { println!("Listening on port {}", port); loop { let (stream, addr) = listener.accept().await?; - let tx = tx.clone(); let connid = id; id = id + 1; tokio::spawn(async move { - match Peer::new(connid, tx, stream).await.run().await { + match Peer::new(connid, stream).await.run().await { Ok(_) => (), Err(e) => println!("Connection {:?} died with {:?}", addr, e), }