From 61005d308f524540c629c5e8b6bec42bbe7901c9 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sun, 20 Oct 2019 16:01:27 +0100 Subject: [PATCH] Start on Dataspace impl --- src/dataspace.rs | 50 ++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 10 +++++----- src/peer.rs | 40 ++++++++++++++++---------------------- src/spaces.rs | 31 ++++++++++++++++++++++++++++++ 4 files changed, 103 insertions(+), 28 deletions(-) create mode 100644 src/dataspace.rs create mode 100644 src/spaces.rs diff --git a/src/dataspace.rs b/src/dataspace.rs new file mode 100644 index 0000000..4519909 --- /dev/null +++ b/src/dataspace.rs @@ -0,0 +1,50 @@ +use super::V; +use super::ConnId; +use super::packets; + +use preserves::value::Map; +use std::sync::{Arc, RwLock}; +use tokio::sync::mpsc::UnboundedSender; + +pub type DataspaceRef = Arc>; + +pub enum DataspaceError { +} + +impl From for std::io::Error { + fn from(v: DataspaceError) -> Self { + panic!("No DataspaceErrors to convert!") + } +} + +#[derive(Debug)] +pub struct Dataspace { + name: V, + peers: Map>, +} + +impl Dataspace { + pub fn new(name: &V) -> Self { + Self { name: name.clone(), peers: Map::new() } + } + + pub fn new_ref(name: &V) -> DataspaceRef { + Arc::new(RwLock::new(Self::new(name))) + } + + pub fn register(&mut self, id: ConnId, tx: UnboundedSender) { + assert!(!self.peers.contains_key(&id)); + self.peers.insert(id, tx); + } + + pub fn deregister(&mut self, id: ConnId) { + self.peers.remove(&id); + } + + pub fn turn(&mut self, actions: Vec) -> Result<(), DataspaceError> { + for a in actions { + println!("Turn action: {:?}", &a); + } + Ok(()) + } +} diff --git a/src/main.rs b/src/main.rs index d519092..916fd4e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,22 +1,22 @@ #![recursion_limit="512"] mod bag; -mod skeleton; +mod dataspace; mod packets; mod peer; +mod skeleton; +mod spaces; -use preserves::value::{self, Map}; +use preserves::value; use std::sync::{Mutex, Arc}; use tokio::net::TcpListener; -// use self::skeleton::Index; - pub type ConnId = u64; pub type V = value::ArcValue; #[tokio::main] async fn main() -> Result<(), Box> { - let spaces = Arc::new(Mutex::new(Map::new())); + let spaces = Arc::new(Mutex::new(spaces::Spaces::new())); let mut id = 0; let port = 8001; diff --git a/src/peer.rs b/src/peer.rs index 88230a1..52f5c56 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -1,5 +1,7 @@ -use super::{ConnId, V}; +use super::ConnId; +use super::dataspace; use super::packets; +use super::spaces; use core::time::Duration; use futures::select; @@ -16,6 +18,7 @@ pub struct Peer { tx: UnboundedSender, rx: UnboundedReceiver, frames: Framed, + space: Option, } fn err(s: &str) -> packets::Out { @@ -32,10 +35,10 @@ impl Peer { m.insert(2, value::Value::symbol("Observe")); m }))); - Peer{ id, tx, rx, frames } + Peer{ id, tx, rx, frames, space: None } } - pub async fn run(&mut self, spaces: Arc>>) -> Result<(), std::io::Error> { + pub async fn run(&mut self, spaces: Arc>) -> Result<(), std::io::Error> { println!("{:?}: {:?}", self.id, &self.frames.get_ref()); let firstpacket = self.frames.next().await; @@ -48,21 +51,8 @@ impl Peer { return Ok(()) }; - let is_new = { - let mut s = spaces.lock().unwrap(); - match s.get(&dsname) { - Some(_) => false, - None => { - s.insert(dsname.clone(), ()); - true - } - } - }; - - println!("{:?}: connected to {} dataspace {:?}", - self.id, - if is_new { "new" } else { "existing" }, - dsname); + self.space = Some(spaces.lock().unwrap().lookup(&dsname)); + self.space.as_ref().unwrap().write().unwrap().register(self.id, self.tx.clone()); let mut ping_timer = Interval::new_interval(Duration::from_secs(60)); @@ -76,11 +66,12 @@ impl Peer { Ok(p) => { println!("{:?}: input {:?}", self.id, &p); match p { - packets::In::Turn(actions) => (), - packets::In::Ping() => { - to_send.push(packets::Out::Pong()) - } - packets::In::Pong() => (), + packets::In::Turn(actions) => + self.space.as_ref().unwrap().write().unwrap().turn(actions)?, + packets::In::Ping() => + to_send.push(packets::Out::Pong()), + packets::In::Pong() => + (), packets::In::Connect(dsname) => { to_send.push(err("Unexpected Connect")); running = false; @@ -127,5 +118,8 @@ impl Peer { impl Drop for Peer { fn drop(&mut self) { + if let Some(ref s) = self.space { + s.write().unwrap().deregister(self.id); + } } } diff --git a/src/spaces.rs b/src/spaces.rs new file mode 100644 index 0000000..9e53200 --- /dev/null +++ b/src/spaces.rs @@ -0,0 +1,31 @@ +use super::V; +use super::dataspace; + +use preserves::value::Map; + +pub struct Spaces { + index: Map, +} + +impl Spaces { + pub fn new() -> Self { + Self { index: Map::new() } + } + + pub fn lookup(&mut self, name: &V) -> dataspace::DataspaceRef { + let (is_new, space) = match self.index.get(name) { + Some(s) => (false, s.clone()), + None => { + let s = dataspace::Dataspace::new_ref(name); + self.index.insert(name.clone(), s.clone()); + (true, s) + } + }; + + println!("Dataspace {:?} {}", + name, + if is_new { "created" } else { "accessed" }); + + space + } +}