syndicate-rs/src/dataspace.rs

230 lines
7.8 KiB
Rust
Raw Normal View History

2019-10-20 15:01:27 +00:00
use super::V;
use super::ConnId;
use super::packets::{self, Assertion, EndpointName, Captures};
2019-10-20 21:25:01 +00:00
use super::skeleton;
2019-10-20 15:01:27 +00:00
use preserves::value::{self, Map, NestedValue};
2020-05-18 11:16:14 +00:00
use std::sync::{Arc, RwLock, atomic::{AtomicUsize, Ordering}};
2019-10-20 15:01:27 +00:00
use tokio::sync::mpsc::UnboundedSender;
pub type DataspaceRef = Arc<RwLock<Dataspace>>;
pub type DataspaceError = (String, V);
2019-10-20 15:01:27 +00:00
#[derive(Debug)]
struct Actor {
2020-05-11 20:08:27 +00:00
tx: UnboundedSender<packets::S2C>,
2020-05-18 11:16:14 +00:00
queue_depth: Arc<AtomicUsize>,
endpoints: Map<EndpointName, ActorEndpoint>,
2019-10-20 15:01:27 +00:00
}
#[derive(Debug)]
struct ActorEndpoint {
analysis_results: Option<skeleton::AnalysisResults>,
assertion: Assertion,
2019-10-20 15:01:27 +00:00
}
2020-05-18 09:36:44 +00:00
#[derive(Debug)]
pub struct Churn {
pub peers_added: usize,
pub peers_removed: usize,
pub assertions_added: usize,
pub assertions_removed: usize,
pub endpoints_added: usize,
pub endpoints_removed: usize,
pub messages_injected: usize,
pub messages_delivered: usize,
2020-05-18 09:36:44 +00:00
}
impl Churn {
pub fn new() -> Self {
Self {
peers_added: 0,
peers_removed: 0,
assertions_added: 0,
assertions_removed: 0,
endpoints_added: 0,
endpoints_removed: 0,
messages_injected: 0,
messages_delivered: 0,
2020-05-18 09:36:44 +00:00
}
}
pub fn reset(&mut self) {
self.peers_added = 0;
self.peers_removed = 0;
self.assertions_added = 0;
self.assertions_removed = 0;
self.endpoints_added = 0;
self.endpoints_removed = 0;
self.messages_injected = 0;
self.messages_delivered = 0;
2020-05-18 09:36:44 +00:00
}
}
2019-10-20 15:01:27 +00:00
#[derive(Debug)]
pub struct Dataspace {
name: V,
peers: Map<ConnId, Actor>,
2019-10-20 21:25:01 +00:00
index: skeleton::Index,
2020-05-18 09:36:44 +00:00
pub churn: Churn,
2019-10-20 15:01:27 +00:00
}
impl Dataspace {
pub fn new(name: &V) -> Self {
2020-05-18 09:36:44 +00:00
Self {
name: name.clone(),
peers: Map::new(),
index: skeleton::Index::new(),
churn: Churn::new(),
}
2019-10-20 15:01:27 +00:00
}
pub fn new_ref(name: &V) -> DataspaceRef {
Arc::new(RwLock::new(Self::new(name)))
}
2020-05-18 11:16:14 +00:00
pub fn register(&mut self, id: ConnId,
tx: UnboundedSender<packets::S2C>,
queue_depth: Arc<AtomicUsize>)
{
2019-10-20 15:01:27 +00:00
assert!(!self.peers.contains_key(&id));
self.peers.insert(id, Actor {
tx,
2020-05-18 11:16:14 +00:00
queue_depth,
endpoints: Map::new(),
});
2020-05-18 09:36:44 +00:00
self.churn.peers_added += 1;
2019-10-20 15:01:27 +00:00
}
pub fn deregister(&mut self, id: ConnId) {
let ac = self.peers.remove(&id).unwrap();
2020-05-18 09:36:44 +00:00
self.churn.peers_removed += 1;
let mut outbound_turns: Map<ConnId, Vec<packets::Event>> = Map::new();
for (epname, ep) in ac.endpoints {
self.remove_endpoint(&mut outbound_turns, id, &epname, ep);
}
outbound_turns.remove(&id);
self.deliver_outbound_turns(outbound_turns);
2019-10-20 15:01:27 +00:00
}
fn remove_endpoint(&mut self,
mut outbound_turns: &mut Map<ConnId, Vec<packets::Event>>,
id: ConnId,
epname: &EndpointName,
ep: ActorEndpoint)
{
let ActorEndpoint{ analysis_results, assertion } = ep;
if let Some(ar) = analysis_results {
self.index.remove_endpoint(&ar, skeleton::Endpoint {
connection: id,
name: epname.clone(),
});
}
2020-05-18 09:36:44 +00:00
let old_assertions = self.index.assertion_count();
schedule_events(&mut outbound_turns,
self.index.remove((&assertion).into()),
|epname, cs| packets::Event::Del(epname, cs));
2020-05-18 09:36:44 +00:00
self.churn.assertions_removed += old_assertions - self.index.assertion_count();
self.churn.endpoints_removed += 1;
}
pub fn turn(&mut self, id: ConnId, actions: Vec<packets::Action>) ->
Result<(), DataspaceError>
{
let mut outbound_turns: Map<ConnId, Vec<packets::Event>> = Map::new();
2019-10-20 15:01:27 +00:00
for a in actions {
2020-05-18 12:33:36 +00:00
tracing::trace!(action = debug(&a), "turn");
match a {
packets::Action::Assert(ref epname, ref assertion) => {
let ac = self.peers.get_mut(&id).unwrap();
if ac.endpoints.contains_key(&epname) {
return Err(("Duplicate endpoint name".to_string(),
value::to_value(a).unwrap()));
}
let ar =
if let Some(fs) = assertion.value().as_simple_record("Observe", Some(1)) {
let ar = skeleton::analyze(&fs[0]);
let events = self.index.add_endpoint(&ar, skeleton::Endpoint {
connection: id,
name: epname.clone(),
});
outbound_turns.entry(id).or_insert_with(Vec::new).extend(events);
Some(ar)
} else {
None
};
2020-05-18 09:36:44 +00:00
let old_assertions = self.index.assertion_count();
schedule_events(&mut outbound_turns,
self.index.insert(assertion.into()),
|epname, cs| packets::Event::Add(epname, cs));
2020-05-18 09:36:44 +00:00
self.churn.assertions_added += self.index.assertion_count() - old_assertions;
self.churn.endpoints_added += 1;
ac.endpoints.insert(epname.clone(), ActorEndpoint {
analysis_results: ar,
assertion: assertion.clone()
});
}
packets::Action::Clear(ref epname) => {
let ac = self.peers.get_mut(&id).unwrap();
match ac.endpoints.remove(epname) {
None => {
return Err(("Nonexistent endpoint name".to_string(),
value::to_value(a).unwrap()));
}
Some(ep) => {
self.remove_endpoint(&mut outbound_turns, id, epname, ep);
outbound_turns.entry(id).or_insert_with(Vec::new)
.push(packets::Event::End(epname.clone()));
}
}
}
packets::Action::Message(ref assertion) => {
schedule_events(&mut outbound_turns,
self.index.send(assertion.into(),
&mut self.churn.messages_delivered),
|epname, cs| packets::Event::Msg(epname, cs));
self.churn.messages_injected += 1;
}
}
2019-10-20 15:01:27 +00:00
}
self.deliver_outbound_turns(outbound_turns);
2019-10-20 15:01:27 +00:00
Ok(())
}
fn deliver_outbound_turns(&mut self, outbound_turns: Map<ConnId, Vec<packets::Event>>) {
for (target, events) in outbound_turns {
2020-05-18 11:16:14 +00:00
let actor = self.peers.get_mut(&target).unwrap();
let _ = actor.tx.send(packets::S2C::Turn(events));
actor.queue_depth.fetch_add(1, Ordering::Relaxed);
}
}
2020-05-18 09:36:44 +00:00
pub fn peer_count(&self) -> usize {
self.peers.len()
}
pub fn assertion_count(&self) -> usize {
self.index.assertion_count()
}
pub fn endpoint_count(&self) -> isize {
self.index.endpoint_count()
}
}
fn schedule_events<C>(outbound_turns: &mut Map<ConnId, Vec<packets::Event>>,
events: skeleton::Events,
ctor: C)
where C: Fn(EndpointName, Captures) -> packets::Event
{
for (eps, cs) in events {
for ep in eps {
outbound_turns.entry(ep.connection).or_insert_with(Vec::new)
.push(ctor(ep.name, cs.clone()));
}
}
2019-10-20 15:01:27 +00:00
}