From 9e256376a147b35f51a69c7c9c68345e74c24fe0 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 15 Oct 2019 19:49:06 +0100 Subject: [PATCH] More --- src/main.rs | 69 ++++++++++++++++++++++++++++------------------------- 1 file changed, 37 insertions(+), 32 deletions(-) diff --git a/src/main.rs b/src/main.rs index c93a42b..3d0ede5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,9 +21,23 @@ type V = value::ArcValue; mod packets { use super::V; + pub type EndpointName = V; + pub type Assertion = V; + pub type Captures = Vec; + #[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum Action { - Assert(V, V), + Assert(EndpointName, Assertion), + Clear(EndpointName), + Message(Assertion), + } + + #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] + pub enum Event { + Add(EndpointName, Captures), + Del(EndpointName, Captures), + Msg(EndpointName, Captures), + End(EndpointName), } #[derive(Debug, serde::Serialize, serde::Deserialize)] @@ -31,11 +45,14 @@ mod packets { Connect(V), Turn(Vec), Ping(), + Pong(), } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum Out { Err(String), + Turn(Vec), + Ping(), Pong(), } } @@ -114,17 +131,21 @@ impl Peer { Some(res) => match res { Ok(v) => { println!("Input {}: {:?}", self.id, &v); - let p = value::from_value(&v).unwrap(); - self.dataspace.send(DataspaceMessage::Input(self.id, p)).await.unwrap() + match value::from_value(&v) { + Ok(p) => { + let m = DataspaceMessage::Input(self.id, p); + self.dataspace.send(m).await.unwrap() + } + Err(e) => { + to_send.push(packets::Out::Err(format!("{:?}", e))); + running = false; + } + } } Err(value::codec::Error::Eof) => running = false, Err(value::codec::Error::Io(e)) => return Err(e), Err(value::codec::Error::Syntax(s)) => { - let v = value::to_value(packets::Out::Err(s.to_string())).unwrap(); - to_send.push(v); - println!("Connection {} crashed with Preserves syntax error {:?}", - self.id, - s); + to_send.push(packets::Out::Err(s.to_string())); running = false; } } @@ -132,14 +153,18 @@ impl Peer { }, msgopt = self.rx.recv().boxed().fuse() => { match msgopt { - Some(msg) => to_send.push(value::to_value(msg).unwrap()), + Some(msg) => to_send.push(msg), None => /* weird. */ running = false, } }, } for v in to_send { - println!("Output {}: {:?}", self.id, &v); - self.frames.send(v).await?; + if let packets::Out::Err(ref msg) = v { + println!("Connection {} crashed with error {:?}", self.id, msg); + } else { + println!("Output {}: {:?}", self.id, &v); + } + self.frames.send(value::to_value(v).unwrap()).await?; } } Ok(()) @@ -189,12 +214,6 @@ impl Dataspace { // self.pending.push(PeerMessage::Leave(i)); } - // async fn broadcast(&mut self, m: &Arc) { - // for (i, ref mut s) in self.peers.clone() { - // self.send(i, s, m).await; - // } - // } - async fn run(&mut self) { loop { println!("Dataspace waiting for message ({} connected)", self.peers.len()); @@ -202,17 +221,6 @@ impl Dataspace { println!("Dataspace: {:?}", msg); match msg { DataspaceMessage::Join(i, s) => { - // let mut ok = true; - // let i_join = &Arc::new(PeerMessage::Join(i)); - // for (p, ref mut r) in self.peers.clone() { - // ok = ok && self.send(i, &mut s, &Arc::new(PeerMessage::Join(p))).await; - // self.send(p, r, i_join).await; - // } - // ok = ok && self.send(i, &mut s, i_join).await; - // if ok { - // self.peers.insert(i, s); - // } - self.peers.insert(i, s); } DataspaceMessage::Input(i, p) => { @@ -220,14 +228,11 @@ impl Dataspace { packets::In::Connect(dsname) => (), packets::In::Turn(actions) => (), packets::In::Ping() => { self.send_to(i, &packets::Out::Pong()).await; } + packets::In::Pong() => (), } - // self.broadcast(&Arc::new(PeerMessage::Speak(i, v))).await; } DataspaceMessage::Leave(i) => self.remove(i), } - // while let Some(m) = self.pending.pop() { - // self.broadcast(&Arc::new(m)).await; - // } } } }