Finish actor.rs docs

This commit is contained in:
Tony Garnock-Jones 2021-08-13 20:12:11 -04:00
parent 931c4e5cd1
commit aee65ea029
3 changed files with 185 additions and 9 deletions

View File

@ -1,3 +1,4 @@
# Flow control
- Account, LoanedItem
- start_debt_reporter

View File

@ -0,0 +1,3 @@
# Linked Tasks
- linked_task

View File

@ -2,6 +2,7 @@
include_str!("../doc/actor.md"),
include_str!("../doc/what-is-an-actor.md"),
include_str!("../doc/flow-control.md"),
include_str!("../doc/linked-tasks.md"),
)]
use super::schemas::sturdy;
@ -185,7 +186,9 @@ enum CleanupAction {
type CleanupActions = Map<Handle, CleanupAction>;
type Action = Box<dyn Send + FnOnce(&mut Activation) -> ActorResult>;
type PendingEventQueue = Vec<Action>;
#[doc(hidden)]
pub type PendingEventQueue = Vec<Action>;
/// The main API for programming Syndicated Actor objects.
///
@ -224,10 +227,11 @@ pub struct Account {
}
/// A `LoanedItem<T>` is a `T` with an associated `cost` recorded
/// against it in the ledger of a given [`Account`].
/// against it in the ledger of a given [`Account`]. The cost is
/// repaid automatically when the `LoanedItem<T>` is `Drop`ped.
///
/// It is part of the flow control mechanism - see [the module-level
/// documentation][crate::actor#flow-control] for more.
/// `LoanedItem`s are part of the flow control mechanism - see [the
/// module-level documentation][crate::actor#flow-control] for more.
#[derive(Debug)]
pub struct LoanedItem<T> {
/// The account against which this loan is recorded.
@ -280,7 +284,9 @@ pub enum ActorState {
},
}
/// State associated with each non-terminated [`Actor`].
pub struct RunningActor {
/// The ID of the actor this state belongs to.
pub actor_id: ActorId,
tx: UnboundedSender<SystemMessage>,
mailbox: Weak<Mailbox>,
@ -296,7 +302,9 @@ pub struct RunningActor {
/// The object can be in the same actor, in a different local
/// (in-process) actor, or accessible across a network link.
pub struct Ref<M> {
/// Mailbox of the actor owning the referenced entity.
pub mailbox: Arc<Mailbox>,
/// Mutex owning and guarding the state backing the referenced entity.
pub target: Mutex<Option<Box<dyn Entity<M>>>>,
}
@ -316,7 +324,9 @@ pub struct Ref<M> {
/// [Miller 2006]: http://www.erights.org/talks/thesis/markm-thesis.pdf
#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct Cap {
#[doc(hidden)]
pub underlying: Arc<Ref<AnyValue>>,
#[doc(hidden)]
pub attenuation: Vec<CheckedCaveat>,
}
@ -338,11 +348,13 @@ where
const BUMP_AMOUNT: u8 = 10;
static NEXT_ACTOR_ID: AtomicU64 = AtomicU64::new(1);
#[doc(hidden)]
pub fn next_actor_id() -> ActorId {
NEXT_ACTOR_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed)
}
static NEXT_HANDLE: AtomicU64 = AtomicU64::new(3);
/// Allocate a process-unique `Handle`.
pub fn next_handle() -> Handle {
NEXT_HANDLE.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed)
}
@ -350,6 +362,7 @@ pub fn next_handle() -> Handle {
static NEXT_ACCOUNT_ID: AtomicU64 = AtomicU64::new(4);
preserves_schema::support::lazy_static! {
#[doc(hidden)]
pub static ref SYNDICATE_CREDIT: i64 = {
let credit =
std::env::var("SYNDICATE_CREDIT").unwrap_or("100".to_owned())
@ -358,10 +371,12 @@ preserves_schema::support::lazy_static! {
credit
};
#[doc(hidden)]
pub static ref ACCOUNTS: RwLock<Map<u64, (tracing::Span, Arc<AtomicI64>)>> =
RwLock::new(Map::new());
}
/// Starts a "debt reporter" actor which periodically logs information about active [`Account`]s.
pub fn start_debt_reporter() {
Actor::new().boot(crate::name!("debt-reporter"), |t| {
t.state.linked_task(crate::name!("tick"), async {
@ -404,6 +419,12 @@ impl<'activation> Activation<'activation> {
}
}
/// Constructs and executes `f` in a new "turn" for `actor`. If
/// `f` returns `Ok(())`, [commits the turn][Self::deliver] and
/// performs the buffered actions; otherwise, [abandons the
/// turn][Self::clear] and discards the buffered actions.
///
/// Bills any activity to `account`.
pub fn for_actor<F>(
actor: &ActorRef,
account: Arc<Account>,
@ -420,6 +441,14 @@ impl<'activation> Activation<'activation> {
}
}
/// Constructs and executes `f` in a new "turn" for `actor`. If
/// `f` returns `Some(exit_status)`, terminates `actor` with that
/// `exit_status`. Otherwise, if `f` returns `None`, leaves
/// `actor` in runnable state. [Commits buffered
/// actions][Self::deliver] unless `actor` terminates with an
/// `Err` status.
///
/// Bills any activity to `account`.
pub fn for_actor_exit<F>(
actor: &ActorRef,
account: Arc<Account>,
@ -432,10 +461,15 @@ impl<'activation> Activation<'activation> {
Ok(mut g) => match &mut *g {
ActorState::Terminated { exit_status } =>
Some((**exit_status).clone()),
ActorState::Running(state) =>
match f(&mut Activation::make(actor, account, state)) {
ActorState::Running(state) => {
let mut activation = Activation::make(actor, account, state);
match f(&mut activation) {
None => None,
Some(exit_status) => {
if exit_status.is_err() {
activation.clear();
}
drop(activation);
let exit_status = Arc::new(exit_status);
let mut t = Activation::make(actor, Account::new(crate::name!("shutdown")), state);
for action in std::mem::take(&mut t.state.exit_hooks) {
@ -448,7 +482,8 @@ impl<'activation> Activation<'activation> {
};
Some((*exit_status).clone())
}
},
}
}
}
}
}
@ -459,6 +494,9 @@ impl<'activation> Activation<'activation> {
}
}
/// Core API: assert `a` at recipient `r`.
///
/// Returns the [`Handle`] for the new assertion.
pub fn assert<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, a: M) -> Handle {
let handle = next_handle();
{
@ -476,6 +514,20 @@ impl<'activation> Activation<'activation> {
handle
}
/// Core API: assert `a` at `r`, which must be a `Ref<M>` within the active actor.
///
/// It's perfectly OK to use method [`assert`][Self::assert] even
/// for `Ref`s that are part of the active actor. The difference
/// between `assert` and `assert_for_myself` is that `r`'s handler
/// for `assert` runs in a separate, later [`Activation`], while
/// `r`'s handler for `assert_for_myself` runs in *this*
/// [`Activation`], before it commits.
///
/// Returns the [`Handle`] for the new assertion.
///
/// # Panics
///
/// Panics if `r` is not part of the active actor.
pub fn assert_for_myself<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, a: M) -> Handle {
self.immediate_oid(r);
let handle = next_handle();
@ -494,18 +546,30 @@ impl<'activation> Activation<'activation> {
handle
}
/// Core API: retract a previously-established assertion.
pub fn retract(&mut self, handle: Handle) {
if let Some(d) = self.state.cleanup_actions.remove(&handle) {
self.pending.execute_cleanup_action(d)
}
}
/// Core API: send message `m` to recipient `r`.
pub fn message<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, m: M) {
let r = Arc::clone(r);
self.pending.queue_for(&r).push(Box::new(
move |t| r.with_entity(|e| e.message(t, m))))
}
/// Core API: send message `m` to recipient `r`, which must be a
/// `Ref<M>` within the active actor.
///
/// Method `message_for_myself` is to [`message`][Self::message]
/// as [`assert_for_myself`][Self::assert_for_myself] is to
/// [`assert`][Self::assert].
///
/// # Panics
///
/// Panics if `r` is not part of the active actor.
pub fn message_for_myself<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, m: M) {
self.immediate_oid(r);
let r = Arc::clone(r);
@ -513,16 +577,38 @@ impl<'activation> Activation<'activation> {
move |t| r.with_entity(|e| e.message(t, m))))
}
/// Core API: begins a synchronisation with `r`.
///
/// Once the synchronisation request reaches `r`'s actor, it will
/// send a response to `peer`, which acts as a continuation for
/// the synchronisation request.
pub fn sync<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, peer: Arc<Ref<Synced>>) {
let r = Arc::clone(r);
self.pending.queue_for(&r).push(Box::new(
move |t| r.with_entity(|e| e.sync(t, peer))))
}
/// Retrieve the [`Account`] against which actions are recorded.
pub fn account(&self) -> &Arc<Account> {
&self.pending.account
}
/// Discards all pending actions in this activation.
pub fn clear(&mut self) {
self.pending.clear();
}
/// Delivers all pending actions in this activation.
///
/// This is called automatically when an `Activation` is
/// `Drop`ped.
///
/// # Panics
///
/// Panics if any pending actions "`for_myself`" (resulting from
/// [`assert_for_myself`][Self::assert_for_myself] or
/// [`message_for_myself`][Self::message_for_myself]) are
/// outstanding at the time of the call.
pub fn deliver(&mut self) {
self.pending.deliver();
}
@ -555,6 +641,11 @@ impl EventBuffer {
.or_insert((mailbox.tx.clone(), Vec::new())).1
}
fn clear(&mut self) {
self.queues = HashMap::new();
self.for_myself = PendingEventQueue::new();
}
fn deliver(&mut self) {
if !self.for_myself.is_empty() {
panic!("Unprocessed for_myself events remain at deliver() time");
@ -572,6 +663,8 @@ impl Drop for EventBuffer {
}
impl Account {
/// Construct a new `Account`, storing `name` within it for
/// debugging use.
pub fn new(name: tracing::Span) -> Arc<Self> {
let id = NEXT_ACCOUNT_ID.fetch_add(1, Ordering::Relaxed);
let debt = Arc::new(AtomicI64::new(0));
@ -583,15 +676,19 @@ impl Account {
})
}
/// Retrieve the current account balance: the number of
/// currently-outstanding work items.
pub fn balance(&self) -> i64 {
self.debt.load(Ordering::Relaxed)
}
/// Borrow `token_count` work items against this account.
pub fn borrow(&self, token_count: usize) {
let token_count: i64 = token_count.try_into().expect("manageable token count");
self.debt.fetch_add(token_count, Ordering::Relaxed);
}
/// Repay `token_count` work items previously borrowed against this account.
pub fn repay(&self, token_count: usize) {
let token_count: i64 = token_count.try_into().expect("manageable token count");
let _old_debt = self.debt.fetch_sub(token_count, Ordering::Relaxed);
@ -600,6 +697,8 @@ impl Account {
}
}
/// Suspend execution until enough "clear funds" exist in this
/// account for some subsequent activity to be permissible.
pub async fn ensure_clear_funds(&self) {
let limit = *SYNDICATE_CREDIT;
// tokio::task::yield_now().await;
@ -617,6 +716,8 @@ impl Drop for Account {
}
impl<T> LoanedItem<T> {
/// Construct a new `LoanedItem<T>` containing `item`, recording
/// `cost` work items against `account`.
pub fn new(account: &Arc<Account>, cost: usize, item: T) -> Self {
account.borrow(cost);
LoanedItem { account: Arc::clone(account), cost, item }
@ -679,6 +780,8 @@ impl Drop for Mailbox {
}
impl Actor {
/// Create a new actor. It still needs to be
/// [`start`ed][Self::start]/[`boot`ed][Self::boot].
pub fn new() -> Self {
let (tx, rx) = unbounded_channel();
let actor_id = next_actor_id();
@ -700,6 +803,9 @@ impl Actor {
}
}
/// Create and start a new actor to own entity `e`. Returns a
/// `Ref` to the new entity. The `name` is used as context for any
/// log messages emitted by the new actor.
pub fn create_and_start<M, E: Entity<M> + Send + 'static>(
name: tracing::Span,
e: E,
@ -709,6 +815,11 @@ impl Actor {
r
}
/// Create and start a new actor, returning a `Ref` to a fresh
/// entity contained within it. Before using the `Ref`, its
/// initialization must be completed by calling
/// [`become_entity`][Ref::become_entity] on it. The `name` is
/// used as context for any log messages emitted by the new actor.
pub fn create_and_start_inert<M>(name: tracing::Span) -> Arc<Ref<M>> {
let ac = Self::new();
let r = ac.ac_ref.access(|s| s.unwrap().expect_running().create_inert());
@ -716,6 +827,10 @@ impl Actor {
r
}
/// Start the actor's mainloop. Takes ownership of `self`. The
/// `name` is used as context for any log messages emitted by the
/// actor. The `boot` function is called in the actor's context,
/// and then the mainloop is entered.
pub fn boot<F: 'static + Send + FnOnce(&mut Activation) -> ActorResult>(
mut self,
name: tracing::Span,
@ -734,6 +849,10 @@ impl Actor {
}.instrument(name))
}
/// Start the actor's mainloop. Takes ownership of `self`. The
/// `name` is used as context for any log messages emitted by the
/// actor. Delegates to [`boot`][Self::boot], with a no-op `boot`
/// function.
pub fn start(self, name: tracing::Span) -> ActorHandle {
self.boot(name, |_ac| Ok(()))
}
@ -789,6 +908,9 @@ fn panicked_err() -> Option<ActorResult> {
}
impl ActorRef {
/// Uses an internal mutex to access the internal state: takes the
/// lock, calls `f` with the internal state, releases the lock,
/// and returns the result of `f`.
pub fn access<R, F: FnOnce(Option<&mut ActorState>) -> R>(&self, f: F) -> R {
match self.state.lock() {
Err(_) => f(None),
@ -796,6 +918,10 @@ impl ActorRef {
}
}
/// Retrieves the exit status of the denoted actor. If it is still
/// running, yields `None`; otherwise, yields `Some(Ok(()))` if it
/// exited normally, or `Some(Err(_))` if it terminated
/// abnormally.
pub fn exit_status(&self) -> Option<ActorResult> {
self.access(|s| s.map_or_else(
panicked_err,
@ -816,6 +942,9 @@ impl ActorState {
}
impl RunningActor {
/// Requests a shutdown of the actor. The shutdown request is
/// handled by the actor's main loop, causing it to terminate with
/// exit status `Ok(())`.
pub fn shutdown(&self) {
let _ = self.tx.send(SystemMessage::Release);
}
@ -834,16 +963,21 @@ impl RunningActor {
}
}
/// Construct an entity with behaviour [`InertEntity`] within this
/// actor.
pub fn inert_entity<M>(&mut self) -> Arc<Ref<M>> {
self.create(InertEntity)
}
/// Construct an entity with behaviour `e` within this actor.
pub fn create<M, E: Entity<M> + Send + 'static>(&mut self, e: E) -> Arc<Ref<M>> {
let r = self.create_inert();
r.become_entity(e);
r
}
/// Construct an entity whose behaviour will be specified later
/// (via [`become_entity`][Ref::become_entity]).
pub fn create_inert<M>(&mut self) -> Arc<Ref<M>> {
Arc::new(Ref {
mailbox: self.mailbox(),
@ -851,6 +985,9 @@ impl RunningActor {
})
}
/// Registers the entity `r` in the list of exit hooks for this
/// actor. When the actor terminates, `r`'s
/// [`exit_hook`][Entity::exit_hook] will be called.
pub fn add_exit_hook<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>) {
let r = Arc::clone(r);
self.exit_hooks.push(Box::new(move |t, exit_status| {
@ -858,6 +995,9 @@ impl RunningActor {
}))
}
/// Start a new [linked task][crate::actor#linked-tasks] attached
/// to this actor. The function `boot` is the main function of the
/// new task. Uses `name` for log messages emitted by the task.
pub fn linked_task<F: 'static + Send + futures::Future<Output = ActorResult>>(
&mut self,
name: tracing::Span,
@ -923,17 +1063,29 @@ impl Drop for RunningActor {
}
}
/// Directly injects `action` into `mailbox`, billing subsequent activity against `account`.
///
/// Primarily for use by [linked tasks][RunningActor::linked_task].
#[must_use]
pub fn external_event(mailbox: &Arc<Mailbox>, account: &Arc<Account>, action: Action) -> ActorResult {
send_actions(&mailbox.tx, account, vec![action])
}
/// Directly injects `actions` into `mailbox`, billing subsequent activity against `account`.
///
/// Primarily for use by [linked tasks][RunningActor::linked_task].
#[must_use]
pub fn external_events(mailbox: &Arc<Mailbox>, account: &Arc<Account>, events: PendingEventQueue) -> ActorResult {
send_actions(&mailbox.tx, account, events)
pub fn external_events(mailbox: &Arc<Mailbox>, account: &Arc<Account>, actions: PendingEventQueue) -> ActorResult {
send_actions(&mailbox.tx, account, actions)
}
impl<M> Ref<M> {
/// Supplies the behaviour (`e`) for a `Ref` created via
/// [`create_inert`][RunningActor::create_inert].
///
/// # Panics
///
/// Panics if this `Ref` has already been given a behaviour.
pub fn become_entity<E: 'static + Entity<M>>(&self, e: E) {
let mut g = self.target.lock().expect("unpoisoned");
if g.is_some() {
@ -942,6 +1094,7 @@ impl<M> Ref<M> {
*g = Some(Box::new(e));
}
#[doc(hidden)]
pub fn with_entity<R, F: FnOnce(&mut dyn Entity<M>) -> R>(&self, f: F) -> R {
let mut g = self.target.lock().expect("unpoisoned");
f(g.as_mut().expect("initialized").as_mut())
@ -949,6 +1102,8 @@ impl<M> Ref<M> {
}
impl<M> Ref<M> {
/// Retrieves a process-unique identifier for the `Ref`; `Ref`s
/// are compared by this identifier.
pub fn oid(&self) -> usize {
std::ptr::addr_of!(*self) as usize
}
@ -987,6 +1142,10 @@ impl<M> std::fmt::Debug for Ref<M> {
}
impl Cap {
/// Given a `Ref<M>`, where `M` is interconvertible with
/// `AnyValue`, yields a `Cap` for the referenced entity. The
/// `Cap` automatically decodes presented `AnyValue`s into
/// instances of `M`.
pub fn guard<M: 'static + Send>(underlying: &Arc<Ref<M>>) -> Arc<Self>
where
for<'a> &'a M: Into<AnyValue>,
@ -998,6 +1157,7 @@ impl Cap {
}))
}
/// Directly constructs a `Cap` for `underlying`.
pub fn new(underlying: &Arc<Ref<AnyValue>>) -> Arc<Self> {
Arc::new(Cap {
underlying: Arc::clone(underlying),
@ -1005,12 +1165,18 @@ impl Cap {
})
}
/// Yields a fresh `Cap` for `self`'s `underlying`, copying the
/// existing attenuation of `self` to the new `Cap` and adding
/// `attenuation` to it.
pub fn attenuate(&self, attenuation: &sturdy::Attenuation) -> Result<Arc<Self>, CaveatError> {
let mut r = Cap { attenuation: self.attenuation.clone(), .. self.clone() };
r.attenuation.extend(attenuation.check()?);
Ok(Arc::new(r))
}
/// Applies the contained attenuation to `a`, yielding `None` if
/// `a` is filtered out, or `Some(_)` if it is accepted (and
/// possibly transformed).
pub fn rewrite(&self, mut a: AnyValue) -> Option<AnyValue> {
for c in &self.attenuation {
match c.rewrite(&a) {
@ -1021,10 +1187,16 @@ impl Cap {
Some(a)
}
/// Translates `m` into an `AnyValue`, passes it through
/// [`rewrite`][Self::rewrite], and then
/// [`assert`s][Activation::assert] it using the activation `t`.
pub fn assert<M: Into<AnyValue>>(&self, t: &mut Activation, m: M) -> Option<Handle> {
self.rewrite(m.into()).map(|m| t.assert(&self.underlying, m))
}
/// Translates `m` into an `AnyValue`, passes it through
/// [`rewrite`][Self::rewrite], and then sends it via method
/// [`message`][Activation::message] on the activation `t`.
pub fn message<M: Into<AnyValue>>(&self, t: &mut Activation, m: M) {
if let Some(m) = self.rewrite(m.into()) {
t.message(&self.underlying, m)