Start on Dataspace impl

This commit is contained in:
Tony Garnock-Jones 2019-10-20 16:01:27 +01:00
parent 3830b76a95
commit 61005d308f
4 changed files with 103 additions and 28 deletions

50
src/dataspace.rs Normal file
View File

@ -0,0 +1,50 @@
use super::V;
use super::ConnId;
use super::packets;
use preserves::value::Map;
use std::sync::{Arc, RwLock};
use tokio::sync::mpsc::UnboundedSender;
pub type DataspaceRef = Arc<RwLock<Dataspace>>;
pub enum DataspaceError {
}
impl From<DataspaceError> for std::io::Error {
fn from(v: DataspaceError) -> Self {
panic!("No DataspaceErrors to convert!")
}
}
#[derive(Debug)]
pub struct Dataspace {
name: V,
peers: Map<ConnId, UnboundedSender<packets::Out>>,
}
impl Dataspace {
pub fn new(name: &V) -> Self {
Self { name: name.clone(), peers: Map::new() }
}
pub fn new_ref(name: &V) -> DataspaceRef {
Arc::new(RwLock::new(Self::new(name)))
}
pub fn register(&mut self, id: ConnId, tx: UnboundedSender<packets::Out>) {
assert!(!self.peers.contains_key(&id));
self.peers.insert(id, tx);
}
pub fn deregister(&mut self, id: ConnId) {
self.peers.remove(&id);
}
pub fn turn(&mut self, actions: Vec<packets::Action>) -> Result<(), DataspaceError> {
for a in actions {
println!("Turn action: {:?}", &a);
}
Ok(())
}
}

View File

@ -1,22 +1,22 @@
#![recursion_limit="512"]
mod bag;
mod skeleton;
mod dataspace;
mod packets;
mod peer;
mod skeleton;
mod spaces;
use preserves::value::{self, Map};
use preserves::value;
use std::sync::{Mutex, Arc};
use tokio::net::TcpListener;
// use self::skeleton::Index;
pub type ConnId = u64;
pub type V = value::ArcValue;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spaces = Arc::new(Mutex::new(Map::new()));
let spaces = Arc::new(Mutex::new(spaces::Spaces::new()));
let mut id = 0;
let port = 8001;

View File

@ -1,5 +1,7 @@
use super::{ConnId, V};
use super::ConnId;
use super::dataspace;
use super::packets;
use super::spaces;
use core::time::Duration;
use futures::select;
@ -16,6 +18,7 @@ pub struct Peer {
tx: UnboundedSender<packets::Out>,
rx: UnboundedReceiver<packets::Out>,
frames: Framed<TcpStream, packets::Codec>,
space: Option<dataspace::DataspaceRef>,
}
fn err(s: &str) -> packets::Out {
@ -32,10 +35,10 @@ impl Peer {
m.insert(2, value::Value::symbol("Observe"));
m
})));
Peer{ id, tx, rx, frames }
Peer{ id, tx, rx, frames, space: None }
}
pub async fn run(&mut self, spaces: Arc<Mutex<Map<V, ()>>>) -> Result<(), std::io::Error> {
pub async fn run(&mut self, spaces: Arc<Mutex<spaces::Spaces>>) -> Result<(), std::io::Error> {
println!("{:?}: {:?}", self.id, &self.frames.get_ref());
let firstpacket = self.frames.next().await;
@ -48,21 +51,8 @@ impl Peer {
return Ok(())
};
let is_new = {
let mut s = spaces.lock().unwrap();
match s.get(&dsname) {
Some(_) => false,
None => {
s.insert(dsname.clone(), ());
true
}
}
};
println!("{:?}: connected to {} dataspace {:?}",
self.id,
if is_new { "new" } else { "existing" },
dsname);
self.space = Some(spaces.lock().unwrap().lookup(&dsname));
self.space.as_ref().unwrap().write().unwrap().register(self.id, self.tx.clone());
let mut ping_timer = Interval::new_interval(Duration::from_secs(60));
@ -76,11 +66,12 @@ impl Peer {
Ok(p) => {
println!("{:?}: input {:?}", self.id, &p);
match p {
packets::In::Turn(actions) => (),
packets::In::Ping() => {
to_send.push(packets::Out::Pong())
}
packets::In::Pong() => (),
packets::In::Turn(actions) =>
self.space.as_ref().unwrap().write().unwrap().turn(actions)?,
packets::In::Ping() =>
to_send.push(packets::Out::Pong()),
packets::In::Pong() =>
(),
packets::In::Connect(dsname) => {
to_send.push(err("Unexpected Connect"));
running = false;
@ -127,5 +118,8 @@ impl Peer {
impl Drop for Peer {
fn drop(&mut self) {
if let Some(ref s) = self.space {
s.write().unwrap().deregister(self.id);
}
}
}

31
src/spaces.rs Normal file
View File

@ -0,0 +1,31 @@
use super::V;
use super::dataspace;
use preserves::value::Map;
pub struct Spaces {
index: Map<V, dataspace::DataspaceRef>,
}
impl Spaces {
pub fn new() -> Self {
Self { index: Map::new() }
}
pub fn lookup(&mut self, name: &V) -> dataspace::DataspaceRef {
let (is_new, space) = match self.index.get(name) {
Some(s) => (false, s.clone()),
None => {
let s = dataspace::Dataspace::new_ref(name);
self.index.insert(name.clone(), s.clone());
(true, s)
}
};
println!("Dataspace {:?} {}",
name,
if is_new { "created" } else { "accessed" });
space
}
}