From 2b296d79c7a1a1a7e5500344c5965570c4aba3f5 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sat, 15 Jan 2022 23:18:29 +0100 Subject: [PATCH] Repair error in dataspace assertion idempotency. If a facet, during X, asserts X, for all X, then X includes all `Observe` assertions. Assertion of X should be a no-op (though subsequent retractions of X will have no effect!) since duplicates are ignored. However, the implementation had been ignoring whether it had seen `Observe` assertions before, and was *always* (re)placing them into the index, leading to runaway growth. The repair is to only process `Observe` records on first assertion and last retraction. As part of this change, Dataspaces have been given names, and some cruft from the previous implementation has been removed. --- syndicate-server/src/main.rs | 4 +- syndicate-server/src/script/mod.rs | 2 +- syndicate/src/dataspace.rs | 85 ++++++++++-------------------- syndicate/src/skeleton.rs | 12 +++-- 4 files changed, 38 insertions(+), 65 deletions(-) diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index 165975e..e5d8c10 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -84,8 +84,8 @@ async fn main() -> Result<(), Box> { tracing::trace!("startup"); Actor::new(None).boot(tracing::Span::current(), move |t| { - let server_config_ds = Cap::new(&t.create(Dataspace::new())); - let log_ds = Cap::new(&t.create(Dataspace::new())); + let server_config_ds = Cap::new(&t.create(Dataspace::new(Some(syndicate::name!("config"))))); + let log_ds = Cap::new(&t.create(Dataspace::new(Some(syndicate::name!("log"))))); if config.inferior { tracing::info!("inferior server instance"); diff --git a/syndicate-server/src/script/mod.rs b/syndicate-server/src/script/mod.rs index 543e4d6..9cc3ac1 100644 --- a/syndicate-server/src/script/mod.rs +++ b/syndicate-server/src/script/mod.rs @@ -484,7 +484,7 @@ impl Env { pub fn eval_expr(&self, t: &mut Activation, e: &Expr) -> io::Result { match e { Expr::Template { template } => self.instantiate_value(template), - Expr::Dataspace => Ok(AnyValue::domain(Cap::new(&t.create(Dataspace::new())))), + Expr::Dataspace => Ok(AnyValue::domain(Cap::new(&t.create(Dataspace::new(None))))), Expr::Timestamp => Ok(AnyValue::new(chrono::Utc::now().to_rfc3339())), Expr::Facet => Ok(AnyValue::domain(Cap::new(&t.create(FacetHandle::new())))), } diff --git a/syndicate/src/dataspace.rs b/syndicate/src/dataspace.rs index 52d74e5..ae65ef4 100644 --- a/syndicate/src/dataspace.rs +++ b/syndicate/src/dataspace.rs @@ -17,56 +17,26 @@ use super::schemas::dataspace::_Any; use preserves::value::Map; use preserves_schema::Codec; -// #[derive(Debug)] -// pub struct Churn { -// pub assertions_added: usize, -// pub assertions_removed: usize, -// pub endpoints_added: usize, -// pub endpoints_removed: usize, -// pub observers_added: usize, -// pub observers_removed: usize, -// pub messages_injected: usize, -// pub messages_delivered: usize, -// } - -// impl Churn { -// pub fn new() -> Self { -// Self { -// assertions_added: 0, -// assertions_removed: 0, -// endpoints_added: 0, -// endpoints_removed: 0, -// observers_added: 0, -// observers_removed: 0, -// messages_injected: 0, -// messages_delivered: 0, -// } -// } -// -// pub fn reset(&mut self) { -// *self = Churn::new() -// } -// } - /// A Dataspace object (entity). #[derive(Debug)] pub struct Dataspace { + pub name: tracing::Span, /// Index over assertions placed in the dataspace; used to /// efficiently route assertion changes and messages to observers. pub index: skeleton::Index, /// Local memory of assertions indexed by `Handle`, used to remove /// assertions from the `index` when they are retracted. - pub handle_map: Map)>, - // pub churn: Churn, + pub handle_map: Map, } impl Dataspace { /// Construct a new, empty dataspace. - pub fn new() -> Self { + pub fn new(name: Option) -> Self { Self { + name: name.map_or_else(|| crate::name!("anonymous_dataspace"), + |n| crate::name!(parent: &n, "dataspace")), index: skeleton::Index::new(), handle_map: Map::new(), - // churn: Churn::new(), } } @@ -92,46 +62,45 @@ impl Dataspace { impl Entity<_Any> for Dataspace { fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult { - tracing::trace!(assertion = ?a, handle = ?h, "assert"); + let _guard = self.name.enter(); - // let old_assertions = self.index.assertion_count(); - self.index.insert(t, &a); - // self.churn.assertions_added += self.index.assertion_count() - old_assertions; - // self.churn.endpoints_added += 1; + let is_new = self.index.insert(t, &a); + tracing::trace!(assertion = ?a, handle = ?h, ?is_new, "assert"); - if let Ok(o) = language().parse::(&a) { - self.index.add_observer(t, &o.pattern, &o.observer); - // self.churn.observers_added += 1; - self.handle_map.insert(h, (a, Some(o))); - } else { - self.handle_map.insert(h, (a, None)); + if is_new { + if let Ok(o) = language().parse::(&a) { + self.index.add_observer(t, &o.pattern, &o.observer); + } } + + self.handle_map.insert(h, a); Ok(()) } fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult { - tracing::trace!(handle = ?h, "retract"); + let _guard = self.name.enter(); - if let Some((a, maybe_o)) = self.handle_map.remove(&h) { - if let Some(o) = maybe_o { - self.index.remove_observer(t, o.pattern, &o.observer); - // self.churn.observers_removed += 1; + match self.handle_map.remove(&h) { + None => tracing::warn!(handle = ?h, "retract of unknown handle"), + Some(a) => { + let is_last = self.index.remove(t, &a); + tracing::trace!(assertion = ?a, handle = ?h, ?is_last, "retract"); + + if is_last { + if let Ok(o) = language().parse::(&a) { + self.index.remove_observer(t, o.pattern, &o.observer); + } + } } - - // let old_assertions = self.index.assertion_count(); - self.index.remove(t, &a); - // self.churn.assertions_removed += old_assertions - self.index.assertion_count(); - // self.churn.endpoints_removed += 1; } Ok(()) } fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult { + let _guard = self.name.enter(); tracing::trace!(body = ?m, "message"); - // self.index.send(t, &m, &mut self.churn.messages_delivered); self.index.send(t, &m); - // self.churn.messages_injected += 1; Ok(()) } } diff --git a/syndicate/src/skeleton.rs b/syndicate/src/skeleton.rs index 1dbd24a..e86ae57 100644 --- a/syndicate/src/skeleton.rs +++ b/syndicate/src/skeleton.rs @@ -112,7 +112,8 @@ impl Index { } /// Inserts an assertion into the index, notifying matching observers. - pub fn insert(&mut self, t: &mut Activation, outer_value: &AnyValue) { + /// Answers `true` iff the assertion was new to the dataspace. + pub fn insert(&mut self, t: &mut Activation, outer_value: &AnyValue) -> bool { let net = self.all_assertions.change(outer_value.clone(), 1); match net { bag::Net::AbsentToPresent => { @@ -131,14 +132,16 @@ impl Index { } }) .perform(&mut self.root); + true } - bag::Net::PresentToPresent => (), + bag::Net::PresentToPresent => false, _ => unreachable!(), } } /// Removes an assertion from the index, notifying matching observers. - pub fn remove(&mut self, t: &mut Activation, outer_value: &AnyValue) { + /// Answers `true` if it is the last in its equivalence class to be removed. + pub fn remove(&mut self, t: &mut Activation, outer_value: &AnyValue) -> bool { let net = self.all_assertions.change(outer_value.clone(), -1); match net { bag::Net::PresentToAbsent => { @@ -157,8 +160,9 @@ impl Index { } }) .perform(&mut self.root); + true } - bag::Net::PresentToPresent => (), + bag::Net::PresentToPresent => false, _ => unreachable!(), } }