From 82252cf4e53e7ebf3e01792dd00f9fabcdd5b4aa Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 15 Oct 2019 16:58:52 +0100 Subject: [PATCH] Start to kick main.rs into being a syndicate-server-shaped object --- src/main.rs | 172 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 103 insertions(+), 69 deletions(-) diff --git a/src/main.rs b/src/main.rs index fe229dd..c93a42b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,9 +4,7 @@ mod bag; mod skeleton; use bytes::BytesMut; -use preserves::value::{self, NestedValue}; -use std::collections::BTreeMap; -use std::sync::Arc; +use preserves::value::{self, Map}; use tokio::prelude::*; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc::{channel, Sender, Receiver}; @@ -21,21 +19,31 @@ type ConnId = u64; type V = value::ArcValue; mod packets { + use super::V; + #[derive(Debug, serde::Serialize, serde::Deserialize)] - pub struct Error(pub String); + pub enum Action { + Assert(V, V), + } + + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub enum In { + Connect(V), + Turn(Vec), + Ping(), + } + + #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] + pub enum Out { + Err(String), + Pong(), + } } #[derive(Debug)] -pub enum RelayMessage { - Hello(ConnId, Sender>), - Speak(ConnId, V), - Goodbye(ConnId), -} - -#[derive(Debug, Clone, serde::Serialize)] -pub enum PeerMessage { - Join(ConnId), - Speak(ConnId, V), +pub enum DataspaceMessage { + Join(ConnId, Sender), + Input(ConnId, packets::In), Leave(ConnId), } @@ -77,17 +85,23 @@ impl Encoder for ValueCodec { struct Peer { id: ConnId, - rx: Receiver>, - relay: Sender, + rx: Receiver, + dataspace: Sender, frames: Framed, } impl Peer { - async fn new(id: ConnId, mut relay: Sender, stream: TcpStream) -> Self { + async fn new(id: ConnId, mut dataspace: Sender, stream: TcpStream) -> Self { let (tx, rx) = channel(1); - let frames = Framed::new(stream, ValueCodec::new(value::Codec::without_placeholders())); - relay.send(RelayMessage::Hello(id, tx)).await.unwrap(); - Peer{ id, rx, relay, frames } + let frames = Framed::new(stream, ValueCodec::new(value::Codec::new({ + let mut m = Map::new(); + m.insert(0, value::Value::symbol("Discard")); + m.insert(1, value::Value::symbol("Capture")); + m.insert(2, value::Value::symbol("Observe")); + m + }))); + dataspace.send(DataspaceMessage::Join(id, tx)).await.unwrap(); + Peer{ id, rx, dataspace, frames } } async fn run(&mut self) -> Result<(), std::io::Error> { @@ -99,31 +113,34 @@ impl Peer { frame = self.frames.next().boxed().fuse() => match frame { Some(res) => match res { Ok(v) => { - if (v.value().as_symbol() == Some(&"die".to_string())) { - panic!(); - } else { - self.relay.send(RelayMessage::Speak(self.id, v)).await.unwrap() - } + println!("Input {}: {:?}", self.id, &v); + let p = value::from_value(&v).unwrap(); + self.dataspace.send(DataspaceMessage::Input(self.id, p)).await.unwrap() } Err(value::codec::Error::Eof) => running = false, Err(value::codec::Error::Io(e)) => return Err(e), Err(value::codec::Error::Syntax(s)) => { - let v = value::to_value(packets::Error(s.to_string())).unwrap(); + let v = value::to_value(packets::Out::Err(s.to_string())).unwrap(); to_send.push(v); + println!("Connection {} crashed with Preserves syntax error {:?}", + self.id, + s); running = false; } } None => running = false, }, msgopt = self.rx.recv().boxed().fuse() => { - println!("MSGOPT {:?}", &msgopt); match msgopt { - Some(msg) => to_send.push(value::to_value(&*msg).unwrap()), + Some(msg) => to_send.push(value::to_value(msg).unwrap()), None => /* weird. */ running = false, } }, } - for v in to_send { self.frames.send(v).await?; } + for v in to_send { + println!("Output {}: {:?}", self.id, &v); + self.frames.send(v).await?; + } } Ok(()) } @@ -131,71 +148,86 @@ impl Peer { impl Drop for Peer { fn drop(&mut self) { - let mut relay = self.relay.clone(); + let mut dataspace = self.dataspace.clone(); let id = self.id; tokio::spawn(async move { - let _ = relay.send(RelayMessage::Goodbye(id)).await; + let _ = dataspace.send(DataspaceMessage::Leave(id)).await; }); } } -struct Relay { - rx: Receiver, - peers: BTreeMap>>, - pending: Vec, +struct Dataspace { + rx: Receiver, + peers: Map>, } -impl Relay { - fn new(rx: Receiver) -> Self { - Relay { rx, peers: BTreeMap::new(), pending: Vec::new() } +impl Dataspace { + fn new(rx: Receiver) -> Self { + Dataspace { rx, peers: Map::new() } } - async fn send(&mut self, i: ConnId, s: &mut Sender>, m: &Arc) + async fn send(&mut self, i: ConnId, s: &mut Sender, m: &packets::Out) -> bool { - match s.send(Arc::clone(m)).await { + match s.send(m.clone()).await { Ok(_) => true, Err(_) => { self.remove(i); false } } } - fn remove(&mut self, i: ConnId) { - self.peers.remove(&i); - self.pending.push(PeerMessage::Leave(i)); - } - - async fn broadcast(&mut self, m: &Arc) { - for (i, ref mut s) in self.peers.clone() { - self.send(i, s, m).await; + async fn send_to(&mut self, i: ConnId, m: &packets::Out) -> bool { + let mut ms = self.peers.get(&i).map(|s| s.clone()); + match ms { + Some(ref mut s) => self.send(i, s, m).await, + None => false, } } + fn remove(&mut self, i: ConnId) { + self.peers.remove(&i); + // TODO: cleanup. Previously, this was: + // self.pending.push(PeerMessage::Leave(i)); + } + + // async fn broadcast(&mut self, m: &Arc) { + // for (i, ref mut s) in self.peers.clone() { + // self.send(i, s, m).await; + // } + // } + async fn run(&mut self) { loop { - println!("Relay waiting for message ({} connected)", self.peers.len()); + println!("Dataspace waiting for message ({} connected)", self.peers.len()); let msg = self.rx.recv().await.unwrap(); - println!("Relay: {:?}", msg); + println!("Dataspace: {:?}", msg); match msg { - RelayMessage::Hello(i, mut s) => { - let mut ok = true; - let i_join = &Arc::new(PeerMessage::Join(i)); - for (p, ref mut r) in self.peers.clone() { - ok = ok && self.send(i, &mut s, &Arc::new(PeerMessage::Join(p))).await; - self.send(p, r, i_join).await; - } - ok = ok && self.send(i, &mut s, i_join).await; - if ok { - self.peers.insert(i, s); - } + DataspaceMessage::Join(i, s) => { + // let mut ok = true; + // let i_join = &Arc::new(PeerMessage::Join(i)); + // for (p, ref mut r) in self.peers.clone() { + // ok = ok && self.send(i, &mut s, &Arc::new(PeerMessage::Join(p))).await; + // self.send(p, r, i_join).await; + // } + // ok = ok && self.send(i, &mut s, i_join).await; + // if ok { + // self.peers.insert(i, s); + // } + + self.peers.insert(i, s); } - RelayMessage::Speak(i, v) => { - self.broadcast(&Arc::new(PeerMessage::Speak(i, v))).await; + DataspaceMessage::Input(i, p) => { + match p { + packets::In::Connect(dsname) => (), + packets::In::Turn(actions) => (), + packets::In::Ping() => { self.send_to(i, &packets::Out::Pong()).await; } + } + // self.broadcast(&Arc::new(PeerMessage::Speak(i, v))).await; } - RelayMessage::Goodbye(i) => self.remove(i), - } - while let Some(m) = self.pending.pop() { - self.broadcast(&Arc::new(m)).await; + DataspaceMessage::Leave(i) => self.remove(i), } + // while let Some(m) = self.pending.pop() { + // self.broadcast(&Arc::new(m)).await; + // } } } } @@ -207,12 +239,14 @@ async fn main() -> Result<(), Box> { // Unlike std channels, a zero buffer is not supported let (tx, rx) = channel(100); // but ugh a big buffer is needed to avoid deadlocks??? tokio::spawn(async { - Relay::new(rx).run().await; + Dataspace::new(rx).run().await; }); let mut id = 0; - let mut listener = TcpListener::bind("0.0.0.0:5889").await?; + let port = 8001; + let mut listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?; + println!("Listening on port {}", port); loop { let (stream, addr) = listener.accept().await?; let tx = tx.clone();