#![doc = concat!( include_str!("../doc/actor.md"), include_str!("../doc/what-is-an-actor.md"), include_str!("../doc/flow-control.md"), include_str!("../doc/linked-tasks.md"), )] use super::schemas::sturdy; use super::error::Error; use super::error::encode_error; use super::error::error; use super::rewrite::CaveatError; use super::rewrite::CheckedCaveat; use preserves::value::Domain; use preserves::value::IOValue; use preserves::value::Map; use preserves::value::NestedValue; use preserves::value::Set; use preserves_schema::support::ParseError; use std::boxed::Box; use std::collections::hash_map::HashMap; use std::convert::TryFrom; use std::convert::TryInto; use std::num::NonZeroU64; use std::sync::Arc; use std::sync::Mutex; use std::sync::RwLock; use std::sync::Weak; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use tokio::select; use tokio::sync::Notify; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; use tokio_util::sync::CancellationToken; use tracing::Instrument; /// The type of messages and assertions that can be exchanged among /// distributed objects, including via [dataspace][crate::dataspace]. /// /// A Preserves value where embedded references are instances of /// [`Cap`]. /// /// While [`Ref`] can be used within a process, where arbitrary /// `M`-values can be exchanged among objects, for distributed or /// polyglot systems a *lingua franca* has to be chosen. `AnyValue` is /// that language. pub type AnyValue = super::schemas::internal_protocol::_Any; /// The type of process-unique actor IDs. pub type ActorId = u64; /// The type of process-unique facet IDs. pub type FacetId = NonZeroU64; /// The type of process-unique assertion handles. /// /// Used both as a reference to [retract][Entity::retract] /// previously-asserted assertions and as an indexing key to associate /// local state with some incoming assertion in an entity. pub type Handle = u64; /// Responses to events must have type `ActorResult`. pub type ActorResult = Result<(), Error>; /// The [`Actor::boot`] method returns an `ActorHandle`, representing /// the actor's mainloop task. pub type ActorHandle = tokio::task::JoinHandle; /// A small protocol for indicating successful synchronisation with /// some peer; see [Entity::sync]. pub struct Synced; /// The core metaprotocol implemented by every object. /// /// Entities communicate with each other by asserting and retracting /// values and by sending messages (which can be understood very /// approximately as "infinitesimally brief" assertions of the message /// body). /// /// Every assertion placed at a receiving entity *R* from some sending /// entity *S* lives so long as *S*'s actor survives, *R*'s actor /// survives, and *S* does not retract the assertion. Messages, by /// contrast, are transient. /// /// Implementors of [`Entity`] accept assertions from peers in method /// [`assert`][Entity::assert]; notification of retraction of a /// previously-asserted value happens in method /// [`retract`][Entity::retract]; and notification of a message in /// method [`message`][Entity::message]. /// /// In addition, entities may *synchronise* with each other: the /// [`sync`][Entity::sync] method responds to a synchronisation /// request. /// /// Finally, the Rust implementation of the Syndicated Actor model /// offers a hook for running some code at the end of an Entity's /// containing [`Actor`]'s lifetime /// ([`exit_hook`][Entity::exit_hook]). /// /// # What to implement /// /// The default implementations of the methods here generally do /// nothing; override them to add actual behaviour to your entity. /// #[allow(unused_variables)] pub trait Entity: Send { /// Receive notification of a new assertion from a peer. /// /// The `turn` parameter represents the current /// [activation][Activation]; `assertion` is the value (of type /// `M`) asserted; and `handle` is the process-unique name for /// this particular assertion instance that will be used later /// when it is [retracted][Entity::retract]. /// /// The default implementation does nothing. fn assert(&mut self, turn: &mut Activation, assertion: M, handle: Handle) -> ActorResult { Ok(()) } /// Receive notification of retraction of a previous assertion from a peer. /// /// This happens either when the asserting peer explicitly /// retracts an assertion, or when its animating [`Actor`] /// terminates. /// /// The `turn` parameter represents the current /// [activation][Activation], and `handle` is the process-unique /// name for this particular assertion instance being retracted. /// /// Note that no `assertion` value is provided: entities needing /// to know the value that was previously asserted must remember /// it themselves (perhaps in a [`Map`] keyed by `handle`). /// /// The default implementation does nothing. fn retract(&mut self, turn: &mut Activation, handle: Handle) -> ActorResult { Ok(()) } /// Receive notification of a message from a peer. /// /// The `turn` parameter represents the current /// [activation][Activation], and `message` is the body of the /// message sent. /// /// The default implementation does nothing. fn message(&mut self, turn: &mut Activation, message: M) -> ActorResult { Ok(()) } /// Respond to a synchronisation request from a peer. /// /// Implementors of [`Entity`] will seldom override this. The /// default implementation fulfils the synchronisation protocol by /// responding to `peer` with a `Synced` message. /// /// In special cases, for example when an entity is a proxy for /// some remote entity, the right thing to do is to forward the /// synchronisation request on to another entity; in those cases, /// overriding the default behaviour is appropriate. fn sync(&mut self, turn: &mut Activation, peer: Arc>) -> ActorResult { turn.message(&peer, Synced); Ok(()) } /// Optional callback for running cleanup actions when the /// entity's animating [Actor] terminates. /// /// Programs register an entity's exit hook with /// [RunningActor::add_exit_hook]. /// /// The default implementation does nothing. fn exit_hook(&mut self, turn: &mut Activation, exit_status: &Arc) -> ActorResult { Ok(()) } } /// An "inert" entity, that does nothing in response to any event delivered to it. /// /// Useful as a placeholder or dummy in various situations. pub struct InertEntity; impl Entity for InertEntity {} enum CleanupAction { ForMyself(Action), ForAnother(Arc, Action), } type CleanupActions = Map; type Action = Box ActorResult>; #[doc(hidden)] pub type PendingEventQueue = Vec; /// The main API for programming Syndicated Actor objects. /// /// Through `Activation`s, programs can access the state of their /// animating [`RunningActor`] and their active [`Facet`]. /// /// Usually, an `Activation` will be supplied to code that needs one; but when non-Actor code /// (such as a [linked task][crate::actor#linked-tasks]) needs to enter an Actor's execution /// context, use [`FacetRef::activate`] to construct one. /// /// Many actions that an entity can perform are methods directly on /// `Activation`, but methods on the [`RunningActor`] and [`FacetRef`] /// values contained in an `Activation` are also sometimes useful. /// /// This is what other implementations call a "Turn", renamed here to /// avoid conflicts with [`crate::schemas::internal_protocol::Turn`]. pub struct Activation<'activation> { /// A reference to the currently active [`Facet`] and the implementation-side state of its /// [`Actor`]. pub facet: FacetRef, /// A reference to the current state of the active [`Actor`]. pub state: &'activation mut RunningActor, pending: EventBuffer, } struct EventBuffer { pub account: Arc, queues: HashMap, PendingEventQueue)>, for_myself: PendingEventQueue, } /// An `Account` records a "debt" in terms of outstanding work items. /// /// It is part of the flow control mechanism - see [the module-level /// documentation][crate::actor#flow-control] for more. #[derive(Debug)] pub struct Account { id: u64, debt: Arc, notify: Notify, } /// A `LoanedItem` is a `T` with an associated `cost` recorded /// against it in the ledger of a given [`Account`]. The cost is /// repaid automatically when the `LoanedItem` is `Drop`ped. /// /// `LoanedItem`s are part of the flow control mechanism - see [the /// module-level documentation][crate::actor#flow-control] for more. #[derive(Debug)] pub struct LoanedItem { /// The account against which this loan is recorded. pub account: Arc, /// The cost of this particular `T`. pub cost: usize, /// The underlying item itself. pub item: T, } enum SystemMessage { Release, Turn(LoanedItem), Crash(Error), } /// The mechanism by which events are delivered to a given [`Actor`]. pub struct Mailbox { /// The ID of the actor this mailbox corresponds to. pub actor_id: ActorId, tx: UnboundedSender, } /// Each actor owns an instance of this structure. /// /// It holds the receive-half of the actor's mailbox, plus a reference /// to the actor's private state. pub struct Actor { rx: UnboundedReceiver, ac_ref: ActorRef, } /// A reference to an actor's private [`ActorState`]. #[derive(Clone)] pub struct ActorRef { /// The ID of the referenced actor. pub actor_id: ActorId, state: Arc>, } /// A combination of an [`ActorRef`] with a [`FacetId`], acting as a capability to enter the /// execution context of a facet from a linked task. #[derive(Clone)] pub struct FacetRef { pub actor: ActorRef, pub facet_id: FacetId, } /// The state of an actor: either `Running` or `Terminated`. pub enum ActorState { /// A non-terminated actor has an associated [`RunningActor`] state record. Running(RunningActor), /// A terminated actor has an [`ActorResult`] as its `exit_status`. Terminated { /// The exit status of the actor: `Ok(())` for normal /// termination, `Err(_)` for abnormal termination. exit_status: Arc, }, } /// State associated with each non-terminated [`Actor`]. pub struct RunningActor { /// The ID of the actor this state belongs to. pub actor_id: ActorId, tx: UnboundedSender, mailbox: Weak, exit_hooks: Vec) -> ActorResult>>, facet_nodes: Map, facet_children: Map>, root: FacetId, } /// State associated with each facet in an [`Actor`]'s facet tree. /// /// # Inert facets /// /// A facet is considered *inert* if: /// /// 1. it has no child facets; /// 2. it has no cleanup actions (that is, no assertions placed by any of its entities); /// 3. it has no linked tasks; and /// 4. it has no "inert check preventers" (see [Activation::prevent_inert_check]). /// /// If a facet is created and is inert at the moment that its `boot` function returns, it is /// automatically terminated. /// /// When a facet is terminated, if its parent facet is inert, the parent is terminated. /// /// If the root facet in an actor is terminated, the entire actor is terminated (with exit /// status `Ok(())`). /// pub struct Facet { /// The ID of the facet. pub facet_id: FacetId, /// The ID of the facet's parent facet, if any; if None, this facet is the `Actor`'s root facet. pub parent_facet_id: Option, cleanup_actions: CleanupActions, stop_actions: Vec, linked_tasks: Map, inert_check_preventers: Arc, } /// A reference to an object that expects messages/assertions of type /// `M`. /// /// The object can be in the same actor, in a different local /// (in-process) actor, or accessible across a network link. pub struct Ref { /// Mailbox of the actor owning the referenced entity. pub mailbox: Arc, /// ID of the facet (within the actor) owning the referenced entity. pub facet_id: FacetId, /// Mutex owning and guarding the state backing the referenced entity. pub target: Mutex>>>, } /// Specialization of `Ref` for messages/assertions of type /// [`AnyValue`]. /// /// All polyglot and network communication is done in terms of `Cap`s. /// /// `Cap`s can also be *attenuated* ([Hardy 2017]; [Miller 2006]) to /// reduce (or otherwise transform) the range of assertions and /// messages they can be used to send to their referent. The /// Syndicated Actor model uses /// [Macaroon](https://syndicate-lang.org/doc/capabilities/)-style /// capability attenuation. /// /// [Hardy 2017]: http://cap-lore.com/CapTheory/Patterns/Attenuation.html /// [Miller 2006]: http://www.erights.org/talks/thesis/markm-thesis.pdf #[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct Cap { #[doc(hidden)] pub underlying: Arc>, #[doc(hidden)] pub attenuation: Vec, } /// Adapter for converting an underlying [`Ref`] to a [`Cap`]. /// /// The [`Entity`] implementation for `Guard` decodes `AnyValue` /// assertions/messages to type `M` before passing them on to the /// underlying entity. pub struct Guard where for<'a> &'a M: Into, for<'a> M: TryFrom<&'a AnyValue>, { underlying: Arc> } /// Simple entity that stops its containing facet when any assertion it receives is /// subsequently retracted. pub struct StopOnRetract; //--------------------------------------------------------------------------- const BUMP_AMOUNT: u8 = 10; static NEXT_ACTOR_ID: AtomicU64 = AtomicU64::new(1); #[doc(hidden)] pub fn next_actor_id() -> ActorId { NEXT_ACTOR_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed) } static NEXT_FACET_ID: AtomicU64 = AtomicU64::new(2); #[doc(hidden)] pub fn next_facet_id() -> FacetId { FacetId::new(NEXT_FACET_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed)) .expect("Internal error: Attempt to allocate FacetId of zero. Too many FacetIds allocated. Restart the process.") } static NEXT_HANDLE: AtomicU64 = AtomicU64::new(3); /// Allocate a process-unique `Handle`. pub fn next_handle() -> Handle { NEXT_HANDLE.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed) } static NEXT_ACCOUNT_ID: AtomicU64 = AtomicU64::new(4); static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(5); preserves_schema::support::lazy_static! { #[doc(hidden)] pub static ref SYNDICATE_CREDIT: i64 = { let credit = std::env::var("SYNDICATE_CREDIT").unwrap_or("100".to_owned()) .parse::().expect("Valid SYNDICATE_CREDIT environment variable"); tracing::debug!("Configured SYNDICATE_CREDIT = {}", credit); credit }; #[doc(hidden)] pub static ref ACCOUNTS: RwLock)>> = RwLock::new(Map::new()); } impl TryFrom<&AnyValue> for Synced { type Error = ParseError; fn try_from(value: &AnyValue) -> Result { if let Some(true) = value.value().as_boolean() { Ok(Synced) } else { Err(ParseError::conformance_error("Synced")) } } } impl From<&Synced> for AnyValue { fn from(_value: &Synced) -> Self { AnyValue::new(true) } } impl FacetRef { /// Executes `f` in a new "[turn][Activation]" for `actor`. If `f` returns `Ok(())`, /// [commits the turn][Activation::deliver] and performs the buffered actions; otherwise, /// [abandons the turn][Activation::clear] and discards the buffered actions. /// /// Bills any activity to `account`. pub fn activate( &self, account: Arc, f: F, ) -> ActorResult where F: FnOnce(&mut Activation) -> ActorResult, { match self.activate_exit(account, |t| match f(t) { Ok(()) => None, Err(e) => Some(Err(e)), }) { None => Ok(()), Some(e) => Err(error("Could not activate terminated actor", encode_error(e))), } } /// Executes `f` in a new "[turn][Activation]" for `actor`. If `f` returns /// `Some(exit_status)`, terminates `actor` with that `exit_status`. Otherwise, if `f` /// returns `None`, leaves `actor` in runnable state. [Commits buffered /// actions][Activation::deliver] unless `actor` terminates with an `Err` status. /// /// Bills any activity to `account`. pub fn activate_exit( &self, account: Arc, f: F, ) -> Option where F: FnOnce(&mut Activation) -> Option, { match self.actor.state.lock() { Err(_) => panicked_err(), Ok(mut g) => match &mut *g { ActorState::Terminated { exit_status } => Some((**exit_status).clone()), ActorState::Running(state) => { tracing::trace!(actor_id=?self.actor.actor_id, "activate"); let mut activation = Activation::make(self, account, state); let result = match f(&mut activation) { None => None, Some(exit_status) => { if exit_status.is_err() { activation.clear(); } drop(activation); let exit_status = Arc::new(exit_status); let mut t = Activation::make(&self.actor.facet_ref(state.root), Account::new(crate::name!("shutdown")), state); for action in std::mem::take(&mut t.state.exit_hooks) { if let Err(err) = action(&mut t, &exit_status) { tracing::error!(?err, "error in exit hook"); } } if let Err(err) = t._terminate_facet(t.state.root, false) { // This can only occur as the result of an internal error in this file's code. tracing::error!(?err, "unexpected error from disorderly terminate_facet"); panic!("Unexpected error result from disorderly terminate_facet"); } *g = ActorState::Terminated { exit_status: Arc::clone(&exit_status), }; Some((*exit_status).clone()) } }; tracing::trace!(actor_id=?self.actor.actor_id, "deactivate"); result } } } } } impl<'activation> Activation<'activation> { fn make( facet: &FacetRef, account: Arc, state: &'activation mut RunningActor, ) -> Self { Activation { facet: facet.clone(), state, pending: EventBuffer::new(account), } } fn immediate_oid(&self, r: &Arc>) { if r.mailbox.actor_id != self.facet.actor.actor_id { panic!("Cannot use for_myself to send to remote peers"); } } fn with_facet(&mut self, check_existence: bool, facet_id: FacetId, f: F) -> ActorResult where F: FnOnce(&mut Activation) -> ActorResult, { if !check_existence || self.state.facet_nodes.contains_key(&facet_id) { tracing::trace!(check_existence, facet_id, "is_alive"); let old_facet_id = self.facet.facet_id; self.facet.facet_id = facet_id; let result = f(self); self.facet.facet_id = old_facet_id; result } else { tracing::trace!(facet_id, "not_alive"); Ok(()) } } #[doc(hidden)] pub fn with_entity(&mut self, r: &Arc>, f: F) -> ActorResult where F: FnOnce(&mut Activation, &mut dyn Entity) -> ActorResult { self.with_facet(true, r.facet_id, |t| r.internal_with_entity(|e| f(t, e))) } fn active_facet<'a>(&'a mut self) -> Option<&'a mut Facet> { self.state.get_facet(self.facet.facet_id) } /// Core API: assert `a` at recipient `r`. /// /// Returns the [`Handle`] for the new assertion. pub fn assert(&mut self, r: &Arc>, a: M) -> Handle { let handle = next_handle(); if let Some(f) = self.active_facet() { f.insert_retract_cleanup_action(&r, handle); drop(f); { let r = Arc::clone(r); self.pending.queue_for(&r).push(Box::new( move |t| t.with_entity(&r, |t, e| e.assert(t, a, handle)))); } } handle } /// Core API: assert `a` at `r`, which must be a `Ref` within the active actor. /// /// It's perfectly OK to use method [`assert`][Self::assert] even /// for `Ref`s that are part of the active actor. The difference /// between `assert` and `assert_for_myself` is that `r`'s handler /// for `assert` runs in a separate, later [`Activation`], while /// `r`'s handler for `assert_for_myself` runs in *this* /// [`Activation`], before it commits. /// /// Returns the [`Handle`] for the new assertion. /// /// # Panics /// /// Panics if `r` is not part of the active actor. pub fn assert_for_myself(&mut self, r: &Arc>, a: M) -> Handle { self.immediate_oid(r); let handle = next_handle(); if let Some(f) = self.active_facet() { { let r = Arc::clone(r); f.cleanup_actions.insert( handle, CleanupAction::ForMyself(Box::new( move |t| t.with_entity(&r, |t, e| e.retract(t, handle))))); } drop(f); { let r = Arc::clone(r); self.pending.for_myself.push(Box::new( move |t| t.with_entity(&r, |t, e| e.assert(t, a, handle)))); } } handle } fn half_link(&mut self, t_other: &mut Activation) { let entity_ref = t_other.create::(StopOnRetract); let handle = next_handle(); self.active_facet().unwrap().insert_retract_cleanup_action(&entity_ref, handle); t_other.with_entity(&entity_ref, |t, e| e.assert(t, AnyValue::new(true), handle)).unwrap(); } /// Core API: retract a previously-established assertion. pub fn retract(&mut self, handle: Handle) { if let Some(f) = self.active_facet() { if let Some(d) = f.cleanup_actions.remove(&handle) { self.pending.execute_cleanup_action(d) } } } /// Core API: send message `m` to recipient `r`. pub fn message(&mut self, r: &Arc>, m: M) { let r = Arc::clone(r); self.pending.queue_for(&r).push(Box::new( move |t| t.with_entity(&r, |t, e| e.message(t, m)))) } /// Core API: send message `m` to recipient `r`, which must be a /// `Ref` within the active actor. /// /// Method `message_for_myself` is to [`message`][Self::message] /// as [`assert_for_myself`][Self::assert_for_myself] is to /// [`assert`][Self::assert]. /// /// # Panics /// /// Panics if `r` is not part of the active actor. pub fn message_for_myself(&mut self, r: &Arc>, m: M) { self.immediate_oid(r); let r = Arc::clone(r); self.pending.for_myself.push(Box::new( move |t| t.with_entity(&r, |t, e| e.message(t, m)))) } /// Core API: begins a synchronisation with `r`. /// /// Once the synchronisation request reaches `r`'s actor, it will /// send a response to `peer`, which acts as a continuation for /// the synchronisation request. pub fn sync(&mut self, r: &Arc>, peer: Arc>) { let r = Arc::clone(r); self.pending.queue_for(&r).push(Box::new( move |t| t.with_entity(&r, |t, e| e.sync(t, peer)))) } /// Retrieve the [`Account`] against which actions are recorded. pub fn account(&self) -> &Arc { &self.pending.account } /// Discards all pending actions in this activation. pub fn clear(&mut self) { self.pending.clear(); } /// Delivers all pending actions in this activation. /// /// This is called automatically when an `Activation` is /// `Drop`ped. /// /// # Panics /// /// Panics if any pending actions "`for_myself`" (resulting from /// [`assert_for_myself`][Self::assert_for_myself] or /// [`message_for_myself`][Self::message_for_myself]) are /// outstanding at the time of the call. pub fn deliver(&mut self) { self.pending.deliver(); } /// Construct an entity with behaviour [`InertEntity`] within the active facet. pub fn inert_entity(&mut self) -> Arc> { self.create(InertEntity) } /// Construct an entity with behaviour `e` within the active facet. pub fn create + Send + 'static>(&mut self, e: E) -> Arc> { let r = self.create_inert(); r.become_entity(e); r } /// Construct an entity (within the active facet) whose behaviour will be specified later /// via [`become_entity`][Ref::become_entity]. pub fn create_inert(&mut self) -> Arc> { Arc::new(Ref { mailbox: self.state.mailbox(), facet_id: self.facet.facet_id, target: Mutex::new(None), }) } /// Start a new [linked task][crate::actor#linked-tasks] attached to the active facet. The /// task will execute the future "`boot`" to completion unless it is cancelled first (by /// e.g. termination of the owning facet or crashing of the owning actor). Uses `name` for /// log messages emitted by the task. pub fn linked_task>( &mut self, name: tracing::Span, boot: F, ) { let mailbox = self.state.mailbox(); if let Some(f) = self.active_facet() { let token = CancellationToken::new(); let task_id = NEXT_TASK_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed); name.record("task_id", &task_id); { let token = token.clone(); tokio::spawn(async move { tracing::trace!(task_id, "linked task start"); select! { _ = token.cancelled() => { tracing::trace!(task_id, "linked task cancelled"); Ok(()) } result = boot => { match &result { Ok(()) => { tracing::trace!(task_id, "linked task normal stop"); () } Err(e) => { tracing::error!(task_id, "linked task error: {}", e); let _ = mailbox.tx.send(SystemMessage::Crash(e.clone())); () } } result } } }.instrument(name)); } f.linked_tasks.insert(task_id, token); } } fn enqueue_for_myself_at_commit(&mut self, action: Action) { let mailbox = self.state.mailbox(); self.pending.queue_for_mailbox(&mailbox).push(action); } /// Schedule the creation of a new actor when the Activation commits. pub fn spawn ActorResult>( &mut self, name: tracing::Span, boot: F, ) { self.enqueue_for_myself_at_commit(Box::new(move |_| { Actor::new().boot(name, boot); Ok(()) })); } /// Schedule the creation of a new actor when the Activation commits. /// /// The new actor will be "linked" to the active facet: if the new actor terminates, the /// active facet is stopped, and if the active facet stops, the new actor's root facet is /// stopped. pub fn spawn_link ActorResult>( &mut self, name: tracing::Span, boot: F, ) { let facet_id = self.facet.facet_id; self.enqueue_for_myself_at_commit(Box::new(move |t| { t.with_facet(true, facet_id, move |t| { Actor::new().link(t).boot(name, boot); Ok(()) }) })); } /// Create a new subfacet of the currently-active facet. Runs `boot` in the new facet's /// context. If `boot` returns leaving the new facet [inert][Facet#inert-facets], pub fn facet ActorResult>( &mut self, boot: F, ) -> Result { let f = Facet::new(Some(self.facet.facet_id)); let facet_id = f.facet_id; self.state.facet_nodes.insert(facet_id, f); self.state.facet_children.entry(self.facet.facet_id).or_default().insert(facet_id); self.with_facet(true /* TODO: tiny optimisation: "false" would be safe here */, facet_id, move |t| { boot(t)?; t.stop_if_inert(); Ok(()) })?; Ok(facet_id) } /// Useful during facet (and actor) startup, in some situations: when a facet `boot` /// procedure would return while the facet is inert, but the facet should survive until /// some subsequent time, call `prevent_inert_check` to increment a counter that prevents /// inertness-checks from succeeding on the active facet. /// /// The result of `prevent_inert_check` is a function which, when called, decrements the /// counter again. After the counter has been decremented, any subsequent inertness checks /// will no longer be artificially forced to fail. /// /// An example of when you might want this: creating an actor having only a single /// Dataspace entity within it, then using the Dataspace from other actors. At the start of /// its life, the Dataspace actor will have no outbound assertions, no child facets, and no /// linked tasks, so the only way to prevent it from being prematurely garbage collected is /// to use `prevent_inert_check` in its boot function. pub fn prevent_inert_check(&mut self) -> Box { if let Some(f) = self.active_facet() { Box::new(f.prevent_inert_check()) } else { Box::new(|| ()) } } /// Arranges for the [`Facet`] named by `facet_id` to be stopped cleanly when `self` /// commits. If `continuation` is supplied, the facet to be stopped hasn't been stopped /// yet, none of the shutdown handlers yields an error, and the facet's parent facet is /// alive, executes `continuation` in the parent facet's context. pub fn stop_facet(&mut self, facet_id: FacetId, continuation: Option) { let maybe_parent_id = self.active_facet().and_then(|f| f.parent_facet_id); self.enqueue_for_myself_at_commit(Box::new(move |t| { t._terminate_facet(facet_id, true)?; if let Some(k) = continuation { if let Some(parent_id) = maybe_parent_id { t.with_facet(true, parent_id, k)?; } } Ok(()) })); } /// Arranges for the active facet to be stopped cleanly when `self` commits. /// /// Equivalent to `self.stop_facet(self.facet_id.unwrap(), None)`. pub fn stop(&mut self) { self.stop_facet(self.facet.facet_id, None) } fn stop_if_inert(&mut self) { let facet_id = self.facet.facet_id; self.enqueue_for_myself_at_commit(Box::new(move |t| { tracing::trace!("Checking inertness of facet {} from facet {}", facet_id, t.facet.facet_id); if t.state.facet_exists_and_is_inert(facet_id) { tracing::trace!(" - facet {} is inert, stopping it", facet_id); t.stop_facet(facet_id, None); } else { tracing::trace!(" - facet {} is not inert", facet_id); } Ok(()) })) } fn _terminate_facet(&mut self, facet_id: FacetId, alive: bool) -> ActorResult { if let Some(mut f) = self.state.facet_nodes.remove(&facet_id) { tracing::debug!("{} termination of {:?}", if alive { "living" } else { "post-exit" }, facet_id); if let Some(p) = f.parent_facet_id { self.state.facet_children.get_mut(&p).map(|children| children.remove(&facet_id)); } self.with_facet(false, facet_id, |t| { if let Some(children) = t.state.facet_children.remove(&facet_id) { for child_id in children.into_iter() { t._terminate_facet(child_id, alive)?; } } if alive { for action in std::mem::take(&mut f.stop_actions) { action(t)?; } let parent_facet_id = f.parent_facet_id; // if !alive, the drop will happen at the end of this function, but we // need it to happen right here so that child-facet cleanup-actions are // performed before parent-facet cleanup-actions. drop(f); if let Some(p) = parent_facet_id { if t.state.facet_exists_and_is_inert(p) { t._terminate_facet(p, true)?; } } else { t.state.shutdown(); } } Ok(()) }) } else { Ok(()) } } } impl EventBuffer { fn new(account: Arc) -> Self { EventBuffer { account, queues: HashMap::new(), for_myself: Vec::new(), } } fn execute_cleanup_action(&mut self, d: CleanupAction) { match d { CleanupAction::ForAnother(mailbox, action) => self.queue_for_mailbox(&mailbox).push(action), CleanupAction::ForMyself(action) => self.for_myself.push(action), } } fn queue_for(&mut self, r: &Arc>) -> &mut PendingEventQueue { self.queue_for_mailbox(&r.mailbox) } fn queue_for_mailbox(&mut self, mailbox: &Arc) -> &mut PendingEventQueue { &mut self.queues.entry(mailbox.actor_id) .or_insert((mailbox.tx.clone(), Vec::new())).1 } fn clear(&mut self) { self.queues = HashMap::new(); self.for_myself = PendingEventQueue::new(); } fn deliver(&mut self) { if !self.for_myself.is_empty() { panic!("Unprocessed for_myself events remain at deliver() time"); } for (_actor_id, (tx, turn)) in std::mem::take(&mut self.queues).into_iter() { if let Err(e) = send_actions(&tx, &self.account, turn) { tracing::error!(actor=?_actor_id, ?e); } } } } impl Drop for EventBuffer { fn drop(&mut self) { self.deliver() } } impl Account { /// Construct a new `Account`, storing `name` within it for /// debugging use. pub fn new(name: tracing::Span) -> Arc { let id = NEXT_ACCOUNT_ID.fetch_add(1, Ordering::Relaxed); let debt = Arc::new(AtomicI64::new(0)); ACCOUNTS.write().unwrap().insert(id, (name, Arc::clone(&debt))); Arc::new(Account { id, debt, notify: Notify::new(), }) } /// Retrieve the current account balance: the number of /// currently-outstanding work items. pub fn balance(&self) -> i64 { self.debt.load(Ordering::Relaxed) } /// Borrow `token_count` work items against this account. pub fn borrow(&self, token_count: usize) { let token_count: i64 = token_count.try_into().expect("manageable token count"); self.debt.fetch_add(token_count, Ordering::Relaxed); } /// Repay `token_count` work items previously borrowed against this account. pub fn repay(&self, token_count: usize) { let token_count: i64 = token_count.try_into().expect("manageable token count"); let _old_debt = self.debt.fetch_sub(token_count, Ordering::Relaxed); if _old_debt - token_count <= *SYNDICATE_CREDIT { self.notify.notify_one(); } } /// Suspend execution until enough "clear funds" exist in this /// account for some subsequent activity to be permissible. pub async fn ensure_clear_funds(&self) { let limit = *SYNDICATE_CREDIT; // tokio::task::yield_now().await; while self.balance() > limit { // tokio::task::yield_now().await; self.notify.notified().await; } } } impl Drop for Account { fn drop(&mut self) { ACCOUNTS.write().unwrap().remove(&self.id); } } impl LoanedItem { /// Construct a new `LoanedItem` containing `item`, recording /// `cost` work items against `account`. pub fn new(account: &Arc, cost: usize, item: T) -> Self { account.borrow(cost); LoanedItem { account: Arc::clone(account), cost, item } } } impl Drop for LoanedItem { fn drop(&mut self) { self.account.repay(self.cost); } } #[must_use] fn send_actions( tx: &UnboundedSender, account: &Arc, t: PendingEventQueue, ) -> ActorResult { let token_count = t.len(); tx.send(SystemMessage::Turn(LoanedItem::new(account, token_count, t))) .map_err(|_| error("Target actor not running", AnyValue::new(false))) } impl std::fmt::Debug for Mailbox { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { write!(f, "#", self.actor_id) } } impl std::hash::Hash for Mailbox { fn hash(&self, state: &mut H) { self.actor_id.hash(state) } } impl Eq for Mailbox {} impl PartialEq for Mailbox { fn eq(&self, other: &Mailbox) -> bool { self.actor_id == other.actor_id } } impl Ord for Mailbox { fn cmp(&self, other: &Mailbox) -> std::cmp::Ordering { return self.actor_id.cmp(&other.actor_id) } } impl PartialOrd for Mailbox { fn partial_cmp(&self, other: &Mailbox) -> Option { return Some(self.cmp(&other)) } } impl Drop for Mailbox { fn drop(&mut self) { let _ = self.tx.send(SystemMessage::Release); () } } impl Actor { /// Create a new actor. It still needs to be [`boot`ed][Self::boot]. pub fn new() -> Self { let (tx, rx) = unbounded_channel(); let actor_id = next_actor_id(); let root = Facet::new(None); // tracing::trace!(id = actor_id, "Actor::new"); let mut st = RunningActor { actor_id, tx, mailbox: Weak::new(), exit_hooks: Vec::new(), facet_nodes: Map::new(), facet_children: Map::new(), root: root.facet_id, }; st.facet_nodes.insert(root.facet_id, root); Actor { rx, ac_ref: ActorRef { actor_id, state: Arc::new(Mutex::new(ActorState::Running(st))), }, } } fn link(self, t_parent: &mut Activation) -> Self { if t_parent.active_facet().is_none() { panic!("No active facet when calling spawn_link"); } self.ac_ref.root_facet_ref().activate(Account::new(crate::name!("link")), |t_child| { t_parent.half_link(t_child); t_child.half_link(t_parent); Ok(()) }).expect("Failed during link"); self } /// Start the actor's mainloop. Takes ownership of `self`. The /// `name` is used as context for any log messages emitted by the /// actor. The `boot` function is called in the actor's context, /// and then the mainloop is entered. pub fn boot ActorResult>( mut self, name: tracing::Span, boot: F, ) -> ActorHandle { name.record("actor_id", &self.ac_ref.actor_id); tokio::spawn(async move { tracing::trace!("start"); self.run(|t| { t.facet(boot)?; Ok(()) }).await; let result = self.ac_ref.exit_status().expect("terminated"); match &result { Ok(()) => tracing::trace!("normal stop"), Err(e) => tracing::error!("error stop: {}", e), } result }.instrument(name)) } async fn run ActorResult>( &mut self, boot: F, ) -> () { let root_facet_ref = self.ac_ref.root_facet_ref(); let terminate = |result: ActorResult| { let _ = root_facet_ref.activate_exit(Account::new(crate::name!("shutdown")), |_| Some(result)); }; if root_facet_ref.activate(Account::new(crate::name!("boot")), boot).is_err() { return; } loop { tracing::trace!(actor_id = ?self.ac_ref.actor_id, "mainloop top"); match self.rx.recv().await { None => { return terminate(Err(error("Unexpected channel close", AnyValue::new(false)))); } Some(m) => match m { SystemMessage::Release => { tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::Release"); return terminate(Ok(())); } SystemMessage::Turn(mut loaned_item) => { tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::Turn"); let mut actions = std::mem::take(&mut loaned_item.item); let r = root_facet_ref.activate(Arc::clone(&loaned_item.account), |t| { loop { for action in actions.into_iter() { action(t)? } actions = std::mem::take(&mut t.pending.for_myself); if actions.is_empty() { break; } } Ok(()) }); if r.is_err() { return; } } SystemMessage::Crash(e) => { tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::Crash({:?})", &e); return terminate(Err(e)); } } } } } } impl Facet { fn new(parent_facet_id: Option) -> Self { Facet { facet_id: next_facet_id(), parent_facet_id, cleanup_actions: Map::new(), stop_actions: Vec::new(), linked_tasks: Map::new(), inert_check_preventers: Arc::new(AtomicU64::new(0)), } } fn prevent_inert_check(&mut self) -> impl FnOnce() { let inert_check_preventers = Arc::clone(&self.inert_check_preventers); let armed = AtomicU64::new(1); inert_check_preventers.fetch_add(1, Ordering::Relaxed); move || { match armed.compare_exchange(1, 0, Ordering::SeqCst, Ordering::SeqCst) { Ok(_) => { inert_check_preventers.fetch_sub(1, Ordering::Relaxed); () } Err(_) => (), } } } fn insert_retract_cleanup_action(&mut self, r: &Arc>, handle: Handle) { let r = Arc::clone(r); self.cleanup_actions.insert( handle, CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new( move |t| t.with_entity(&r, |t, e| e.retract(t, handle))))); } } fn panicked_err() -> Option { Some(Err(error("Actor panicked", AnyValue::new(false)))) } impl ActorRef { /// Uses an internal mutex to access the internal state: takes the /// lock, calls `f` with the internal state, releases the lock, /// and returns the result of `f`. pub fn access) -> R>(&self, f: F) -> R { match self.state.lock() { Err(_) => f(None), Ok(mut g) => f(Some(&mut *g)), } } /// Retrieves the exit status of the denoted actor. If it is still /// running, yields `None`; otherwise, yields `Some(Ok(()))` if it /// exited normally, or `Some(Err(_))` if it terminated /// abnormally. pub fn exit_status(&self) -> Option { self.access(|s| s.map_or_else( panicked_err, |state| match state { ActorState::Running(_) => None, ActorState::Terminated { exit_status } => Some((**exit_status).clone()), })) } fn facet_ref(&self, facet_id: FacetId) -> FacetRef { FacetRef { actor: self.clone(), facet_id, } } fn root_facet_id(&self) -> FacetId { self.access(|s| match s.expect("Actor missing its state") { ActorState::Terminated { .. } => panic!("Actor unexpectedly in terminated state"), ActorState::Running(ra) => ra.root, // what a lot of work to get this one number }) } fn root_facet_ref(&self) -> FacetRef { self.facet_ref(self.root_facet_id()) } } impl RunningActor { /// Requests a shutdown of the actor. The shutdown request is /// handled by the actor's main loop, causing it to terminate with /// exit status `Ok(())`. pub fn shutdown(&self) { let _ = self.tx.send(SystemMessage::Release); } fn mailbox(&mut self) -> Arc { match self.mailbox.upgrade() { None => { let new_mailbox = Arc::new(Mailbox { actor_id: self.actor_id, tx: self.tx.clone(), }); self.mailbox = Arc::downgrade(&new_mailbox); new_mailbox } Some(m) => m } } /// Registers the entity `r` in the list of exit hooks for this /// actor. When the actor terminates, `r`'s /// [`exit_hook`][Entity::exit_hook] will be called. pub fn add_exit_hook(&mut self, r: &Arc>) { let r = Arc::clone(r); self.exit_hooks.push(Box::new( move |t, exit_status| r.internal_with_entity(|e| e.exit_hook(t, &exit_status)))) } fn get_facet(&mut self, facet_id: FacetId) -> Option<&mut Facet> { self.facet_nodes.get_mut(&facet_id) } /// See the definition of an [inert facet][Facet#inert-facets]. fn facet_exists_and_is_inert(&mut self, facet_id: FacetId) -> bool { let no_kids = self.facet_children.get(&facet_id).map(|cs| cs.is_empty()).unwrap_or(true); if let Some(f) = self.get_facet(facet_id) { no_kids && f.cleanup_actions.is_empty() && f.linked_tasks.is_empty() && f.inert_check_preventers.load(Ordering::Relaxed) == 0 } else { false } } } impl Drop for Actor { fn drop(&mut self) { self.rx.close(); tracing::trace!(actor_id = ?self.ac_ref.actor_id, "Actor::drop"); } } impl Drop for Facet { fn drop(&mut self) { for (_task_id, token) in std::mem::take(&mut self.linked_tasks).into_iter() { token.cancel(); } let to_clear = std::mem::take(&mut self.cleanup_actions); { let mut b = EventBuffer::new(Account::new(crate::name!("drop"))); for (_handle, r) in to_clear.into_iter() { tracing::trace!(h = ?_handle, "retract on termination"); b.execute_cleanup_action(r); } } tracing::trace!(facet_id = ?self.facet_id, "Facet::drop"); } } /// Directly injects `action` into `mailbox`, billing subsequent activity against `account`. /// /// Primarily for use by [linked tasks][Activation::linked_task]. #[must_use] pub fn external_event(mailbox: &Arc, account: &Arc, action: Action) -> ActorResult { send_actions(&mailbox.tx, account, vec![action]) } /// Directly injects `actions` into `mailbox`, billing subsequent activity against `account`. /// /// Primarily for use by [linked tasks][Activation::linked_task]. #[must_use] pub fn external_events(mailbox: &Arc, account: &Arc, actions: PendingEventQueue) -> ActorResult { send_actions(&mailbox.tx, account, actions) } impl Ref { /// Supplies the behaviour (`e`) for a `Ref` created via /// [`create_inert`][Activation::create_inert]. /// /// # Panics /// /// Panics if this `Ref` has already been given a behaviour. pub fn become_entity>(&self, e: E) { let mut g = self.target.lock().expect("unpoisoned"); if g.is_some() { panic!("Double initialization of Ref"); } *g = Some(Box::new(e)); } fn internal_with_entity) -> R>(&self, f: F) -> R { let mut g = self.target.lock().expect("unpoisoned"); f(g.as_mut().expect("initialized").as_mut()) } } impl Ref { /// Retrieves a process-unique identifier for the `Ref`; `Ref`s /// are compared by this identifier. pub fn oid(&self) -> usize { std::ptr::addr_of!(*self) as usize } } impl PartialEq for Ref { fn eq(&self, other: &Self) -> bool { self.oid() == other.oid() } } impl Eq for Ref {} impl std::hash::Hash for Ref { fn hash(&self, hash: &mut H) where H: std::hash::Hasher { self.oid().hash(hash) } } impl PartialOrd for Ref { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } impl Ord for Ref { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.oid().cmp(&other.oid()) } } impl std::fmt::Debug for Ref { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { write!(f, "⌜{}/{}:{:016x}⌝", self.mailbox.actor_id, self.facet_id, self.oid()) } } impl Cap { /// Given a `Ref`, where `M` is interconvertible with /// `AnyValue`, yields a `Cap` for the referenced entity. The /// `Cap` automatically decodes presented `AnyValue`s into /// instances of `M`. pub fn guard(underlying: &Arc>) -> Arc where for<'a> &'a M: Into, for<'a> M: TryFrom<&'a AnyValue>, { Self::new(&Arc::new(Ref { mailbox: Arc::clone(&underlying.mailbox), facet_id: underlying.facet_id, target: Mutex::new(Some(Box::new(Guard { underlying: underlying.clone() }))), })) } /// Directly constructs a `Cap` for `underlying`. pub fn new(underlying: &Arc>) -> Arc { Arc::new(Cap { underlying: Arc::clone(underlying), attenuation: Vec::new(), }) } /// Yields a fresh `Cap` for `self`'s `underlying`, copying the /// existing attenuation of `self` to the new `Cap` and adding /// `attenuation` to it. pub fn attenuate(&self, attenuation: &sturdy::Attenuation) -> Result, CaveatError> { let mut r = Cap { attenuation: self.attenuation.clone(), .. self.clone() }; r.attenuation.extend(attenuation.check()?); Ok(Arc::new(r)) } /// Applies the contained attenuation to `a`, yielding `None` if /// `a` is filtered out, or `Some(_)` if it is accepted (and /// possibly transformed). pub fn rewrite(&self, mut a: AnyValue) -> Option { for c in &self.attenuation { match c.rewrite(&a) { Some(v) => a = v, None => return None, } } Some(a) } /// Translates `m` into an `AnyValue`, passes it through /// [`rewrite`][Self::rewrite], and then /// [`assert`s][Activation::assert] it using the activation `t`. pub fn assert>(&self, t: &mut Activation, m: M) -> Option { self.rewrite(m.into()).map(|m| t.assert(&self.underlying, m)) } /// Translates `m` into an `AnyValue`, passes it through /// [`rewrite`][Self::rewrite], and then sends it via method /// [`message`][Activation::message] on the activation `t`. pub fn message>(&self, t: &mut Activation, m: M) { if let Some(m) = self.rewrite(m.into()) { t.message(&self.underlying, m) } } } impl std::fmt::Debug for Cap { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { if self.attenuation.is_empty() { self.underlying.fmt(f) } else { write!(f, "⌜{}/{}:{:016x}\\{:?}⌝", self.underlying.mailbox.actor_id, self.underlying.facet_id, self.underlying.oid(), self.attenuation) } } } impl Domain for Cap {} impl std::convert::TryFrom<&IOValue> for Cap { type Error = preserves_schema::support::ParseError; fn try_from(_v: &IOValue) -> Result { panic!("Attempted to serialize Cap via IOValue"); } } impl std::convert::From<&Cap> for IOValue { fn from(_v: &Cap) -> IOValue { panic!("Attempted to deserialize Ref via IOValue"); } } impl Entity for Guard where for<'a> &'a M: Into, for<'a> M: TryFrom<&'a AnyValue>, { fn assert(&mut self, t: &mut Activation, a: AnyValue, h: Handle) -> ActorResult { match M::try_from(&a) { Ok(a) => t.with_entity(&self.underlying, |t, e| e.assert(t, a, h)), Err(_) => Ok(()), } } fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult { t.with_entity(&self.underlying, |t, e| e.retract(t, h)) } fn message(&mut self, t: &mut Activation, m: AnyValue) -> ActorResult { match M::try_from(&m) { Ok(m) => t.with_entity(&self.underlying, |t, e| e.message(t, m)), Err(_) => Ok(()), } } fn sync(&mut self, t: &mut Activation, peer: Arc>) -> ActorResult { t.with_entity(&self.underlying, |t, e| e.sync(t, peer)) } fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) -> ActorResult { self.underlying.internal_with_entity(|e| e.exit_hook(t, exit_status)) } } impl Entity for StopOnRetract { fn retract(&mut self, t: &mut Activation, _h: Handle) -> ActorResult { t.stop(); Ok(()) } } /// A convenient Syndicate-enhanced variation on /// [`tracing::info_span`]. /// /// Includes fields `actor_id`, `task_id` and `oid`, so that they show /// up in those circumstances where they happen to be defined as part /// of the operation of the [`crate::actor`] module. #[macro_export] macro_rules! name { () => {tracing::info_span!(actor_id = tracing::field::Empty, task_id = tracing::field::Empty, oid = tracing::field::Empty)}; ($($item:tt)*) => {tracing::info_span!($($item)*, actor_id = tracing::field::Empty, task_id = tracing::field::Empty, oid = tracing::field::Empty)} }