Repair error in dataspace assertion idempotency.

If a facet, during X, asserts X, for all X, then X includes all
`Observe` assertions. Assertion of X should be a no-op (though
subsequent retractions of X will have no effect!) since duplicates are
ignored. However, the implementation had been ignoring whether it had
seen `Observe` assertions before, and was *always* (re)placing them
into the index, leading to runaway growth.

The repair is to only process `Observe` records on first assertion and
last retraction.

As part of this change, Dataspaces have been given names, and some
cruft from the previous implementation has been removed.
This commit is contained in:
Tony Garnock-Jones 2022-01-15 23:18:29 +01:00
parent af4af8b048
commit 2b296d79c7
4 changed files with 38 additions and 65 deletions

View File

@ -84,8 +84,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing::trace!("startup");
Actor::new(None).boot(tracing::Span::current(), move |t| {
let server_config_ds = Cap::new(&t.create(Dataspace::new()));
let log_ds = Cap::new(&t.create(Dataspace::new()));
let server_config_ds = Cap::new(&t.create(Dataspace::new(Some(syndicate::name!("config")))));
let log_ds = Cap::new(&t.create(Dataspace::new(Some(syndicate::name!("log")))));
if config.inferior {
tracing::info!("inferior server instance");

View File

@ -484,7 +484,7 @@ impl Env {
pub fn eval_expr(&self, t: &mut Activation, e: &Expr) -> io::Result<AnyValue> {
match e {
Expr::Template { template } => self.instantiate_value(template),
Expr::Dataspace => Ok(AnyValue::domain(Cap::new(&t.create(Dataspace::new())))),
Expr::Dataspace => Ok(AnyValue::domain(Cap::new(&t.create(Dataspace::new(None))))),
Expr::Timestamp => Ok(AnyValue::new(chrono::Utc::now().to_rfc3339())),
Expr::Facet => Ok(AnyValue::domain(Cap::new(&t.create(FacetHandle::new())))),
}

View File

@ -17,56 +17,26 @@ use super::schemas::dataspace::_Any;
use preserves::value::Map;
use preserves_schema::Codec;
// #[derive(Debug)]
// pub struct Churn {
// pub assertions_added: usize,
// pub assertions_removed: usize,
// pub endpoints_added: usize,
// pub endpoints_removed: usize,
// pub observers_added: usize,
// pub observers_removed: usize,
// pub messages_injected: usize,
// pub messages_delivered: usize,
// }
// impl Churn {
// pub fn new() -> Self {
// Self {
// assertions_added: 0,
// assertions_removed: 0,
// endpoints_added: 0,
// endpoints_removed: 0,
// observers_added: 0,
// observers_removed: 0,
// messages_injected: 0,
// messages_delivered: 0,
// }
// }
//
// pub fn reset(&mut self) {
// *self = Churn::new()
// }
// }
/// A Dataspace object (entity).
#[derive(Debug)]
pub struct Dataspace {
pub name: tracing::Span,
/// Index over assertions placed in the dataspace; used to
/// efficiently route assertion changes and messages to observers.
pub index: skeleton::Index,
/// Local memory of assertions indexed by `Handle`, used to remove
/// assertions from the `index` when they are retracted.
pub handle_map: Map<Handle, (_Any, Option<Observe>)>,
// pub churn: Churn,
pub handle_map: Map<Handle, _Any>,
}
impl Dataspace {
/// Construct a new, empty dataspace.
pub fn new() -> Self {
pub fn new(name: Option<tracing::Span>) -> Self {
Self {
name: name.map_or_else(|| crate::name!("anonymous_dataspace"),
|n| crate::name!(parent: &n, "dataspace")),
index: skeleton::Index::new(),
handle_map: Map::new(),
// churn: Churn::new(),
}
}
@ -92,46 +62,45 @@ impl Dataspace {
impl Entity<_Any> for Dataspace {
fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult {
tracing::trace!(assertion = ?a, handle = ?h, "assert");
let _guard = self.name.enter();
// let old_assertions = self.index.assertion_count();
self.index.insert(t, &a);
// self.churn.assertions_added += self.index.assertion_count() - old_assertions;
// self.churn.endpoints_added += 1;
let is_new = self.index.insert(t, &a);
tracing::trace!(assertion = ?a, handle = ?h, ?is_new, "assert");
if let Ok(o) = language().parse::<Observe>(&a) {
self.index.add_observer(t, &o.pattern, &o.observer);
// self.churn.observers_added += 1;
self.handle_map.insert(h, (a, Some(o)));
} else {
self.handle_map.insert(h, (a, None));
if is_new {
if let Ok(o) = language().parse::<Observe>(&a) {
self.index.add_observer(t, &o.pattern, &o.observer);
}
}
self.handle_map.insert(h, a);
Ok(())
}
fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult {
tracing::trace!(handle = ?h, "retract");
let _guard = self.name.enter();
if let Some((a, maybe_o)) = self.handle_map.remove(&h) {
if let Some(o) = maybe_o {
self.index.remove_observer(t, o.pattern, &o.observer);
// self.churn.observers_removed += 1;
match self.handle_map.remove(&h) {
None => tracing::warn!(handle = ?h, "retract of unknown handle"),
Some(a) => {
let is_last = self.index.remove(t, &a);
tracing::trace!(assertion = ?a, handle = ?h, ?is_last, "retract");
if is_last {
if let Ok(o) = language().parse::<Observe>(&a) {
self.index.remove_observer(t, o.pattern, &o.observer);
}
}
}
// let old_assertions = self.index.assertion_count();
self.index.remove(t, &a);
// self.churn.assertions_removed += old_assertions - self.index.assertion_count();
// self.churn.endpoints_removed += 1;
}
Ok(())
}
fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult {
let _guard = self.name.enter();
tracing::trace!(body = ?m, "message");
// self.index.send(t, &m, &mut self.churn.messages_delivered);
self.index.send(t, &m);
// self.churn.messages_injected += 1;
Ok(())
}
}

View File

@ -112,7 +112,8 @@ impl Index {
}
/// Inserts an assertion into the index, notifying matching observers.
pub fn insert(&mut self, t: &mut Activation, outer_value: &AnyValue) {
/// Answers `true` iff the assertion was new to the dataspace.
pub fn insert(&mut self, t: &mut Activation, outer_value: &AnyValue) -> bool {
let net = self.all_assertions.change(outer_value.clone(), 1);
match net {
bag::Net::AbsentToPresent => {
@ -131,14 +132,16 @@ impl Index {
}
})
.perform(&mut self.root);
true
}
bag::Net::PresentToPresent => (),
bag::Net::PresentToPresent => false,
_ => unreachable!(),
}
}
/// Removes an assertion from the index, notifying matching observers.
pub fn remove(&mut self, t: &mut Activation, outer_value: &AnyValue) {
/// Answers `true` if it is the last in its equivalence class to be removed.
pub fn remove(&mut self, t: &mut Activation, outer_value: &AnyValue) -> bool {
let net = self.all_assertions.change(outer_value.clone(), -1);
match net {
bag::Net::PresentToAbsent => {
@ -157,8 +160,9 @@ impl Index {
}
})
.perform(&mut self.root);
true
}
bag::Net::PresentToPresent => (),
bag::Net::PresentToPresent => false,
_ => unreachable!(),
}
}