From 531d66205b8daa205f1c12012a5bb7720164857f Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 23 Sep 2021 21:43:32 +0200 Subject: [PATCH] Intra-actor dataflow and fields; `enclose!` macro --- syndicate/benches/bench_dataspace.rs | 14 +- syndicate/src/actor.rs | 286 ++++++++++++++++++++++++--- syndicate/src/dataflow.rs | 64 ++++++ syndicate/src/lib.rs | 1 + 4 files changed, 332 insertions(+), 33 deletions(-) create mode 100644 syndicate/src/dataflow.rs diff --git a/syndicate/benches/bench_dataspace.rs b/syndicate/benches/bench_dataspace.rs index db95698..32d9124 100644 --- a/syndicate/benches/bench_dataspace.rs +++ b/syndicate/benches/bench_dataspace.rs @@ -10,6 +10,7 @@ use syndicate::language; use syndicate::actor::*; use syndicate::during::entity; use syndicate::dataspace::Dataspace; +use syndicate::enclose; use syndicate::schemas::dataspace::Observe; use syndicate::schemas::dataspace_patterns as p; use syndicate::value::Map; @@ -60,17 +61,16 @@ pub fn bench_pub(c: &mut Criterion) { let account = Account::new(syndicate::name!("sender-account")); t.linked_task(syndicate::name!("sender"), async move { for _ in 0..iters { - let ds = Arc::clone(&ds); - external_event(&Arc::clone(&ds.mailbox), &account, Box::new( - move |t| t.with_entity( + external_event(&ds.mailbox, &account, Box::new( + enclose!((ds) move |t| t.with_entity( &ds, |t, e| e.message(t, says(AnyValue::new("bench_pub"), - Value::ByteString(vec![]).wrap())))))? + Value::ByteString(vec![]).wrap()))))))? } - external_event(&Arc::clone(&shutdown.mailbox), &account, Box::new( - move |t| t.with_entity( + external_event(&shutdown.mailbox, &account, Box::new( + enclose!((shutdown) move |t| t.with_entity( &shutdown, - |t, e| e.message(t, AnyValue::new(true)))))?; + |t, e| e.message(t, AnyValue::new(true))))))?; Ok(()) }); Ok(()) diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index 33fd28f..d0d3411 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -5,12 +5,13 @@ include_str!("../doc/linked-tasks.md"), )] -use super::schemas::sturdy; +use super::dataflow::Graph; use super::error::Error; use super::error::encode_error; use super::error::error; use super::rewrite::CaveatError; use super::rewrite::CheckedCaveat; +use super::schemas::sturdy; use preserves::value::Domain; use preserves::value::IOValue; @@ -21,10 +22,12 @@ use preserves_schema::ParseError; use preserves_schema::support::Parse; use preserves_schema::support::Unparse; +use std::any::Any; use std::boxed::Box; use std::collections::hash_map::HashMap; use std::convert::TryFrom; use std::convert::TryInto; +use std::marker::PhantomData; use std::num::NonZeroU64; use std::sync::Arc; use std::sync::Mutex; @@ -65,6 +68,12 @@ pub type FacetId = NonZeroU64; /// local state with some incoming assertion in an entity. pub type Handle = u64; +/// The type of process-unique field instance IDs. +pub type FieldId = NonZeroU64; + +/// The type of process-unique field observer block IDs. +pub type BlockId = NonZeroU64; + /// Responses to events must have type `ActorResult`. pub type ActorResult = Result<(), Error>; @@ -208,6 +217,7 @@ enum CleanupAction { type CleanupActions = Map; type Action = Box ActorResult>; +type Block = Box ActorResult>; #[doc(hidden)] pub type PendingEventQueue = Vec; @@ -233,6 +243,7 @@ pub struct Activation<'activation> { pub facet: FacetRef, /// A reference to the current state of the active [`Actor`]. pub state: &'activation mut RunningActor, + active_block: Option, pending: EventBuffer, } @@ -271,6 +282,7 @@ pub struct LoanedItem { enum SystemMessage { Release, + ReleaseField(FieldId), Turn(LoanedItem), Crash(Error), } @@ -325,6 +337,9 @@ pub struct RunningActor { pub actor_id: ActorId, tx: UnboundedSender, mailbox: Weak, + dataflow: Graph, + fields: HashMap>, + blocks: HashMap, exit_hooks: Vec) -> ActorResult>>, cleanup_actions: CleanupActions, facet_nodes: Map, @@ -332,6 +347,20 @@ pub struct RunningActor { root: FacetId, } +/// Handle to a shared, mutable field (i.e. a *dataflow variable*) within a [`RunningActor`]. +/// +/// Use [`Activation::field`] to create fields, and use [`Activation::get`], +/// [`::get_mut`][Activation::get_mut], and [`::set`][Activation::set] to read and write field +/// values. Use [`Activation::dataflow`] to create a reactive block within a facet that will be +/// (re-)executed whenever some dependent field changes value. +/// +#[derive(Debug)] +pub struct Field { + pub field_id: FieldId, + tx: UnboundedSender, + phantom: PhantomData, +} + /// State associated with each facet in an [`Actor`]'s facet tree. /// /// # Inert facets @@ -416,6 +445,13 @@ where /// subsequently retracted. pub struct StopOnRetract; +/// Returned from the function given to [`FacetRef::activate_exit`] to indicate how the actor +/// should proceed. +pub enum RunDisposition { + Continue, + Terminate(ActorResult), +} + //--------------------------------------------------------------------------- const BUMP_AMOUNT: u8 = 10; @@ -443,6 +479,10 @@ static NEXT_ACCOUNT_ID: AtomicU64 = AtomicU64::new(4); static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(5); +static NEXT_FIELD_ID: AtomicU64 = AtomicU64::new(6); + +static NEXT_BLOCK_ID: AtomicU64 = AtomicU64::new(7); + preserves_schema::support::lazy_static! { #[doc(hidden)] pub static ref SYNDICATE_CREDIT: i64 = { @@ -487,6 +527,15 @@ impl<'a> Unparse<&'a (), AnyValue> for Synced { } } +impl From for RunDisposition { + fn from(v: ActorResult) -> Self { + match v { + Ok(()) => RunDisposition::Continue, + Err(e) => RunDisposition::Terminate(Err(e)), + } + } +} + impl FacetRef { /// Executes `f` in a new "[turn][Activation]" for `actor`. If `f` returns `Ok(())`, /// [commits the turn][Activation::deliver] and performs the buffered actions; otherwise, @@ -500,13 +549,7 @@ impl FacetRef { ) -> ActorResult where F: FnOnce(&mut Activation) -> ActorResult, { - match self.activate_exit(account, |t| match f(t) { - Ok(()) => None, - Err(e) => Some(Err(e)), - }) { - None => Ok(()), - Some(e) => Err(error("Could not activate terminated actor", encode_error(e))), - } + self.activate_exit(account, |t| f(t).into()) } /// Executes `f` in a new "[turn][Activation]" for `actor`. If `f` returns @@ -519,20 +562,22 @@ impl FacetRef { &self, account: Arc, f: F, - ) -> Option where - F: FnOnce(&mut Activation) -> Option, + ) -> ActorResult where + F: FnOnce(&mut Activation) -> RunDisposition, { match self.actor.state.lock() { Err(_) => panicked_err(), Ok(mut g) => match &mut *g { ActorState::Terminated { exit_status } => - Some((**exit_status).clone()), + Err(error("Could not activate terminated actor", + encode_error((**exit_status).clone()))), ActorState::Running(state) => { tracing::trace!(actor_id=?self.actor.actor_id, "activate"); let mut activation = Activation::make(self, account, state); - let result = match f(&mut activation) { - None => None, - Some(exit_status) => { + let f_result = f(&mut activation); + let result = match activation.restore_invariants(f_result) { + RunDisposition::Continue => Ok(()), + RunDisposition::Terminate(exit_status) => { if exit_status.is_err() { activation.clear(); } @@ -557,7 +602,7 @@ impl FacetRef { *g = ActorState::Terminated { exit_status: Arc::clone(&exit_status), }; - Some((*exit_status).clone()) + (*exit_status).clone() } }; tracing::trace!(actor_id=?self.actor.actor_id, "deactivate"); @@ -577,6 +622,7 @@ impl<'activation> Activation<'activation> { Activation { facet: facet.clone(), state, + active_block: None, pending: EventBuffer::new(account), } } @@ -699,6 +745,22 @@ impl<'activation> Activation<'activation> { } } + /// Core API: assert, retract, or replace an assertion. + pub fn update( + &mut self, + handle: &mut Option, + r: &Arc>, + a: Option, + ) { + let saved = handle.take(); + if let Some(a) = a { + *handle = Some(self.assert(r, a)); + } + if let Some(h) = saved { + self.retract(h); + } + } + /// Core API: send message `m` to recipient `r`. pub fn message(&mut self, r: &Arc>, m: M) { tracing::trace!(?r, ?m, "message"); @@ -973,7 +1035,7 @@ impl<'activation> Activation<'activation> { /// Arranges for the active facet to be stopped cleanly when `self` commits. /// - /// Equivalent to `self.stop_facet(self.facet_id.unwrap(), None)`. + /// Equivalent to `self.stop_facet(self.facet.facet_id, None)`. pub fn stop(&mut self) { self.stop_facet(self.facet.facet_id, None) } @@ -1030,6 +1092,132 @@ impl<'activation> Activation<'activation> { Ok(()) } } + + /// Create a new dataflow variable (field) within the active [`Actor`]. + pub fn field(&mut self, initial_value: T) -> Arc> { + let field_id = FieldId::new(NEXT_FIELD_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed)) + .expect("Internal error: Attempt to allocate FieldId of zero. Too many FieldIds allocated. Restart the process."); + self.state.fields.insert(field_id, Box::new(initial_value)); + Arc::new(Field { + field_id, + tx: self.state.tx.clone(), + phantom: PhantomData, + }) + } + + /// Retrieve a reference to the current value of a dataflow variable (field); if execution + /// is currently within a [dataflow block][Activation::dataflow], marks the block as + /// *depending upon* the field. + /// + pub fn get(&mut self, field: &Field) -> &T { + tracing::trace!(field = ?field.field_id, block = ?self.active_block, "get"); + if let Some(block) = self.active_block { + self.state.dataflow.record_observation(block, field.field_id); + } + let any = self.state.fields.get(&field.field_id) + .expect("Attempt to get() missing field: wrong actor?"); + any.downcast_ref().expect("Attempt to access field at incorrect type") + } + + /// Retrieve a mutable reference to the contents of a dataflow variable (field). As for + /// [`get`][Activation::get], if used within a dataflow block, marks the block as + /// *depending upon* the field. In addition, because the caller may mutate the field, this + /// function (pessimistically) marks the field as dirty, which will lead to later + /// reevaluation of dependent blocks. + /// + pub fn get_mut(&mut self, field: &Field) -> &mut T { + tracing::trace!(field = ?field.field_id, block = ?self.active_block, "get_mut"); + { + // Overapproximation. + if let Some(block) = self.active_block { + self.state.dataflow.record_observation(block, field.field_id); + } + self.state.dataflow.record_damage(field.field_id); + } + let any = self.state.fields.get_mut(&field.field_id) + .expect("Attempt to get_mut() missing field: wrong actor?"); + any.downcast_mut().expect("Attempt to access field at incorrect type") + } + + /// Overwrite the value of a dataflow variable (field). Marks the field as dirty, even if + /// the new value is [`eq`][std::cmp::PartialEq::eq] to the value being overwritten. + /// + pub fn set(&mut self, field: &Field, value: T) { + tracing::trace!(field = ?field.field_id, block = ?self.active_block, "set"); + // Overapproximation in many cases, since the new value may not produce an + // observable difference (may be equal to the current value). + self.state.dataflow.record_damage(field.field_id); + let any = self.state.fields.get_mut(&field.field_id) + .expect("Attempt to set() missing field: wrong actor?"); + *any = Box::new(value); + } + + fn with_block(&mut self, block_id: BlockId, block: &mut Block) -> ActorResult { + let saved = self.active_block.replace(block_id); + let result = block(self); + self.active_block = saved; + result + } + + /// Creates (and immediately executes) a new *dataflow block* that will be reexecuted if + /// any of its *dependent fields* (accessed via e.g. [`get`][Activation::get] or + /// [`get_mut`][Activation::get_mut]) are mutated. + /// + pub fn dataflow ActorResult>(&mut self, block: F) -> ActorResult { + let block_id = BlockId::new(NEXT_BLOCK_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed)) + .expect("Internal error: Attempt to allocate BlockId of zero. Too many BlockIds allocated. Restart the process."); + let mut block: Block = Box::new(block); + self.with_block(block_id, &mut block)?; + self.state.blocks.insert(block_id, (self.facet.facet_id, block)); + Ok(()) + } + + fn repair_dataflow(&mut self) -> Result { + let mut pass_number = 0; + while !self.state.dataflow.is_clean() { + pass_number += 1; + tracing::trace!(?pass_number, "repair_dataflow"); + let damaged_field_ids = self.state.dataflow.take_damaged_nodes(); + for field_id in damaged_field_ids.into_iter() { + let block_ids = self.state.dataflow.take_observers_of(&field_id); + for block_id in block_ids.into_iter() { + if let Some((facet_id, mut block)) = self.state.blocks.remove(&block_id) { + let result = self.with_facet( + true, facet_id, |t| t.with_block(block_id, &mut block)); + self.state.blocks.insert(block_id, (facet_id, block)); + result?; + } + } + } + } + tracing::trace!(passes = ?pass_number, "repair_dataflow complete"); + Ok(pass_number > 0) + } + + fn _restore_invariants(&mut self) -> ActorResult { + loop { + loop { + let actions = std::mem::take(&mut self.pending.for_myself); + if actions.is_empty() { break; } + for action in actions.into_iter() { action(self)? } + } + if !self.repair_dataflow()? { + break; + } + } + Ok(()) + } + + fn restore_invariants(&mut self, d: RunDisposition) -> RunDisposition { + match d { + RunDisposition::Continue => + self._restore_invariants().into(), + RunDisposition::Terminate(Ok(())) => + RunDisposition::Terminate(self._restore_invariants()), + RunDisposition::Terminate(Err(_)) => + d, + } + } } impl EventBuffer { @@ -1215,6 +1403,9 @@ impl Actor { actor_id, tx, mailbox: Weak::new(), + dataflow: Graph::new(), + fields: HashMap::new(), + blocks: HashMap::new(), exit_hooks: Vec::new(), cleanup_actions: Map::new(), facet_nodes: Map::new(), @@ -1276,7 +1467,7 @@ impl Actor { let terminate = |result: ActorResult| { let _ = root_facet_ref.activate_exit(Account::new(crate::name!("shutdown")), - |_| Some(result)); + |_| RunDisposition::Terminate(result)); }; if root_facet_ref.activate(Account::new(crate::name!("boot")), boot).is_err() { @@ -1294,15 +1485,18 @@ impl Actor { tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::Release"); return terminate(Ok(())); } + SystemMessage::ReleaseField(field_id) => { + tracing::trace!(actor_id = ?self.ac_ref.actor_id, + "SystemMessage::ReleaseField({})", field_id); + self.ac_ref.access(|s| if let ActorState::Running(ra) = s.unwrap() { + ra.fields.remove(&field_id); + }) + } SystemMessage::Turn(mut loaned_item) => { tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::Turn"); - let mut actions = std::mem::take(&mut loaned_item.item); + let actions = std::mem::take(&mut loaned_item.item); let r = root_facet_ref.activate(Arc::clone(&loaned_item.account), |t| { - loop { - for action in actions.into_iter() { action(t)? } - actions = std::mem::take(&mut t.pending.for_myself); - if actions.is_empty() { break; } - } + for action in actions.into_iter() { action(t)? } Ok(()) }); if r.is_err() { return; } @@ -1353,8 +1547,8 @@ impl Facet { } } -fn panicked_err() -> Option { - Some(Err(error("Actor panicked", AnyValue::new(false)))) +fn panicked_err() -> ActorResult { + Err(error("Actor panicked", AnyValue::new(false))) } impl ActorRef { @@ -1374,7 +1568,7 @@ impl ActorRef { /// abnormally. pub fn exit_status(&self) -> Option { self.access(|s| s.map_or_else( - panicked_err, + || Some(panicked_err()), |state| match state { ActorState::Running(_) => None, ActorState::Terminated { exit_status } => Some((**exit_status).clone()), @@ -1467,6 +1661,20 @@ impl RunningActor { } } +impl Eq for Field {} +impl PartialEq for Field { + fn eq(&self, other: &Field) -> bool { + self.field_id == other.field_id + } +} + +impl Drop for Field { + fn drop(&mut self) { + let _ = self.tx.send(SystemMessage::ReleaseField(self.field_id)); + () + } +} + impl Drop for Actor { fn drop(&mut self) { self.rx.close(); @@ -1623,6 +1831,17 @@ impl Cap { self.rewrite(m.unparse(literals)).map(|m| t.assert(&self.underlying, m)) } + /// `update` is to [`assert`] as [`Activation::update`] is to [`Activation::assert`]. + pub fn update>( + &self, + t: &mut Activation, + handle: &mut Option, + literals: L, + m: Option<&M>, + ) { + t.update(handle, &self.underlying, m.and_then(|m| self.rewrite(m.unparse(literals)))) + } + /// Translates `m` into an `AnyValue`, passes it through /// [`rewrite`][Self::rewrite], and then sends it via method /// [`message`][Activation::message] on the activation `t`. @@ -1717,3 +1936,18 @@ macro_rules! name { task_id = tracing::field::Empty, oid = tracing::field::Empty)} } + +/// A convenient way of cloning a bunch of state shared among [entities][Entity], actions, +/// linked tasks, etc. +/// +/// Directly drawn from the discussion [here](https://github.com/rust-lang/rfcs/issues/2407). +/// +#[macro_export] +macro_rules! enclose { + ( ( $($name:ident),* ) $closure:expr ) => { + { + $(let $name = $name.clone();)* + $closure + } + } +} diff --git a/syndicate/src/dataflow.rs b/syndicate/src/dataflow.rs new file mode 100644 index 0000000..d688a45 --- /dev/null +++ b/syndicate/src/dataflow.rs @@ -0,0 +1,64 @@ +use preserves::value::Map; +use preserves::value::Set; + +use std::fmt::Debug; + +#[derive(Debug)] +pub struct Graph +{ + forward_edges: Map>, + reverse_edges: Map>, + damaged_nodes: Set, +} + +impl + Graph +{ + pub fn new() -> Self { + Graph { + forward_edges: Map::new(), + reverse_edges: Map::new(), + damaged_nodes: Set::new(), + } + } + + pub fn is_clean(&self) -> bool { + self.damaged_nodes.is_empty() + } + + pub fn record_observation(&mut self, observer: SubjectId, observed: ObjectId) { + self.forward_edges.entry(observed.clone()).or_default().insert(observer.clone()); + self.reverse_edges.entry(observer).or_default().insert(observed); + } + + pub fn record_damage(&mut self, observed: ObjectId) { + self.damaged_nodes.insert(observed); + } + + fn forget_subject(&mut self, observer: &SubjectId) { + if let Some(observeds) = self.reverse_edges.remove(observer) { + for observed in observeds.into_iter() { + if let Some(observers) = self.forward_edges.get_mut(&observed) { + observers.remove(observer); + } + } + } + } + + // To repair: repeat: + // - take_damaged_nodes; if none, return successfully - otherwise: + // - for each, take_observers_of. + // - for each, invoke the observer's repair function. + + pub fn take_damaged_nodes(&mut self) -> Set { + std::mem::take(&mut self.damaged_nodes) + } + + pub fn take_observers_of(&mut self, observed: &ObjectId) -> Set { + let observers = self.forward_edges.remove(&observed).unwrap_or_default(); + for observer in observers.iter() { + self.forget_subject(observer); + } + observers + } +} diff --git a/syndicate/src/lib.rs b/syndicate/src/lib.rs index 4d5a40c..6dc83df 100644 --- a/syndicate/src/lib.rs +++ b/syndicate/src/lib.rs @@ -11,6 +11,7 @@ pub use preserves_schema; pub mod actor; pub mod bag; +pub mod dataflow; pub mod dataspace; pub mod during; pub mod error;