From 2e2d5bfb5de869ff87310f32307611f4730149c6 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Fri, 13 Aug 2021 20:39:27 -0400 Subject: [PATCH] Document dataspace.rs; remove "churn" field --- syndicate/src/dataspace.rs | 105 ++++++++++++++++++++++--------------- syndicate/src/skeleton.rs | 4 +- 2 files changed, 66 insertions(+), 43 deletions(-) diff --git a/syndicate/src/dataspace.rs b/syndicate/src/dataspace.rs index e0fb195..dcc6387 100644 --- a/syndicate/src/dataspace.rs +++ b/syndicate/src/dataspace.rs @@ -1,3 +1,12 @@ +//! Implements a [*dataspace*](#GarnockJones2017) entity. +//! +//! **References.** +//! +//! - Garnock-Jones, Tony. “Conversational +//! Concurrency.” PhD, Northeastern University, 2017. [Available +//! on the web](https://syndicate-lang.org/tonyg-dissertation/). +//! [PDF](https://syndicate-lang.org/papers/conversational-concurrency-201712310922.pdf). + use super::skeleton; use super::actor::*; use super::schemas::dataspace::*; @@ -7,61 +16,74 @@ use preserves::value::Map; use std::convert::TryFrom; -#[derive(Debug)] -pub struct Churn { - pub assertions_added: usize, - pub assertions_removed: usize, - pub endpoints_added: usize, - pub endpoints_removed: usize, - pub observers_added: usize, - pub observers_removed: usize, - pub messages_injected: usize, - pub messages_delivered: usize, -} +// #[derive(Debug)] +// pub struct Churn { +// pub assertions_added: usize, +// pub assertions_removed: usize, +// pub endpoints_added: usize, +// pub endpoints_removed: usize, +// pub observers_added: usize, +// pub observers_removed: usize, +// pub messages_injected: usize, +// pub messages_delivered: usize, +// } -impl Churn { - pub fn new() -> Self { - Self { - assertions_added: 0, - assertions_removed: 0, - endpoints_added: 0, - endpoints_removed: 0, - observers_added: 0, - observers_removed: 0, - messages_injected: 0, - messages_delivered: 0, - } - } - - pub fn reset(&mut self) { - *self = Churn::new() - } -} +// impl Churn { +// pub fn new() -> Self { +// Self { +// assertions_added: 0, +// assertions_removed: 0, +// endpoints_added: 0, +// endpoints_removed: 0, +// observers_added: 0, +// observers_removed: 0, +// messages_injected: 0, +// messages_delivered: 0, +// } +// } +// +// pub fn reset(&mut self) { +// *self = Churn::new() +// } +// } +/// A Dataspace object (entity). #[derive(Debug)] pub struct Dataspace { + /// Index over assertions placed in the dataspace; used to + /// efficiently route assertion changes and messages to observers. pub index: skeleton::Index, + /// Local memory of assertions indexed by `Handle`, used to remove + /// assertions from the `index` when they are retracted. pub handle_map: Map)>, - pub churn: Churn, + // pub churn: Churn, } impl Dataspace { + /// Construct a new, empty dataspace. pub fn new() -> Self { Self { index: skeleton::Index::new(), handle_map: Map::new(), - churn: Churn::new(), + // churn: Churn::new(), } } + /// Retrieve the current count of *distinct* assertions placed in + /// the dataspace. pub fn assertion_count(&self) -> usize { self.index.assertion_count() } + /// Retrieve the current count of assertions, including + /// duplicates, placed in the dataspace. pub fn endpoint_count(&self) -> isize { self.index.endpoint_count() } + /// Retrieve the current count of + /// [`Observe`][crate::schemas::dataspace::Observe] assertions in + /// the dataspace. pub fn observer_count(&self) -> usize { self.index.observer_count() } @@ -71,14 +93,14 @@ impl Entity<_Any> for Dataspace { fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult { // tracing::trace!(assertion = debug(&a), handle = debug(&h), "assert"); - let old_assertions = self.index.assertion_count(); + // let old_assertions = self.index.assertion_count(); self.index.insert(t, &a); - self.churn.assertions_added += self.index.assertion_count() - old_assertions; - self.churn.endpoints_added += 1; + // self.churn.assertions_added += self.index.assertion_count() - old_assertions; + // self.churn.endpoints_added += 1; if let Ok(o) = Observe::try_from(&a) { self.index.add_observer(t, &o.pattern, &o.observer); - self.churn.observers_added += 1; + // self.churn.observers_added += 1; self.handle_map.insert(h, (a, Some(o))); } else { self.handle_map.insert(h, (a, None)); @@ -92,13 +114,13 @@ impl Entity<_Any> for Dataspace { if let Some((a, maybe_o)) = self.handle_map.remove(&h) { if let Some(o) = maybe_o { self.index.remove_observer(t, o.pattern, &o.observer); - self.churn.observers_removed += 1; + // self.churn.observers_removed += 1; } - let old_assertions = self.index.assertion_count(); + // let old_assertions = self.index.assertion_count(); self.index.remove(t, &a); - self.churn.assertions_removed += old_assertions - self.index.assertion_count(); - self.churn.endpoints_removed += 1; + // self.churn.assertions_removed += old_assertions - self.index.assertion_count(); + // self.churn.endpoints_removed += 1; } Ok(()) } @@ -106,8 +128,9 @@ impl Entity<_Any> for Dataspace { fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult { // tracing::trace!(body = debug(&m), "message"); - self.index.send(t, &m, &mut self.churn.messages_delivered); - self.churn.messages_injected += 1; + // self.index.send(t, &m, &mut self.churn.messages_delivered); + self.index.send(t, &m); + // self.churn.messages_injected += 1; Ok(()) } } diff --git a/syndicate/src/skeleton.rs b/syndicate/src/skeleton.rs index 91884c4..7865551 100644 --- a/syndicate/src/skeleton.rs +++ b/syndicate/src/skeleton.rs @@ -144,14 +144,14 @@ impl Index { } } - pub fn send(&mut self, t: &mut Activation, outer_value: &AnyValue, delivery_count: &mut usize) { + pub fn send(&mut self, t: &mut Activation, outer_value: &AnyValue) { Modification::new( false, &outer_value, |_c, _v| (), |_l, _v| (), |es, cs| { - *delivery_count += es.endpoints.len(); + // *delivery_count += es.endpoints.len(); for observer in es.endpoints.keys() { observer.message(t, cs.clone()); }