Start to kick main.rs into being a syndicate-server-shaped object

This commit is contained in:
Tony Garnock-Jones 2019-10-15 16:58:52 +01:00
parent 4399f17db4
commit 82252cf4e5
1 changed files with 103 additions and 69 deletions

View File

@ -4,9 +4,7 @@ mod bag;
mod skeleton; mod skeleton;
use bytes::BytesMut; use bytes::BytesMut;
use preserves::value::{self, NestedValue}; use preserves::value::{self, Map};
use std::collections::BTreeMap;
use std::sync::Arc;
use tokio::prelude::*; use tokio::prelude::*;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::{channel, Sender, Receiver}; use tokio::sync::mpsc::{channel, Sender, Receiver};
@ -21,21 +19,31 @@ type ConnId = u64;
type V = value::ArcValue; type V = value::ArcValue;
mod packets { mod packets {
use super::V;
#[derive(Debug, serde::Serialize, serde::Deserialize)] #[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Error(pub String); pub enum Action {
Assert(V, V),
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum In {
Connect(V),
Turn(Vec<Action>),
Ping(),
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub enum Out {
Err(String),
Pong(),
}
} }
#[derive(Debug)] #[derive(Debug)]
pub enum RelayMessage { pub enum DataspaceMessage {
Hello(ConnId, Sender<Arc<PeerMessage>>), Join(ConnId, Sender<packets::Out>),
Speak(ConnId, V), Input(ConnId, packets::In),
Goodbye(ConnId),
}
#[derive(Debug, Clone, serde::Serialize)]
pub enum PeerMessage {
Join(ConnId),
Speak(ConnId, V),
Leave(ConnId), Leave(ConnId),
} }
@ -77,17 +85,23 @@ impl Encoder for ValueCodec {
struct Peer { struct Peer {
id: ConnId, id: ConnId,
rx: Receiver<Arc<PeerMessage>>, rx: Receiver<packets::Out>,
relay: Sender<RelayMessage>, dataspace: Sender<DataspaceMessage>,
frames: Framed<TcpStream, ValueCodec>, frames: Framed<TcpStream, ValueCodec>,
} }
impl Peer { impl Peer {
async fn new(id: ConnId, mut relay: Sender<RelayMessage>, stream: TcpStream) -> Self { async fn new(id: ConnId, mut dataspace: Sender<DataspaceMessage>, stream: TcpStream) -> Self {
let (tx, rx) = channel(1); let (tx, rx) = channel(1);
let frames = Framed::new(stream, ValueCodec::new(value::Codec::without_placeholders())); let frames = Framed::new(stream, ValueCodec::new(value::Codec::new({
relay.send(RelayMessage::Hello(id, tx)).await.unwrap(); let mut m = Map::new();
Peer{ id, rx, relay, frames } m.insert(0, value::Value::symbol("Discard"));
m.insert(1, value::Value::symbol("Capture"));
m.insert(2, value::Value::symbol("Observe"));
m
})));
dataspace.send(DataspaceMessage::Join(id, tx)).await.unwrap();
Peer{ id, rx, dataspace, frames }
} }
async fn run(&mut self) -> Result<(), std::io::Error> { async fn run(&mut self) -> Result<(), std::io::Error> {
@ -99,31 +113,34 @@ impl Peer {
frame = self.frames.next().boxed().fuse() => match frame { frame = self.frames.next().boxed().fuse() => match frame {
Some(res) => match res { Some(res) => match res {
Ok(v) => { Ok(v) => {
if (v.value().as_symbol() == Some(&"die".to_string())) { println!("Input {}: {:?}", self.id, &v);
panic!(); let p = value::from_value(&v).unwrap();
} else { self.dataspace.send(DataspaceMessage::Input(self.id, p)).await.unwrap()
self.relay.send(RelayMessage::Speak(self.id, v)).await.unwrap()
}
} }
Err(value::codec::Error::Eof) => running = false, Err(value::codec::Error::Eof) => running = false,
Err(value::codec::Error::Io(e)) => return Err(e), Err(value::codec::Error::Io(e)) => return Err(e),
Err(value::codec::Error::Syntax(s)) => { Err(value::codec::Error::Syntax(s)) => {
let v = value::to_value(packets::Error(s.to_string())).unwrap(); let v = value::to_value(packets::Out::Err(s.to_string())).unwrap();
to_send.push(v); to_send.push(v);
println!("Connection {} crashed with Preserves syntax error {:?}",
self.id,
s);
running = false; running = false;
} }
} }
None => running = false, None => running = false,
}, },
msgopt = self.rx.recv().boxed().fuse() => { msgopt = self.rx.recv().boxed().fuse() => {
println!("MSGOPT {:?}", &msgopt);
match msgopt { match msgopt {
Some(msg) => to_send.push(value::to_value(&*msg).unwrap()), Some(msg) => to_send.push(value::to_value(msg).unwrap()),
None => /* weird. */ running = false, None => /* weird. */ running = false,
} }
}, },
} }
for v in to_send { self.frames.send(v).await?; } for v in to_send {
println!("Output {}: {:?}", self.id, &v);
self.frames.send(v).await?;
}
} }
Ok(()) Ok(())
} }
@ -131,71 +148,86 @@ impl Peer {
impl Drop for Peer { impl Drop for Peer {
fn drop(&mut self) { fn drop(&mut self) {
let mut relay = self.relay.clone(); let mut dataspace = self.dataspace.clone();
let id = self.id; let id = self.id;
tokio::spawn(async move { tokio::spawn(async move {
let _ = relay.send(RelayMessage::Goodbye(id)).await; let _ = dataspace.send(DataspaceMessage::Leave(id)).await;
}); });
} }
} }
struct Relay { struct Dataspace {
rx: Receiver<RelayMessage>, rx: Receiver<DataspaceMessage>,
peers: BTreeMap<ConnId, Sender<Arc<PeerMessage>>>, peers: Map<ConnId, Sender<packets::Out>>,
pending: Vec<PeerMessage>,
} }
impl Relay { impl Dataspace {
fn new(rx: Receiver<RelayMessage>) -> Self { fn new(rx: Receiver<DataspaceMessage>) -> Self {
Relay { rx, peers: BTreeMap::new(), pending: Vec::new() } Dataspace { rx, peers: Map::new() }
} }
async fn send(&mut self, i: ConnId, s: &mut Sender<Arc<PeerMessage>>, m: &Arc<PeerMessage>) async fn send(&mut self, i: ConnId, s: &mut Sender<packets::Out>, m: &packets::Out)
-> bool -> bool
{ {
match s.send(Arc::clone(m)).await { match s.send(m.clone()).await {
Ok(_) => true, Ok(_) => true,
Err(_) => { self.remove(i); false } Err(_) => { self.remove(i); false }
} }
} }
fn remove(&mut self, i: ConnId) { async fn send_to(&mut self, i: ConnId, m: &packets::Out) -> bool {
self.peers.remove(&i); let mut ms = self.peers.get(&i).map(|s| s.clone());
self.pending.push(PeerMessage::Leave(i)); match ms {
} Some(ref mut s) => self.send(i, s, m).await,
None => false,
async fn broadcast(&mut self, m: &Arc<PeerMessage>) {
for (i, ref mut s) in self.peers.clone() {
self.send(i, s, m).await;
} }
} }
fn remove(&mut self, i: ConnId) {
self.peers.remove(&i);
// TODO: cleanup. Previously, this was:
// self.pending.push(PeerMessage::Leave(i));
}
// async fn broadcast(&mut self, m: &Arc<PeerMessage>) {
// for (i, ref mut s) in self.peers.clone() {
// self.send(i, s, m).await;
// }
// }
async fn run(&mut self) { async fn run(&mut self) {
loop { loop {
println!("Relay waiting for message ({} connected)", self.peers.len()); println!("Dataspace waiting for message ({} connected)", self.peers.len());
let msg = self.rx.recv().await.unwrap(); let msg = self.rx.recv().await.unwrap();
println!("Relay: {:?}", msg); println!("Dataspace: {:?}", msg);
match msg { match msg {
RelayMessage::Hello(i, mut s) => { DataspaceMessage::Join(i, s) => {
let mut ok = true; // let mut ok = true;
let i_join = &Arc::new(PeerMessage::Join(i)); // let i_join = &Arc::new(PeerMessage::Join(i));
for (p, ref mut r) in self.peers.clone() { // for (p, ref mut r) in self.peers.clone() {
ok = ok && self.send(i, &mut s, &Arc::new(PeerMessage::Join(p))).await; // ok = ok && self.send(i, &mut s, &Arc::new(PeerMessage::Join(p))).await;
self.send(p, r, i_join).await; // self.send(p, r, i_join).await;
} // }
ok = ok && self.send(i, &mut s, i_join).await; // ok = ok && self.send(i, &mut s, i_join).await;
if ok { // if ok {
self.peers.insert(i, s); // self.peers.insert(i, s);
} // }
self.peers.insert(i, s);
} }
RelayMessage::Speak(i, v) => { DataspaceMessage::Input(i, p) => {
self.broadcast(&Arc::new(PeerMessage::Speak(i, v))).await; match p {
packets::In::Connect(dsname) => (),
packets::In::Turn(actions) => (),
packets::In::Ping() => { self.send_to(i, &packets::Out::Pong()).await; }
}
// self.broadcast(&Arc::new(PeerMessage::Speak(i, v))).await;
} }
RelayMessage::Goodbye(i) => self.remove(i), DataspaceMessage::Leave(i) => self.remove(i),
}
while let Some(m) = self.pending.pop() {
self.broadcast(&Arc::new(m)).await;
} }
// while let Some(m) = self.pending.pop() {
// self.broadcast(&Arc::new(m)).await;
// }
} }
} }
} }
@ -207,12 +239,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Unlike std channels, a zero buffer is not supported // Unlike std channels, a zero buffer is not supported
let (tx, rx) = channel(100); // but ugh a big buffer is needed to avoid deadlocks??? let (tx, rx) = channel(100); // but ugh a big buffer is needed to avoid deadlocks???
tokio::spawn(async { tokio::spawn(async {
Relay::new(rx).run().await; Dataspace::new(rx).run().await;
}); });
let mut id = 0; let mut id = 0;
let mut listener = TcpListener::bind("0.0.0.0:5889").await?; let port = 8001;
let mut listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?;
println!("Listening on port {}", port);
loop { loop {
let (stream, addr) = listener.accept().await?; let (stream, addr) = listener.accept().await?;
let tx = tx.clone(); let tx = tx.clone();