#![doc = concat!( include_str!("../doc/actor.md"), include_str!("../doc/what-is-an-actor.md"), include_str!("../doc/flow-control.md"), include_str!("../doc/linked-tasks.md"), )] use crate::enclose; use super::dataflow::Graph; use super::error::Error; use super::error::error; use super::rewrite::CaveatError; use super::rewrite::CheckedCaveat; use super::schemas::protocol; use super::schemas::sturdy; use super::trace; use parking_lot::Mutex; use parking_lot::RwLock; use preserves::value::ArcValue; use preserves::value::Domain; use preserves::value::IOValue; use preserves::value::Map; use preserves::value::NestedValue; use preserves::value::Set; use preserves_schema::ParseError; use preserves_schema::support::Parse; use preserves_schema::support::Unparse; use std::any::Any; use std::collections::hash_map::HashMap; use std::convert::TryFrom; use std::convert::TryInto; use std::marker::PhantomData; use std::num::NonZeroU64; use std::sync::Arc; use std::sync::Weak; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use std::time; use tokio::select; use tokio::sync::Notify; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; use tokio_util::sync::CancellationToken; // use tracing::Instrument; /// The type of messages and assertions that can be exchanged among /// distributed objects, including via [dataspace][crate::dataspace]. /// /// A Preserves value where embedded references are instances of /// [`Cap`]. /// /// While [`Ref`] can be used within a process, where arbitrary /// `M`-values can be exchanged among objects, for distributed or /// polyglot systems a *lingua franca* has to be chosen. `AnyValue` is /// that language. pub type AnyValue = ArcValue>; /// The type of the optional names attached to actors, tasks, and [`Account`]s. pub type Name = Option; /// The type of process-unique actor IDs. pub type ActorId = NonZeroU64; /// The type of process-unique facet IDs. pub type FacetId = NonZeroU64; /// The type of process-unique assertion handles. /// /// Used both as a reference to [retract][Entity::retract] /// previously-asserted assertions and as an indexing key to associate /// local state with some incoming assertion in an entity. pub type Handle = u64; /// The type of process-unique field instance IDs. pub type FieldId = NonZeroU64; /// The type of process-unique field observer block IDs. pub type BlockId = NonZeroU64; /// Error responses to events must have type `ActorError`. pub type ActorError = Box; /// Responses to events must have type `ActorResult`. pub type ActorResult = Result<(), ActorError>; /// Final exit status of an actor. #[derive(Debug, Clone, PartialEq, Eq)] pub enum ExitStatus { Normal, Dropped, Error(Error), } impl From for Result<(), Error> { fn from(status: ExitStatus) -> Self { match status { ExitStatus::Normal => Ok(()), ExitStatus::Dropped => Ok(()), ExitStatus::Error(e) => Err(e), } } } impl From for ActorResult { fn from(status: ExitStatus) -> Self { Result::<(), Error>::from(status).map_err(|e| e.into()) } } /// The [`Actor::boot`] method returns an `ActorHandle`, representing /// the actor's mainloop task. pub type ActorHandle = tokio::task::JoinHandle; /// The type of the "disarm" function returned from [`Activation::prevent_inert_check`]. pub type DisarmFn = Box; /// A small protocol for indicating successful synchronisation with /// some peer; see [Entity::sync]. #[derive(Debug)] pub struct Synced; /// The core metaprotocol implemented by every object. /// /// Entities communicate with each other by asserting and retracting /// values and by sending messages (which can be understood very /// approximately as "infinitesimally brief" assertions of the message /// body). /// /// Every assertion placed at a receiving entity *R* from some sending /// entity *S* lives so long as *S*'s actor survives, *R*'s actor /// survives, and *S* does not retract the assertion. Messages, by /// contrast, are transient. /// /// Implementors of [`Entity`] accept assertions from peers in method /// [`assert`][Entity::assert]; notification of retraction of a /// previously-asserted value happens in method /// [`retract`][Entity::retract]; and notification of a message in /// method [`message`][Entity::message]. /// /// In addition, entities may *synchronise* with each other: the /// [`sync`][Entity::sync] method responds to a synchronisation /// request. /// /// Finally, the Rust implementation of the Syndicated Actor model /// offers a hook for running some code at the end of an Entity's /// containing [`Actor`]'s lifetime /// ([`exit_hook`][Entity::exit_hook]). /// /// # What to implement /// /// The default implementations of the methods here generally do /// nothing; override them to add actual behaviour to your entity. /// #[allow(unused_variables)] pub trait Entity: Send { /// Receive notification of a new assertion from a peer. /// /// The `turn` parameter represents the current /// [activation][Activation]; `assertion` is the value (of type /// `M`) asserted; and `handle` is the process-unique name for /// this particular assertion instance that will be used later /// when it is [retracted][Entity::retract]. /// /// The default implementation does nothing. fn assert(&mut self, turn: &mut Activation, assertion: M, handle: Handle) -> ActorResult { Ok(()) } /// Receive notification of retraction of a previous assertion from a peer. /// /// This happens either when the asserting peer explicitly /// retracts an assertion, or when its animating [`Actor`] /// terminates. /// /// The `turn` parameter represents the current /// [activation][Activation], and `handle` is the process-unique /// name for this particular assertion instance being retracted. /// /// Note that no `assertion` value is provided: entities needing /// to know the value that was previously asserted must remember /// it themselves (perhaps in a [`Map`] keyed by `handle`). /// /// The default implementation does nothing. fn retract(&mut self, turn: &mut Activation, handle: Handle) -> ActorResult { Ok(()) } /// Receive notification of a message from a peer. /// /// The `turn` parameter represents the current /// [activation][Activation], and `message` is the body of the /// message sent. /// /// The default implementation does nothing. fn message(&mut self, turn: &mut Activation, message: M) -> ActorResult { Ok(()) } /// Respond to a synchronisation request from a peer. /// /// Implementors of [`Entity`] will seldom override this. The /// default implementation fulfils the synchronisation protocol by /// responding to `peer` with a `Synced` message. /// /// In special cases, for example when an entity is a proxy for /// some remote entity, the right thing to do is to forward the /// synchronisation request on to another entity; in those cases, /// overriding the default behaviour is appropriate. fn sync(&mut self, turn: &mut Activation, peer: Arc>) -> ActorResult { turn.message(&peer, Synced); Ok(()) } /// Optional callback for running actions when the entity's owning [Facet] terminates /// cleanly. Will not be called in case of abnormal shutdown (crash) of an actor. Callbacks /// run in the context of the owning facet's *parent* facet. /// /// Programs register an entity's stop hook with [Activation::on_stop_notify]. /// /// The default implementation does nothing. fn stop(&mut self, turn: &mut Activation) -> ActorResult { Ok(()) } /// Optional callback for running cleanup actions when the /// entity's animating [Actor] terminates. /// /// Programs register an entity's exit hook with /// [Activation::add_exit_hook]. /// /// The default implementation does nothing. fn exit_hook(&mut self, turn: &mut Activation, exit_status: &Arc) { } } /// An "inert" entity, that does nothing in response to any event delivered to it. /// /// Useful as a placeholder or dummy in various situations. pub struct InertEntity; impl Entity for InertEntity {} type TracedAction = (Option, Action); type Action = Box ActorResult>; type Block = Box ActorResult>; type InfallibleAction = Box; #[doc(hidden)] pub struct OutboundAssertion { established: bool, asserting_facet_id: FacetId, peer: Arc, retractor: TracedAction, } #[doc(hidden)] pub type OutboundAssertions = Map>>>; #[doc(hidden)] pub type PendingEventQueue = Vec; /// The main API for programming Syndicated Actor objects. /// /// Through `Activation`s, programs can access the state of their /// animating actor and their active [`Facet`]. /// /// Usually, an `Activation` will be supplied to code that needs one; but when non-Actor code /// (such as a [linked task][crate::actor#linked-tasks]) needs to enter an Actor's execution /// context, use [`FacetRef::activate`] to construct one. /// /// Many actions that an entity can perform are methods directly on /// `Activation`, but methods on [`FacetRef`] are also sometimes useful. /// /// This is what other implementations call a "Turn", renamed here to /// avoid conflicts with [`crate::schemas::protocol::Turn`]. pub struct Activation { // Do not poke at exposed but doc(hidden) fields here! You will violate invariants if you do! // They are exposed for debug/reflective purposes. //--------------------------------------------------------------------------- // Fields related to an active turn for an actor. // INVARIANT: facet.is_none() any time the owning actor's Mutex is unlocked. facet_id: Option, account: Option>, turn_description: Option, trace_collector: Option, pre_commit_actions: Vec, rollback_actions: Vec, commit_actions: Vec, // INVARIANT: At most one of single_queue and multiple_queues is non-None at any given time. single_queue: Option<(ActorId, UnboundedSender, PendingEventQueue)>, multiple_queues: Option, PendingEventQueue)>>, //--------------------------------------------------------------------------- // Fields related to the actor itself, relevant even when it is suspended. actor_id: ActorId, actor_ref: ActorRef, tx: UnboundedSender, mailbox: Weak, exit_hooks: Vec)>>, #[doc(hidden)] pub outbound_assertions: OutboundAssertions, #[doc(hidden)] pub facet_nodes: Map, #[doc(hidden)] pub facet_children: Map>, pub root: FacetId, // TODO rename to root_facet_id, or ideally make it a FacetRef called root_facet #[doc(hidden)] dataflow: Option> } pub struct DataflowState { active_block: Option, graph: Graph, #[doc(hidden)] pub fields: HashMap>, blocks: HashMap, } /// An `Account` records a "debt" in terms of outstanding work items. /// /// It is part of the flow control mechanism - see [the module-level /// documentation][crate::actor#flow-control] for more. #[derive(Debug)] pub struct Account { id: u64, debt: Arc, notify: Notify, trace_collector: Option, } /// A `LoanedItem` is a `T` with an associated `cost` recorded /// against it in the ledger of a given [`Account`]. The cost is /// repaid automatically when the `LoanedItem` is `Drop`ped. /// /// `LoanedItem`s are part of the flow control mechanism - see [the /// module-level documentation][crate::actor#flow-control] for more. #[derive(Debug)] pub struct LoanedItem { /// The account against which this loan is recorded. pub account: Arc, /// The cost of this particular `T`. pub cost: usize, /// The underlying item itself. pub item: T, } enum SystemMessage { Release, ReleaseField(FieldId), Turn(Option, LoanedItem), Crash(Error), } /// The mechanism by which events are delivered to a given [`Actor`]. pub struct Mailbox { /// The ID of the actor this mailbox corresponds to. pub actor_id: ActorId, tx: UnboundedSender, } /// Each actor owns an instance of this structure. /// /// It holds the receive-half of the actor's mailbox, plus a reference /// to the actor's private state. pub struct Actor { rx: UnboundedReceiver, trace_collector: Option, ac_ref: ActorRef, } /// A reference to an actor's private [`ActorState`]. #[derive(Clone)] pub struct ActorRef { /// The ID of the referenced actor. pub actor_id: ActorId, // Not intended for ordinary use! You'll break a bunch of invariants if you do! // Used internally, and made publicly accessible for reflective/debug use. #[doc(hidden)] pub state: Arc>, } /// A combination of an [`ActorRef`] with a [`FacetId`], acting as a capability to enter the /// execution context of a facet from a linked task. #[derive(Debug, Clone)] pub struct FacetRef { pub actor: ActorRef, pub facet_id: FacetId, } /// The state of an actor: either `Running` or `Terminated`. pub enum ActorState { /// A non-terminated actor has an associated [`Activation`] state record. Running(Activation), /// A terminated actor has an [`ActorResult`] as its `exit_status`. Terminated { /// The exit status of the actor: `Ok(())` for normal /// termination, `Err(_)` for abnormal termination. exit_status: Arc, }, } /// The type of process-unique task IDs. pub type TaskId = u64; /// Handle to a shared, mutable field (i.e. a *dataflow variable*) within a running actor. /// /// Use [`Activation::field`] to create fields, and use [`Activation::get`], /// [`::get_mut`][Activation::get_mut], and [`::set`][Activation::set] to read and write field /// values. Use [`Activation::dataflow`] to create a reactive block within a facet that will be /// (re-)executed whenever some dependent field changes value. /// pub struct Field { pub name: String, pub field_id: FieldId, tx: UnboundedSender, phantom: PhantomData, } /// State associated with each facet in an [`Actor`]'s facet tree. /// /// # Inert facets /// /// A facet is considered *inert* if either: /// /// 1. it has a parent facet and that parent facet is terminated; or /// 2. it is either the root facet or its parent is not yet terminated, and it: /// 1. has no child facets; /// 2. has no cleanup actions (that is, no assertions placed by any of its entities); /// 3. has no linked tasks; and /// 4. has no "inert check preventers" (see [Activation::prevent_inert_check]). /// /// If a facet is created and is inert at the moment that its `boot` function returns, it is /// automatically terminated. /// /// When a facet is terminated, if its parent facet is inert, the parent is terminated. /// /// If the root facet in an actor is terminated, the entire actor is terminated (with exit /// status `Ok(())`). /// // Do not poke at exposed but doc(hidden) fields here! You will violate invariants if you do! // They are exposed for debug/reflective purposes. pub struct Facet { /// The ID of the facet. pub facet_id: FacetId, /// The ID of the facet's parent facet, if any; if None, this facet is the `Actor`'s root facet. pub parent_facet_id: Option, #[doc(hidden)] pub outbound_handles: Set, stop_actions: Vec, #[doc(hidden)] pub linked_tasks: Map, #[doc(hidden)] pub inert_check_preventers: Arc, } /// A reference to an object that expects messages/assertions of type /// `M`. /// /// The object can be in the same actor, in a different local /// (in-process) actor, or accessible across a network link. pub struct Ref { /// Mailbox of the actor owning the referenced entity. pub mailbox: Arc, /// ID of the facet (within the actor) owning the referenced entity. pub facet_id: FacetId, /// Mutex owning and guarding the state backing the referenced entity. pub target: Mutex>>>, } /// Specialization of `Ref` for messages/assertions of type /// [`AnyValue`]. /// /// All polyglot and network communication is done in terms of `Cap`s. /// /// `Cap`s can also be *attenuated* ([Hardy 2017]; [Miller 2006]) to /// reduce (or otherwise transform) the range of assertions and /// messages they can be used to send to their referent. The /// Syndicated Actor model uses /// [Macaroon](https://syndicate-lang.org/doc/capabilities/)-style /// capability attenuation. /// /// [Hardy 2017]: http://cap-lore.com/CapTheory/Patterns/Attenuation.html /// [Miller 2006]: http://www.erights.org/talks/thesis/markm-thesis.pdf #[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct Cap { #[doc(hidden)] pub underlying: Arc>, #[doc(hidden)] pub attenuation: Vec, } /// Adapter for converting an underlying [`Ref`] to a [`Cap`]. /// /// The [`Entity`] implementation for `Guard` decodes `AnyValue` /// assertions/messages to type `M` before passing them on to the /// underlying entity. pub struct Guard where M: for<'a> Unparse<&'a L, AnyValue>, M: for<'a> Parse<&'a L, AnyValue>, { underlying: Arc>, literals: Arc, } /// Simple entity that stops its containing facet when any assertion it receives is /// subsequently retracted. pub struct StopOnRetract; /// [Linked tasks][Activation::linked_task] terminate yielding values of this type. pub enum LinkedTaskTermination { /// Causes the task's associated [Facet] to be [stop][Activation::stop]ped when the task /// returns. Normal, /// Causes no action to be taken regarding the task's associated [Facet] at the time the /// task returns. KeepFacet, } #[derive(Debug, Clone, Copy)] enum TerminationDirection { BelowStartingPoint, AtOrAboveStartingPoint, } //--------------------------------------------------------------------------- const BUMP_AMOUNT: u8 = 10; static NEXT_ACTOR_ID: AtomicU64 = AtomicU64::new(1); #[doc(hidden)] pub fn next_actor_id() -> ActorId { ActorId::new(NEXT_ACTOR_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed)) .expect("Internal error: Attempt to allocate ActorId of zero. Too many ActorIds allocated. Restart the process.") } static NEXT_FACET_ID: AtomicU64 = AtomicU64::new(2); #[doc(hidden)] pub fn next_facet_id() -> FacetId { FacetId::new(NEXT_FACET_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed)) .expect("Internal error: Attempt to allocate FacetId of zero. Too many FacetIds allocated. Restart the process.") } static NEXT_HANDLE: AtomicU64 = AtomicU64::new(3); /// Allocate a process-unique `Handle`. pub fn next_handle() -> Handle { NEXT_HANDLE.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed) } static NEXT_ACCOUNT_ID: AtomicU64 = AtomicU64::new(4); static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(5); static NEXT_FIELD_ID: AtomicU64 = AtomicU64::new(6); static NEXT_BLOCK_ID: AtomicU64 = AtomicU64::new(7); static NEXT_ACTIVATION_ID: AtomicU64 = AtomicU64::new(9); preserves_schema::support::lazy_static! { #[doc(hidden)] pub static ref SYNDICATE_CREDIT: i64 = { let credit = std::env::var("SYNDICATE_CREDIT").unwrap_or("100".to_owned()) .parse::().expect("Valid SYNDICATE_CREDIT environment variable"); tracing::debug!("Configured SYNDICATE_CREDIT = {}", credit); credit }; #[doc(hidden)] pub static ref ACCOUNTS: RwLock)>> = Default::default(); #[doc(hidden)] pub static ref ACTORS: RwLock> = Default::default(); } impl TryFrom<&AnyValue> for Synced { type Error = ParseError; fn try_from(value: &AnyValue) -> Result { if let Some(true) = value.value().as_boolean() { Ok(Synced) } else { Err(ParseError::conformance_error("Synced")) } } } impl From<&Synced> for AnyValue { fn from(_value: &Synced) -> Self { AnyValue::new(true) } } impl<'a> Parse<&'a (), AnyValue> for Synced { fn parse(_language: &'a (), value: &AnyValue) -> Result { Synced::try_from(value) } } impl<'a> Unparse<&'a (), AnyValue> for Synced { fn unparse(&self, _language: &'a ()) -> AnyValue { self.into() } } mod panic_guard { use super::*; pub struct PanicGuard(Option>); impl PanicGuard { pub(super) fn new(tx: UnboundedSender) -> Self { tracing::trace!("Panic guard armed"); PanicGuard(Some(tx)) } pub fn disarm(&mut self) { tracing::trace!("Panic guard disarmed"); self.0 = None; } } impl Drop for PanicGuard { fn drop(&mut self) { if let Some(tx) = &self.0 { tracing::trace!("Panic guard triggering"); let _ = tx.send(SystemMessage::Crash( error("Actor panicked during activation", AnyValue::new(false)))); } } } } impl FacetRef { /// Executes `f` in a new "[turn][Activation]" for `actor`. If `f` returns `Ok(())`, /// [commits the turn][Activation::commit] and performs the buffered actions; otherwise, /// abandons the turn and discards the buffered actions. /// /// Returns `true` if, at the end of the activation, `actor` had not yet terminated. /// /// Bills any activity to `account`. pub fn activate( &self, account: &Arc, cause: Option, f: F, ) -> bool where F: FnOnce(&mut Activation) -> ActorResult, { let mut g = self.actor.state.lock(); match &mut *g { ActorState::Terminated { .. } => false, ActorState::Running(state) => { let mut panic_guard = panic_guard::PanicGuard::new(state.tx.clone()); // let _entry = tracing::info_span!(parent: None, "actor", actor_id = ?self.actor.actor_id).entered(); let maybe_exit_status = state.take_turn( self.facet_id, Arc::clone(account), cause, account.trace_collector.clone(), |t| match f(t) { Ok(()) => t.commit().map_or_else( |e| Some(Err(e)), |_| { // If we would otherwise continue, check the root facet: is it // still alive? If not, then the whole actor should terminate now. if let None = t.get_facet(t.root) { tracing::trace!( "terminating actor because root facet no longer exists"); Some(Ok(())) } else { None } }), Err(e) => { t.rollback(); Some(Err(e)) } }); panic_guard.disarm(); drop(panic_guard); match maybe_exit_status { None => true, Some(result) => { let exit_status = match result { Ok(()) => ExitStatus::Normal, Err(e) => ExitStatus::Error(e.into()), }; g.terminate(exit_status, &self.actor, &account.trace_collector); false } } } } } } impl DataflowState { fn new() -> Self { DataflowState { active_block: None, graph: Graph::new(), fields: HashMap::new(), blocks: HashMap::new(), } } fn named_field( &mut self, name: &str, initial_value: T, tx: UnboundedSender, ) -> Arc> { let field_id = FieldId::new(NEXT_FIELD_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed)) .expect("Internal error: Attempt to allocate FieldId of zero. Too many FieldIds allocated. Restart the process."); self.fields.insert(field_id, Box::new(initial_value)); Arc::new(Field { name: name.to_owned(), field_id, tx, phantom: PhantomData, }) } fn get(&mut self, field: &Field) -> &T { if let Some(block) = self.active_block { tracing::trace!(?field, ?block, action = "get", "observed"); self.graph.record_observation(block, field.field_id); } let any = self.fields.get(&field.field_id) .expect("Attempt to get() missing field: wrong actor?"); any.downcast_ref().expect("Attempt to access field at incorrect type") } fn get_mut(&mut self, field: &Field) -> &mut T { { // Overapproximation. if let Some(block) = self.active_block { tracing::trace!(?field, ?block, action = "get_mut", "observed"); self.graph.record_observation(block, field.field_id); } tracing::trace!(?field, active_block = ?self.active_block, action = "get_mut", "damaged"); self.graph.record_damage(field.field_id); } let any = self.fields.get_mut(&field.field_id) .expect("Attempt to get_mut() missing field: wrong actor?"); any.downcast_mut().expect("Attempt to access field at incorrect type") } fn set(&mut self, field: &Field, value: T) { tracing::trace!(?field, active_block = ?self.active_block, action = "set", "damaged"); // Overapproximation in many cases, since the new value may not produce an // observable difference (may be equal to the current value). self.graph.record_damage(field.field_id); let any = self.fields.get_mut(&field.field_id) .expect("Attempt to set() missing field: wrong actor?"); *any = Box::new(value); } } impl Activation { fn take_turn R>( &mut self, facet_id: FacetId, account: Arc, cause: Option, trace_collector: Option, f: F, ) -> R { if self.facet_id.is_some() { panic!("Invariant violated: nested turns detected"); } self.facet_id = Some(facet_id); self.account = Some(account); self.turn_description = cause.map(|c| trace::TurnDescription::new( NEXT_ACTIVATION_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed), c)); self.trace_collector = trace_collector; let result = f(self); self.facet_id = None; self.account = None; self.turn_description = None; self.trace_collector = None; result } pub fn trace_collector(&self) -> Option { self.trace_collector.clone() } /// Constructs a new [`Account`] with the given `name`, inheriting /// its `trace_collector` from the current [`Activation`]'s cause. pub fn create_account(&self, name: Name) -> Arc { Account::new(name, self.trace_collector()) } fn with_facet(&mut self, facet_id: FacetId, f: F) -> ActorResult where F: FnOnce(&mut Activation) -> ActorResult, { if self.facet_nodes.contains_key(&facet_id) { tracing::trace!(facet_id, "alive=true"); self._with_facet(facet_id, f) } else { tracing::trace!(facet_id, "alive=false"); Ok(()) } } /// Retrieve the [`FacetId`] of the currently-active facet. pub fn facet_id(&self) -> FacetId { self.facet_id.expect("Attempt to access active facet ID outside turn") } /// Retrieve a [`FacetRef`] for the currently-active facet. pub fn facet_ref(&self) -> FacetRef { FacetRef { actor: self.actor_ref.clone(), facet_id: self.facet_id(), } } fn _with_facet(&mut self, facet_id: FacetId, f: F) -> ActorResult where F: FnOnce(&mut Activation) -> ActorResult, { let saved = self.facet_id.replace(facet_id); // let _entry = tracing::info_span!("facet", ?facet_id).entered(); let result = f(self); self.facet_id = saved; result } #[doc(hidden)] pub fn with_entity(&mut self, r: &Arc>, f: F) -> ActorResult where F: FnOnce(&mut Activation, &mut dyn Entity) -> ActorResult { self.with_facet(r.facet_id, |t| r.internal_with_entity(|e| f(t, e))) } fn active_facet<'a>(&'a mut self) -> Option<&'a mut Facet> { self.get_facet(self.facet_id()) } /// Retrieves the chain of facet IDs, in order, from the currently-active [`Facet`] up to /// and including the root facet of the active actor. Useful for debugging. pub fn facet_ids(&mut self) -> Vec { if let Some(f) = self.get_facet_immut(self.facet_id()) { self.facet_ids_for(f) } else { Vec::new() } } fn facet_ids_for<'a>(&self, f: &'a Facet) -> Vec { let mut ids = Vec::new(); ids.push(f.facet_id); let mut id = f.parent_facet_id; while let Some(parent_id) = id { ids.push(parent_id); match self.get_facet_immut(parent_id) { None => break, Some(pf) => id = pf.parent_facet_id, } } ids } #[inline(always)] fn trace trace::ActionDescription>(&mut self, f: F) { if self.turn_description.is_some() { let a = f(self); self.turn_description.as_mut().unwrap().record(a); } } #[inline(always)] fn trace_targeted trace::TurnEvent>( &mut self, internal: bool, r: &Arc>, f: F, ) -> Option { self.turn_description.as_mut().map(|d| { let event = trace::TargetedTurnEvent { target: r.as_ref().into(), detail: f(), }; d.record(if internal { trace::ActionDescription::EnqueueInternal { event: Box::new(event.clone()) } } else { trace::ActionDescription::Enqueue { event: Box::new(event.clone()) } }); event }) } fn insert_outbound_assertion( &mut self, r: &Arc>, handle: Handle, description: Option, ) -> bool { let asserting_facet_id = self.facet_id(); match self.get_facet(asserting_facet_id) { None => false, Some(f) => { f.outbound_handles.insert(handle); let r = Arc::clone(r); let details = OutboundAssertion { established: false, asserting_facet_id, peer: Arc::clone(&r.mailbox), retractor: ( description, Box::new(move |remote_t| remote_t.with_entity(&r, |t, e| { tracing::trace!(?handle, "retracted"); e.retract(t, handle) }))), }; self.outbound_assertions.insert(handle, Arc::new(Mutex::new(Some(details)))); self.on_rollback(move |t| { if let Some(f) = t.get_facet(asserting_facet_id) { f.outbound_handles.remove(&handle); } t.outbound_assertions.remove(&handle); }); self.on_commit(move |t| { if let Some(oa_handle) = t.outbound_assertions.get_mut(&handle) { oa_handle.lock().as_mut().expect("OutboundAssertion").established = true; } }); true } } } /// Core API: assert `a` at recipient `r`. /// /// Returns the [`Handle`] for the new assertion. pub fn assert(&mut self, r: &Arc>, a: M) -> Handle { let handle = next_handle(); if self.insert_outbound_assertion(r, handle, self.turn_description.as_ref().map( enclose!((r) move |_| trace::TargetedTurnEvent { target: r.as_ref().into(), detail: trace::TurnEvent::Retract { handle: Box::new(protocol::Handle(handle.into())), }, }))) { tracing::trace!(?r, ?handle, ?a, "assert"); let r = Arc::clone(r); let description = self.trace_targeted(false, &r, || trace::TurnEvent::Assert { assertion: Box::new((&a).into()), handle: Box::new(protocol::Handle(handle.into())), }); self.queue_for(&r).push(( description, Box::new(move |t| t.with_entity(&r, |t, e| { tracing::trace!(?handle, ?a, "asserted"); e.assert(t, a, handle) })))); } handle } fn half_link(&mut self, t_other: &mut Activation) -> Handle { let this_actor_id = self.actor_id; let entity_ref = t_other.create::(StopOnRetract); let handle = next_handle(); assert!(self.insert_outbound_assertion(&entity_ref, handle, self.turn_description.as_ref().map( enclose!((entity_ref) move |_| trace::TargetedTurnEvent { target: entity_ref.as_ref().into(), detail: trace::TurnEvent::BreakLink { source: Box::new(this_actor_id.into()), handle: Box::new(protocol::Handle(handle.into())), }, })))); tracing::trace!(?handle, ?entity_ref, "half_link"); t_other.with_entity(&entity_ref, |t, e| e.assert(t, AnyValue::new(true), handle)).unwrap(); handle } /// Core API: retract a previously-established assertion. pub fn retract(&mut self, handle: Handle) { tracing::trace!(?handle, "retract"); if let Some(oa_handle) = self.outbound_assertions.remove(&handle) { self.on_rollback(enclose!((oa_handle) move |t| { if oa_handle.lock().as_mut().expect("OutboundAssertion").established { t.outbound_assertions.insert(handle, oa_handle); } })); let g = oa_handle.lock(); let oa = g.as_ref().expect("Present OutboundAssertion"); if let Some(desc) = &oa.retractor.0 { self.trace(|_| trace::ActionDescription::Enqueue { event: Box::new(desc.clone()) }); } self.queue_for_mailbox(&oa.peer).push(( oa.retractor.0.clone(), Box::new(enclose!((oa_handle) move |remote_t| { let oa = oa_handle.lock().take().expect("Present OutboundAssertion"); (oa.retractor.1)(remote_t) })))); let asserting_facet_id = oa.asserting_facet_id; self.on_commit(move |t| { if let Some(f) = t.get_facet(asserting_facet_id) { f.outbound_handles.remove(&handle); } }); } } /// Core API: assert, retract, or replace an assertion. pub fn update( &mut self, handle: &mut Option, r: &Arc>, a: Option, ) { let saved = handle.take(); if let Some(a) = a { *handle = Some(self.assert(r, a)); } if let Some(h) = saved { self.retract(h); } } /// Core API: send message `m` to recipient `r`. pub fn message(&mut self, r: &Arc>, m: M) { tracing::trace!(?r, ?m, "message"); let r = Arc::clone(r); let description = self.trace_targeted(false, &r, || trace::TurnEvent::Message { body: Box::new((&m).into()), }); self.queue_for(&r).push(( description, Box::new(move |t| t.with_entity(&r, |t, e| { tracing::trace!(?m, "delivered"); e.message(t, m) })))) } /// Core API: begins a synchronisation with `r`. /// /// Once the synchronisation request reaches `r`'s actor, it will /// send a response to `peer`, which acts as a continuation for /// the synchronisation request. pub fn sync(&mut self, r: &Arc>, peer: Arc>) { let r = Arc::clone(r); let description = self.trace_targeted(false, &r, || trace::TurnEvent::Sync { peer: Box::new(peer.as_ref().into()), }); self.queue_for(&r).push(( description, Box::new(move |t| t.with_entity(&r, |t, e| e.sync(t, peer))))) } pub fn later ActorResult>(&mut self, action: F) { // TODO: properly describe this in traces let mailbox = self.mailbox(); self.queue_for_mailbox(&mailbox).push(( None, Box::new(action))); } /// Registers the entity `r` in the list of stop actions for the active facet. If the facet /// terminates cleanly, `r`'s [`stop`][Entity::stop] will be called in the context of the /// facet's parent. /// /// **Note.** If the actor crashes, stop actions will *not* be called. /// /// Use [`Activation::add_exit_hook`] to install a callback that will be called at the /// end of the lifetime of the *actor* rather than the facet. (Also, exit hooks are called /// no matter whether actor termination was normal or abnormal.) pub fn on_stop_notify(&mut self, r: &Arc>) { if let Some(f) = self.active_facet() { let r = Arc::clone(r); f.stop_actions.push(Box::new(move |t| r.internal_with_entity(|e| e.stop(t)))); } } /// Registers `action` in the list of stop actions for the active facet. If the facet /// terminates cleanly, `action` will be called in the context of the facet's parent. See /// also notes against [`on_stop_notify`][Activation::on_stop_notify]. pub fn on_stop ActorResult>(&mut self, action: F) { self.on_facet_stop(self.facet_id(), action) } fn on_facet_stop ActorResult>( &mut self, facet_id: FacetId, action: F, ) { if let Some(f) = self.get_facet(facet_id) { f.stop_actions.push(Box::new(action)); } } /// Retrieve the [`Account`] against which actions are recorded. pub fn account(&self) -> &Arc { self.account.as_ref().expect("Attempt to access active account outside turn") } /// Delivers all pending actions in this activation and resets it, ready for more. Succeeds /// iff all [pre-commit][Activation::pre_commit] actions succeed. /// /// # Commit procedure /// /// If an [activation][FacetRef::activate]'s `f` function returns successfully, the /// activation *commits* according to the following procedure: /// /// 1. While the dataflow graph needs repairing or outstanding /// [pre-commit][Activation::pre_commit`] actions exist: /// 1. repair the dataflow graph /// 2. run all pre-commit actions /// Note that graph repair or a pre-commit action may fail, causing the commit to /// abort, or may further damage the dataflow graph or schedule another pre-commit /// action, causing another go around the loop. /// /// 2. The commit becomes final. All queued events are sent; all internal accounting /// actions are performed. pub fn commit(&mut self) -> ActorResult { tracing::trace!("Activation::commit"); loop { let mut should_loop = false; if self.repair_dataflow()? { should_loop = true; } if !self.pre_commit_actions.is_empty() { should_loop = true; for ac in std::mem::take(&mut self.pre_commit_actions) { ac(self)? } } if !should_loop { break; } } tracing::trace!("Commit is final"); if !self.rollback_actions.is_empty() { // just drop 'em so they don't run next time std::mem::take(&mut self.rollback_actions); } if !self.commit_actions.is_empty() { for ac in std::mem::take(&mut self.commit_actions) { ac(self); } } let mut causing_turn_id: Option = None; if let Some(d) = self.turn_description.take() { causing_turn_id = Some(d.id.clone()); if let Some(c) = &self.trace_collector { c.record(self.actor_id, trace::ActorActivation::Turn(Box::new(d))); } } // Deliberately ignore send errors here: they indicate that the recipient is no // longer alive. But we don't care about that case, since we have to be robust to // crashes anyway. (When we were printing errors we saw here, an example of the // problems it caused was a relay output_loop that received EPIPE causing the relay // to crash, just as it was receiving thousands of messages a second, leading to // many, many log reports of failed send_actions from the following line.) if let Some((_actor_id, tx, turn)) = std::mem::take(&mut self.single_queue) { let desc = causing_turn_id.as_ref().map(|id| trace::TurnCause::Turn { id: Box::new(id.clone()) }); let _ = send_actions(&tx, desc, &self.account(), turn); } if let Some(table) = std::mem::take(&mut self.multiple_queues) { for (_actor_id, (tx, turn)) in table.into_iter() { let desc = causing_turn_id.as_ref().map(|id| trace::TurnCause::Turn { id: Box::new(id.clone()) }); let _ = send_actions(&tx, desc, &self.account(), turn); } } tracing::trace!("Activation::commit complete"); Ok(()) } fn rollback(&mut self) { tracing::trace!("Activation::rollback"); if !self.pre_commit_actions.is_empty() { // just drop 'em so they don't run next time std::mem::take(&mut self.pre_commit_actions); } if !self.rollback_actions.is_empty() { for ac in std::mem::take(&mut self.rollback_actions) { ac(self) } } if !self.commit_actions.is_empty() { // just drop 'em so they don't run next time std::mem::take(&mut self.commit_actions); } self.single_queue = None; self.multiple_queues = None; tracing::trace!("Activation::rollback complete"); } /// Construct an entity with behaviour [`InertEntity`] within the active facet. pub fn inert_entity(&mut self) -> Arc> { self.create(InertEntity) } /// Construct an entity with behaviour `e` within the active facet. pub fn create + Send + 'static>(&mut self, e: E) -> Arc> { let r = self.create_inert(); r.become_entity(e); r } /// Construct an entity (within the active facet) whose behaviour will be specified later /// via [`become_entity`][Ref::become_entity]. pub fn create_inert(&mut self) -> Arc> { Arc::new(Ref { mailbox: self.mailbox(), facet_id: self.facet_id(), target: Mutex::new(None), }) } /// Start a new [linked task][crate::actor#linked-tasks] attached to the active facet. The /// task will execute the future "`boot`" to completion unless it is cancelled first (by /// e.g. termination of the owning facet or crashing of the owning actor). Stops the active /// facet when the linked task completes. Uses `name` for log messages emitted by the task. pub fn linked_task>>( &mut self, name: Name, boot: F, ) { let mailbox = self.mailbox(); let facet = self.facet_ref(); let trace_collector = self.trace_collector(); if self.active_facet().is_some() { let task_id = NEXT_TASK_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed); self.trace(|_| trace::ActionDescription::LinkedTaskStart { task_name: Box::new(name.into()), id: Box::new(trace::TaskId(AnyValue::new(task_id))), }); let f = self.active_facet().unwrap(); let token = CancellationToken::new(); { let token = token.clone(); tokio::spawn(async move { tracing::trace!(task_id, "linked task start"); let (result, reason) = select! { _ = token.cancelled() => { tracing::trace!(task_id, "linked task cancelled"); (LinkedTaskTermination::Normal, trace::LinkedTaskReleaseReason::Cancelled) } result = boot => match result { Ok(t) => { tracing::trace!(task_id, "linked task normal stop"); (t, trace::LinkedTaskReleaseReason::Normal) } Err(e) => { tracing::error!(task_id, "linked task error: {}", e); let _ = mailbox.tx.send(SystemMessage::Crash(e.clone())); Err(e)? } } }; let release_account = Account::new(Some(AnyValue::symbol("linked-task-release")), trace_collector); facet.activate( &release_account, release_account.trace_collector.as_ref().map( |_| trace::TurnCause::LinkedTaskRelease { id: Box::new(trace::TaskId(AnyValue::new(task_id))), reason: Box::new(reason), }), |t| { if let Some(f) = t.active_facet() { tracing::trace!(task_id, "cancellation token removed"); f.linked_tasks.remove(&task_id); } if let LinkedTaskTermination::Normal = result { t.stop(); } Ok(()) }); Ok::<(), Error>(()) }); // }.instrument(tracing::info_span!("task", ?task_id).or_current())); } f.linked_tasks.insert(task_id, token); } } /// Executes the given action after the given duration has elapsed (so long as the active /// facet still exists at that time). pub fn after ActorResult>( &mut self, duration: time::Duration, a: F, ) { let account = Arc::clone(self.account()); let desc = self.turn_description.as_ref().map(|d| trace::TurnCause::Delay { causing_turn: Box::new(d.id.clone()), amount: duration.as_secs_f64().into(), }); let instant = time::Instant::now() + duration; let facet = self.facet_ref(); self.linked_task(Some(AnyValue::symbol("delay")), async move { tokio::time::sleep_until(instant.into()).await; facet.activate(&account, desc, a); Ok(LinkedTaskTermination::KeepFacet) }); } /// Executes the given action immediately, and then every time another multiple of the /// given duration has elapsed (so long as the active facet still exists at that time). pub fn every ActorResult>( &mut self, duration: time::Duration, mut a: F, ) -> ActorResult { let account = Arc::clone(self.account()); let facet = self.facet_ref(); let desc = trace::TurnCause::PeriodicActivation { period: duration.as_secs_f64().into() }; self.linked_task(Some(AnyValue::symbol("periodic-activation")), async move { let mut timer = tokio::time::interval(duration); loop { timer.tick().await; if !facet.activate(&account, Some(desc.clone()), |t| a(t)) { break; } } Ok(LinkedTaskTermination::Normal) }); Ok(()) } /// Executes the given action at the given instant (so long as the active facet still /// exists at that time). pub fn at, F: 'static + Send + FnOnce(&mut Activation) -> ActorResult>( &mut self, instant: I, a: F, ) { let delay = instant.into().checked_duration_since(tokio::time::Instant::now()) .unwrap_or(time::Duration::ZERO); self.after(delay, a) } /// Schedules the given action to run just prior to [commit][Activation::commit]. pub fn pre_commit ActorResult>( &mut self, action: F, ) { self.pre_commit_actions.push(Box::new(action)); } fn on_rollback(&mut self, action: F) { self.rollback_actions.push(Box::new(action)); } fn on_commit(&mut self, action: F) { self.commit_actions.push(Box::new(action)); } /// Schedule the creation of a new actor wrapping an entity. pub fn spawn_for_entity( &mut self, name: Name, link: bool, target: Box>, ) -> (Option>>, ActorRef) { let ac_ref = self._spawn(name, |t| { let _ = t.prevent_inert_check(); Ok(()) }, link); let mut g = ac_ref.state.lock(); let r = match &mut *g { ActorState::Terminated { .. } => None, ActorState::Running(state) => Some(Arc::new(Ref { mailbox: state.mailbox(), facet_id: state.root, target: Mutex::new(Some(target)), })), }; drop(g); (r, ac_ref) } /// Schedule the creation of a new actor when the Activation commits. pub fn spawn ActorResult>( &mut self, name: Name, boot: F, ) -> ActorRef { self._spawn(name, boot, false) } /// Schedule the creation of a new actor when the Activation commits. /// /// The new actor will be "linked" to the active facet: if the new actor terminates, the /// active facet is stopped, and if the active facet stops, the new actor's root facet is /// stopped. pub fn spawn_link ActorResult>( &mut self, name: Name, boot: F, ) -> ActorRef { self._spawn(name, boot, true) } fn _spawn ActorResult>( &mut self, name: Name, boot: F, link: bool, ) -> ActorRef { let ac = Actor::new(Some(self.actor_id), self.trace_collector()); let ac_ref = ac.ac_ref.clone(); self.trace(|_| trace::ActionDescription::Spawn { link, id: Box::new(ac_ref.actor_id.into()), }); let cause = self.turn_description.as_ref().map( |d| trace::TurnCause::Turn { id: Box::new(d.id.clone()) }); let ac = if link { ac.link(self) } else { ac }; self.on_commit(move |t| { ac.boot(name, Arc::clone(t.account()), cause, boot); }); ac_ref } /// Create a new subfacet of the currently-active facet. Runs `boot` in the new facet's /// context. If `boot` returns leaving the new facet [inert][Facet#inert-facets], the new /// facet is [stopped][Activation::stop_facet]. pub fn facet ActorResult>( &mut self, boot: F, ) -> Result { let f = Facet::new(Some(self.facet_id())); self.trace(|t| trace::ActionDescription::FacetStart { path: t.facet_ids_for(&f).iter().map(|i| (*i).into()).collect(), }); let facet_id = f.facet_id; self.facet_nodes.insert(facet_id, f); tracing::trace!(parent_id = ?self.facet_id, ?facet_id, new_actor_facet_count = ?self.facet_nodes.len()); self.facet_children.entry(self.facet_id()).or_default().insert(facet_id); self._with_facet(facet_id, move |t| { boot(t)?; t.stop_if_inert(); Ok(()) })?; Ok(facet_id) } /// Useful during facet (and actor) startup, in some situations: when a facet `boot` /// procedure would return while the facet is inert, but the facet should survive until /// some subsequent time, call `prevent_inert_check` to increment a counter that prevents /// inertness-checks from succeeding on the active facet. /// /// The result of `prevent_inert_check` is a function which, when called, decrements the /// counter again. After the counter has been decremented, any subsequent inertness checks /// will no longer be artificially forced to fail. /// /// An example of when you might want this: creating an actor having only a single /// Dataspace entity within it, then using the Dataspace from other actors. At the start of /// its life, the Dataspace actor will have no outbound assertions, no child facets, and no /// linked tasks, so the only way to prevent it from being prematurely garbage collected is /// to use `prevent_inert_check` in its boot function. pub fn prevent_inert_check(&mut self) -> DisarmFn { if let Some(f) = self.active_facet() { Box::new(f.prevent_inert_check()) } else { Box::new(|| ()) } } /// If `continuation` is supplied, adds it as a stop action for the [`Facet`] named by /// `facet_id`. Then, cleanly stops the facet immediately, without waiting for `self` to /// commit. pub fn stop_facet_and_continue ActorResult>( &mut self, facet_id: FacetId, continuation: Option, ) -> ActorResult { if let Some(k) = continuation { self.on_facet_stop(facet_id, k); } self._terminate_facet(facet_id, TerminationDirection::AtOrAboveStartingPoint, trace::FacetStopReason::ExplicitAction) } /// Cleanly stops the [`Facet`] named by `facet_id`. /// /// Equivalent to `self.stop_facet_and_continue(facet_id, None)`, except that the lack of a /// continuation means that there's no need for this method to return `ActorResult`. pub fn stop_facet(&mut self, facet_id: FacetId) { self.stop_facet_and_continue::(facet_id, None) .expect("Non-failing stop_facet_and_continue") } /// Cleanly stops the active facet. /// /// Equivalent to `self.stop_facet(self.facet_id())`. pub fn stop(&mut self) { self.stop_facet(self.facet_id()) } /// Cleanly stops the active actor's root facet. /// This is one way to arrange a clean shutdown of the entire actor. /// /// Equivalent to `self.stop_facet(self.state.root)`. pub fn stop_root(&mut self) { self.stop_facet(self.root); } fn stop_if_inert(&mut self) { let facet_id = self.facet_id(); // Registering a pre-commit hook lets this run after the dataflow graph has been repaired. self.pre_commit(move |t| { tracing::trace!("Checking inertness of facet {} from facet {}", facet_id, t.facet_id()); if t.facet_exists_and_is_inert(facet_id) { tracing::trace!(" - facet {} is inert, stopping it", facet_id); t._terminate_facet(facet_id, TerminationDirection::AtOrAboveStartingPoint, trace::FacetStopReason::Inert)?; } else { tracing::trace!(" - facet {} is not inert", facet_id); } Ok(()) }) } fn _terminate_facet( &mut self, facet_id: FacetId, direction: TerminationDirection, reason: trace::FacetStopReason, ) -> ActorResult { if let Some(mut f) = self.facet_nodes.remove(&facet_id) { let maybe_parent_id = f.parent_facet_id; self.trace(|t| trace::ActionDescription::FacetStop { path: t.facet_ids_for(&f).iter().map(|i| (*i).into()).collect(), reason: Box::new(reason), }); tracing::trace!(remaining_actor_facet_count = ?self.facet_nodes.len(), ?facet_id, ?direction, "stopping"); if let Some(children) = self.facet_children.remove(&facet_id) { for child_id in children.into_iter() { self._terminate_facet(child_id, TerminationDirection::BelowStartingPoint, trace::FacetStopReason::ParentStopping)?; } } if let TerminationDirection::AtOrAboveStartingPoint = direction { if let Some(p) = maybe_parent_id { self.facet_children.get_mut(&p).map(|children| children.remove(&facet_id)); } } self._with_facet(maybe_parent_id.unwrap_or(facet_id), |t| { for ac in std::mem::take(&mut f.stop_actions).into_iter() { ac(t)? } Ok(()) })?; for handle in std::mem::take(&mut f.outbound_handles).into_iter() { tracing::trace!(h = ?handle, "retract on termination"); self.retract(handle); } if let TerminationDirection::AtOrAboveStartingPoint = direction { match maybe_parent_id { Some(p) => { if self.facet_exists_and_is_inert(p) { tracing::trace!("terminating parent {:?} of facet {:?}", p, facet_id); self._terminate_facet(p, TerminationDirection::AtOrAboveStartingPoint, trace::FacetStopReason::Inert)?; } else { tracing::trace!("not terminating parent {:?} of facet {:?}", p, facet_id); } } None => tracing::trace!("no parent of root facet {:?} to terminate", facet_id), } } } Ok(()) } fn dataflow_state_mut(&mut self) -> &mut DataflowState { if self.dataflow.is_none() { self.dataflow = Some(Box::new(DataflowState::new())); } self.dataflow.as_mut().unwrap() } /// Create a new named dataflow variable (field) within the active [`Actor`]. pub fn named_field(&mut self, name: &str, initial_value: T) -> Arc> { let tx = self.tx.clone(); self.dataflow_state_mut().named_field(name, initial_value, tx) } /// Create a new anonymous dataflow variable (field) within the active [`Actor`]. pub fn field(&mut self, initial_value: T) -> Arc> { self.named_field("", initial_value) } /// Retrieve a reference to the current value of a dataflow variable (field); if execution /// is currently within a [dataflow block][Activation::dataflow], marks the block as /// *depending upon* the field. /// pub fn get(&mut self, field: &Field) -> &T { self.dataflow_state_mut().get(field) } /// Retrieve a mutable reference to the contents of a dataflow variable (field). As for /// [`get`][Activation::get], if used within a dataflow block, marks the block as /// *depending upon* the field. In addition, because the caller may mutate the field, this /// function (pessimistically) marks the field as dirty, which will lead to later /// reevaluation of dependent blocks. /// pub fn get_mut(&mut self, field: &Field) -> &mut T { self.dataflow_state_mut().get_mut(field) } /// Overwrite the value of a dataflow variable (field). Marks the field as dirty, even if /// the new value is [`eq`][std::cmp::PartialEq::eq] to the value being overwritten. /// pub fn set(&mut self, field: &Field, value: T) { self.dataflow_state_mut().set(field, value) } fn with_block(&mut self, block_id: BlockId, block: &mut Block) -> ActorResult { let saved = self.dataflow_state_mut().active_block.replace(block_id); let result = block(self); self.dataflow_state_mut().active_block = saved; result } /// Creates (and immediately executes) a new *dataflow block* that will be reexecuted if /// any of its *dependent fields* (accessed via e.g. [`get`][Activation::get] or /// [`get_mut`][Activation::get_mut]) are mutated. /// pub fn dataflow ActorResult>(&mut self, block: F) -> ActorResult { let block_id = BlockId::new(NEXT_BLOCK_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed)) .expect("Internal error: Attempt to allocate BlockId of zero. Too many BlockIds allocated. Restart the process."); let mut block: Block = Box::new(block); self.with_block(block_id, &mut block)?; let facet_id = self.facet_id(); self.dataflow_state_mut().blocks.insert(block_id, (facet_id, block)); Ok(()) } fn repair_dataflow(&mut self) -> Result { if self.dataflow.is_none() { return Ok(false); } let mut pass_number = 0; loop { let mut blocks_to_run = HashMap::new(); { let df = self.dataflow.as_mut().unwrap(); if df.graph.is_clean() { break; } pass_number += 1; tracing::trace!(?pass_number, "repair_dataflow"); let damaged_field_ids = df.graph.take_damaged_nodes(); for field_id in damaged_field_ids.into_iter() { let block_ids = df.graph.take_observers_of(&field_id); for block_id in block_ids.into_iter() { if let Some(entry) = df.blocks.remove(&block_id) { blocks_to_run.insert(block_id, entry); } } } } let mut error = None; let mut blocks_to_replace = Vec::with_capacity(blocks_to_run.len()); for (block_id, (facet_id, mut block)) in blocks_to_run.into_iter() { if error.is_none() { if let Err(e) = self.with_facet(facet_id, |t| t.with_block(block_id, &mut block)) { error = Some(e); } } blocks_to_replace.push((block_id, (facet_id, block))); } { let df = self.dataflow.as_mut().unwrap(); for (block_id, entry) in blocks_to_replace.into_iter() { df.blocks.insert(block_id, entry); } } if let Some(e) = error { Err(e)?; } } if pass_number > 0 { tracing::trace!(passes = ?pass_number, "repair_dataflow complete"); } Ok(pass_number > 0) } fn queue_for(&mut self, r: &Arc>) -> &mut PendingEventQueue { self.queue_for_mailbox(&r.mailbox) } fn queue_for_mailbox(&mut self, mailbox: &Arc) -> &mut PendingEventQueue { if self.multiple_queues.is_some() { return &mut self.multiple_queues.as_mut().unwrap().entry(mailbox.actor_id) .or_insert((mailbox.tx.clone(), Vec::with_capacity(3))).1; } if let None = self.single_queue { self.single_queue = Some((mailbox.actor_id, mailbox.tx.clone(), Vec::with_capacity(3))); return &mut self.single_queue.as_mut().unwrap().2; } if Some(mailbox.actor_id) == self.single_queue.as_ref().map(|e| e.0) { return &mut self.single_queue.as_mut().unwrap().2; } let (aid, tx, q) = std::mem::take(&mut self.single_queue).unwrap(); let mut table = HashMap::new(); table.insert(aid, (tx, q)); self.multiple_queues = Some(table); &mut self.multiple_queues.as_mut().unwrap().entry(mailbox.actor_id) .or_insert((mailbox.tx.clone(), Vec::with_capacity(3))).1 } /// Retrieve the ID of the current actor. pub fn actor_id(&self) -> ActorId { self.actor_id } #[doc(hidden)] pub fn actor_ref(&self) -> ActorRef { self.actor_ref.clone() } /// Requests a shutdown of the actor. The shutdown request is /// handled by the actor's main loop, causing it to terminate with /// exit status `Ok(())`. pub fn shutdown(&self) { let _ = self.tx.send(SystemMessage::Release); } fn mailbox(&mut self) -> Arc { match self.mailbox.upgrade() { None => { let new_mailbox = Arc::new(Mailbox { actor_id: self.actor_id, tx: self.tx.clone(), }); self.mailbox = Arc::downgrade(&new_mailbox); new_mailbox } Some(m) => m } } /// Registers the entity `r` in the list of exit hooks for this actor. When the actor /// terminates, `r`'s [`exit_hook`][Entity::exit_hook] will be called in the context of the /// actor's root facet. pub fn add_exit_hook(&mut self, r: &Arc>) { let r = Arc::clone(r); self.exit_hooks.push(Box::new( move |t, exit_status| r.internal_with_entity(|e| e.exit_hook(t, &exit_status)))) } fn get_facet(&mut self, facet_id: FacetId) -> Option<&mut Facet> { self.facet_nodes.get_mut(&facet_id) } fn get_facet_immut(&self, facet_id: FacetId) -> Option<&Facet> { self.facet_nodes.get(&facet_id) } /// See the definition of an [inert facet][Facet#inert-facets]. fn facet_exists_and_is_inert(&mut self, facet_id: FacetId) -> bool { let no_kids = self.facet_children.get(&facet_id).map(|cs| cs.is_empty()).unwrap_or(true); if let Some(f) = self.get_facet(facet_id) { // The only outbound handle the root facet of an actor may have is a link // assertion, from [Activation::link]. This is not to be considered a "real" // assertion for purposes of keeping the facet alive! let maybe_parent_id = f.parent_facet_id.clone(); let no_outbound_handles = f.outbound_handles.is_empty(); let is_root_facet = maybe_parent_id.is_none(); let no_linked_tasks = f.linked_tasks.is_empty(); let no_inert_check_preventers = f.inert_check_preventers.load(Ordering::Relaxed) == 0; let parent_facet_missing = maybe_parent_id.map_or(false, |p| self.get_facet_immut(p).is_none()); tracing::trace!(?facet_id, ?no_kids, ?no_outbound_handles, ?is_root_facet, ?no_linked_tasks, ?no_inert_check_preventers, ?parent_facet_missing); parent_facet_missing || (no_kids && (no_outbound_handles || is_root_facet) && no_linked_tasks && no_inert_check_preventers) } else { tracing::trace!(?facet_id, exists = ?false); false } } fn cleanup( mut self, exit_status: Arc, trace_collector: Option, ) { match &*exit_status { ExitStatus::Normal => assert!(self.get_facet(self.root).is_none()), ExitStatus::Dropped => { // If we panicked, facet_id will be Some(_), but leaving it this way as we // enter take_turn causes a nested panic, so we clear it here. if self.facet_id.is_some() { tracing::debug!(actor_id=?self.actor_id, facet_id=?self.facet_id, "clearing facet_id (we must have panicked mid-Turn)"); self.facet_id = None; } } ExitStatus::Error(_) => (), } let cause = Some(trace::TurnCause::Cleanup); let account = Account::new(Some(AnyValue::symbol("cleanup")), trace_collector.clone()); self.take_turn( self.root, account, cause, trace_collector, |t| { // NB. In descending order so that we retract newer handles first. Since // facet-tree-order retraction is children before parents, this isn't quite right - a // parent facet could have made an assertion after some child made an assertion. But // given that this is for *unclean shutdown*, maybe it's OK? // let handles_descending: Vec = t.outbound_assertions.keys().rev().cloned().collect(); tracing::trace!(actor_id=?t.actor_id, handles=?handles_descending, "remaining to retract at cleanup time"); for handle in handles_descending.into_iter() { t.retract(handle); } if let Err(err) = t.commit() { // This can only happen through an internal error in this module tracing::error!(?err, "Internal error during Activation::cleanup"); } for action in std::mem::take(&mut t.exit_hooks) { action(t, &exit_status); } }); } } impl Account { /// Construct a new `Account`, storing `name` within it for /// debugging use. pub fn new(name: Name, trace_collector: Option) -> Arc { let id = NEXT_ACCOUNT_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed); let debt = Arc::new(AtomicI64::new(0)); ACCOUNTS.write().insert(id, (name, Arc::clone(&debt))); Arc::new(Account { id, debt, notify: Notify::new(), trace_collector, }) } /// Retrieve the current account balance: the number of /// currently-outstanding work items. pub fn balance(&self) -> i64 { self.debt.load(Ordering::Relaxed) } /// Borrow `token_count` work items against this account. pub fn borrow(&self, token_count: usize) { let token_count: i64 = token_count.try_into().expect("manageable token count"); self.debt.fetch_add(token_count, Ordering::Relaxed); } /// Repay `token_count` work items previously borrowed against this account. pub fn repay(&self, token_count: usize) { let token_count: i64 = token_count.try_into().expect("manageable token count"); let _old_debt = self.debt.fetch_sub(token_count, Ordering::Relaxed); if _old_debt - token_count <= *SYNDICATE_CREDIT { self.notify.notify_one(); } } /// Suspend execution until enough "clear funds" exist in this /// account for some subsequent activity to be permissible. pub async fn ensure_clear_funds(&self) { let limit = *SYNDICATE_CREDIT; // tokio::task::yield_now().await; while self.balance() > limit { // tokio::task::yield_now().await; self.notify.notified().await; } } } impl Drop for Account { fn drop(&mut self) { ACCOUNTS.write().remove(&self.id); } } impl LoanedItem { /// Construct a new `LoanedItem` containing `item`, recording /// `cost` work items against `account`. pub fn new(account: &Arc, cost: usize, item: T) -> Self { account.borrow(cost); LoanedItem { account: Arc::clone(account), cost, item } } } impl Drop for LoanedItem { fn drop(&mut self) { self.account.repay(self.cost); } } #[must_use] fn send_actions( tx: &UnboundedSender, caused_by: Option, account: &Arc, t: PendingEventQueue, ) -> ActorResult { let token_count = t.len(); Ok(tx.send(SystemMessage::Turn(caused_by, LoanedItem::new(account, token_count, t))) .map_err(|_| error("Target actor not running", AnyValue::new(false)))?) } impl std::fmt::Debug for Mailbox { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { write!(f, "#", self.actor_id) } } impl std::hash::Hash for Mailbox { fn hash(&self, state: &mut H) { self.actor_id.hash(state) } } impl Eq for Mailbox {} impl PartialEq for Mailbox { fn eq(&self, other: &Mailbox) -> bool { self.actor_id == other.actor_id } } impl Ord for Mailbox { fn cmp(&self, other: &Mailbox) -> std::cmp::Ordering { self.actor_id.cmp(&other.actor_id) } } impl PartialOrd for Mailbox { fn partial_cmp(&self, other: &Mailbox) -> Option { Some(self.cmp(&other)) } } impl Drop for Mailbox { fn drop(&mut self) { tracing::debug!("Last reference to mailbox of actor id {:?} was dropped", self.actor_id); let _ = self.tx.send(SystemMessage::Release); } } impl Actor { /// Create and start a new "top-level" actor: an actor not /// causally related to another. This is the usual way to start a /// Syndicate program. pub fn top ActorResult>( trace_collector: Option, boot: F, ) -> ActorHandle { let ac = Actor::new(None, trace_collector.clone()); let topcause = trace_collector.as_ref().map(|_| trace::TurnCause::external("top-level actor")); let account = Account::new(None, trace_collector); ac.boot(None, account, topcause, boot) } /// Create a new actor. It still needs to be [`boot`ed][Self::boot]. pub fn new(parent_actor_id: Option, trace_collector: Option) -> Self { let (tx, rx) = unbounded_channel(); let actor_id = next_actor_id(); let root = Facet::new(None); tracing::debug!(?actor_id, ?parent_actor_id, root_facet_id = ?root.facet_id, "Actor::new"); let mut st = Activation { facet_id: None, account: None, turn_description: None, trace_collector: None, pre_commit_actions: Vec::new(), rollback_actions: Vec::new(), commit_actions: Vec::new(), single_queue: None, multiple_queues: None, actor_id, actor_ref: ActorRef { actor_id, state: Arc::new(Mutex::new(ActorState::Terminated{ exit_status: Arc::new(ExitStatus::Normal) })), }, tx, mailbox: Weak::new(), exit_hooks: Vec::new(), outbound_assertions: Map::new(), facet_nodes: Map::new(), facet_children: Map::new(), root: root.facet_id, dataflow: None, }; st.facet_nodes.insert(root.facet_id, root); let ac_ref = st.actor_ref.clone(); *ac_ref.state.lock() = ActorState::Running(st); Actor { rx, trace_collector, ac_ref } } fn link(self, t_parent: &mut Activation) -> Self { if t_parent.active_facet().is_none() { panic!("No active facet when calling spawn_link"); } let account = Arc::clone(t_parent.account()); let mut h_to_child = None; let mut h_to_parent = None; let is_alive = self.ac_ref.root_facet_ref().activate( &account, None, |t_child| { h_to_child = Some(t_parent.half_link(t_child)); h_to_parent = Some(t_child.half_link(t_parent)); Ok(()) }); if is_alive { let parent_actor = t_parent.actor_id; t_parent.trace(|_| trace::ActionDescription::Link { parent_actor: Box::new(parent_actor.into()), parent_to_child: Box::new(protocol::Handle(h_to_child.unwrap().into())), child_actor: Box::new(self.ac_ref.actor_id.into()), child_to_parent: Box::new(protocol::Handle(h_to_parent.unwrap().into())), }); self } else { panic!("spawn_link'd actor terminated before link could happen"); } } /// Start the actor's mainloop. Takes ownership of `self`. The /// `name` is used as context for any log messages emitted by the /// actor. The `boot` function is called in the actor's context, /// and then the mainloop is entered. pub fn boot ActorResult>( mut self, name: Name, boot_account: Arc, boot_cause: Option, boot: F, ) -> ActorHandle { let actor_id = self.ac_ref.actor_id; ACTORS.write().insert(actor_id, (name.clone(), self.ac_ref.clone())); let trace_collector = boot_account.trace_collector.clone(); if let Some(c) = &trace_collector { c.record(actor_id, trace::ActorActivation::Start { actor_name: Box::new(name.into()), }); } tokio::spawn(async move { tracing::trace!(?actor_id, "start"); self.run(boot_account, boot_cause, move |t| { t.facet(boot)?; Ok(()) }).await; tracing::trace!(?actor_id, "stop"); self.ac_ref.exit_status().expect("terminated") }) // }.instrument(tracing::info_span!(parent: None, "actor", ?actor_id).or_current())) } async fn run ActorResult>( &mut self, boot_account: Arc, boot_cause: Option, boot: F, ) -> () { let root_facet_ref = self.ac_ref.root_facet_ref(); let terminate = |e: Error | { assert!(!root_facet_ref.activate(&Account::new(None, None), None, |_| Err(e)?)); }; if !root_facet_ref.activate(&boot_account, boot_cause, boot) { return; } 'mainloop: loop { tracing::trace!(actor_id = ?self.ac_ref.actor_id, "mainloop top"); match self.rx.recv().await { None => { terminate(error("Unexpected channel close", AnyValue::new(false))); break 'mainloop; } Some(m) => match m { SystemMessage::Release => { tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::Release"); assert!(!root_facet_ref.activate(&Account::new(None, None), None, |t| { t.stop_root(); Ok(()) })); break 'mainloop; } SystemMessage::ReleaseField(field_id) => { tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::ReleaseField({})", field_id); self.ac_ref.access(|s| if let ActorState::Running(ra) = s { ra.dataflow_state_mut().fields.remove(&field_id); }) } SystemMessage::Turn(cause, mut loaned_item) => { tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::Turn"); let actions = std::mem::take(&mut loaned_item.item); if !root_facet_ref.activate( &loaned_item.account, cause, |t| { for (maybe_desc, action) in actions.into_iter() { if let Some(desc) = maybe_desc { t.trace(|_| trace::ActionDescription::Dequeue { event: Box::new(desc), }); } action(t)?; } Ok(()) }) { break 'mainloop; } } SystemMessage::Crash(e) => { tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::Crash({:?})", &e); terminate(e); break 'mainloop; } } } } } } impl Facet { fn new(parent_facet_id: Option) -> Self { Facet { facet_id: next_facet_id(), parent_facet_id, outbound_handles: Set::new(), stop_actions: Vec::new(), linked_tasks: Map::new(), inert_check_preventers: Arc::new(AtomicU64::new(0)), } } fn prevent_inert_check(&mut self) -> impl FnOnce() { let inert_check_preventers = Arc::clone(&self.inert_check_preventers); let armed = AtomicU64::new(1); inert_check_preventers.fetch_add(1, Ordering::Relaxed); move || { match armed.compare_exchange(1, 0, Ordering::SeqCst, Ordering::SeqCst) { Ok(_) => { inert_check_preventers.fetch_sub(1, Ordering::Relaxed); () } Err(_) => (), } } } } #[derive(Debug, Clone)] #[allow(dead_code)] // otherwise we get 'warning: field `0` is never read' pub struct KeepAlive(Option>); impl ActorRef { /// Uses an internal mutex to access the internal state: takes the /// lock, calls `f` with the internal state, releases the lock, /// and returns the result of `f`. pub fn access R>(&self, f: F) -> R { f(&mut *self.state.lock()) } /// Retrieves the exit status of the denoted actor. If it is still /// running, yields `None`; otherwise, yields `Some(Ok(()))` if it /// exited normally, or `Some(Err(_))` if it terminated /// abnormally. pub fn exit_status(&self) -> Option { self.access(|state| match state { ActorState::Running(_) => None, ActorState::Terminated { exit_status } => Some((**exit_status).clone().into()), }) } /// Creates a [`KeepAlive`] for (usually temporarily) ensuring an [Actor] does not get /// garbage-collected due to no references to its [Mailbox] being held. (It may of course /// be terminated for other reasons.) pub fn keep_alive(&self) -> KeepAlive { KeepAlive(self.access(|s| match s { ActorState::Terminated { .. } => None, ActorState::Running(ra) => Some(ra.mailbox()), })) } fn facet_ref(&self, facet_id: FacetId) -> FacetRef { FacetRef { actor: self.clone(), facet_id, } } fn root_facet_id(&self) -> FacetId { self.access(|s| match s { ActorState::Terminated { .. } => panic!("Actor unexpectedly in terminated state"), ActorState::Running(ra) => ra.root, // what a lot of work to get this one number }) } fn root_facet_ref(&self) -> FacetRef { self.facet_ref(self.root_facet_id()) } } impl std::fmt::Debug for ActorRef { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { write!(f, "#", self.actor_id) } } impl ActorState { fn is_running(&self) -> bool { match self { ActorState::Terminated { .. } => false, ActorState::Running(_) => true, } } fn terminate( &mut self, exit_status: ExitStatus, actor: &ActorRef, trace_collector: &Option, ) { if !self.is_running() { return; } let exit_status = Arc::new(exit_status); let final_state = ActorState::Terminated { exit_status: Arc::clone(&exit_status) }; match std::mem::replace(self, final_state) { ActorState::Terminated { .. } => unreachable!(), ActorState::Running(state) => state.cleanup(Arc::clone(&exit_status), trace_collector.clone()), } match &*exit_status { ExitStatus::Normal => tracing::trace!(actor_id=?actor.actor_id, "normal stop"), ExitStatus::Dropped => tracing::debug!(actor_id=?actor.actor_id, "force-terminated by Actor::drop"), ExitStatus::Error(e) => tracing::error!(actor_id=?actor.actor_id, %e, "error stop"), } if let Some(c) = trace_collector { c.record(actor.actor_id, trace::ActorActivation::Stop { status: Box::new(match &*exit_status { ExitStatus::Normal | ExitStatus::Dropped => trace::ExitStatus::Ok, ExitStatus::Error(e) => trace::ExitStatus::Error(Box::new(e.clone())), }), }); } } } impl std::fmt::Debug for Field { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { write!(f, "#", self.name, self.field_id) } } impl Eq for Field {} impl PartialEq for Field { fn eq(&self, other: &Field) -> bool { self.field_id == other.field_id } } impl Drop for Field { fn drop(&mut self) { let _ = self.tx.send(SystemMessage::ReleaseField(self.field_id)); () } } impl Drop for Actor { fn drop(&mut self) { self.rx.close(); ACTORS.write().remove(&self.ac_ref.actor_id); // let _scope = tracing::info_span!(parent: None, "actor", actor_id = ?self.ac_ref.actor_id).entered(); let mut g = self.ac_ref.state.lock(); if g.is_running() { g.terminate(ExitStatus::Dropped, &self.ac_ref, &self.trace_collector); } tracing::debug!("Actor::drop"); } } impl Drop for Facet { fn drop(&mut self) { for (_task_id, token) in std::mem::take(&mut self.linked_tasks).into_iter() { token.cancel(); } if !self.outbound_handles.is_empty() { tracing::warn!( concat!("outbound_handles for {:?} not empty at drop time; ", "retractions will happen at actor termination, ", "but may not follow facet-tree order"), self.facet_id); } tracing::trace!(facet_id = ?self.facet_id, "Facet::drop"); } } impl Ref { /// Supplies the behaviour (`e`) for a `Ref` created via /// [`create_inert`][Activation::create_inert]. /// /// # Panics /// /// Panics if this `Ref` has already been given a behaviour. pub fn become_entity>(&self, e: E) { let mut g = self.target.lock(); if g.is_some() { panic!("Double initialization of Ref"); } *g = Some(Box::new(e)); } fn internal_with_entity) -> R>(&self, f: F) -> R { let mut g = self.target.lock(); // let _entry = tracing::info_span!("entity", r = ?self).entered(); f(g.as_mut().expect("initialized").as_mut()) } } impl Ref { /// Retrieves a process-unique identifier for the `Ref`; `Ref`s /// are compared by this identifier. pub fn oid(&self) -> usize { std::ptr::addr_of!(*self) as usize } pub fn debug_str(&self) -> String { format!("{}/{}:{:016x}", self.mailbox.actor_id, self.facet_id, self.oid()) } } impl PartialEq for Ref { fn eq(&self, other: &Self) -> bool { self.oid() == other.oid() } } impl Eq for Ref {} impl std::hash::Hash for Ref { fn hash(&self, hash: &mut H) where H: std::hash::Hasher { self.oid().hash(hash) } } impl PartialOrd for Ref { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } impl Ord for Ref { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.oid().cmp(&other.oid()) } } impl std::fmt::Debug for Ref { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { write!(f, "⌜{}⌝", self.debug_str()) } } impl Cap { /// Given a `Ref`, where `M` is interconvertible with /// `AnyValue`, yields a `Cap` for the referenced entity. The /// `Cap` automatically decodes presented `AnyValue`s into /// instances of `M`. pub fn guard( literals: &Arc, underlying: Arc>, ) -> Arc where M: for<'a> Unparse<&'a L, AnyValue>, M: for<'a> Parse<&'a L, AnyValue>, { let literals = Arc::clone(literals); Self::new(&Arc::new(Ref { mailbox: Arc::clone(&underlying.mailbox), facet_id: underlying.facet_id, target: Mutex::new(Some(Box::new(Guard { underlying, literals }))), })) } /// Directly constructs a `Cap` for `underlying`. pub fn new(underlying: &Arc>) -> Arc { Arc::new(Cap { underlying: Arc::clone(underlying), attenuation: Vec::new(), }) } /// Yields a fresh `Cap` for `self`'s `underlying`, copying the /// existing attenuation of `self` to the new `Cap` and adding the /// `caveats` to it. pub fn attenuate(&self, caveats: &[sturdy::Caveat]) -> Result, CaveatError> { let mut r = Cap { attenuation: self.attenuation.clone(), .. self.clone() }; r.attenuation.extend(sturdy::Caveat::check_many(caveats)?); Ok(Arc::new(r)) } /// Applies the contained attenuation to `a`, yielding `None` if /// `a` is filtered out, or `Some(_)` if it is accepted (and /// possibly transformed). pub fn rewrite(&self, mut a: AnyValue) -> Option { for c in self.attenuation.iter().rev() { match c.rewrite(&a) { Some(v) => a = v, None => return None, } } Some(a) } /// Translates `m` into an `AnyValue`, passes it through /// [`rewrite`][Self::rewrite], and then /// [`assert`s][Activation::assert] it using the activation `t`. pub fn assert>(&self, t: &mut Activation, literals: L, m: &M) -> Option { self.rewrite(m.unparse(literals)).map(|m| t.assert(&self.underlying, m)) } /// `update` is to [`assert`] as [`Activation::update`] is to [`Activation::assert`]. pub fn update>( &self, t: &mut Activation, handle: &mut Option, literals: L, m: Option<&M>, ) { t.update(handle, &self.underlying, m.and_then(|m| self.rewrite(m.unparse(literals)))) } /// Translates `m` into an `AnyValue`, passes it through /// [`rewrite`][Self::rewrite], and then sends it via method /// [`message`][Activation::message] on the activation `t`. pub fn message>(&self, t: &mut Activation, literals: L, m: &M) { if let Some(m) = self.rewrite(m.unparse(literals)) { t.message(&self.underlying, m) } } /// Synchronizes with the reference underlying the cap. pub fn sync(&self, t: &mut Activation, peer: Arc>) { t.sync(&self.underlying, peer) } pub fn debug_str(&self) -> String { if self.attenuation.is_empty() { self.underlying.debug_str() } else { format!("{}/{}:{:016x}\\{:?}", self.underlying.mailbox.actor_id, self.underlying.facet_id, self.underlying.oid(), self.attenuation) } } } impl std::fmt::Debug for Cap { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { write!(f, "⌜{}⌝", self.debug_str()) } } impl Domain for Cap {} impl std::convert::TryFrom<&IOValue> for Cap { type Error = preserves_schema::support::ParseError; fn try_from(_v: &IOValue) -> Result { panic!("Attempted to serialize Cap via IOValue"); } } impl std::convert::From<&Cap> for IOValue { fn from(_v: &Cap) -> IOValue { panic!("Attempted to deserialize Ref via IOValue"); } } impl Entity for Guard where M: for<'a> Unparse<&'a L, AnyValue>, M: for<'a> Parse<&'a L, AnyValue>, { fn assert(&mut self, t: &mut Activation, a: AnyValue, h: Handle) -> ActorResult { match M::parse(&*self.literals, &a) { Ok(a) => t.with_entity(&self.underlying, |t, e| e.assert(t, a, h)), Err(_) => Ok(()), } } fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult { t.with_entity(&self.underlying, |t, e| e.retract(t, h)) } fn message(&mut self, t: &mut Activation, m: AnyValue) -> ActorResult { match M::parse(&*self.literals, &m) { Ok(m) => t.with_entity(&self.underlying, |t, e| e.message(t, m)), Err(_) => Ok(()), } } fn sync(&mut self, t: &mut Activation, peer: Arc>) -> ActorResult { t.with_entity(&self.underlying, |t, e| e.sync(t, peer)) } fn stop(&mut self, t: &mut Activation) -> ActorResult { t.with_entity(&self.underlying, |t, e| e.stop(t)) } fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) { self.underlying.internal_with_entity(|e| e.exit_hook(t, exit_status)) } } impl Entity for StopOnRetract { fn retract(&mut self, t: &mut Activation, _h: Handle) -> ActorResult { Ok(t.stop()) } } impl ActorResult> Entity for F { fn message(&mut self, t: &mut Activation, _m: Synced) -> ActorResult { self(t) } } async fn wait_loop(wait_time: time::Duration) { let deadline = time::Instant::now() + wait_time; while time::Instant::now() < deadline { let remaining_count = ACTORS.read().len(); if remaining_count == 0 { break; } tracing::debug!("Waiting for {} remaining actors to stop", remaining_count); tokio::time::sleep(time::Duration::from_millis(100)).await; } } pub async fn wait_for_all_actors_to_stop(wait_time: time::Duration) { wait_loop(wait_time).await; let remaining = ACTORS.read().clone(); if remaining.len() > 0 { tracing::warn!("Some actors remain after {:?}:", wait_time); for (name, actor) in remaining.into_values() { tracing::warn!(?name, ?actor.actor_id, "actor still running, requesting shutdown"); let g = actor.state.lock(); if let ActorState::Running(state) = &*g { state.shutdown(); } } wait_loop(wait_time).await; let remaining = ACTORS.read().clone(); if remaining.len() > 0 { tracing::error!("Some actors failed to stop after being explicitly shut down:"); for (name, actor) in remaining.into_values() { tracing::error!(?name, ?actor.actor_id, "actor failed to stop"); } } else { tracing::debug!("All remaining actors have stopped."); } } else { tracing::debug!("All remaining actors have stopped."); } } /// A convenient way of cloning a bunch of state shared among [entities][Entity], actions, /// linked tasks, etc. /// /// Directly drawn from the discussion [here](https://github.com/rust-lang/rfcs/issues/2407). /// #[macro_export] macro_rules! enclose { ( ( $($name:ident),* ) $closure:expr ) => { { $(let $name = $name.clone();)* $closure } } }