From 931c4e5cd196e664469b1e4f6dd8588b2b0b2d39 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Fri, 13 Aug 2021 15:51:11 -0400 Subject: [PATCH] Some documentation; rename Debtor to Account --- syndicate-server/examples/consumer.rs | 6 +- syndicate-server/examples/pingpong.rs | 10 +- syndicate-server/examples/producer.rs | 10 +- syndicate-server/examples/state-consumer.rs | 6 +- syndicate-server/examples/state-producer.rs | 12 +- syndicate-server/src/main.rs | 3 +- syndicate/README.md | 8 + syndicate/benches/bench_dataspace.rs | 12 +- syndicate/doc/actor.md | 16 ++ syndicate/doc/flow-control.md | 3 + syndicate/doc/what-is-an-actor.md | 24 ++ syndicate/src/actor.rs | 280 ++++++++++++++++---- syndicate/src/bag.rs | 4 + syndicate/src/lib.rs | 9 + syndicate/src/relay.rs | 24 +- 15 files changed, 332 insertions(+), 95 deletions(-) create mode 100644 syndicate/README.md create mode 100644 syndicate/doc/actor.md create mode 100644 syndicate/doc/flow-control.md create mode 100644 syndicate/doc/what-is-an-actor.md diff --git a/syndicate-server/examples/consumer.rs b/syndicate-server/examples/consumer.rs index 289229f..376ac34 100644 --- a/syndicate-server/examples/consumer.rs +++ b/syndicate-server/examples/consumer.rs @@ -25,12 +25,12 @@ async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; Actor::new().boot(syndicate::name!("consumer"), |t| { let ac = t.actor.clone(); - let boot_debtor = Arc::clone(t.debtor()); + let boot_account = Arc::clone(t.account()); Ok(t.state.linked_task(tracing::Span::current(), async move { let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - Activation::for_actor(&ac, boot_debtor, |t| { + Activation::for_actor(&ac, boot_account, |t| { relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| { let consumer = syndicate::entity(0) .on_message(|message_count, _t, m: AnyValue| { @@ -54,7 +54,7 @@ async fn main() -> Result<(), Box> { stats_timer.tick().await; let consumer = Arc::clone(&consumer); external_event(&Arc::clone(&consumer.underlying.mailbox), - &Debtor::new(syndicate::name!("debtor")), + &Account::new(syndicate::name!("account")), Box::new(move |t| consumer.underlying.with_entity( |e| e.message(t, AnyValue::new(true)))))?; } diff --git a/syndicate-server/examples/pingpong.rs b/syndicate-server/examples/pingpong.rs index 152c806..3525465 100644 --- a/syndicate-server/examples/pingpong.rs +++ b/syndicate-server/examples/pingpong.rs @@ -93,12 +93,12 @@ async fn main() -> Result<(), Box> { Actor::new().boot(syndicate::name!("pingpong"), |t| { let ac = t.actor.clone(); - let boot_debtor = Arc::clone(t.debtor()); + let boot_account = Arc::clone(t.account()); Ok(t.state.linked_task(tracing::Span::current(), async move { let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - Activation::for_actor(&ac, boot_debtor, |t| { + Activation::for_actor(&ac, boot_account, |t| { relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { let (send_label, recv_label, report_latency_every, should_echo, bytes_padding) = @@ -183,7 +183,7 @@ async fn main() -> Result<(), Box> { stats_timer.tick().await; let consumer = Arc::clone(&consumer); external_event(&Arc::clone(&consumer.underlying.mailbox), - &Debtor::new(syndicate::name!("debtor")), + &Account::new(syndicate::name!("account")), Box::new(move |t| consumer.underlying.with_entity( |e| e.message(t, AnyValue::new(true)))))?; } @@ -192,7 +192,7 @@ async fn main() -> Result<(), Box> { if let PingPongMode::Ping(c) = &config.mode { let turn_count = c.turn_count; let action_count = c.action_count; - let debtor = Arc::clone(t.debtor()); + let account = Arc::clone(t.account()); t.state.linked_task(syndicate::name!("boot-ping"), async move { let padding: AnyValue = Value::ByteString(vec![0; bytes_padding]).wrap(); for _ in 0..turn_count { @@ -206,7 +206,7 @@ async fn main() -> Result<(), Box> { events.push(Box::new(move |t| ds.underlying.with_entity( |e| e.message(t, current_rec)))); } - external_events(&ds.underlying.mailbox, &debtor, events)? + external_events(&ds.underlying.mailbox, &account, events)? } Ok(()) }); diff --git a/syndicate-server/examples/producer.rs b/syndicate-server/examples/producer.rs index db89b15..cd9098c 100644 --- a/syndicate-server/examples/producer.rs +++ b/syndicate-server/examples/producer.rs @@ -35,19 +35,19 @@ async fn main() -> Result<(), Box> { syndicate::actor::start_debt_reporter(); Actor::new().boot(syndicate::name!("producer"), |t| { let ac = t.actor.clone(); - let boot_debtor = Arc::clone(t.debtor()); + let boot_account = Arc::clone(t.account()); Ok(t.state.linked_task(tracing::Span::current(), async move { let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - Activation::for_actor(&ac, boot_debtor, |t| { + Activation::for_actor(&ac, boot_account, |t| { relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { let padding: AnyValue = Value::ByteString(vec![0; config.bytes_padding]).wrap(); let action_count = config.action_count; - let debtor = Debtor::new(syndicate::name!("debtor")); + let account = Account::new(syndicate::name!("account")); t.state.linked_task(syndicate::name!("sender"), async move { loop { - debtor.ensure_clear_funds().await; + account.ensure_clear_funds().await; let mut events: PendingEventQueue = Vec::new(); for _ in 0..action_count { let ds = Arc::clone(&ds); @@ -55,7 +55,7 @@ async fn main() -> Result<(), Box> { events.push(Box::new(move |t| ds.underlying.with_entity( |e| e.message(t, says(Value::from("producer").wrap(), padding))))); } - external_events(&ds.underlying.mailbox, &debtor, events)?; + external_events(&ds.underlying.mailbox, &account, events)?; } }); Ok(None) diff --git a/syndicate-server/examples/state-consumer.rs b/syndicate-server/examples/state-consumer.rs index a5783dd..b0f1019 100644 --- a/syndicate-server/examples/state-consumer.rs +++ b/syndicate-server/examples/state-consumer.rs @@ -25,12 +25,12 @@ async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; Actor::new().boot(syndicate::name!("state-consumer"), |t| { let ac = t.actor.clone(); - let boot_debtor = Arc::clone(t.debtor()); + let boot_account = Arc::clone(t.account()); Ok(t.state.linked_task(tracing::Span::current(), async move { let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - Activation::for_actor(&ac, boot_debtor, |t| { + Activation::for_actor(&ac, boot_account, |t| { relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| { let consumer = { #[derive(Default)] @@ -75,7 +75,7 @@ async fn main() -> Result<(), Box> { stats_timer.tick().await; let consumer = Arc::clone(&consumer); external_event(&Arc::clone(&consumer.underlying.mailbox), - &Debtor::new(syndicate::name!("debtor")), + &Account::new(syndicate::name!("account")), Box::new(move |t| consumer.underlying.with_entity( |e| e.message(t, AnyValue::new(true)))))?; } diff --git a/syndicate-server/examples/state-producer.rs b/syndicate-server/examples/state-producer.rs index 6531757..013c35b 100644 --- a/syndicate-server/examples/state-producer.rs +++ b/syndicate-server/examples/state-producer.rs @@ -20,14 +20,14 @@ async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; Actor::new().boot(syndicate::name!("state-producer"), |t| { let ac = t.actor.clone(); - let boot_debtor = Arc::clone(t.debtor()); + let boot_account = Arc::clone(t.account()); Ok(t.state.linked_task(tracing::Span::current(), async move { let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - Activation::for_actor(&ac, boot_debtor, |t| { + Activation::for_actor(&ac, boot_account, |t| { relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { - let debtor = Debtor::new(syndicate::name!("debtor")); + let account = Account::new(syndicate::name!("account")); t.state.linked_task(syndicate::name!("sender"), async move { let presence: AnyValue = Value::simple_record1( "Present", @@ -37,18 +37,18 @@ async fn main() -> Result<(), Box> { let ds = Arc::clone(&ds); let presence = presence.clone(); let handle = handle.clone(); - external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new( + external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new( move |t| ds.underlying.with_entity(|e| e.assert(t, presence, handle)))) }; let retract_e = || { let ds = Arc::clone(&ds); let handle = handle.clone(); - external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new( + external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new( move |t| ds.underlying.with_entity(|e| e.retract(t, handle)))) }; assert_e()?; loop { - debtor.ensure_clear_funds().await; + account.ensure_clear_funds().await; retract_e()?; assert_e()?; } diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index 8a98892..5d3e0d8 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -144,6 +144,7 @@ fn extract_binary_packets( } } +#[doc(hidden)] struct ExitListener; impl Entity<()> for ExitListener { @@ -159,7 +160,7 @@ fn run_connection( o: relay::Output, gateway: Arc, ) -> ActorResult { - Activation::for_actor(&ac, Debtor::new(syndicate::name!("start-session")), |t| { + Activation::for_actor(&ac, Account::new(syndicate::name!("start-session")), |t| { let exit_listener = t.state.create(ExitListener); t.state.add_exit_hook(&exit_listener); relay::TunnelRelay::run(t, i, o, Some(gateway), None); diff --git a/syndicate/README.md b/syndicate/README.md new file mode 100644 index 0000000..5ca6738 --- /dev/null +++ b/syndicate/README.md @@ -0,0 +1,8 @@ +This crate implements the +[Syndicated Actor model](https://syndicate-lang.org/about/) for Rust, +including + + - intra-process communication (the [actor] module), + - point-to-point links between actor spaces (the [relay] module), + - and Dataspace objects (the [dataspace] module) for replicating + state and messages among interested parties. diff --git a/syndicate/benches/bench_dataspace.rs b/syndicate/benches/bench_dataspace.rs index ba10364..4acb516 100644 --- a/syndicate/benches/bench_dataspace.rs +++ b/syndicate/benches/bench_dataspace.rs @@ -55,16 +55,16 @@ pub fn bench_pub(c: &mut Criterion) { Actor::new().boot(syndicate::name!("dataspace"), move |t| { let ds = t.state.create(Dataspace::new()); let shutdown = t.state.create(ShutdownEntity); - let debtor = Debtor::new(syndicate::name!("sender-debtor")); + let account = Account::new(syndicate::name!("sender-account")); t.state.linked_task(syndicate::name!("sender"), async move { for _ in 0..iters { let ds = Arc::clone(&ds); - external_event(&Arc::clone(&ds.mailbox), &debtor, Box::new( + external_event(&Arc::clone(&ds.mailbox), &account, Box::new( move |t| ds.with_entity( |e| e.message(t, says(AnyValue::new("bench_pub"), Value::ByteString(vec![]).wrap())))))? } - external_event(&Arc::clone(&shutdown.mailbox), &debtor, Box::new( + external_event(&Arc::clone(&shutdown.mailbox), &account, Box::new( move |t| shutdown.with_entity( |e| e.message(t, AnyValue::new(true)))))?; Ok(()) @@ -124,18 +124,18 @@ pub fn bench_pub(c: &mut Criterion) { })), observer: shutdown, }); - let debtor = Arc::clone(t.debtor()); + let account = Arc::clone(t.account()); t.state.linked_task(syndicate::name!("sender"), async move { for _ in 0..iters { let ds = Arc::clone(&ds); - external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new( + external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new( move |t| ds.underlying.with_entity( |e| e.message(t, says(AnyValue::new("bench_pub"), Value::ByteString(vec![]).wrap())))))? } { let ds = Arc::clone(&ds); - external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new( + external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new( move |t| ds.underlying.with_entity( |e| e.message(t, AnyValue::new(true)))))?; } diff --git a/syndicate/doc/actor.md b/syndicate/doc/actor.md new file mode 100644 index 0000000..fbeb22f --- /dev/null +++ b/syndicate/doc/actor.md @@ -0,0 +1,16 @@ +The [actor][crate::actor] module is the core of the Syndicated Actor model implementation. + +Central features: + + - trait [`Entity`], the core protocol that must be implemented by + every object + - struct [`Activation`], the API for programming a Syndicated Actor + object + - type [`AnyValue`], the type of messages and assertions that can be + exchanged among distributed objects, including via + [dataspace][crate::dataspace] + - struct [`Ref`], a reference to a local or remote object + - struct [`Cap`], a specialization of `Ref` for + messages/assertions of type `AnyValue` + - struct [`Guard`], an adapter for converting an underlying + [`Ref`] to a [`Cap`] diff --git a/syndicate/doc/flow-control.md b/syndicate/doc/flow-control.md new file mode 100644 index 0000000..f1059a5 --- /dev/null +++ b/syndicate/doc/flow-control.md @@ -0,0 +1,3 @@ +# Flow control + + - Account, LoanedItem diff --git a/syndicate/doc/what-is-an-actor.md b/syndicate/doc/what-is-an-actor.md new file mode 100644 index 0000000..10df4b5 --- /dev/null +++ b/syndicate/doc/what-is-an-actor.md @@ -0,0 +1,24 @@ +# What is an Actor? + +A [Syndicated Actor][Actor] is a collection of stateful +[Entities][Entity]. In the taxonomy of De Koster *et al.* +([2016](#DeKoster2016)), the Syndicated Actor model is a +*Communicating Event-Loop* actor model, similar to that offered by the +E programming language +([Wikipedia](https://en.wikipedia.org/wiki/E_(programming_language)); +[erights.org](http://erights.org/)). + +**Note.** In the full Syndicated Actor model, entities are arranged in a tree of +*facets*; the current Rust implementation does not yet include support +for facets. + + - Actor, ActorRef, ActorState, Mailbox + +**References.** + + - De Koster, Joeri, Tom Van Cutsem, and Wolfgang De Meuter. “43 Years of Actors: A Taxonomy of Actor Models + and Their Key Properties.” In Proc. AGERE, 31–40. Amsterdam, + The Netherlands, 2016. + [DOI](https://doi.org/10.1145/3001886.3001890). + [PDF](http://soft.vub.ac.be/Publications/2016/vub-soft-tr-16-11.pdf). diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index a6e7afe..b672cff 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -1,3 +1,9 @@ +#![doc = concat!( + include_str!("../doc/actor.md"), + include_str!("../doc/what-is-an-actor.md"), + include_str!("../doc/flow-control.md"), +)] + use super::schemas::sturdy; use super::error::Error; use super::error::encode_error; @@ -28,38 +34,146 @@ use tokio_util::sync::CancellationToken; use tracing::Instrument; +/// The type of messages and assertions that can be exchanged among +/// distributed objects, including via [dataspace][crate::dataspace]. +/// +/// A Preserves value where embedded references are instances of +/// [`Cap`]. +/// +/// While [`Ref`] can be used within a process, where arbitrary +/// `M`-values can be exchanged among objects, for distributed or +/// polyglot systems a *lingua franca* has to be chosen. `AnyValue` is +/// that language. pub type AnyValue = super::schemas::internal_protocol::_Any; +/// The type of process-unique actor IDs. pub type ActorId = u64; + +/// The type of process-unique assertion handles. +/// +/// Used both as a reference to [retract][Entity::retract] +/// previously-asserted assertions and as an indexing key to associate +/// local state with some incoming assertion in an entity. pub type Handle = u64; +/// Responses to events must have type `ActorResult`. pub type ActorResult = Result<(), Error>; + +/// When integrating actors with [tokio], an `ActorHandle` represents +/// an actor mainloop task. pub type ActorHandle = tokio::task::JoinHandle; +/// A small protocol for indicating successful synchronisation with +/// some peer; see [Entity::sync]. pub struct Synced; +/// The core metaprotocol implemented by every object. +/// +/// Entities communicate with each other by asserting and retracting +/// values and by sending messages (which can be understood very +/// approximately as "infinitesimally brief" assertions of the message +/// body). +/// +/// Every assertion placed at a receiving entity *R* from some sending +/// entity *S* lives so long as *S*'s actor survives, *R*'s actor +/// survives, and *S* does not retract the assertion. Messages, by +/// contrast, are transient. +/// +/// Implementors of [`Entity`] accept assertions from peers in method +/// [`assert`][Entity::assert]; notification of retraction of a +/// previously-asserted value happens in method +/// [`retract`][Entity::retract]; and notification of a message in +/// method [`message`][Entity::message]. +/// +/// In addition, entities may *synchronise* with each other: the +/// [`sync`][Entity::sync] method responds to a synchronisation +/// request. +/// +/// Finally, the Rust implementation of the Syndicated Actor model +/// offers a hook for running some code at the end of an Entity's +/// containing [`Actor`]'s lifetime +/// ([`exit_hook`][Entity::exit_hook]). +/// +/// # What to implement +/// +/// The default implementations of the methods here generally do +/// nothing; override them to add actual behaviour to your entity. +/// +#[allow(unused_variables)] pub trait Entity: Send { - fn assert(&mut self, _t: &mut Activation, _a: M, _h: Handle) -> ActorResult { + /// Receive notification of a new assertion from a peer. + /// + /// The `turn` parameter represents the current + /// [activation][Activation]; `assertion` is the value (of type + /// `M`) asserted; and `handle` is the process-unique name for + /// this particular assertion instance that will be used later + /// when it is [retracted][Entity::retract]. + /// + /// The default implementation does nothing. + fn assert(&mut self, turn: &mut Activation, assertion: M, handle: Handle) -> ActorResult { Ok(()) } - fn retract(&mut self, _t: &mut Activation, _h: Handle) -> ActorResult { + + /// Receive notification of retraction of a previous assertion from a peer. + /// + /// This happens either when the asserting peer explicitly + /// retracts an assertion, or when its animating [`Actor`] + /// terminates. + /// + /// The `turn` parameter represents the current + /// [activation][Activation], and `handle` is the process-unique + /// name for this particular assertion instance being retracted. + /// + /// Note that no `assertion` value is provided: entities needing + /// to know the value that was previously asserted must remember + /// it themselves (perhaps in a [`Map`] keyed by `handle`). + /// + /// The default implementation does nothing. + fn retract(&mut self, turn: &mut Activation, handle: Handle) -> ActorResult { Ok(()) } - fn message(&mut self, _t: &mut Activation, _m: M) -> ActorResult { + + /// Receive notification of a message from a peer. + /// + /// The `turn` parameter represents the current + /// [activation][Activation], and `message` is the body of the + /// message sent. + /// + /// The default implementation does nothing. + fn message(&mut self, turn: &mut Activation, message: M) -> ActorResult { Ok(()) } - fn sync(&mut self, t: &mut Activation, peer: Arc>) -> ActorResult { - t.message(&peer, Synced); + + /// Respond to a synchronisation request from a peer. + /// + /// Implementors of [`Entity`] will seldom override this. The + /// default implementation fulfils the synchronisation protocol by + /// responding to `peer` with a `Synced` message. + /// + /// In special cases, for example when an entity is a proxy for + /// some remote entity, the right thing to do is to forward the + /// synchronisation request on to another entity; in those cases, + /// overriding the default behaviour is appropriate. + fn sync(&mut self, turn: &mut Activation, peer: Arc>) -> ActorResult { + turn.message(&peer, Synced); Ok(()) } - fn turn_end(&mut self, _t: &mut Activation) -> ActorResult { - Ok(()) - } - fn exit_hook(&mut self, _t: &mut Activation, _exit_status: &Arc) -> ActorResult { + + /// Optional callback for running cleanup actions when the + /// entity's animating [Actor] terminates. + /// + /// Programs register an entity's exit hook with + /// [RunningActor::add_exit_hook]. + /// + /// The default implementation does nothing. + fn exit_hook(&mut self, turn: &mut Activation, exit_status: &Arc) -> ActorResult { Ok(()) } } +/// An "inert" entity, that does nothing in response to any event delivered to it. +/// +/// Useful as a placeholder or dummy in various situations. pub struct InertEntity; impl Entity for InertEntity {} @@ -69,34 +183,58 @@ enum CleanupAction { } type CleanupActions = Map; -pub type Action = Box ActorResult>; -pub type PendingEventQueue = Vec; -// This is what other implementations call a "Turn", renamed here to -// avoid conflicts with schemas::internal_protocol::Turn. +type Action = Box ActorResult>; +type PendingEventQueue = Vec; + +/// The main API for programming Syndicated Actor objects. +/// +/// Through `Activation`s, programs can access the state of their +/// animating [`RunningActor`]. +/// +/// Many actions that an entity can perform are methods directly on +/// `Activation`, but methods on the [`RunningActor`] and [`ActorRef`] +/// values contained in an `Activation` are also sometimes useful. +/// +/// This is what other implementations call a "Turn", renamed here to +/// avoid conflicts with [`crate::schemas::internal_protocol::Turn`]. pub struct Activation<'activation> { + /// A reference to the implementation-side of the currently active [`Actor`]. pub actor: ActorRef, + /// A reference to the current state of the active [`Actor`]. pub state: &'activation mut RunningActor, pending: EventBuffer, } struct EventBuffer { - pub debtor: Arc, + pub account: Arc, queues: HashMap, PendingEventQueue)>, for_myself: PendingEventQueue, } +/// An `Account` records a "debt" in terms of outstanding work items. +/// +/// It is part of the flow control mechanism - see [the module-level +/// documentation][crate::actor#flow-control] for more. #[derive(Debug)] -pub struct Debtor { +pub struct Account { id: u64, debt: Arc, notify: Notify, } +/// A `LoanedItem` is a `T` with an associated `cost` recorded +/// against it in the ledger of a given [`Account`]. +/// +/// It is part of the flow control mechanism - see [the module-level +/// documentation][crate::actor#flow-control] for more. #[derive(Debug)] pub struct LoanedItem { - pub debtor: Arc, + /// The account against which this loan is recorded. + pub account: Arc, + /// The cost of this particular `T`. pub cost: usize, + /// The underlying item itself. pub item: T, } @@ -106,25 +244,38 @@ enum SystemMessage { Crash(Error), } +/// The mechanism by which events are delivered to a given [`Actor`]. pub struct Mailbox { + /// The ID of the actor this mailbox corresponds to. pub actor_id: ActorId, tx: UnboundedSender, } +/// Each actor owns an instance of this structure. +/// +/// It holds the receive-half of the actor's mailbox, plus a reference +/// to the actor's private state. pub struct Actor { rx: UnboundedReceiver, ac_ref: ActorRef, } +/// A reference to an actor's private [`ActorState`]. #[derive(Clone)] pub struct ActorRef { + /// The ID of the referenced actor. pub actor_id: ActorId, state: Arc>, } +/// The state of an actor: either `Running` or `Terminated`. pub enum ActorState { + /// A non-terminated actor has an associated [`RunningActor`] state record. Running(RunningActor), + /// A terminated actor has an [`ActorResult`] as its `exit_status`. Terminated { + /// The exit status of the actor: `Ok(())` for normal + /// termination, `Err(_)` for abnormal termination. exit_status: Arc, }, } @@ -139,17 +290,41 @@ pub struct RunningActor { exit_hooks: Vec) -> ActorResult>>, } +/// A reference to an object that expects messages/assertions of type +/// `M`. +/// +/// The object can be in the same actor, in a different local +/// (in-process) actor, or accessible across a network link. pub struct Ref { pub mailbox: Arc, pub target: Mutex>>>, } +/// Specialization of `Ref` for messages/assertions of type +/// [`AnyValue`]. +/// +/// All polyglot and network communication is done in terms of `Cap`s. +/// +/// `Cap`s can also be *attenuated* ([Hardy 2017]; [Miller 2006]) to +/// reduce (or otherwise transform) the range of assertions and +/// messages they can be used to send to their referent. The +/// Syndicated Actor model uses +/// [Macaroon](https://syndicate-lang.org/doc/capabilities/)-style +/// capability attenuation. +/// +/// [Hardy 2017]: http://cap-lore.com/CapTheory/Patterns/Attenuation.html +/// [Miller 2006]: http://www.erights.org/talks/thesis/markm-thesis.pdf #[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct Cap { pub underlying: Arc>, pub attenuation: Vec, } +/// Adapter for converting an underlying [`Ref`] to a [`Cap`]. +/// +/// The [`Entity`] implementation for `Guard` decodes `AnyValue` +/// assertions/messages to type `M` before passing them on to the +/// underlying entity. pub struct Guard where for<'a> &'a M: Into, @@ -172,7 +347,7 @@ pub fn next_handle() -> Handle { NEXT_HANDLE.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed) } -static NEXT_DEBTOR_ID: AtomicU64 = AtomicU64::new(4); +static NEXT_ACCOUNT_ID: AtomicU64 = AtomicU64::new(4); preserves_schema::support::lazy_static! { pub static ref SYNDICATE_CREDIT: i64 = { @@ -183,7 +358,7 @@ preserves_schema::support::lazy_static! { credit }; - pub static ref DEBTORS: RwLock)>> = + pub static ref ACCOUNTS: RwLock)>> = RwLock::new(Map::new()); } @@ -193,7 +368,7 @@ pub fn start_debt_reporter() { let mut timer = tokio::time::interval(core::time::Duration::from_secs(1)); loop { timer.tick().await; - for (id, (name, debt)) in DEBTORS.read().unwrap().iter() { + for (id, (name, debt)) in ACCOUNTS.read().unwrap().iter() { let _enter = name.enter(); tracing::info!(id, debt = debug(debt.load(Ordering::Relaxed))); } @@ -221,22 +396,22 @@ impl From<&Synced> for AnyValue { } impl<'activation> Activation<'activation> { - fn make(actor: &ActorRef, debtor: Arc, state: &'activation mut RunningActor) -> Self { + fn make(actor: &ActorRef, account: Arc, state: &'activation mut RunningActor) -> Self { Activation { actor: actor.clone(), state, - pending: EventBuffer::new(debtor), + pending: EventBuffer::new(account), } } pub fn for_actor( actor: &ActorRef, - debtor: Arc, + account: Arc, f: F, ) -> ActorResult where F: FnOnce(&mut Activation) -> ActorResult, { - match Self::for_actor_exit(actor, debtor, |t| match f(t) { + match Self::for_actor_exit(actor, account, |t| match f(t) { Ok(()) => None, Err(e) => Some(Err(e)), }) { @@ -247,7 +422,7 @@ impl<'activation> Activation<'activation> { pub fn for_actor_exit( actor: &ActorRef, - debtor: Arc, + account: Arc, f: F, ) -> Option where F: FnOnce(&mut Activation) -> Option, @@ -258,11 +433,11 @@ impl<'activation> Activation<'activation> { ActorState::Terminated { exit_status } => Some((**exit_status).clone()), ActorState::Running(state) => - match f(&mut Activation::make(actor, debtor, state)) { + match f(&mut Activation::make(actor, account, state)) { None => None, Some(exit_status) => { let exit_status = Arc::new(exit_status); - let mut t = Activation::make(actor, Debtor::new(crate::name!("shutdown")), state); + let mut t = Activation::make(actor, Account::new(crate::name!("shutdown")), state); for action in std::mem::take(&mut t.state.exit_hooks) { if let Err(err) = action(&mut t, &exit_status) { tracing::error!(err = debug(err), "error in exit hook"); @@ -344,8 +519,8 @@ impl<'activation> Activation<'activation> { move |t| r.with_entity(|e| e.sync(t, peer)))) } - pub fn debtor(&self) -> &Arc { - &self.pending.debtor + pub fn account(&self) -> &Arc { + &self.pending.account } pub fn deliver(&mut self) { @@ -354,9 +529,9 @@ impl<'activation> Activation<'activation> { } impl EventBuffer { - fn new(debtor: Arc) -> Self { + fn new(account: Arc) -> Self { EventBuffer { - debtor, + account, queues: HashMap::new(), for_myself: Vec::new(), } @@ -385,7 +560,7 @@ impl EventBuffer { panic!("Unprocessed for_myself events remain at deliver() time"); } for (_actor_id, (tx, turn)) in std::mem::take(&mut self.queues).into_iter() { - let _ = send_actions(&tx, &self.debtor, turn); + let _ = send_actions(&tx, &self.account, turn); } } } @@ -396,12 +571,12 @@ impl Drop for EventBuffer { } } -impl Debtor { +impl Account { pub fn new(name: tracing::Span) -> Arc { - let id = NEXT_DEBTOR_ID.fetch_add(1, Ordering::Relaxed); + let id = NEXT_ACCOUNT_ID.fetch_add(1, Ordering::Relaxed); let debt = Arc::new(AtomicI64::new(0)); - DEBTORS.write().unwrap().insert(id, (name, Arc::clone(&debt))); - Arc::new(Debtor { + ACCOUNTS.write().unwrap().insert(id, (name, Arc::clone(&debt))); + Arc::new(Account { id, debt, notify: Notify::new(), @@ -435,33 +610,33 @@ impl Debtor { } } -impl Drop for Debtor { +impl Drop for Account { fn drop(&mut self) { - DEBTORS.write().unwrap().remove(&self.id); + ACCOUNTS.write().unwrap().remove(&self.id); } } impl LoanedItem { - pub fn new(debtor: &Arc, cost: usize, item: T) -> Self { - debtor.borrow(cost); - LoanedItem { debtor: Arc::clone(debtor), cost, item } + pub fn new(account: &Arc, cost: usize, item: T) -> Self { + account.borrow(cost); + LoanedItem { account: Arc::clone(account), cost, item } } } impl Drop for LoanedItem { fn drop(&mut self) { - self.debtor.repay(self.cost); + self.account.repay(self.cost); } } #[must_use] fn send_actions( tx: &UnboundedSender, - debtor: &Arc, + account: &Arc, t: PendingEventQueue, ) -> ActorResult { let token_count = t.len(); - tx.send(SystemMessage::Turn(LoanedItem::new(debtor, token_count, t))) + tx.send(SystemMessage::Turn(LoanedItem::new(account, token_count, t))) .map_err(|_| error("Target actor not running", AnyValue::new(false))) } @@ -565,14 +740,14 @@ impl Actor { fn terminate(&mut self, result: ActorResult) { let _ = Activation::for_actor_exit( - &self.ac_ref, Debtor::new(crate::name!("shutdown")), |_| Some(result)); + &self.ac_ref, Account::new(crate::name!("shutdown")), |_| Some(result)); } async fn run ActorResult>( &mut self, boot: F, ) -> () { - if Activation::for_actor(&self.ac_ref, Debtor::new(crate::name!("boot")), boot).is_err() { + if Activation::for_actor(&self.ac_ref, Account::new(crate::name!("boot")), boot).is_err() { return; } @@ -589,7 +764,7 @@ impl Actor { SystemMessage::Turn(mut loaned_item) => { let mut actions = std::mem::take(&mut loaned_item.item); let r = Activation::for_actor( - &self.ac_ref, Arc::clone(&loaned_item.debtor), |t| { + &self.ac_ref, Arc::clone(&loaned_item.account), |t| { loop { for action in actions.into_iter() { action(t)? } actions = std::mem::take(&mut t.pending.for_myself); @@ -737,7 +912,7 @@ impl Drop for RunningActor { let to_clear = std::mem::take(&mut self.cleanup_actions); { - let mut b = EventBuffer::new(Debtor::new(crate::name!("drop"))); + let mut b = EventBuffer::new(Account::new(crate::name!("drop"))); for (_handle, r) in to_clear.into_iter() { tracing::trace!(h = debug(&_handle), "retract on termination"); b.execute_cleanup_action(r); @@ -749,13 +924,13 @@ impl Drop for RunningActor { } #[must_use] -pub fn external_event(mailbox: &Arc, debtor: &Arc, action: Action) -> ActorResult { - send_actions(&mailbox.tx, debtor, vec![action]) +pub fn external_event(mailbox: &Arc, account: &Arc, action: Action) -> ActorResult { + send_actions(&mailbox.tx, account, vec![action]) } #[must_use] -pub fn external_events(mailbox: &Arc, debtor: &Arc, events: PendingEventQueue) -> ActorResult { - send_actions(&mailbox.tx, debtor, events) +pub fn external_events(mailbox: &Arc, account: &Arc, events: PendingEventQueue) -> ActorResult { + send_actions(&mailbox.tx, account, events) } impl Ref { @@ -908,9 +1083,6 @@ where fn sync(&mut self, t: &mut Activation, peer: Arc>) -> ActorResult { self.underlying.with_entity(|e| e.sync(t, peer)) } - fn turn_end(&mut self, t: &mut Activation) -> ActorResult { - self.underlying.with_entity(|e| e.turn_end(t)) - } fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) -> ActorResult { self.underlying.with_entity(|e| e.exit_hook(t, exit_status)) } diff --git a/syndicate/src/bag.rs b/syndicate/src/bag.rs index b58796a..bf71301 100644 --- a/syndicate/src/bag.rs +++ b/syndicate/src/bag.rs @@ -1,3 +1,7 @@ +//! A "bag" data structure (based on +//! [`BTreeMap`][std::collections::BTreeMap]), used in +//! [dataspace][super::dataspace] indexing. + use std::collections::BTreeMap; use std::collections::btree_map::{Iter, Keys, Entry}; use std::iter::{FromIterator, IntoIterator}; diff --git a/syndicate/src/lib.rs b/syndicate/src/lib.rs index c2abbb8..0413d48 100644 --- a/syndicate/src/lib.rs +++ b/syndicate/src/lib.rs @@ -1,3 +1,6 @@ +#![doc = include_str!("../README.md")] + +#[doc(inline)] pub use preserves::value; pub mod actor; @@ -17,8 +20,14 @@ pub mod skeleton; pub mod sturdy; pub mod tracer; +#[doc(inline)] pub use during::entity; +#[doc(inline)] pub use tracer::tracer; + +#[doc(inline)] pub use tracer::tracer_top; + +#[doc(inline)] pub use tracer::convenient_logging; diff --git a/syndicate/src/relay.rs b/syndicate/src/relay.rs index ef101b5..e8aa517 100644 --- a/syndicate/src/relay.rs +++ b/syndicate/src/relay.rs @@ -382,9 +382,9 @@ impl TunnelRelay { } } - pub fn send_packet(&mut self, debtor: &Arc, cost: usize, p: P::Packet) -> ActorResult { + pub fn send_packet(&mut self, account: &Arc, cost: usize, p: P::Packet) -> ActorResult { let bs = self.encode_packet(p)?; - let _ = self.output.send(LoanedItem::new(debtor, cost, bs)); + let _ = self.output.send(LoanedItem::new(account, cost, bs)); Ok(()) } @@ -519,16 +519,16 @@ async fn input_loop( i: Input, relay: TunnelRelayRef, ) -> ActorResult { - let debtor = Debtor::new(crate::name!("input-loop")); + let account = Account::new(crate::name!("input-loop")); match i { Input::Packets(mut src) => { loop { - debtor.ensure_clear_funds().await; + account.ensure_clear_funds().await; match src.next().await { - None => return Activation::for_actor(&ac, Arc::clone(&debtor), |t| { + None => return Activation::for_actor(&ac, Arc::clone(&account), |t| { Ok(t.state.shutdown()) }), - Some(bs) => Activation::for_actor(&ac, Arc::clone(&debtor), |t| { + Some(bs) => Activation::for_actor(&ac, Arc::clone(&account), |t| { let mut g = relay.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); tr.handle_inbound_datagram(t, &bs?) @@ -540,13 +540,13 @@ async fn input_loop( const BUFSIZE: usize = 65536; let mut buf = BytesMut::with_capacity(BUFSIZE); loop { - debtor.ensure_clear_funds().await; + account.ensure_clear_funds().await; buf.reserve(BUFSIZE); let n = match r.read_buf(&mut buf).await { Ok(n) => n, Err(e) => if e.kind() == io::ErrorKind::ConnectionReset { - return Activation::for_actor(&ac, Arc::clone(&debtor), |t| { + return Activation::for_actor(&ac, Arc::clone(&account), |t| { Ok(t.state.shutdown()) }); } else { @@ -554,10 +554,10 @@ async fn input_loop( }, }; match n { - 0 => return Activation::for_actor(&ac, Arc::clone(&debtor), |t| { + 0 => return Activation::for_actor(&ac, Arc::clone(&account), |t| { Ok(t.state.shutdown()) }), - _ => Activation::for_actor(&ac, Arc::clone(&debtor), |t| { + _ => Activation::for_actor(&ac, Arc::clone(&account), |t| { let mut g = relay.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); tr.handle_inbound_stream(t, &mut buf) @@ -594,7 +594,7 @@ impl Entity<()> for TunnelRefEntity { let mut g = self.relay_ref.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); let events = std::mem::take(&mut tr.pending_outbound); - tr.send_packet(&t.debtor(), events.len(), P::Packet::Turn(Box::new(P::Turn(events)))) + tr.send_packet(&t.account(), events.len(), P::Packet::Turn(Box::new(P::Turn(events)))) } fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) -> ActorResult { @@ -602,7 +602,7 @@ impl Entity<()> for TunnelRefEntity { let e = e.clone(); let mut g = self.relay_ref.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); - tr.send_packet(&t.debtor(), 1, P::Packet::Error(Box::new(e)))?; + tr.send_packet(&t.account(), 1, P::Packet::Error(Box::new(e)))?; } Ok(()) }