relay-external-protocol
Tony Garnock-Jones 2 years ago
parent ae46e42539
commit f56c0df10f

@ -24,13 +24,13 @@ pub struct Config {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
syndicate::convenient_logging()?;
Actor::new().boot(syndicate::name!("consumer"), |t| {
let ac = t.actor.clone();
let facet = t.facet.clone();
let boot_account = Arc::clone(t.account());
Ok(t.state.linked_task(tracing::Span::current(), async move {
Ok(t.linked_task(tracing::Span::current(), async move {
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Activation::for_actor(&ac, boot_account, |t| {
facet.activate(boot_account, |t| {
relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| {
let consumer = syndicate::entity(0)
.on_message(|message_count, _t, m: AnyValue| {
@ -42,21 +42,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
Ok(())
})
.create_cap(t.state);
.create_cap(t);
ds.assert(t, &Observe {
pattern: syndicate_macros::pattern!("<Says $ $>"),
observer: Arc::clone(&consumer),
});
t.state.linked_task(syndicate::name!("tick"), async move {
t.linked_task(syndicate::name!("tick"), async move {
let mut stats_timer = interval(Duration::from_secs(1));
loop {
stats_timer.tick().await;
let consumer = Arc::clone(&consumer);
external_event(&Arc::clone(&consumer.underlying.mailbox),
&Account::new(syndicate::name!("account")),
Box::new(move |t| consumer.underlying.with_entity(
|e| e.message(t, AnyValue::new(true)))))?;
Box::new(move |t| t.with_entity(
&consumer.underlying,
|t, e| e.message(t, AnyValue::new(true)))))?;
}
});
Ok(None)

@ -92,13 +92,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
syndicate::convenient_logging()?;
Actor::new().boot(syndicate::name!("pingpong"), |t| {
let ac = t.actor.clone();
let facet = t.facet.clone();
let boot_account = Arc::clone(t.account());
Ok(t.state.linked_task(tracing::Span::current(), async move {
Ok(t.linked_task(tracing::Span::current(), async move {
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Activation::for_actor(&ac, boot_account, |t| {
facet.activate(boot_account, |t| {
relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| {
let (send_label, recv_label, report_latency_every, should_echo, bytes_padding) =
@ -116,7 +116,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut rtt_ns_samples: Vec<u64> = vec![0; report_latency_every];
let mut rtt_batch_count: usize = 0;
let mut current_reply = None;
let self_ref = t.state.create_inert();
let self_ref = t.create_inert();
self_ref.become_entity(
syndicate::entity(Arc::clone(&self_ref))
.on_message(move |self_ref, t, m: AnyValue| {
@ -177,15 +177,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
observer: Arc::clone(&consumer),
});
t.state.linked_task(syndicate::name!("tick"), async move {
t.linked_task(syndicate::name!("tick"), async move {
let mut stats_timer = interval(Duration::from_secs(1));
loop {
stats_timer.tick().await;
let consumer = Arc::clone(&consumer);
external_event(&Arc::clone(&consumer.underlying.mailbox),
&Account::new(syndicate::name!("account")),
Box::new(move |t| consumer.underlying.with_entity(
|e| e.message(t, AnyValue::new(true)))))?;
Box::new(move |t| t.with_entity(
&consumer.underlying,
|t, e| e.message(t, AnyValue::new(true)))))?;
}
});
@ -193,7 +194,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let turn_count = c.turn_count;
let action_count = c.action_count;
let account = Arc::clone(t.account());
t.state.linked_task(syndicate::name!("boot-ping"), async move {
t.linked_task(syndicate::name!("boot-ping"), async move {
let padding: AnyValue = Value::ByteString(vec![0; bytes_padding]).wrap();
for _ in 0..turn_count {
let mut events: PendingEventQueue = vec![];
@ -203,8 +204,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
for _ in 0..action_count {
let ds = Arc::clone(&ds);
let current_rec = current_rec.clone();
events.push(Box::new(move |t| ds.underlying.with_entity(
|e| e.message(t, current_rec))));
events.push(Box::new(move |t| t.with_entity(
&ds.underlying,
|t, e| e.message(t, current_rec))));
}
external_events(&ds.underlying.mailbox, &account, events)?
}

@ -33,26 +33,27 @@ fn says(who: AnyValue, what: AnyValue) -> AnyValue {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
syndicate::convenient_logging()?;
Actor::new().boot(syndicate::name!("producer"), |t| {
let ac = t.actor.clone();
let facet = t.facet.clone();
let boot_account = Arc::clone(t.account());
Ok(t.state.linked_task(tracing::Span::current(), async move {
Ok(t.linked_task(tracing::Span::current(), async move {
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Activation::for_actor(&ac, boot_account, |t| {
facet.activate(boot_account, |t| {
relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| {
let padding: AnyValue = Value::ByteString(vec![0; config.bytes_padding]).wrap();
let action_count = config.action_count;
let account = Account::new(syndicate::name!("account"));
t.state.linked_task(syndicate::name!("sender"), async move {
t.linked_task(syndicate::name!("sender"), async move {
loop {
account.ensure_clear_funds().await;
let mut events: PendingEventQueue = Vec::new();
for _ in 0..action_count {
let ds = Arc::clone(&ds);
let padding = padding.clone();
events.push(Box::new(move |t| ds.underlying.with_entity(
|e| e.message(t, says(Value::from("producer").wrap(), padding)))));
events.push(Box::new(move |t| t.with_entity(
&ds.underlying,
|t, e| e.message(t, says(Value::from("producer").wrap(), padding)))));
}
external_events(&ds.underlying.mailbox, &account, events)?;
}

@ -24,13 +24,13 @@ pub struct Config {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
syndicate::convenient_logging()?;
Actor::new().boot(syndicate::name!("state-consumer"), |t| {
let ac = t.actor.clone();
let facet = t.facet.clone();
let boot_account = Arc::clone(t.account());
Ok(t.state.linked_task(tracing::Span::current(), async move {
Ok(t.linked_task(tracing::Span::current(), async move {
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Activation::for_actor(&ac, boot_account, |t| {
facet.activate(boot_account, |t| {
relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| {
let consumer = {
#[derive(Default)]
@ -61,7 +61,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
s.arrival_counter = 0;
s.departure_counter = 0;
Ok(())
}).create_cap(t.state)
}).create_cap(t)
};
ds.assert(t, &Observe {
@ -69,15 +69,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
observer: Arc::clone(&consumer),
});
t.state.linked_task(syndicate::name!("tick"), async move {
t.linked_task(syndicate::name!("tick"), async move {
let mut stats_timer = interval(Duration::from_secs(1));
loop {
stats_timer.tick().await;
let consumer = Arc::clone(&consumer);
external_event(&Arc::clone(&consumer.underlying.mailbox),
&Account::new(syndicate::name!("account")),
Box::new(move |t| consumer.underlying.with_entity(
|e| e.message(t, AnyValue::new(true)))))?;
Box::new(move |t| t.with_entity(
&consumer.underlying,
|t, e| e.message(t, AnyValue::new(true)))))?;
}
});
Ok(None)

@ -19,16 +19,16 @@ pub struct Config {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
syndicate::convenient_logging()?;
Actor::new().boot(syndicate::name!("state-producer"), |t| {
let ac = t.actor.clone();
let facet = t.facet.clone();
let boot_account = Arc::clone(t.account());
Ok(t.state.linked_task(tracing::Span::current(), async move {
Ok(t.linked_task(tracing::Span::current(), async move {
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Activation::for_actor(&ac, boot_account, |t| {
facet.activate(boot_account, |t| {
relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| {
let account = Account::new(syndicate::name!("account"));
t.state.linked_task(syndicate::name!("sender"), async move {
t.linked_task(syndicate::name!("sender"), async move {
let presence: AnyValue = Value::simple_record1(
"Present",
Value::from(std::process::id()).wrap()).wrap();
@ -38,13 +38,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let presence = presence.clone();
let handle = handle.clone();
external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new(
move |t| ds.underlying.with_entity(|e| e.assert(t, presence, handle))))
move |t| t.with_entity(&ds.underlying, |t, e| e.assert(t, presence, handle))))
};
let retract_e = || {
let ds = Arc::clone(&ds);
let handle = handle.clone();
external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new(
move |t| ds.underlying.with_entity(|e| e.retract(t, handle))))
move |t| t.with_entity(&ds.underlying, |t, e| e.retract(t, handle))))
};
assert_e()?;
loop {

@ -72,13 +72,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing::info!(r"");
}
let mut non_daemons = Vec::new();
tracing::trace!("startup");
if config.debt_reporter {
Actor::new().boot(syndicate::name!("debt-reporter"), |t| {
t.state.linked_task(syndicate::name!("tick"), async {
t.linked_task(syndicate::name!("tick"), async {
let mut timer = tokio::time::interval(core::time::Duration::from_secs(1));
loop {
timer.tick().await;
@ -93,51 +91,49 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});
}
let ds = Cap::new(&Actor::create_and_start(syndicate::name!("dataspace"), Dataspace::new()));
if config.inferior {
let ds = Arc::clone(&ds);
non_daemons.push(
Actor::new().boot(syndicate::name!("parent"), move |t| run_io_relay(
t,
relay::Input::Bytes(Box::pin(tokio::io::stdin())),
relay::Output::Bytes(Box::pin(tokio::io::stdout())),
ds)));
}
Actor::new().boot(syndicate::name!("dataspace"), move |t| {
let ds = Cap::new(&t.create(Dataspace::new()));
let gateway = Cap::guard(&Actor::create_and_start(
syndicate::name!("gateway"),
syndicate::entity(Arc::clone(&ds)).on_asserted(handle_resolve)));
{
let ds = Arc::clone(&ds);
Actor::new().boot(syndicate::name!("rootcap"), move |t| {
{
use syndicate::schemas::gatekeeper;
let key = vec![0; 16];
let sr = sturdy::SturdyRef::mint(_Any::new("syndicate"), &key);
tracing::info!(rootcap = debug(&_Any::from(&sr)));
tracing::info!(rootcap = display(sr.to_hex()));
ds.assert(t, &gatekeeper::Bind { oid: sr.oid.clone(), key, target: ds.clone() });
Ok(())
});
}
}
for port in config.ports.clone() {
let gateway = Arc::clone(&gateway);
non_daemons.push(Actor::new().boot(
syndicate::name!("tcp", port),
move |t| Ok(t.state.linked_task(syndicate::name!("listener"),
run_tcp_listener(gateway, port)))));
}
if config.inferior {
let ds = Arc::clone(&ds);
Actor::new().boot(syndicate::name!("parent"), move |t| run_io_relay(
t,
relay::Input::Bytes(Box::pin(tokio::io::stdin())),
relay::Output::Bytes(Box::pin(tokio::io::stdout())),
ds));
}
for path in config.sockets.clone() {
let gateway = Arc::clone(&gateway);
non_daemons.push(Actor::new().boot(
syndicate::name!("unix", socket = debug(path.to_str().expect("representable UnixListener path"))),
move |t| Ok(t.state.linked_task(syndicate::name!("listener"),
run_unix_listener(gateway, path)))));
}
let gateway = Cap::guard(&t.create(
syndicate::entity(Arc::clone(&ds)).on_asserted(handle_resolve)));
for port in config.ports.clone() {
let gateway = Arc::clone(&gateway);
Actor::new().boot(
syndicate::name!("tcp", port),
move |t| Ok(t.linked_task(syndicate::name!("listener"),
run_tcp_listener(gateway, port))));
}
for path in config.sockets.clone() {
let gateway = Arc::clone(&gateway);
Actor::new().boot(
syndicate::name!("unix", socket = debug(path.to_str().expect("representable UnixListener path"))),
move |t| Ok(t.linked_task(syndicate::name!("listener"),
run_unix_listener(gateway, path))));
}
Ok(())
}).await??;
futures::future::join_all(non_daemons).await;
Ok(())
}
@ -183,24 +179,24 @@ fn run_io_relay(
o: relay::Output,
initial_ref: Arc<Cap>,
) -> ActorResult {
let exit_listener = t.state.create(ExitListener);
let exit_listener = t.create(ExitListener);
t.state.add_exit_hook(&exit_listener);
relay::TunnelRelay::run(t, i, o, Some(initial_ref), None);
Ok(())
}
fn run_connection(
ac: ActorRef,
facet: FacetRef,
i: relay::Input,
o: relay::Output,
initial_ref: Arc<Cap>,
) -> ActorResult {
Activation::for_actor(&ac, Account::new(syndicate::name!("start-session")),
|t| run_io_relay(t, i, o, initial_ref))
facet.activate(Account::new(syndicate::name!("start-session")),
|t| run_io_relay(t, i, o, initial_ref))
}
async fn detect_protocol(
ac: ActorRef,
facet: FacetRef,
stream: TcpStream,
gateway: Arc<Cap>,
addr: std::net::SocketAddr,
@ -229,7 +225,7 @@ async fn detect_protocol(
_ => unreachable!()
}
};
run_connection(ac, i, o, gateway)
run_connection(facet, i, o, gateway)
}
async fn run_tcp_listener(
@ -244,9 +240,9 @@ async fn run_tcp_listener(
let gateway = Arc::clone(&gateway);
let ac = Actor::new();
ac.boot(syndicate::name!(parent: None, "tcp"),
move |t| Ok(t.state.linked_task(
move |t| Ok(t.linked_task(
tracing::Span::current(),
detect_protocol(t.actor.clone(), stream, gateway, addr))));
detect_protocol(t.facet.clone(), stream, gateway, addr))));
}
}
@ -261,24 +257,21 @@ async fn run_unix_listener(
let (stream, _addr) = listener.accept().await?;
let peer = stream.peer_cred()?;
let gateway = Arc::clone(&gateway);
let ac = Actor::new();
ac.boot(syndicate::name!(parent: None,
"unix",
pid = debug(peer.pid().unwrap_or(-1)),
uid = peer.uid()),
move |t| Ok(t.state.linked_task(
tracing::Span::current(),
{
let ac = t.actor.clone();
async move {
tracing::info!(protocol = display("unix"));
let (i, o) = stream.into_split();
run_connection(ac,
relay::Input::Bytes(Box::pin(i)),
relay::Output::Bytes(Box::pin(o)),
gateway)
}
})));
Actor::new().boot(
syndicate::name!(parent: None, "unix", pid = debug(peer.pid().unwrap_or(-1)), uid = peer.uid()),
|t| Ok(t.linked_task(
tracing::Span::current(),
{
let facet = t.facet.clone();
async move {
tracing::info!(protocol = display("unix"));
let (i, o) = stream.into_split();
run_connection(facet,
relay::Input::Bytes(Box::pin(i)),
relay::Output::Bytes(Box::pin(o)),
gateway)
}
})));
}
}
@ -342,7 +335,7 @@ fn handle_resolve(
}
}
})
.create_cap(t.state);
.create_cap(t);
if let Some(oh) = ds.assert(t, &dataspace::Observe {
// TODO: codegen plugin to generate pattern constructors
pattern: syndicate_macros::pattern!("<bind =queried_oid $ $>"),

@ -7,6 +7,7 @@ use std::sync::atomic::Ordering;
use std::time::Instant;
use syndicate::actor::*;
use syndicate::during::entity;
use syndicate::dataspace::Dataspace;
use syndicate::schemas::dataspace::Observe;
use syndicate::schemas::dataspace_patterns as p;
@ -30,7 +31,7 @@ struct ShutdownEntity;
impl Entity<AnyValue> for ShutdownEntity {
fn message(&mut self, t: &mut Activation, _m: AnyValue) -> ActorResult {
t.state.shutdown();
t.stop();
Ok(())
}
}
@ -53,20 +54,22 @@ pub fn bench_pub(c: &mut Criterion) {
let start = Instant::now();
rt.block_on(async move {
Actor::new().boot(syndicate::name!("dataspace"), move |t| {
let ds = t.state.create(Dataspace::new());
let shutdown = t.state.create(ShutdownEntity);
let ds = t.create(Dataspace::new());
let shutdown = t.create(ShutdownEntity);
let account = Account::new(syndicate::name!("sender-account"));
t.state.linked_task(syndicate::name!("sender"), async move {
t.linked_task(syndicate::name!("sender"), async move {
for _ in 0..iters {
let ds = Arc::clone(&ds);
external_event(&Arc::clone(&ds.mailbox), &account, Box::new(
move |t| ds.with_entity(
|e| e.message(t, says(AnyValue::new("bench_pub"),
Value::ByteString(vec![]).wrap())))))?
move |t| t.with_entity(
&ds,
|t, e| e.message(t, says(AnyValue::new("bench_pub"),
Value::ByteString(vec![]).wrap())))))?
}
external_event(&Arc::clone(&shutdown.mailbox), &account, Box::new(
move |t| shutdown.with_entity(
|e| e.message(t, AnyValue::new(true)))))?;
move |t| t.with_entity(
&shutdown,
|t, e| e.message(t, AnyValue::new(true)))))?;
Ok(())
});
Ok(())
@ -80,70 +83,96 @@ pub fn bench_pub(c: &mut Criterion) {
b.iter_custom(|iters| {
let start = Instant::now();
rt.block_on(async move {
let ds = Cap::new(
&Actor::create_and_start(syndicate::name!("dataspace"), Dataspace::new()));
let turn_count = Arc::new(AtomicU64::new(0));
{
Actor::new().boot(syndicate::name!("dataspace"), {
let iters = iters.clone();
let turn_count = Arc::clone(&turn_count);
Actor::new().boot(syndicate::name!("consumer"), move |t| {
struct Receiver(Arc<AtomicU64>);
impl Entity<AnyValue> for Receiver {
fn message(&mut self, _t: &mut Activation, _m: AnyValue) -> ActorResult {
self.0.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}
let shutdown = Cap::new(&t.state.create(ShutdownEntity));
let receiver = Cap::new(&t.state.create(Receiver(Arc::clone(&turn_count))));
move |t| {
let ds = Cap::new(&t.create(Dataspace::new()));
let shutdown = entity(())
.on_asserted(|_, _, _| {
Ok(Some(Box::new(|_, t| {
t.stop();
Ok(())
})))
})
.create_cap(t);
ds.assert(t, &Observe {
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec {
ctor: Box::new(p::CRec {
label: Value::symbol("Says").wrap(),
arity: 2.into(),
}),
members: Map::from_iter(vec![
(0.into(), p::Pattern::DLit(Box::new(p::DLit {
value: AnyValue::new("bench_pub"),
}))),
(1.into(), p::Pattern::DBind(Box::new(p::DBind {
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)),
}))),
].into_iter()),
})),
observer: receiver,
});
ds.assert(t, &Observe {
pattern: p::Pattern::DBind(Box::new(p::DBind {
pattern: p::Pattern::DLit(Box::new(p::DLit {
value: AnyValue::new(true),
value: Value::symbol("consumer").wrap(),
})),
})),
observer: shutdown,
});
let account = Arc::clone(t.account());
t.state.linked_task(syndicate::name!("sender"), async move {
for _ in 0..iters {
let ds = Arc::clone(&ds);
external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new(
move |t| ds.underlying.with_entity(
|e| e.message(t, says(AnyValue::new("bench_pub"),
Value::ByteString(vec![]).wrap())))))?
}
{
let ds = Arc::clone(&ds);
external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new(
move |t| ds.underlying.with_entity(
|e| e.message(t, AnyValue::new(true)))))?;
Actor::new().boot(syndicate::name!("consumer"), move |t| {
struct Receiver(Arc<AtomicU64>);
impl Entity<AnyValue> for Receiver {
fn message(&mut self, _t: &mut Activation, _m: AnyValue) -> ActorResult {
self.0.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}
let shutdown = Cap::new(&t.create(ShutdownEntity));
let receiver = Cap::new(&t.create(Receiver(Arc::clone(&turn_count))));
ds.assert(t, Value::<AnyValue, _>::symbol("consumer").wrap());
ds.assert(t, &Observe {
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec {
ctor: Box::new(p::CRec {
label: Value::symbol("Says").wrap(),
arity: 2.into(),
}),
members: Map::from_iter(vec![
(0.into(), p::Pattern::DLit(Box::new(p::DLit {
value: AnyValue::new("bench_pub"),
}))),
(1.into(), p::Pattern::DBind(Box::new(p::DBind {
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)),
}))),
].into_iter()),
})),
observer: receiver,
});
ds.assert(t, &Observe {
pattern: p::Pattern::DBind(Box::new(p::DBind {
pattern: p::Pattern::DLit(Box::new(p::DLit {
value: AnyValue::new(true),
})),
})),
observer: shutdown,
});
let account = Arc::clone(t.account());
t.linked_task(syndicate::name!("sender"), async move {
for _i in 0..iters {
let ds = Arc::clone(&ds);
external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new(
move |t| t.with_entity(
&ds.underlying,
|t, e| e.message(t, says(AnyValue::new("bench_pub"),
Value::ByteString(vec![]).wrap())))))?
}
{
let ds = Arc::clone(&ds);
external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new(
move |t| t.with_entity(
&ds.underlying,
|t, e| e.message(t, AnyValue::new(true)))))?;
}
Ok(())
});
Ok(())
});
Ok(())
}).await.unwrap().unwrap();
}
}
}).await.unwrap().unwrap();
let actual_turns = turn_count.load(Ordering::SeqCst);
if actual_turns != iters {

@ -2,10 +2,12 @@ The [actor][crate::actor] module is the core of the Syndicated Actor model imple
Central features:
- trait [`Entity`], the core protocol that must be implemented by
every object
- struct [`Activation`], the API for programming a Syndicated Actor
object
- trait [`Entity`], the core protocol that must be implemented by
every object
- struct [`Facet`], a node in the tree of nested conversations that
an Actor is participating in
- type [`AnyValue`], the type of messages and assertions that can be
exchanged among distributed objects, including via
[dataspace][crate::dataspace]

@ -1,4 +1,4 @@
# Flow control
- Account, LoanedItem
- start_debt_reporter
- [Account]
- [LoanedItem]

@ -1,3 +1,3 @@
# Linked Tasks
- linked_task
- [Activation::linked_task]

@ -1,18 +1,23 @@
# What is an Actor?
A [Syndicated Actor][Actor] is a collection of stateful
[Entities][Entity]. In the taxonomy of De Koster *et al.*
([2016](#DeKoster2016)), the Syndicated Actor model is a
*Communicating Event-Loop* actor model, similar to that offered by the
E programming language
[Entities][Entity], organised in a tree of [Facets][Facet], with each
facet representing a
[(sub)conversation](https://syndicate-lang.org/about/#conversational-concurrency-1)
that the Actor is engaged in. Each entity belongs to exactly one
facet; each facet has exactly one parent and zero or more children;
each actor has exactly one associated root facet. When a facet is its
actor's root facet, its parent is the actor itself; otherwise, its
parent is always another facet.
In the taxonomy of De Koster *et al.* ([2016](#DeKoster2016)), the
Syndicated Actor model is a *Communicating Event-Loop* actor model,
similar to that offered by the E programming language
([Wikipedia](https://en.wikipedia.org/wiki/E_(programming_language));
[erights.org](http://erights.org/)).
**Note.** In the full Syndicated Actor model, entities are arranged in a tree of
*facets*; the current Rust implementation does not yet include support
for facets.
- Actor, ActorRef, ActorState, Mailbox
- [Actor], [ActorRef], [Facet], [FacetRef], [ActorState], [Mailbox],
[Activation]
**References.**

@ -16,12 +16,14 @@ use preserves::value::Domain;
use preserves::value::IOValue;
use preserves::value::Map;
use preserves::value::NestedValue;
use preserves::value::Set;
use preserves_schema::support::ParseError;
use std::boxed::Box;
use std::collections::hash_map::HashMap;
use std::convert::TryFrom;
use std::convert::TryInto;
use std::num::NonZeroU64;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
@ -50,6 +52,9 @@ pub type AnyValue = super::schemas::internal_protocol::_Any;
/// The type of process-unique actor IDs.
pub type ActorId = u64;
/// The type of process-unique facet IDs.
pub type FacetId = NonZeroU64;
/// The type of process-unique assertion handles.
///
/// Used both as a reference to [retract][Entity::retract]
@ -60,8 +65,8 @@ pub type Handle = u64;
/// Responses to events must have type `ActorResult`.
pub type ActorResult = Result<(), Error>;
/// Methods [`Actor::boot`] and [`Actor::start`] return an
/// `ActorHandle`, representing the actor's mainloop task.
/// The [`Actor::boot`] method returns an `ActorHandle`, representing
/// the actor's mainloop task.
pub type ActorHandle = tokio::task::JoinHandle<ActorResult>;
/// A small protocol for indicating successful synchronisation with
@ -193,17 +198,22 @@ pub type PendingEventQueue = Vec<Action>;
/// The main API for programming Syndicated Actor objects.
///
/// Through `Activation`s, programs can access the state of their
/// animating [`RunningActor`].
/// animating [`RunningActor`] and their active [`Facet`].
///
/// Usually, an `Activation` will be supplied to code that needs one; but when non-Actor code
/// (such as a [linked task][crate::actor#linked-tasks]) needs to enter an Actor's execution
/// context, use [`FacetRef::activate`] to construct one.
///
/// Many actions that an entity can perform are methods directly on
/// `Activation`, but methods on the [`RunningActor`] and [`ActorRef`]
/// `Activation`, but methods on the [`RunningActor`] and [`FacetRef`]
/// values contained in an `Activation` are also sometimes useful.
///
/// This is what other implementations call a "Turn", renamed here to
/// avoid conflicts with [`crate::schemas::internal_protocol::Turn`].
pub struct Activation<'activation> {
/// A reference to the implementation-side of the currently active [`Actor`].
pub actor: ActorRef,
/// A reference to the currently active [`Facet`] and the implementation-side state of its
/// [`Actor`].
pub facet: FacetRef,
/// A reference to the current state of the active [`Actor`].
pub state: &'activation mut RunningActor,
pending: EventBuffer,
@ -272,6 +282,14 @@ pub struct ActorRef {
state: Arc<Mutex<ActorState>>,
}
/// A combination of an [`ActorRef`] with a [`FacetId`], acting as a capability to enter the
/// execution context of a facet from a linked task.
#[derive(Clone)]
pub struct FacetRef {
pub actor: ActorRef,
pub facet_id: FacetId,
}
/// The state of an actor: either `Running` or `Terminated`.
pub enum ActorState {
/// A non-terminated actor has an associated [`RunningActor`] state record.
@ -290,9 +308,40 @@ pub struct RunningActor {
pub actor_id: ActorId,
tx: UnboundedSender<SystemMessage>,
mailbox: Weak<Mailbox>,
exit_hooks: Vec<Box<dyn Send + FnOnce(&mut Activation, &Arc<ActorResult>) -> ActorResult>>,
facet_nodes: Map<FacetId, Facet>,
facet_children: Map<FacetId, Set<FacetId>>,
root: FacetId,
}
/// State associated with each facet in an [`Actor`]'s facet tree.
///
/// # Inert facets
///
/// A facet is considered *inert* if:
///
/// 1. it has no child facets;
/// 2. it has no cleanup actions (that is, no assertions placed by any of its entities);
/// 3. it has no linked tasks; and
/// 4. it has no "inert check preventers" (see [Activation::prevent_inert_check]).
///
/// If a facet is created and is inert at the moment that its `boot` function returns, it is
/// automatically terminated.
///
/// When a facet is terminated, if its parent facet is inert, the parent is terminated.
///
/// If the root facet in an actor is terminated, the entire actor is terminated (with exit
/// status `Ok(())`).
///
pub struct Facet {
/// The ID of the facet.
pub facet_id: FacetId,
/// The ID of the facet's parent facet, if any; if None, this facet is the `Actor`'s root facet.
pub parent_facet_id: Option<FacetId>,
cleanup_actions: CleanupActions,
stop_actions: Vec<Action>,
linked_tasks: Map<u64, CancellationToken>,
exit_hooks: Vec<Box<dyn Send + FnOnce(&mut Activation, &Arc<ActorResult>) -> ActorResult>>,
inert_check_preventers: Arc<AtomicU64>,
}
/// A reference to an object that expects messages/assertions of type
@ -303,6 +352,8 @@ pub struct RunningActor {
pub struct Ref<M> {
/// Mailbox of the actor owning the referenced entity.
pub mailbox: Arc<Mailbox>,
/// ID of the facet (within the actor) owning the referenced entity.
pub facet_id: FacetId,
/// Mutex owning and guarding the state backing the referenced entity.
pub target: Mutex<Option<Box<dyn Entity<M>>>>,
}
@ -352,15 +403,22 @@ pub fn next_actor_id() -> ActorId {
NEXT_ACTOR_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed)
}
static NEXT_HANDLE: AtomicU64 = AtomicU64::new(2);
static NEXT_FACET_ID: AtomicU64 = AtomicU64::new(2);
#[doc(hidden)]
pub fn next_facet_id() -> FacetId {
FacetId::new(NEXT_FACET_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed))
.expect("Internal error: Attempt to allocate FacetId of zero. Too many FacetIds allocated. Restart the process.")
}
static NEXT_HANDLE: AtomicU64 = AtomicU64::new(3);
/// Allocate a process-unique `Handle`.
pub fn next_handle() -> Handle {
NEXT_HANDLE.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed)
}
static NEXT_ACCOUNT_ID: AtomicU64 = AtomicU64::new(3);
static NEXT_ACCOUNT_ID: AtomicU64 = AtomicU64::new(4);
static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(4);
static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(5);
preserves_schema::support::lazy_static! {
#[doc(hidden)]
@ -394,29 +452,20 @@ impl From<&Synced> for AnyValue {
}
}
impl<'activation> Activation<'activation> {
fn make(actor: &ActorRef, account: Arc<Account>, state: &'activation mut RunningActor) -> Self {
Activation {
actor: actor.clone(),
state,
pending: EventBuffer::new(account),
}
}
/// Constructs and executes `f` in a new "turn" for `actor`. If
/// `f` returns `Ok(())`, [commits the turn][Self::deliver] and
/// performs the buffered actions; otherwise, [abandons the
/// turn][Self::clear] and discards the buffered actions.
impl FacetRef {
/// Executes `f` in a new "[turn][Activation]" for `actor`. If `f` returns `Ok(())`,
/// [commits the turn][Activation::deliver] and performs the buffered actions; otherwise,
/// [abandons the turn][Activation::clear] and discards the buffered actions.
///
/// Bills any activity to `account`.
pub fn for_actor<F>(
actor: &ActorRef,
pub fn activate<F>(
&self,
account: Arc<Account>,
f: F,
) -> ActorResult where
F: FnOnce(&mut Activation) -> ActorResult,
{
match Self::for_actor_exit(actor, account, |t| match f(t) {
match self.activate_exit(account, |t| match f(t) {
Ok(()) => None,
Err(e) => Some(Err(e)),
}) {
@ -425,28 +474,26 @@ impl<'activation> Activation<'activation> {
}
}
/// Constructs and executes `f` in a new "turn" for `actor`. If
/// `f` returns `Some(exit_status)`, terminates `actor` with that
/// `exit_status`. Otherwise, if `f` returns `None`, leaves
/// `actor` in runnable state. [Commits buffered
/// actions][Self::deliver] unless `actor` terminates with an
/// `Err` status.
/// Executes `f` in a new "[turn][Activation]" for `actor`. If `f` returns
/// `Some(exit_status)`, terminates `actor` with that `exit_status`. Otherwise, if `f`
/// returns `None`, leaves `actor` in runnable state. [Commits buffered
/// actions][Activation::deliver] unless `actor` terminates with an `Err` status.
///
/// Bills any activity to `account`.
pub fn for_actor_exit<F>(
actor: &ActorRef,
pub fn activate_exit<F>(
&self,
account: Arc<Account>,
f: F,
) -> Option<ActorResult> where
F: FnOnce(&mut Activation) -> Option<ActorResult>,
{
match actor.state.lock() {
match self.actor.state.lock() {
Err(_) => panicked_err(),
Ok(mut g) => match &mut *g {
ActorState::Terminated { exit_status } =>
Some((**exit_status).clone()),
ActorState::Running(state) => {
let mut activation = Activation::make(actor, account, state);
let mut activation = Activation::make(self, account, state);
match f(&mut activation) {
None => None,
Some(exit_status) => {
@ -455,12 +502,19 @@ impl<'activation> Activation<'activation> {
}
drop(activation);
let exit_status = Arc::new(exit_status);
let mut t = Activation::make(actor, Account::new(crate::name!("shutdown")), state);
let mut t = Activation::make(&self.actor.facet_ref(state.root),
Account::new(crate::name!("shutdown")),
state);
for action in std::mem::take(&mut t.state.exit_hooks) {
if let Err(err) = action(&mut t, &exit_status) {
tracing::error!(err = debug(err), "error in exit hook");
}
}
if let Err(err) = t._terminate_facet(t.state.root, false) {
// This can only occur as the result of an internal error in this file's code.
tracing::error!(err = debug(err), "unexpected error from disorderly terminate_facet");
panic!("Unexpected error result from disorderly terminate_facet");
}
*g = ActorState::Terminated {
exit_status: Arc::clone(&exit_status),
};
@ -471,29 +525,72 @@ impl<'activation> Activation<'activation> {
}
}
}
}
impl<'activation> Activation<'activation> {
fn make(
facet: &FacetRef,
account: Arc<Account>,
state: &'activation mut RunningActor,
) -> Self {
Activation {
facet: facet.clone(),
state,
pending: EventBuffer::new(account),
}
}
fn immediate_oid<M>(&self, r: &Arc<Ref<M>>) {
if r.mailbox.actor_id != self.actor.actor_id {
if r.mailbox.actor_id != self.facet.actor.actor_id {
panic!("Cannot use for_myself to send to remote peers");
}
}
fn with_facet<F>(&mut self, check_existence: bool, facet_id: FacetId, f: F) -> ActorResult
where
F: FnOnce(&mut Activation) -> ActorResult,
{
if !check_existence || self.state.facet_nodes.contains_key(&facet_id) {
let old_facet_id = self.facet.facet_id;
self.facet.facet_id = facet_id;
let result = f(self);
self.facet.facet_id = old_facet_id;
result
} else {
Ok(())
}
}
#[doc(hidden)]
pub fn with_entity<M, F>(&mut self, r: &Arc<Ref<M>>, f: F) -> ActorResult where
F: FnOnce(&mut Activation, &mut dyn Entity<M>) -> ActorResult
{
self.with_facet(true, r.facet_id, |t| r.internal_with_entity(|e| f(t, e)))
}
fn active_facet<'a>(&'a mut self) -> Option<&'a mut Facet> {
self.state.get_facet(self.facet.facet_id)
}
/// Core API: assert `a` at recipient `r`.
///
/// Returns the [`Handle`] for the new assertion.
pub fn assert<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, a: M) -> Handle {
let handle = next_handle();
{
let r = Arc::clone(r);
self.pending.queue_for(&r).push(Box::new(
move |t| r.with_entity(|e| e.assert(t, a, handle))));
}
{
let r = Arc::clone(r);
self.state.cleanup_actions.insert(
handle,
CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new(
move |t| r.with_entity(|e| e.retract(t, handle)))));
if let Some(f) = self.active_facet() {
{
let r = Arc::clone(r);
f.cleanup_actions.insert(
handle,
CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new(
move |t| t.with_entity(&r, |t, e| e.retract(t, handle)))));
}
drop(f);
{
let r = Arc::clone(r);
self.pending.queue_for(&r).push(Box::new(
move |t| t.with_entity(&r, |t, e| e.assert(t, a, handle))));
}
}
handle
}
@ -515,25 +612,30 @@ impl<'activation> Activation<'activation> {
pub fn assert_for_myself<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, a: M) -> Handle {
self.immediate_oid(r);
let handle = next_handle();
{
let r = Arc::clone(r);
self.pending.for_myself.push(Box::new(
move |t| r.with_entity(|e| e.assert(t, a, handle))));
}
{
let r = Arc::clone(r);
self.state.cleanup_actions.insert(
handle,
CleanupAction::ForMyself(Box::new(
move |t| r.with_entity(|e| e.retract(t, handle)))));
if let Some(f) = self.active_facet() {
{
let r = Arc::clone(r);
f.cleanup_actions.insert(
handle,
CleanupAction::ForMyself(Box::new(
move |t| t.with_entity(&r, |t, e| e.retract(t, handle)))));
}
drop(f);
{
let r = Arc::clone(r);
self.pending.for_myself.push(Box::new(
move |t| t.with_entity(&r, |t, e| e.assert(t, a, handle))));
}
}
handle
}
/// Core API: retract a previously-established assertion.
pub fn retract(&mut self, handle: Handle) {
if let Some(d) = self.state.cleanup_actions.remove(&handle) {
self.pending.execute_cleanup_action(d)
if let Some(f) = self.active_facet() {
if let Some(d) = f.cleanup_actions.remove(&handle) {
self.pending.execute_cleanup_action(d)
}
}
}
@ -541,7 +643,7 @@ impl<'activation> Activation<'activation> {
pub fn message<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, m: M) {
let r = Arc::clone(r);
self.pending.queue_for(&r).push(Box::new(
move |t| r.with_entity(|e| e.message(t, m))))
move |t| t.with_entity(&r, |t, e| e.message(t, m))))
}
/// Core API: send message `m` to recipient `r`, which must be a
@ -558,7 +660,7 @@ impl<'activation> Activation<'activation> {
self.immediate_oid(r);
let r = Arc::clone(r);
self.pending.for_myself.push(Box::new(
move |t| r.with_entity(|e| e.message(t, m))))
move |t| t.with_entity(&r, |t, e| e.message(t, m))))
}
/// Core API: begins a synchronisation with `r`.
@ -569,7 +671,7 @@ impl<'activation> Activation<'activation> {
pub fn sync<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, peer: Arc<Ref<Synced>>) {
let r = Arc::clone(r);
self.pending.queue_for(&r).push(Box::new(
move |t| r.with_entity(|e| e.sync(t, peer))))
move |t| t.with_entity(&r, |t, e| e.sync(t, peer))))
}
/// Retrieve the [`Account`] against which actions are recorded.
@ -596,6 +698,182 @@ impl<'activation> Activation<'activation> {
pub fn deliver(&mut self) {
self.pending.deliver();
}
/// Construct an entity with behaviour [`InertEntity`] within the active facet.
pub fn inert_entity<M>(&mut self) -> Arc<Ref<M>> {
self.create(InertEntity)
}
/// Construct an entity with behaviour `e` within the active facet.
pub fn create<M, E: Entity<M> + Send + 'static>(&mut self, e: E) -> Arc<Ref<M>> {
let r = self.create_inert();
r.become_entity(e);
r
}
/// Construct an entity (within the active facet) whose behaviour will be specified later
/// via [`become_entity`][Ref::become_entity].
pub fn create_inert<M>(&mut self) -> Arc<Ref<M>> {
Arc::new(Ref {
mailbox: self.state.mailbox(),
facet_id: self.facet.facet_id,
target: Mutex::new(None),
})
}
/// Start a new [linked task][crate::actor#linked-tasks] attached to the active facet. The
/// task will execute the future "`boot`" to completion unless it is cancelled first (by
/// e.g. termination of the owning facet or crashing of the owning actor). Uses `name` for
/// log messages emitted by the task.
pub fn linked_task<F: 'static + Send + futures::Future<Output = ActorResult>>(
&mut self,
name: tracing::Span,
boot: F,
) {
let mailbox = self.state.mailbox();
if let Some(f) = self.active_facet() {
let token = CancellationToken::new();
let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
name.record("task_id", &task_id);
{
let token = token.clone();
tokio::spawn(async move {
tracing::trace!(task_id, "linked task start");
select! {
_ = token.cancelled() => {
tracing::trace!(task_id, "linked task cancelled");
Ok(())
}
result = boot => {
match &result {
Ok(()) => {
tracing::trace!(task_id, "linked task normal stop");
()
}
Err(e) => {
tracing::error!(task_id, "linked task error: {}", e);
let _ = mailbox.tx.send(SystemMessage::Crash(e.clone()));
()
}
}
result
}
}
}.instrument(name));
}
f.linked_tasks.insert(task_id, token);
}
}
/// Create a new subfacet of the currently-active facet. Runs `boot` in the new facet's
/// context. If `boot` returns leaving the new facet [inert][Facet#inert-facets],
pub fn facet<F: 'static + Send + FnOnce(&mut Activation) -> ActorResult>(
&mut self,
boot: F,
) -> Result<FacetId, Error> {
let f = Facet::new(Some(self.facet.facet_id));
let facet_id = f.facet_id;
self.state.facet_nodes.insert(facet_id, f);
self.state.facet_children.entry(self.facet.facet_id).or_default().insert(facet_id);