use super::V; use super::ConnId; use super::packets::{self, Assertion, EndpointName, Captures}; use super::skeleton; use preserves::value::{self, Map, NestedValue}; use std::sync::{Arc, RwLock, atomic::{AtomicUsize, Ordering}}; use tokio::sync::mpsc::UnboundedSender; pub type DataspaceRef = Arc>; pub type DataspaceError = (String, V); #[derive(Debug)] struct Actor { tx: UnboundedSender, queue_depth: Arc, endpoints: Map, } #[derive(Debug)] struct ActorEndpoint { analysis_results: Option, assertion: Assertion, } #[derive(Debug)] pub struct Churn { pub peers_added: usize, pub peers_removed: usize, pub assertions_added: usize, pub assertions_removed: usize, pub endpoints_added: usize, pub endpoints_removed: usize, pub messages_injected: usize, pub messages_delivered: usize, } impl Churn { pub fn new() -> Self { Self { peers_added: 0, peers_removed: 0, assertions_added: 0, assertions_removed: 0, endpoints_added: 0, endpoints_removed: 0, messages_injected: 0, messages_delivered: 0, } } pub fn reset(&mut self) { self.peers_added = 0; self.peers_removed = 0; self.assertions_added = 0; self.assertions_removed = 0; self.endpoints_added = 0; self.endpoints_removed = 0; self.messages_injected = 0; self.messages_delivered = 0; } } #[derive(Debug)] pub struct Dataspace { name: V, peers: Map, index: skeleton::Index, pub churn: Churn, } impl Dataspace { pub fn new(name: &V) -> Self { Self { name: name.clone(), peers: Map::new(), index: skeleton::Index::new(), churn: Churn::new(), } } pub fn new_ref(name: &V) -> DataspaceRef { Arc::new(RwLock::new(Self::new(name))) } pub fn register(&mut self, id: ConnId, tx: UnboundedSender, queue_depth: Arc) { assert!(!self.peers.contains_key(&id)); self.peers.insert(id, Actor { tx, queue_depth, endpoints: Map::new(), }); self.churn.peers_added += 1; } pub fn deregister(&mut self, id: ConnId) { let ac = self.peers.remove(&id).unwrap(); self.churn.peers_removed += 1; let mut outbound_turns: Map> = Map::new(); for (epname, ep) in ac.endpoints { self.remove_endpoint(&mut outbound_turns, id, &epname, ep); } outbound_turns.remove(&id); self.deliver_outbound_turns(outbound_turns); } fn remove_endpoint(&mut self, mut outbound_turns: &mut Map>, id: ConnId, epname: &EndpointName, ep: ActorEndpoint) { let ActorEndpoint{ analysis_results, assertion } = ep; if let Some(ar) = analysis_results { self.index.remove_endpoint(&ar, skeleton::Endpoint { connection: id, name: epname.clone(), }); } let old_assertions = self.index.assertion_count(); schedule_events(&mut outbound_turns, self.index.remove((&assertion).into()), |epname, cs| packets::Event::Del(epname, cs)); self.churn.assertions_removed += old_assertions - self.index.assertion_count(); self.churn.endpoints_removed += 1; } pub fn turn(&mut self, id: ConnId, actions: Vec) -> Result<(), DataspaceError> { let mut outbound_turns: Map> = Map::new(); for a in actions { // println!("Turn action: {:?}", &a); match a { packets::Action::Assert(ref epname, ref assertion) => { let ac = self.peers.get_mut(&id).unwrap(); if ac.endpoints.contains_key(&epname) { return Err(("Duplicate endpoint name".to_string(), value::to_value(a).unwrap())); } let ar = if let Some(fs) = assertion.value().as_simple_record("Observe", Some(1)) { let ar = skeleton::analyze(&fs[0]); let events = self.index.add_endpoint(&ar, skeleton::Endpoint { connection: id, name: epname.clone(), }); outbound_turns.entry(id).or_insert_with(Vec::new).extend(events); Some(ar) } else { None }; let old_assertions = self.index.assertion_count(); schedule_events(&mut outbound_turns, self.index.insert(assertion.into()), |epname, cs| packets::Event::Add(epname, cs)); self.churn.assertions_added += self.index.assertion_count() - old_assertions; self.churn.endpoints_added += 1; ac.endpoints.insert(epname.clone(), ActorEndpoint { analysis_results: ar, assertion: assertion.clone() }); } packets::Action::Clear(ref epname) => { let ac = self.peers.get_mut(&id).unwrap(); match ac.endpoints.remove(epname) { None => { return Err(("Nonexistent endpoint name".to_string(), value::to_value(a).unwrap())); } Some(ep) => { self.remove_endpoint(&mut outbound_turns, id, epname, ep); outbound_turns.entry(id).or_insert_with(Vec::new) .push(packets::Event::End(epname.clone())); } } } packets::Action::Message(ref assertion) => { schedule_events(&mut outbound_turns, self.index.send(assertion.into(), &mut self.churn.messages_delivered), |epname, cs| packets::Event::Msg(epname, cs)); self.churn.messages_injected += 1; } } } self.deliver_outbound_turns(outbound_turns); Ok(()) } fn deliver_outbound_turns(&mut self, outbound_turns: Map>) { for (target, events) in outbound_turns { let actor = self.peers.get_mut(&target).unwrap(); let _ = actor.tx.send(packets::S2C::Turn(events)); actor.queue_depth.fetch_add(1, Ordering::Relaxed); } } pub fn peer_count(&self) -> usize { self.peers.len() } pub fn assertion_count(&self) -> usize { self.index.assertion_count() } pub fn endpoint_count(&self) -> isize { self.index.endpoint_count() } } fn schedule_events(outbound_turns: &mut Map>, events: skeleton::Events, ctor: C) where C: Fn(EndpointName, Captures) -> packets::Event { for (eps, cs) in events { for ep in eps { outbound_turns.entry(ep.connection).or_insert_with(Vec::new) .push(ctor(ep.name, cs.clone())); } } }