diff --git a/src/main.rs b/src/main.rs index 27d517e..804330c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,141 +3,17 @@ mod bag; mod skeleton; mod packets; +mod peer; -use core::time::Duration; -use futures::select; use preserves::value::{self, Map}; -use std::sync::{Mutex, RwLock, Arc}; -use tokio::codec::Framed; -use tokio::net::{TcpListener, TcpStream}; -use tokio::prelude::*; -use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; -use tokio::timer::Interval; +use std::sync::{Mutex, Arc}; +use tokio::net::TcpListener; // use self::skeleton::Index; -type ConnId = u64; - +pub type ConnId = u64; pub type V = value::ArcValue; -fn err(s: &str) -> packets::Out { - packets::Out::Err(s.into()) -} - -struct Peer { - id: ConnId, - tx: UnboundedSender, - rx: UnboundedReceiver, - frames: Framed, -} - -impl Peer { - async fn new(id: ConnId, stream: TcpStream) -> Self { - let (tx, rx) = unbounded_channel(); - let frames = Framed::new(stream, packets::Codec::new(value::Codec::new({ - let mut m = Map::new(); - m.insert(0, value::Value::symbol("Discard")); - m.insert(1, value::Value::symbol("Capture")); - m.insert(2, value::Value::symbol("Observe")); - m - }))); - Peer{ id, tx, rx, frames } - } - - async fn run(&mut self, spaces: Arc>>) -> Result<(), std::io::Error> { - println!("{:?}: got {:?}", self.id, &self.frames.get_ref()); - - let firstpacket = self.frames.next().await; - let dsname = if let Some(Ok(packets::In::Connect(dsname))) = firstpacket { - dsname - } else { - let e: String = format!("Expected initial Connect, got {:?}", firstpacket); - println!("{:?}: {}", self.id, e); - self.frames.send(err(&e)).await?; - return Ok(()) - }; - - let is_new = { - let mut s = spaces.lock().unwrap(); - match s.get(&dsname) { - Some(_) => false, - None => { - s.insert(dsname.clone(), ()); - true - } - } - }; - - println!("{:?}: connected to {} dataspace {:?}", - self.id, - if is_new { "new" } else { "existing" }, - dsname); - - let mut ping_timer = Interval::new_interval(Duration::from_secs(60)); - - let mut running = true; - while running { - let mut to_send = Vec::new(); - select! { - _instant = ping_timer.next().boxed().fuse() => to_send.push(packets::Out::Ping()), - frame = self.frames.next().boxed().fuse() => match frame { - Some(res) => match res { - Ok(p) => { - println!("{:?}: input {:?}", self.id, &p); - match p { - packets::In::Turn(actions) => (), - packets::In::Ping() => { - to_send.push(packets::Out::Pong()) - } - packets::In::Pong() => (), - packets::In::Connect(dsname) => { - to_send.push(err("Unexpected Connect")); - running = false; - } - } - } - Err(packets::DecodeError::Read(value::decoder::Error::Eof)) => running = false, - Err(packets::DecodeError::Read(value::decoder::Error::Io(e))) => return Err(e), - Err(packets::DecodeError::Read(value::decoder::Error::Syntax(s))) => { - to_send.push(err(s)); - running = false; - } - Err(packets::DecodeError::Parse(e)) => { - to_send.push(err(&format!("Packet deserialization error: {:?}", e))); - running = false; - } - } - None => running = false, - }, - msgopt = self.rx.recv().boxed().fuse() => { - match msgopt { - Some(msg) => to_send.push(msg), - None => { - /* weird. */ - to_send.push(err("Outbound channel closed unexpectedly")); - running = false; - } - } - }, - } - for v in to_send { - if let packets::Out::Err(ref msg) = v { - println!("{:?}: Connection crashed with error {:?}", self.id, msg); - } else { - println!("{:?}: Output {:?}", self.id, &v); - } - self.frames.send(v).await?; - } - } - Ok(()) - } -} - -impl Drop for Peer { - fn drop(&mut self) { - } -} - #[tokio::main] async fn main() -> Result<(), Box> { let spaces = Arc::new(Mutex::new(Map::new())); @@ -152,7 +28,7 @@ async fn main() -> Result<(), Box> { let spaces = Arc::clone(&spaces); id = id + 1; tokio::spawn(async move { - match Peer::new(connid, stream).await.run(spaces).await { + match peer::Peer::new(connid, stream).await.run(spaces).await { Ok(_) => (), Err(e) => println!("Connection {:?} died with {:?}", addr, e), } diff --git a/src/peer.rs b/src/peer.rs new file mode 100644 index 0000000..51ee63f --- /dev/null +++ b/src/peer.rs @@ -0,0 +1,130 @@ +use super::{ConnId, V}; +use super::packets; + +use core::time::Duration; +use futures::select; +use preserves::value::{self, Map}; +use std::sync::{Mutex, Arc}; +use tokio::codec::Framed; +use tokio::net::TcpStream; +use tokio::prelude::*; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; +use tokio::timer::Interval; + +pub struct Peer { + id: ConnId, + tx: UnboundedSender, + rx: UnboundedReceiver, + frames: Framed, +} + +fn err(s: &str) -> packets::Out { + packets::Out::Err(s.into()) +} + +impl Peer { + pub async fn new(id: ConnId, stream: TcpStream) -> Self { + let (tx, rx) = unbounded_channel(); + let frames = Framed::new(stream, packets::Codec::new(value::Codec::new({ + let mut m = Map::new(); + m.insert(0, value::Value::symbol("Discard")); + m.insert(1, value::Value::symbol("Capture")); + m.insert(2, value::Value::symbol("Observe")); + m + }))); + Peer{ id, tx, rx, frames } + } + + pub async fn run(&mut self, spaces: Arc>>) -> Result<(), std::io::Error> { + println!("{:?}: got {:?}", self.id, &self.frames.get_ref()); + + let firstpacket = self.frames.next().await; + let dsname = if let Some(Ok(packets::In::Connect(dsname))) = firstpacket { + dsname + } else { + let e: String = format!("Expected initial Connect, got {:?}", firstpacket); + println!("{:?}: {}", self.id, e); + self.frames.send(err(&e)).await?; + return Ok(()) + }; + + let is_new = { + let mut s = spaces.lock().unwrap(); + match s.get(&dsname) { + Some(_) => false, + None => { + s.insert(dsname.clone(), ()); + true + } + } + }; + + println!("{:?}: connected to {} dataspace {:?}", + self.id, + if is_new { "new" } else { "existing" }, + dsname); + + let mut ping_timer = Interval::new_interval(Duration::from_secs(60)); + + let mut running = true; + while running { + let mut to_send = Vec::new(); + select! { + _instant = ping_timer.next().boxed().fuse() => to_send.push(packets::Out::Ping()), + frame = self.frames.next().boxed().fuse() => match frame { + Some(res) => match res { + Ok(p) => { + println!("{:?}: input {:?}", self.id, &p); + match p { + packets::In::Turn(actions) => (), + packets::In::Ping() => { + to_send.push(packets::Out::Pong()) + } + packets::In::Pong() => (), + packets::In::Connect(dsname) => { + to_send.push(err("Unexpected Connect")); + running = false; + } + } + } + Err(packets::DecodeError::Read(value::decoder::Error::Eof)) => running = false, + Err(packets::DecodeError::Read(value::decoder::Error::Io(e))) => return Err(e), + Err(packets::DecodeError::Read(value::decoder::Error::Syntax(s))) => { + to_send.push(err(s)); + running = false; + } + Err(packets::DecodeError::Parse(e)) => { + to_send.push(err(&format!("Packet deserialization error: {:?}", e))); + running = false; + } + } + None => running = false, + }, + msgopt = self.rx.recv().boxed().fuse() => { + match msgopt { + Some(msg) => to_send.push(msg), + None => { + /* weird. */ + to_send.push(err("Outbound channel closed unexpectedly")); + running = false; + } + } + }, + } + for v in to_send { + if let packets::Out::Err(ref msg) = v { + println!("{:?}: Connection crashed with error {:?}", self.id, msg); + } else { + println!("{:?}: Output {:?}", self.id, &v); + } + self.frames.send(v).await?; + } + } + Ok(()) + } +} + +impl Drop for Peer { + fn drop(&mut self) { + } +}