diff --git a/syndicate/doc/flow-control.md b/syndicate/doc/flow-control.md index f1059a5..447cb68 100644 --- a/syndicate/doc/flow-control.md +++ b/syndicate/doc/flow-control.md @@ -1,3 +1,4 @@ # Flow control - Account, LoanedItem + - start_debt_reporter diff --git a/syndicate/doc/linked-tasks.md b/syndicate/doc/linked-tasks.md new file mode 100644 index 0000000..28e9738 --- /dev/null +++ b/syndicate/doc/linked-tasks.md @@ -0,0 +1,3 @@ +# Linked Tasks + + - linked_task diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index b672cff..45d87b6 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -2,6 +2,7 @@ include_str!("../doc/actor.md"), include_str!("../doc/what-is-an-actor.md"), include_str!("../doc/flow-control.md"), + include_str!("../doc/linked-tasks.md"), )] use super::schemas::sturdy; @@ -185,7 +186,9 @@ enum CleanupAction { type CleanupActions = Map; type Action = Box ActorResult>; -type PendingEventQueue = Vec; + +#[doc(hidden)] +pub type PendingEventQueue = Vec; /// The main API for programming Syndicated Actor objects. /// @@ -224,10 +227,11 @@ pub struct Account { } /// A `LoanedItem` is a `T` with an associated `cost` recorded -/// against it in the ledger of a given [`Account`]. +/// against it in the ledger of a given [`Account`]. The cost is +/// repaid automatically when the `LoanedItem` is `Drop`ped. /// -/// It is part of the flow control mechanism - see [the module-level -/// documentation][crate::actor#flow-control] for more. +/// `LoanedItem`s are part of the flow control mechanism - see [the +/// module-level documentation][crate::actor#flow-control] for more. #[derive(Debug)] pub struct LoanedItem { /// The account against which this loan is recorded. @@ -280,7 +284,9 @@ pub enum ActorState { }, } +/// State associated with each non-terminated [`Actor`]. pub struct RunningActor { + /// The ID of the actor this state belongs to. pub actor_id: ActorId, tx: UnboundedSender, mailbox: Weak, @@ -296,7 +302,9 @@ pub struct RunningActor { /// The object can be in the same actor, in a different local /// (in-process) actor, or accessible across a network link. pub struct Ref { + /// Mailbox of the actor owning the referenced entity. pub mailbox: Arc, + /// Mutex owning and guarding the state backing the referenced entity. pub target: Mutex>>>, } @@ -316,7 +324,9 @@ pub struct Ref { /// [Miller 2006]: http://www.erights.org/talks/thesis/markm-thesis.pdf #[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct Cap { + #[doc(hidden)] pub underlying: Arc>, + #[doc(hidden)] pub attenuation: Vec, } @@ -338,11 +348,13 @@ where const BUMP_AMOUNT: u8 = 10; static NEXT_ACTOR_ID: AtomicU64 = AtomicU64::new(1); +#[doc(hidden)] pub fn next_actor_id() -> ActorId { NEXT_ACTOR_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed) } static NEXT_HANDLE: AtomicU64 = AtomicU64::new(3); +/// Allocate a process-unique `Handle`. pub fn next_handle() -> Handle { NEXT_HANDLE.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed) } @@ -350,6 +362,7 @@ pub fn next_handle() -> Handle { static NEXT_ACCOUNT_ID: AtomicU64 = AtomicU64::new(4); preserves_schema::support::lazy_static! { + #[doc(hidden)] pub static ref SYNDICATE_CREDIT: i64 = { let credit = std::env::var("SYNDICATE_CREDIT").unwrap_or("100".to_owned()) @@ -358,10 +371,12 @@ preserves_schema::support::lazy_static! { credit }; + #[doc(hidden)] pub static ref ACCOUNTS: RwLock)>> = RwLock::new(Map::new()); } +/// Starts a "debt reporter" actor which periodically logs information about active [`Account`]s. pub fn start_debt_reporter() { Actor::new().boot(crate::name!("debt-reporter"), |t| { t.state.linked_task(crate::name!("tick"), async { @@ -404,6 +419,12 @@ impl<'activation> Activation<'activation> { } } + /// Constructs and executes `f` in a new "turn" for `actor`. If + /// `f` returns `Ok(())`, [commits the turn][Self::deliver] and + /// performs the buffered actions; otherwise, [abandons the + /// turn][Self::clear] and discards the buffered actions. + /// + /// Bills any activity to `account`. pub fn for_actor( actor: &ActorRef, account: Arc, @@ -420,6 +441,14 @@ impl<'activation> Activation<'activation> { } } + /// Constructs and executes `f` in a new "turn" for `actor`. If + /// `f` returns `Some(exit_status)`, terminates `actor` with that + /// `exit_status`. Otherwise, if `f` returns `None`, leaves + /// `actor` in runnable state. [Commits buffered + /// actions][Self::deliver] unless `actor` terminates with an + /// `Err` status. + /// + /// Bills any activity to `account`. pub fn for_actor_exit( actor: &ActorRef, account: Arc, @@ -432,10 +461,15 @@ impl<'activation> Activation<'activation> { Ok(mut g) => match &mut *g { ActorState::Terminated { exit_status } => Some((**exit_status).clone()), - ActorState::Running(state) => - match f(&mut Activation::make(actor, account, state)) { + ActorState::Running(state) => { + let mut activation = Activation::make(actor, account, state); + match f(&mut activation) { None => None, Some(exit_status) => { + if exit_status.is_err() { + activation.clear(); + } + drop(activation); let exit_status = Arc::new(exit_status); let mut t = Activation::make(actor, Account::new(crate::name!("shutdown")), state); for action in std::mem::take(&mut t.state.exit_hooks) { @@ -448,7 +482,8 @@ impl<'activation> Activation<'activation> { }; Some((*exit_status).clone()) } - }, + } + } } } } @@ -459,6 +494,9 @@ impl<'activation> Activation<'activation> { } } + /// Core API: assert `a` at recipient `r`. + /// + /// Returns the [`Handle`] for the new assertion. pub fn assert(&mut self, r: &Arc>, a: M) -> Handle { let handle = next_handle(); { @@ -476,6 +514,20 @@ impl<'activation> Activation<'activation> { handle } + /// Core API: assert `a` at `r`, which must be a `Ref` within the active actor. + /// + /// It's perfectly OK to use method [`assert`][Self::assert] even + /// for `Ref`s that are part of the active actor. The difference + /// between `assert` and `assert_for_myself` is that `r`'s handler + /// for `assert` runs in a separate, later [`Activation`], while + /// `r`'s handler for `assert_for_myself` runs in *this* + /// [`Activation`], before it commits. + /// + /// Returns the [`Handle`] for the new assertion. + /// + /// # Panics + /// + /// Panics if `r` is not part of the active actor. pub fn assert_for_myself(&mut self, r: &Arc>, a: M) -> Handle { self.immediate_oid(r); let handle = next_handle(); @@ -494,18 +546,30 @@ impl<'activation> Activation<'activation> { handle } + /// Core API: retract a previously-established assertion. pub fn retract(&mut self, handle: Handle) { if let Some(d) = self.state.cleanup_actions.remove(&handle) { self.pending.execute_cleanup_action(d) } } + /// Core API: send message `m` to recipient `r`. pub fn message(&mut self, r: &Arc>, m: M) { let r = Arc::clone(r); self.pending.queue_for(&r).push(Box::new( move |t| r.with_entity(|e| e.message(t, m)))) } + /// Core API: send message `m` to recipient `r`, which must be a + /// `Ref` within the active actor. + /// + /// Method `message_for_myself` is to [`message`][Self::message] + /// as [`assert_for_myself`][Self::assert_for_myself] is to + /// [`assert`][Self::assert]. + /// + /// # Panics + /// + /// Panics if `r` is not part of the active actor. pub fn message_for_myself(&mut self, r: &Arc>, m: M) { self.immediate_oid(r); let r = Arc::clone(r); @@ -513,16 +577,38 @@ impl<'activation> Activation<'activation> { move |t| r.with_entity(|e| e.message(t, m)))) } + /// Core API: begins a synchronisation with `r`. + /// + /// Once the synchronisation request reaches `r`'s actor, it will + /// send a response to `peer`, which acts as a continuation for + /// the synchronisation request. pub fn sync(&mut self, r: &Arc>, peer: Arc>) { let r = Arc::clone(r); self.pending.queue_for(&r).push(Box::new( move |t| r.with_entity(|e| e.sync(t, peer)))) } + /// Retrieve the [`Account`] against which actions are recorded. pub fn account(&self) -> &Arc { &self.pending.account } + /// Discards all pending actions in this activation. + pub fn clear(&mut self) { + self.pending.clear(); + } + + /// Delivers all pending actions in this activation. + /// + /// This is called automatically when an `Activation` is + /// `Drop`ped. + /// + /// # Panics + /// + /// Panics if any pending actions "`for_myself`" (resulting from + /// [`assert_for_myself`][Self::assert_for_myself] or + /// [`message_for_myself`][Self::message_for_myself]) are + /// outstanding at the time of the call. pub fn deliver(&mut self) { self.pending.deliver(); } @@ -555,6 +641,11 @@ impl EventBuffer { .or_insert((mailbox.tx.clone(), Vec::new())).1 } + fn clear(&mut self) { + self.queues = HashMap::new(); + self.for_myself = PendingEventQueue::new(); + } + fn deliver(&mut self) { if !self.for_myself.is_empty() { panic!("Unprocessed for_myself events remain at deliver() time"); @@ -572,6 +663,8 @@ impl Drop for EventBuffer { } impl Account { + /// Construct a new `Account`, storing `name` within it for + /// debugging use. pub fn new(name: tracing::Span) -> Arc { let id = NEXT_ACCOUNT_ID.fetch_add(1, Ordering::Relaxed); let debt = Arc::new(AtomicI64::new(0)); @@ -583,15 +676,19 @@ impl Account { }) } + /// Retrieve the current account balance: the number of + /// currently-outstanding work items. pub fn balance(&self) -> i64 { self.debt.load(Ordering::Relaxed) } + /// Borrow `token_count` work items against this account. pub fn borrow(&self, token_count: usize) { let token_count: i64 = token_count.try_into().expect("manageable token count"); self.debt.fetch_add(token_count, Ordering::Relaxed); } + /// Repay `token_count` work items previously borrowed against this account. pub fn repay(&self, token_count: usize) { let token_count: i64 = token_count.try_into().expect("manageable token count"); let _old_debt = self.debt.fetch_sub(token_count, Ordering::Relaxed); @@ -600,6 +697,8 @@ impl Account { } } + /// Suspend execution until enough "clear funds" exist in this + /// account for some subsequent activity to be permissible. pub async fn ensure_clear_funds(&self) { let limit = *SYNDICATE_CREDIT; // tokio::task::yield_now().await; @@ -617,6 +716,8 @@ impl Drop for Account { } impl LoanedItem { + /// Construct a new `LoanedItem` containing `item`, recording + /// `cost` work items against `account`. pub fn new(account: &Arc, cost: usize, item: T) -> Self { account.borrow(cost); LoanedItem { account: Arc::clone(account), cost, item } @@ -679,6 +780,8 @@ impl Drop for Mailbox { } impl Actor { + /// Create a new actor. It still needs to be + /// [`start`ed][Self::start]/[`boot`ed][Self::boot]. pub fn new() -> Self { let (tx, rx) = unbounded_channel(); let actor_id = next_actor_id(); @@ -700,6 +803,9 @@ impl Actor { } } + /// Create and start a new actor to own entity `e`. Returns a + /// `Ref` to the new entity. The `name` is used as context for any + /// log messages emitted by the new actor. pub fn create_and_start + Send + 'static>( name: tracing::Span, e: E, @@ -709,6 +815,11 @@ impl Actor { r } + /// Create and start a new actor, returning a `Ref` to a fresh + /// entity contained within it. Before using the `Ref`, its + /// initialization must be completed by calling + /// [`become_entity`][Ref::become_entity] on it. The `name` is + /// used as context for any log messages emitted by the new actor. pub fn create_and_start_inert(name: tracing::Span) -> Arc> { let ac = Self::new(); let r = ac.ac_ref.access(|s| s.unwrap().expect_running().create_inert()); @@ -716,6 +827,10 @@ impl Actor { r } + /// Start the actor's mainloop. Takes ownership of `self`. The + /// `name` is used as context for any log messages emitted by the + /// actor. The `boot` function is called in the actor's context, + /// and then the mainloop is entered. pub fn boot ActorResult>( mut self, name: tracing::Span, @@ -734,6 +849,10 @@ impl Actor { }.instrument(name)) } + /// Start the actor's mainloop. Takes ownership of `self`. The + /// `name` is used as context for any log messages emitted by the + /// actor. Delegates to [`boot`][Self::boot], with a no-op `boot` + /// function. pub fn start(self, name: tracing::Span) -> ActorHandle { self.boot(name, |_ac| Ok(())) } @@ -789,6 +908,9 @@ fn panicked_err() -> Option { } impl ActorRef { + /// Uses an internal mutex to access the internal state: takes the + /// lock, calls `f` with the internal state, releases the lock, + /// and returns the result of `f`. pub fn access) -> R>(&self, f: F) -> R { match self.state.lock() { Err(_) => f(None), @@ -796,6 +918,10 @@ impl ActorRef { } } + /// Retrieves the exit status of the denoted actor. If it is still + /// running, yields `None`; otherwise, yields `Some(Ok(()))` if it + /// exited normally, or `Some(Err(_))` if it terminated + /// abnormally. pub fn exit_status(&self) -> Option { self.access(|s| s.map_or_else( panicked_err, @@ -816,6 +942,9 @@ impl ActorState { } impl RunningActor { + /// Requests a shutdown of the actor. The shutdown request is + /// handled by the actor's main loop, causing it to terminate with + /// exit status `Ok(())`. pub fn shutdown(&self) { let _ = self.tx.send(SystemMessage::Release); } @@ -834,16 +963,21 @@ impl RunningActor { } } + /// Construct an entity with behaviour [`InertEntity`] within this + /// actor. pub fn inert_entity(&mut self) -> Arc> { self.create(InertEntity) } + /// Construct an entity with behaviour `e` within this actor. pub fn create + Send + 'static>(&mut self, e: E) -> Arc> { let r = self.create_inert(); r.become_entity(e); r } + /// Construct an entity whose behaviour will be specified later + /// (via [`become_entity`][Ref::become_entity]). pub fn create_inert(&mut self) -> Arc> { Arc::new(Ref { mailbox: self.mailbox(), @@ -851,6 +985,9 @@ impl RunningActor { }) } + /// Registers the entity `r` in the list of exit hooks for this + /// actor. When the actor terminates, `r`'s + /// [`exit_hook`][Entity::exit_hook] will be called. pub fn add_exit_hook(&mut self, r: &Arc>) { let r = Arc::clone(r); self.exit_hooks.push(Box::new(move |t, exit_status| { @@ -858,6 +995,9 @@ impl RunningActor { })) } + /// Start a new [linked task][crate::actor#linked-tasks] attached + /// to this actor. The function `boot` is the main function of the + /// new task. Uses `name` for log messages emitted by the task. pub fn linked_task>( &mut self, name: tracing::Span, @@ -923,17 +1063,29 @@ impl Drop for RunningActor { } } +/// Directly injects `action` into `mailbox`, billing subsequent activity against `account`. +/// +/// Primarily for use by [linked tasks][RunningActor::linked_task]. #[must_use] pub fn external_event(mailbox: &Arc, account: &Arc, action: Action) -> ActorResult { send_actions(&mailbox.tx, account, vec![action]) } +/// Directly injects `actions` into `mailbox`, billing subsequent activity against `account`. +/// +/// Primarily for use by [linked tasks][RunningActor::linked_task]. #[must_use] -pub fn external_events(mailbox: &Arc, account: &Arc, events: PendingEventQueue) -> ActorResult { - send_actions(&mailbox.tx, account, events) +pub fn external_events(mailbox: &Arc, account: &Arc, actions: PendingEventQueue) -> ActorResult { + send_actions(&mailbox.tx, account, actions) } impl Ref { + /// Supplies the behaviour (`e`) for a `Ref` created via + /// [`create_inert`][RunningActor::create_inert]. + /// + /// # Panics + /// + /// Panics if this `Ref` has already been given a behaviour. pub fn become_entity>(&self, e: E) { let mut g = self.target.lock().expect("unpoisoned"); if g.is_some() { @@ -942,6 +1094,7 @@ impl Ref { *g = Some(Box::new(e)); } + #[doc(hidden)] pub fn with_entity) -> R>(&self, f: F) -> R { let mut g = self.target.lock().expect("unpoisoned"); f(g.as_mut().expect("initialized").as_mut()) @@ -949,6 +1102,8 @@ impl Ref { } impl Ref { + /// Retrieves a process-unique identifier for the `Ref`; `Ref`s + /// are compared by this identifier. pub fn oid(&self) -> usize { std::ptr::addr_of!(*self) as usize } @@ -987,6 +1142,10 @@ impl std::fmt::Debug for Ref { } impl Cap { + /// Given a `Ref`, where `M` is interconvertible with + /// `AnyValue`, yields a `Cap` for the referenced entity. The + /// `Cap` automatically decodes presented `AnyValue`s into + /// instances of `M`. pub fn guard(underlying: &Arc>) -> Arc where for<'a> &'a M: Into, @@ -998,6 +1157,7 @@ impl Cap { })) } + /// Directly constructs a `Cap` for `underlying`. pub fn new(underlying: &Arc>) -> Arc { Arc::new(Cap { underlying: Arc::clone(underlying), @@ -1005,12 +1165,18 @@ impl Cap { }) } + /// Yields a fresh `Cap` for `self`'s `underlying`, copying the + /// existing attenuation of `self` to the new `Cap` and adding + /// `attenuation` to it. pub fn attenuate(&self, attenuation: &sturdy::Attenuation) -> Result, CaveatError> { let mut r = Cap { attenuation: self.attenuation.clone(), .. self.clone() }; r.attenuation.extend(attenuation.check()?); Ok(Arc::new(r)) } + /// Applies the contained attenuation to `a`, yielding `None` if + /// `a` is filtered out, or `Some(_)` if it is accepted (and + /// possibly transformed). pub fn rewrite(&self, mut a: AnyValue) -> Option { for c in &self.attenuation { match c.rewrite(&a) { @@ -1021,10 +1187,16 @@ impl Cap { Some(a) } + /// Translates `m` into an `AnyValue`, passes it through + /// [`rewrite`][Self::rewrite], and then + /// [`assert`s][Activation::assert] it using the activation `t`. pub fn assert>(&self, t: &mut Activation, m: M) -> Option { self.rewrite(m.into()).map(|m| t.assert(&self.underlying, m)) } + /// Translates `m` into an `AnyValue`, passes it through + /// [`rewrite`][Self::rewrite], and then sends it via method + /// [`message`][Activation::message] on the activation `t`. pub fn message>(&self, t: &mut Activation, m: M) { if let Some(m) = self.rewrite(m.into()) { t.message(&self.underlying, m)