Intra-actor dataflow and fields; `enclose!` macro

Tony Garnock-Jones 2021-09-23 21:43:32 +02:00
@ -10,6 +10,7 @@ use syndicate::language;
use syndicate::actor::*;
use syndicate::during::entity;
use syndicate::dataspace::Dataspace;
use syndicate::enclose;
use syndicate::schemas::dataspace::Observe;
use syndicate::schemas::dataspace_patterns as p;
use syndicate::value::Map;
@ -60,17 +61,16 @@ pub fn bench_pub(c: &mut Criterion) {
let account = Account::new(syndicate::name!("sender-account"));
t.linked_task(syndicate::name!("sender"), async move {
for _ in 0..iters {
let ds = Arc::clone(&ds);
external_event(&Arc::clone(&ds.mailbox), &account, Box::new(
move |t| t.with_entity(
external_event(&ds.mailbox, &account, Box::new(
enclose!((ds) move |t| t.with_entity(
|t, e| e.message(t, says(AnyValue::new("bench_pub"),
external_event(&Arc::clone(&shutdown.mailbox), &account, Box::new(
move |t| t.with_entity(
external_event(&shutdown.mailbox, &account, Box::new(
enclose!((shutdown) move |t| t.with_entity(
|t, e| e.message(t, AnyValue::new(true)))))?;
|t, e| e.message(t, AnyValue::new(true))))))?;

@ -5,12 +5,13 @@
use super::schemas::sturdy;
use super::dataflow::Graph;
use super::error::Error;
use super::error::encode_error;
use super::error::error;
use super::rewrite::CaveatError;
use super::rewrite::CheckedCaveat;
use super::schemas::sturdy;
use preserves::value::Domain;
use preserves::value::IOValue;
@ -21,10 +22,12 @@ use preserves_schema::ParseError;
use preserves_schema::support::Parse;
use preserves_schema::support::Unparse;
use std::any::Any;
use std::boxed::Box;
use std::collections::hash_map::HashMap;
use std::convert::TryFrom;
use std::convert::TryInto;
use std::marker::PhantomData;
use std::num::NonZeroU64;
use std::sync::Arc;
use std::sync::Mutex;
@ -65,6 +68,12 @@ pub type FacetId = NonZeroU64;
/// local state with some incoming assertion in an entity.
pub type Handle = u64;
/// The type of process-unique field instance IDs.
pub type FieldId = NonZeroU64;
/// The type of process-unique field observer block IDs.
pub type BlockId = NonZeroU64;
/// Responses to events must have type `ActorResult`.
pub type ActorResult = Result<(), Error>;
@ -208,6 +217,7 @@ enum CleanupAction {
type CleanupActions = Map<Handle, CleanupAction>;
type Action = Box<dyn Send + FnOnce(&mut Activation) -> ActorResult>;
type Block = Box<dyn Send + FnMut(&mut Activation) -> ActorResult>;
pub type PendingEventQueue = Vec<Action>;
@ -233,6 +243,7 @@ pub struct Activation<'activation> {
pub facet: FacetRef,
/// A reference to the current state of the active [`Actor`].
pub state: &'activation mut RunningActor,
active_block: Option<BlockId>,
pending: EventBuffer,
@ -271,6 +282,7 @@ pub struct LoanedItem<T> {
enum SystemMessage {
@ -325,6 +337,9 @@ pub struct RunningActor {
pub actor_id: ActorId,
tx: UnboundedSender<SystemMessage>,
mailbox: Weak<Mailbox>,
dataflow: Graph<FieldId, BlockId>,
fields: HashMap<FieldId, Box<dyn Any + Send>>,
blocks: HashMap<BlockId, (FacetId, Block)>,
exit_hooks: Vec<Box<dyn Send + FnOnce(&mut Activation, &Arc<ActorResult>) -> ActorResult>>,
cleanup_actions: CleanupActions,
facet_nodes: Map<FacetId, Facet>,
@ -332,6 +347,20 @@ pub struct RunningActor {
root: FacetId,
/// Handle to a shared, mutable field (i.e. a *dataflow variable*) within a [`RunningActor`].
/// Use [`Activation::field`] to create fields, and use [`Activation::get`],
/// [`::get_mut`][Activation::get_mut], and [`::set`][Activation::set] to read and write field
/// values. Use [`Activation::dataflow`] to create a reactive block within a facet that will be
/// (re-)executed whenever some dependent field changes value.
pub struct Field<T: Any + Send> {
pub field_id: FieldId,
tx: UnboundedSender<SystemMessage>,
phantom: PhantomData<T>,
/// State associated with each facet in an [`Actor`]'s facet tree.
/// # Inert facets
@ -416,6 +445,13 @@ where
/// subsequently retracted.
pub struct StopOnRetract;
/// Returned from the function given to [`FacetRef::activate_exit`] to indicate how the actor
/// should proceed.
pub enum RunDisposition {
const BUMP_AMOUNT: u8 = 10;
@ -443,6 +479,10 @@ static NEXT_ACCOUNT_ID: AtomicU64 = AtomicU64::new(4);
static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(5);
static NEXT_FIELD_ID: AtomicU64 = AtomicU64::new(6);
static NEXT_BLOCK_ID: AtomicU64 = AtomicU64::new(7);
preserves_schema::support::lazy_static! {
pub static ref SYNDICATE_CREDIT: i64 = {
@ -487,6 +527,15 @@ impl<'a> Unparse<&'a (), AnyValue> for Synced {
impl From<ActorResult> for RunDisposition {
fn from(v: ActorResult) -> Self {
match v {
Ok(()) => RunDisposition::Continue,
Err(e) => RunDisposition::Terminate(Err(e)),
impl FacetRef {
/// Executes `f` in a new "[turn][Activation]" for `actor`. If `f` returns `Ok(())`,
/// [commits the turn][Activation::deliver] and performs the buffered actions; otherwise,
@ -500,13 +549,7 @@ impl FacetRef {
) -> ActorResult where
F: FnOnce(&mut Activation) -> ActorResult,
match self.activate_exit(account, |t| match f(t) {
Ok(()) => None,
Err(e) => Some(Err(e)),
}) {
None => Ok(()),
Some(e) => Err(error("Could not activate terminated actor", encode_error(e))),
self.activate_exit(account, |t| f(t).into())
/// Executes `f` in a new "[turn][Activation]" for `actor`. If `f` returns
@ -519,20 +562,22 @@ impl FacetRef {
account: Arc<Account>,
f: F,
) -> Option<ActorResult> where
F: FnOnce(&mut Activation) -> Option<ActorResult>,
) -> ActorResult where
F: FnOnce(&mut Activation) -> RunDisposition,
match {
Err(_) => panicked_err(),
Ok(mut g) => match &mut *g {
ActorState::Terminated { exit_status } =>
Err(error("Could not activate terminated actor",
ActorState::Running(state) => {
tracing::trace!(actor_id=?, "activate");
let mut activation = Activation::make(self, account, state);
let result = match f(&mut activation) {
None => None,
Some(exit_status) => {
let f_result = f(&mut activation);
let result = match activation.restore_invariants(f_result) {
RunDisposition::Continue => Ok(()),
RunDisposition::Terminate(exit_status) => {
if exit_status.is_err() {
@ -557,7 +602,7 @@ impl FacetRef {
*g = ActorState::Terminated {
exit_status: Arc::clone(&exit_status),
tracing::trace!(actor_id=?, "deactivate");
@ -577,6 +622,7 @@ impl<'activation> Activation<'activation> {
Activation {
facet: facet.clone(),
active_block: None,
pending: EventBuffer::new(account),
@ -699,6 +745,22 @@ impl<'activation> Activation<'activation> {
/// Core API: assert, retract, or replace an assertion.
pub fn update<M: 'static + Send + std::fmt::Debug>(
&mut self,
handle: &mut Option<Handle>,
r: &Arc<Ref<M>>,
a: Option<M>,
) {
let saved = handle.take();
if let Some(a) = a {
*handle = Some(self.assert(r, a));
if let Some(h) = saved {
/// Core API: send message `m` to recipient `r`.
pub fn message<M: 'static + Send + std::fmt::Debug>(&mut self, r: &Arc<Ref<M>>, m: M) {
tracing::trace!(?r, ?m, "message");
@ -973,7 +1035,7 @@ impl<'activation> Activation<'activation> {
/// Arranges for the active facet to be stopped cleanly when `self` commits.
/// Equivalent to `self.stop_facet(self.facet_id.unwrap(), None)`.
/// Equivalent to `self.stop_facet(self.facet.facet_id, None)`.
pub fn stop(&mut self) {
self.stop_facet(self.facet.facet_id, None)
@ -1030,6 +1092,132 @@ impl<'activation> Activation<'activation> {
/// Create a new dataflow variable (field) within the active [`Actor`].
pub fn field<T: Any + Send>(&mut self, initial_value: T) -> Arc<Field<T>> {
let field_id = FieldId::new(NEXT_FIELD_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed))
.expect("Internal error: Attempt to allocate FieldId of zero. Too many FieldIds allocated. Restart the process.");
self.state.fields.insert(field_id, Box::new(initial_value));
Arc::new(Field {
tx: self.state.tx.clone(),
phantom: PhantomData,
/// Retrieve a reference to the current value of a dataflow variable (field); if execution
/// is currently within a [dataflow block][Activation::dataflow], marks the block as
/// *depending upon* the field.
pub fn get<T: Any + Send>(&mut self, field: &Field<T>) -> &T {
tracing::trace!(field = ?field.field_id, block = ?self.active_block, "get");
if let Some(block) = self.active_block {
self.state.dataflow.record_observation(block, field.field_id);
let any = self.state.fields.get(&field.field_id)
.expect("Attempt to get() missing field: wrong actor?");
any.downcast_ref().expect("Attempt to access field at incorrect type")
/// Retrieve a mutable reference to the contents of a dataflow variable (field). As for
/// [`get`][Activation::get], if used within a dataflow block, marks the block as
/// *depending upon* the field. In addition, because the caller may mutate the field, this
/// function (pessimistically) marks the field as dirty, which will lead to later
/// reevaluation of dependent blocks.
pub fn get_mut<T: Any + Send>(&mut self, field: &Field<T>) -> &mut T {
tracing::trace!(field = ?field.field_id, block = ?self.active_block, "get_mut");
// Overapproximation.
if let Some(block) = self.active_block {
self.state.dataflow.record_observation(block, field.field_id);
let any = self.state.fields.get_mut(&field.field_id)
.expect("Attempt to get_mut() missing field: wrong actor?");
any.downcast_mut().expect("Attempt to access field at incorrect type")
/// Overwrite the value of a dataflow variable (field). Marks the field as dirty, even if
/// the new value is [`eq`][std::cmp::PartialEq::eq] to the value being overwritten.
pub fn set<T: Any + Send>(&mut self, field: &Field<T>, value: T) {
tracing::trace!(field = ?field.field_id, block = ?self.active_block, "set");
// Overapproximation in many cases, since the new value may not produce an
// observable difference (may be equal to the current value).
let any = self.state.fields.get_mut(&field.field_id)
.expect("Attempt to set() missing field: wrong actor?");
*any = Box::new(value);
fn with_block(&mut self, block_id: BlockId, block: &mut Block) -> ActorResult {
let saved = self.active_block.replace(block_id);
let result = block(self);
self.active_block = saved;
/// Creates (and immediately executes) a new *dataflow block* that will be reexecuted if
/// any of its *dependent fields* (accessed via e.g. [`get`][Activation::get] or
/// [`get_mut`][Activation::get_mut]) are mutated.
pub fn dataflow<F: 'static + Send + FnMut(&mut Activation) -> ActorResult>(&mut self, block: F) -> ActorResult {
let block_id = BlockId::new(NEXT_BLOCK_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed))
.expect("Internal error: Attempt to allocate BlockId of zero. Too many BlockIds allocated. Restart the process.");
let mut block: Block = Box::new(block);
self.with_block(block_id, &mut block)?;
self.state.blocks.insert(block_id, (self.facet.facet_id, block));
fn repair_dataflow(&mut self) -> Result<bool, Error> {
let mut pass_number = 0;
while !self.state.dataflow.is_clean() {
pass_number += 1;
tracing::trace!(?pass_number, "repair_dataflow");
let damaged_field_ids = self.state.dataflow.take_damaged_nodes();
for field_id in damaged_field_ids.into_iter() {
let block_ids = self.state.dataflow.take_observers_of(&field_id);
for block_id in block_ids.into_iter() {
if let Some((facet_id, mut block)) = self.state.blocks.remove(&block_id) {
let result = self.with_facet(
true, facet_id, |t| t.with_block(block_id, &mut block));
self.state.blocks.insert(block_id, (facet_id, block));
tracing::trace!(passes = ?pass_number, "repair_dataflow complete");
Ok(pass_number > 0)
fn _restore_invariants(&mut self) -> ActorResult {
loop {
loop {
let actions = std::mem::take(&mut self.pending.for_myself);
if actions.is_empty() { break; }
for action in actions.into_iter() { action(self)? }
if !self.repair_dataflow()? {
fn restore_invariants(&mut self, d: RunDisposition) -> RunDisposition {
match d {
RunDisposition::Continue =>
RunDisposition::Terminate(Ok(())) =>
RunDisposition::Terminate(Err(_)) =>
impl EventBuffer {
@ -1215,6 +1403,9 @@ impl Actor {
mailbox: Weak::new(),
dataflow: Graph::new(),
fields: HashMap::new(),
blocks: HashMap::new(),
exit_hooks: Vec::new(),
cleanup_actions: Map::new(),
facet_nodes: Map::new(),
@ -1276,7 +1467,7 @@ impl Actor {
let terminate = |result: ActorResult| {
let _ = root_facet_ref.activate_exit(Account::new(crate::name!("shutdown")),
|_| Some(result));
|_| RunDisposition::Terminate(result));
if root_facet_ref.activate(Account::new(crate::name!("boot")), boot).is_err() {
@ -1294,15 +1485,18 @@ impl Actor {
tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::Release");
return terminate(Ok(()));
SystemMessage::ReleaseField(field_id) => {
tracing::trace!(actor_id = ?self.ac_ref.actor_id,
"SystemMessage::ReleaseField({})", field_id);
self.ac_ref.access(|s| if let ActorState::Running(ra) = s.unwrap() {
SystemMessage::Turn(mut loaned_item) => {
tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::Turn");
let mut actions = std::mem::take(&mut loaned_item.item);
let actions = std::mem::take(&mut loaned_item.item);
let r = root_facet_ref.activate(Arc::clone(&loaned_item.account), |t| {
loop {
for action in actions.into_iter() { action(t)? }
actions = std::mem::take(&mut t.pending.for_myself);
if actions.is_empty() { break; }
for action in actions.into_iter() { action(t)? }
if r.is_err() { return; }
@ -1353,8 +1547,8 @@ impl Facet {
fn panicked_err() -> Option<ActorResult> {
Some(Err(error("Actor panicked", AnyValue::new(false))))
fn panicked_err() -> ActorResult {
Err(error("Actor panicked", AnyValue::new(false)))
impl ActorRef {
@ -1374,7 +1568,7 @@ impl ActorRef {
/// abnormally.
pub fn exit_status(&self) -> Option<ActorResult> {
self.access(|s| s.map_or_else(
|| Some(panicked_err()),
|state| match state {
ActorState::Running(_) => None,
ActorState::Terminated { exit_status } => Some((**exit_status).clone()),
@ -1467,6 +1661,20 @@ impl RunningActor {
impl<T: Any + Send> Eq for Field<T> {}
impl<T: Any + Send> PartialEq for Field<T> {
fn eq(&self, other: &Field<T>) -> bool {
self.field_id == other.field_id
impl<T: Any + Send> Drop for Field<T> {
fn drop(&mut self) {
let _ = self.tx.send(SystemMessage::ReleaseField(self.field_id));
impl Drop for Actor {
fn drop(&mut self) {
@ -1623,6 +1831,17 @@ impl Cap {
self.rewrite(m.unparse(literals)).map(|m| t.assert(&self.underlying, m))
/// `update` is to [`assert`] as [`Activation::update`] is to [`Activation::assert`].
pub fn update<L, M: Unparse<L, AnyValue>>(
t: &mut Activation,
handle: &mut Option<Handle>,
literals: L,
m: Option<&M>,
) {
t.update(handle, &self.underlying, m.and_then(|m| self.rewrite(m.unparse(literals))))
/// Translates `m` into an `AnyValue`, passes it through
/// [`rewrite`][Self::rewrite], and then sends it via method
/// [`message`][Activation::message] on the activation `t`.
@ -1717,3 +1936,18 @@ macro_rules! name {
task_id = tracing::field::Empty,
oid = tracing::field::Empty)}
/// A convenient way of cloning a bunch of state shared among [entities][Entity], actions,
/// linked tasks, etc.
/// Directly drawn from the discussion [here](
macro_rules! enclose {
( ( $($name:ident),* ) $closure:expr ) => {
$(let $name = $name.clone();)*

syndicate/src/ Normal file
@ -0,0 +1,64 @@
use preserves::value::Map;
use preserves::value::Set;
use std::fmt::Debug;
pub struct Graph<ObjectId: Debug + Clone + Eq + Ord, SubjectId: Debug + Clone + Eq + Ord>
forward_edges: Map<ObjectId, Set<SubjectId>>,
reverse_edges: Map<SubjectId, Set<ObjectId>>,
damaged_nodes: Set<ObjectId>,
impl<ObjectId: Debug + Clone + Eq + Ord, SubjectId: Debug + Clone + Eq + Ord>
Graph<ObjectId, SubjectId>
pub fn new() -> Self {
Graph {
forward_edges: Map::new(),
reverse_edges: Map::new(),
damaged_nodes: Set::new(),
pub fn is_clean(&self) -> bool {
pub fn record_observation(&mut self, observer: SubjectId, observed: ObjectId) {
pub fn record_damage(&mut self, observed: ObjectId) {
fn forget_subject(&mut self, observer: &SubjectId) {
if let Some(observeds) = self.reverse_edges.remove(observer) {
for observed in observeds.into_iter() {
if let Some(observers) = self.forward_edges.get_mut(&observed) {
// To repair: repeat:
// - take_damaged_nodes; if none, return successfully - otherwise:
// - for each, take_observers_of.
// - for each, invoke the observer's repair function.
pub fn take_damaged_nodes(&mut self) -> Set<ObjectId> {
std::mem::take(&mut self.damaged_nodes)
pub fn take_observers_of(&mut self, observed: &ObjectId) -> Set<SubjectId> {
let observers = self.forward_edges.remove(&observed).unwrap_or_default();
for observer in observers.iter() {

@ -11,6 +11,7 @@ pub use preserves_schema;
pub mod actor;
pub mod bag;
pub mod dataflow;
pub mod dataspace;
pub mod during;
pub mod error;