diff --git a/syndicate-server/examples/consumer.rs b/syndicate-server/examples/consumer.rs index 376ac34..6d93b63 100644 --- a/syndicate-server/examples/consumer.rs +++ b/syndicate-server/examples/consumer.rs @@ -24,13 +24,13 @@ pub struct Config { async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; Actor::new().boot(syndicate::name!("consumer"), |t| { - let ac = t.actor.clone(); + let facet = t.facet.clone(); let boot_account = Arc::clone(t.account()); - Ok(t.state.linked_task(tracing::Span::current(), async move { + Ok(t.linked_task(tracing::Span::current(), async move { let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - Activation::for_actor(&ac, boot_account, |t| { + facet.activate(boot_account, |t| { relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| { let consumer = syndicate::entity(0) .on_message(|message_count, _t, m: AnyValue| { @@ -42,21 +42,22 @@ async fn main() -> Result<(), Box> { } Ok(()) }) - .create_cap(t.state); + .create_cap(t); ds.assert(t, &Observe { pattern: syndicate_macros::pattern!(""), observer: Arc::clone(&consumer), }); - t.state.linked_task(syndicate::name!("tick"), async move { + t.linked_task(syndicate::name!("tick"), async move { let mut stats_timer = interval(Duration::from_secs(1)); loop { stats_timer.tick().await; let consumer = Arc::clone(&consumer); external_event(&Arc::clone(&consumer.underlying.mailbox), &Account::new(syndicate::name!("account")), - Box::new(move |t| consumer.underlying.with_entity( - |e| e.message(t, AnyValue::new(true)))))?; + Box::new(move |t| t.with_entity( + &consumer.underlying, + |t, e| e.message(t, AnyValue::new(true)))))?; } }); Ok(None) diff --git a/syndicate-server/examples/pingpong.rs b/syndicate-server/examples/pingpong.rs index 3525465..f579645 100644 --- a/syndicate-server/examples/pingpong.rs +++ b/syndicate-server/examples/pingpong.rs @@ -92,13 +92,13 @@ async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; Actor::new().boot(syndicate::name!("pingpong"), |t| { - let ac = t.actor.clone(); + let facet = t.facet.clone(); let boot_account = Arc::clone(t.account()); - Ok(t.state.linked_task(tracing::Span::current(), async move { + Ok(t.linked_task(tracing::Span::current(), async move { let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - Activation::for_actor(&ac, boot_account, |t| { + facet.activate(boot_account, |t| { relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { let (send_label, recv_label, report_latency_every, should_echo, bytes_padding) = @@ -116,7 +116,7 @@ async fn main() -> Result<(), Box> { let mut rtt_ns_samples: Vec = vec![0; report_latency_every]; let mut rtt_batch_count: usize = 0; let mut current_reply = None; - let self_ref = t.state.create_inert(); + let self_ref = t.create_inert(); self_ref.become_entity( syndicate::entity(Arc::clone(&self_ref)) .on_message(move |self_ref, t, m: AnyValue| { @@ -177,15 +177,16 @@ async fn main() -> Result<(), Box> { observer: Arc::clone(&consumer), }); - t.state.linked_task(syndicate::name!("tick"), async move { + t.linked_task(syndicate::name!("tick"), async move { let mut stats_timer = interval(Duration::from_secs(1)); loop { stats_timer.tick().await; let consumer = Arc::clone(&consumer); external_event(&Arc::clone(&consumer.underlying.mailbox), &Account::new(syndicate::name!("account")), - Box::new(move |t| consumer.underlying.with_entity( - |e| e.message(t, AnyValue::new(true)))))?; + Box::new(move |t| t.with_entity( + &consumer.underlying, + |t, e| e.message(t, AnyValue::new(true)))))?; } }); @@ -193,7 +194,7 @@ async fn main() -> Result<(), Box> { let turn_count = c.turn_count; let action_count = c.action_count; let account = Arc::clone(t.account()); - t.state.linked_task(syndicate::name!("boot-ping"), async move { + t.linked_task(syndicate::name!("boot-ping"), async move { let padding: AnyValue = Value::ByteString(vec![0; bytes_padding]).wrap(); for _ in 0..turn_count { let mut events: PendingEventQueue = vec![]; @@ -203,8 +204,9 @@ async fn main() -> Result<(), Box> { for _ in 0..action_count { let ds = Arc::clone(&ds); let current_rec = current_rec.clone(); - events.push(Box::new(move |t| ds.underlying.with_entity( - |e| e.message(t, current_rec)))); + events.push(Box::new(move |t| t.with_entity( + &ds.underlying, + |t, e| e.message(t, current_rec)))); } external_events(&ds.underlying.mailbox, &account, events)? } diff --git a/syndicate-server/examples/producer.rs b/syndicate-server/examples/producer.rs index 598ff22..f8ecc18 100644 --- a/syndicate-server/examples/producer.rs +++ b/syndicate-server/examples/producer.rs @@ -33,26 +33,27 @@ fn says(who: AnyValue, what: AnyValue) -> AnyValue { async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; Actor::new().boot(syndicate::name!("producer"), |t| { - let ac = t.actor.clone(); + let facet = t.facet.clone(); let boot_account = Arc::clone(t.account()); - Ok(t.state.linked_task(tracing::Span::current(), async move { + Ok(t.linked_task(tracing::Span::current(), async move { let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - Activation::for_actor(&ac, boot_account, |t| { + facet.activate(boot_account, |t| { relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { let padding: AnyValue = Value::ByteString(vec![0; config.bytes_padding]).wrap(); let action_count = config.action_count; let account = Account::new(syndicate::name!("account")); - t.state.linked_task(syndicate::name!("sender"), async move { + t.linked_task(syndicate::name!("sender"), async move { loop { account.ensure_clear_funds().await; let mut events: PendingEventQueue = Vec::new(); for _ in 0..action_count { let ds = Arc::clone(&ds); let padding = padding.clone(); - events.push(Box::new(move |t| ds.underlying.with_entity( - |e| e.message(t, says(Value::from("producer").wrap(), padding))))); + events.push(Box::new(move |t| t.with_entity( + &ds.underlying, + |t, e| e.message(t, says(Value::from("producer").wrap(), padding))))); } external_events(&ds.underlying.mailbox, &account, events)?; } diff --git a/syndicate-server/examples/state-consumer.rs b/syndicate-server/examples/state-consumer.rs index b0f1019..fdbc01c 100644 --- a/syndicate-server/examples/state-consumer.rs +++ b/syndicate-server/examples/state-consumer.rs @@ -24,13 +24,13 @@ pub struct Config { async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; Actor::new().boot(syndicate::name!("state-consumer"), |t| { - let ac = t.actor.clone(); + let facet = t.facet.clone(); let boot_account = Arc::clone(t.account()); - Ok(t.state.linked_task(tracing::Span::current(), async move { + Ok(t.linked_task(tracing::Span::current(), async move { let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - Activation::for_actor(&ac, boot_account, |t| { + facet.activate(boot_account, |t| { relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| { let consumer = { #[derive(Default)] @@ -61,7 +61,7 @@ async fn main() -> Result<(), Box> { s.arrival_counter = 0; s.departure_counter = 0; Ok(()) - }).create_cap(t.state) + }).create_cap(t) }; ds.assert(t, &Observe { @@ -69,15 +69,16 @@ async fn main() -> Result<(), Box> { observer: Arc::clone(&consumer), }); - t.state.linked_task(syndicate::name!("tick"), async move { + t.linked_task(syndicate::name!("tick"), async move { let mut stats_timer = interval(Duration::from_secs(1)); loop { stats_timer.tick().await; let consumer = Arc::clone(&consumer); external_event(&Arc::clone(&consumer.underlying.mailbox), &Account::new(syndicate::name!("account")), - Box::new(move |t| consumer.underlying.with_entity( - |e| e.message(t, AnyValue::new(true)))))?; + Box::new(move |t| t.with_entity( + &consumer.underlying, + |t, e| e.message(t, AnyValue::new(true)))))?; } }); Ok(None) diff --git a/syndicate-server/examples/state-producer.rs b/syndicate-server/examples/state-producer.rs index 013c35b..6376ed4 100644 --- a/syndicate-server/examples/state-producer.rs +++ b/syndicate-server/examples/state-producer.rs @@ -19,16 +19,16 @@ pub struct Config { async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; Actor::new().boot(syndicate::name!("state-producer"), |t| { - let ac = t.actor.clone(); + let facet = t.facet.clone(); let boot_account = Arc::clone(t.account()); - Ok(t.state.linked_task(tracing::Span::current(), async move { + Ok(t.linked_task(tracing::Span::current(), async move { let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - Activation::for_actor(&ac, boot_account, |t| { + facet.activate(boot_account, |t| { relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { let account = Account::new(syndicate::name!("account")); - t.state.linked_task(syndicate::name!("sender"), async move { + t.linked_task(syndicate::name!("sender"), async move { let presence: AnyValue = Value::simple_record1( "Present", Value::from(std::process::id()).wrap()).wrap(); @@ -38,13 +38,13 @@ async fn main() -> Result<(), Box> { let presence = presence.clone(); let handle = handle.clone(); external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new( - move |t| ds.underlying.with_entity(|e| e.assert(t, presence, handle)))) + move |t| t.with_entity(&ds.underlying, |t, e| e.assert(t, presence, handle)))) }; let retract_e = || { let ds = Arc::clone(&ds); let handle = handle.clone(); external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new( - move |t| ds.underlying.with_entity(|e| e.retract(t, handle)))) + move |t| t.with_entity(&ds.underlying, |t, e| e.retract(t, handle)))) }; assert_e()?; loop { diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index 5abd757..53d1c1f 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -72,13 +72,11 @@ async fn main() -> Result<(), Box> { tracing::info!(r""); } - let mut non_daemons = Vec::new(); - tracing::trace!("startup"); if config.debt_reporter { Actor::new().boot(syndicate::name!("debt-reporter"), |t| { - t.state.linked_task(syndicate::name!("tick"), async { + t.linked_task(syndicate::name!("tick"), async { let mut timer = tokio::time::interval(core::time::Duration::from_secs(1)); loop { timer.tick().await; @@ -93,51 +91,49 @@ async fn main() -> Result<(), Box> { }); } - let ds = Cap::new(&Actor::create_and_start(syndicate::name!("dataspace"), Dataspace::new())); + Actor::new().boot(syndicate::name!("dataspace"), move |t| { + let ds = Cap::new(&t.create(Dataspace::new())); - if config.inferior { - let ds = Arc::clone(&ds); - non_daemons.push( - Actor::new().boot(syndicate::name!("parent"), move |t| run_io_relay( - t, - relay::Input::Bytes(Box::pin(tokio::io::stdin())), - relay::Output::Bytes(Box::pin(tokio::io::stdout())), - ds))); - } - - let gateway = Cap::guard(&Actor::create_and_start( - syndicate::name!("gateway"), - syndicate::entity(Arc::clone(&ds)).on_asserted(handle_resolve))); - { - let ds = Arc::clone(&ds); - Actor::new().boot(syndicate::name!("rootcap"), move |t| { + { use syndicate::schemas::gatekeeper; let key = vec![0; 16]; let sr = sturdy::SturdyRef::mint(_Any::new("syndicate"), &key); tracing::info!(rootcap = debug(&_Any::from(&sr))); tracing::info!(rootcap = display(sr.to_hex())); ds.assert(t, &gatekeeper::Bind { oid: sr.oid.clone(), key, target: ds.clone() }); - Ok(()) - }); - } + } - for port in config.ports.clone() { - let gateway = Arc::clone(&gateway); - non_daemons.push(Actor::new().boot( - syndicate::name!("tcp", port), - move |t| Ok(t.state.linked_task(syndicate::name!("listener"), - run_tcp_listener(gateway, port))))); - } + if config.inferior { + let ds = Arc::clone(&ds); + Actor::new().boot(syndicate::name!("parent"), move |t| run_io_relay( + t, + relay::Input::Bytes(Box::pin(tokio::io::stdin())), + relay::Output::Bytes(Box::pin(tokio::io::stdout())), + ds)); + } - for path in config.sockets.clone() { - let gateway = Arc::clone(&gateway); - non_daemons.push(Actor::new().boot( - syndicate::name!("unix", socket = debug(path.to_str().expect("representable UnixListener path"))), - move |t| Ok(t.state.linked_task(syndicate::name!("listener"), - run_unix_listener(gateway, path))))); - } + let gateway = Cap::guard(&t.create( + syndicate::entity(Arc::clone(&ds)).on_asserted(handle_resolve))); + + for port in config.ports.clone() { + let gateway = Arc::clone(&gateway); + Actor::new().boot( + syndicate::name!("tcp", port), + move |t| Ok(t.linked_task(syndicate::name!("listener"), + run_tcp_listener(gateway, port)))); + } + + for path in config.sockets.clone() { + let gateway = Arc::clone(&gateway); + Actor::new().boot( + syndicate::name!("unix", socket = debug(path.to_str().expect("representable UnixListener path"))), + move |t| Ok(t.linked_task(syndicate::name!("listener"), + run_unix_listener(gateway, path)))); + } + + Ok(()) + }).await??; - futures::future::join_all(non_daemons).await; Ok(()) } @@ -183,24 +179,24 @@ fn run_io_relay( o: relay::Output, initial_ref: Arc, ) -> ActorResult { - let exit_listener = t.state.create(ExitListener); + let exit_listener = t.create(ExitListener); t.state.add_exit_hook(&exit_listener); relay::TunnelRelay::run(t, i, o, Some(initial_ref), None); Ok(()) } fn run_connection( - ac: ActorRef, + facet: FacetRef, i: relay::Input, o: relay::Output, initial_ref: Arc, ) -> ActorResult { - Activation::for_actor(&ac, Account::new(syndicate::name!("start-session")), - |t| run_io_relay(t, i, o, initial_ref)) + facet.activate(Account::new(syndicate::name!("start-session")), + |t| run_io_relay(t, i, o, initial_ref)) } async fn detect_protocol( - ac: ActorRef, + facet: FacetRef, stream: TcpStream, gateway: Arc, addr: std::net::SocketAddr, @@ -229,7 +225,7 @@ async fn detect_protocol( _ => unreachable!() } }; - run_connection(ac, i, o, gateway) + run_connection(facet, i, o, gateway) } async fn run_tcp_listener( @@ -244,9 +240,9 @@ async fn run_tcp_listener( let gateway = Arc::clone(&gateway); let ac = Actor::new(); ac.boot(syndicate::name!(parent: None, "tcp"), - move |t| Ok(t.state.linked_task( + move |t| Ok(t.linked_task( tracing::Span::current(), - detect_protocol(t.actor.clone(), stream, gateway, addr)))); + detect_protocol(t.facet.clone(), stream, gateway, addr)))); } } @@ -261,24 +257,21 @@ async fn run_unix_listener( let (stream, _addr) = listener.accept().await?; let peer = stream.peer_cred()?; let gateway = Arc::clone(&gateway); - let ac = Actor::new(); - ac.boot(syndicate::name!(parent: None, - "unix", - pid = debug(peer.pid().unwrap_or(-1)), - uid = peer.uid()), - move |t| Ok(t.state.linked_task( - tracing::Span::current(), - { - let ac = t.actor.clone(); - async move { - tracing::info!(protocol = display("unix")); - let (i, o) = stream.into_split(); - run_connection(ac, - relay::Input::Bytes(Box::pin(i)), - relay::Output::Bytes(Box::pin(o)), - gateway) - } - }))); + Actor::new().boot( + syndicate::name!(parent: None, "unix", pid = debug(peer.pid().unwrap_or(-1)), uid = peer.uid()), + |t| Ok(t.linked_task( + tracing::Span::current(), + { + let facet = t.facet.clone(); + async move { + tracing::info!(protocol = display("unix")); + let (i, o) = stream.into_split(); + run_connection(facet, + relay::Input::Bytes(Box::pin(i)), + relay::Output::Bytes(Box::pin(o)), + gateway) + } + }))); } } @@ -342,7 +335,7 @@ fn handle_resolve( } } }) - .create_cap(t.state); + .create_cap(t); if let Some(oh) = ds.assert(t, &dataspace::Observe { // TODO: codegen plugin to generate pattern constructors pattern: syndicate_macros::pattern!(""), diff --git a/syndicate/benches/bench_dataspace.rs b/syndicate/benches/bench_dataspace.rs index 4acb516..43b0231 100644 --- a/syndicate/benches/bench_dataspace.rs +++ b/syndicate/benches/bench_dataspace.rs @@ -7,6 +7,7 @@ use std::sync::atomic::Ordering; use std::time::Instant; use syndicate::actor::*; +use syndicate::during::entity; use syndicate::dataspace::Dataspace; use syndicate::schemas::dataspace::Observe; use syndicate::schemas::dataspace_patterns as p; @@ -30,7 +31,7 @@ struct ShutdownEntity; impl Entity for ShutdownEntity { fn message(&mut self, t: &mut Activation, _m: AnyValue) -> ActorResult { - t.state.shutdown(); + t.stop(); Ok(()) } } @@ -53,20 +54,22 @@ pub fn bench_pub(c: &mut Criterion) { let start = Instant::now(); rt.block_on(async move { Actor::new().boot(syndicate::name!("dataspace"), move |t| { - let ds = t.state.create(Dataspace::new()); - let shutdown = t.state.create(ShutdownEntity); + let ds = t.create(Dataspace::new()); + let shutdown = t.create(ShutdownEntity); let account = Account::new(syndicate::name!("sender-account")); - t.state.linked_task(syndicate::name!("sender"), async move { + t.linked_task(syndicate::name!("sender"), async move { for _ in 0..iters { let ds = Arc::clone(&ds); external_event(&Arc::clone(&ds.mailbox), &account, Box::new( - move |t| ds.with_entity( - |e| e.message(t, says(AnyValue::new("bench_pub"), - Value::ByteString(vec![]).wrap())))))? + move |t| t.with_entity( + &ds, + |t, e| e.message(t, says(AnyValue::new("bench_pub"), + Value::ByteString(vec![]).wrap())))))? } external_event(&Arc::clone(&shutdown.mailbox), &account, Box::new( - move |t| shutdown.with_entity( - |e| e.message(t, AnyValue::new(true)))))?; + move |t| t.with_entity( + &shutdown, + |t, e| e.message(t, AnyValue::new(true)))))?; Ok(()) }); Ok(()) @@ -80,70 +83,96 @@ pub fn bench_pub(c: &mut Criterion) { b.iter_custom(|iters| { let start = Instant::now(); rt.block_on(async move { - let ds = Cap::new( - &Actor::create_and_start(syndicate::name!("dataspace"), Dataspace::new())); let turn_count = Arc::new(AtomicU64::new(0)); - { + Actor::new().boot(syndicate::name!("dataspace"), { let iters = iters.clone(); let turn_count = Arc::clone(&turn_count); - Actor::new().boot(syndicate::name!("consumer"), move |t| { - struct Receiver(Arc); - impl Entity for Receiver { - fn message(&mut self, _t: &mut Activation, _m: AnyValue) -> ActorResult { - self.0.fetch_add(1, Ordering::Relaxed); - Ok(()) - } - } - let shutdown = Cap::new(&t.state.create(ShutdownEntity)); - let receiver = Cap::new(&t.state.create(Receiver(Arc::clone(&turn_count)))); + move |t| { + let ds = Cap::new(&t.create(Dataspace::new())); + + let shutdown = entity(()) + .on_asserted(|_, _, _| { + Ok(Some(Box::new(|_, t| { + t.stop(); + Ok(()) + }))) + }) + .create_cap(t); - ds.assert(t, &Observe { - pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { - ctor: Box::new(p::CRec { - label: Value::symbol("Says").wrap(), - arity: 2.into(), - }), - members: Map::from_iter(vec![ - (0.into(), p::Pattern::DLit(Box::new(p::DLit { - value: AnyValue::new("bench_pub"), - }))), - (1.into(), p::Pattern::DBind(Box::new(p::DBind { - pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), - }))), - ].into_iter()), - })), - observer: receiver, - }); ds.assert(t, &Observe { pattern: p::Pattern::DBind(Box::new(p::DBind { pattern: p::Pattern::DLit(Box::new(p::DLit { - value: AnyValue::new(true), + value: Value::symbol("consumer").wrap(), })), })), observer: shutdown, }); - let account = Arc::clone(t.account()); - t.state.linked_task(syndicate::name!("sender"), async move { - for _ in 0..iters { - let ds = Arc::clone(&ds); - external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new( - move |t| ds.underlying.with_entity( - |e| e.message(t, says(AnyValue::new("bench_pub"), - Value::ByteString(vec![]).wrap())))))? - } - { - let ds = Arc::clone(&ds); - external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new( - move |t| ds.underlying.with_entity( - |e| e.message(t, AnyValue::new(true)))))?; + + Actor::new().boot(syndicate::name!("consumer"), move |t| { + struct Receiver(Arc); + impl Entity for Receiver { + fn message(&mut self, _t: &mut Activation, _m: AnyValue) -> ActorResult { + self.0.fetch_add(1, Ordering::Relaxed); + Ok(()) + } } + + let shutdown = Cap::new(&t.create(ShutdownEntity)); + let receiver = Cap::new(&t.create(Receiver(Arc::clone(&turn_count)))); + + ds.assert(t, Value::::symbol("consumer").wrap()); + ds.assert(t, &Observe { + pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { + ctor: Box::new(p::CRec { + label: Value::symbol("Says").wrap(), + arity: 2.into(), + }), + members: Map::from_iter(vec![ + (0.into(), p::Pattern::DLit(Box::new(p::DLit { + value: AnyValue::new("bench_pub"), + }))), + (1.into(), p::Pattern::DBind(Box::new(p::DBind { + pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), + }))), + ].into_iter()), + })), + observer: receiver, + }); + ds.assert(t, &Observe { + pattern: p::Pattern::DBind(Box::new(p::DBind { + pattern: p::Pattern::DLit(Box::new(p::DLit { + value: AnyValue::new(true), + })), + })), + observer: shutdown, + }); + + let account = Arc::clone(t.account()); + t.linked_task(syndicate::name!("sender"), async move { + for _i in 0..iters { + let ds = Arc::clone(&ds); + external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new( + move |t| t.with_entity( + &ds.underlying, + |t, e| e.message(t, says(AnyValue::new("bench_pub"), + Value::ByteString(vec![]).wrap())))))? + } + { + let ds = Arc::clone(&ds); + external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new( + move |t| t.with_entity( + &ds.underlying, + |t, e| e.message(t, AnyValue::new(true)))))?; + } + Ok(()) + }); Ok(()) }); Ok(()) - }).await.unwrap().unwrap(); - } + } + }).await.unwrap().unwrap(); let actual_turns = turn_count.load(Ordering::SeqCst); if actual_turns != iters { diff --git a/syndicate/doc/actor.md b/syndicate/doc/actor.md index fbeb22f..8d141b0 100644 --- a/syndicate/doc/actor.md +++ b/syndicate/doc/actor.md @@ -2,10 +2,12 @@ The [actor][crate::actor] module is the core of the Syndicated Actor model imple Central features: - - trait [`Entity`], the core protocol that must be implemented by - every object - struct [`Activation`], the API for programming a Syndicated Actor object + - trait [`Entity`], the core protocol that must be implemented by + every object + - struct [`Facet`], a node in the tree of nested conversations that + an Actor is participating in - type [`AnyValue`], the type of messages and assertions that can be exchanged among distributed objects, including via [dataspace][crate::dataspace] diff --git a/syndicate/doc/flow-control.md b/syndicate/doc/flow-control.md index 447cb68..ef3d674 100644 --- a/syndicate/doc/flow-control.md +++ b/syndicate/doc/flow-control.md @@ -1,4 +1,4 @@ # Flow control - - Account, LoanedItem - - start_debt_reporter + - [Account] + - [LoanedItem] diff --git a/syndicate/doc/linked-tasks.md b/syndicate/doc/linked-tasks.md index 28e9738..56fb133 100644 --- a/syndicate/doc/linked-tasks.md +++ b/syndicate/doc/linked-tasks.md @@ -1,3 +1,3 @@ # Linked Tasks - - linked_task + - [Activation::linked_task] diff --git a/syndicate/doc/what-is-an-actor.md b/syndicate/doc/what-is-an-actor.md index 10df4b5..08b0e6e 100644 --- a/syndicate/doc/what-is-an-actor.md +++ b/syndicate/doc/what-is-an-actor.md @@ -1,18 +1,23 @@ # What is an Actor? A [Syndicated Actor][Actor] is a collection of stateful -[Entities][Entity]. In the taxonomy of De Koster *et al.* -([2016](#DeKoster2016)), the Syndicated Actor model is a -*Communicating Event-Loop* actor model, similar to that offered by the -E programming language +[Entities][Entity], organised in a tree of [Facets][Facet], with each +facet representing a +[(sub)conversation](https://syndicate-lang.org/about/#conversational-concurrency-1) +that the Actor is engaged in. Each entity belongs to exactly one +facet; each facet has exactly one parent and zero or more children; +each actor has exactly one associated root facet. When a facet is its +actor's root facet, its parent is the actor itself; otherwise, its +parent is always another facet. + +In the taxonomy of De Koster *et al.* ([2016](#DeKoster2016)), the +Syndicated Actor model is a *Communicating Event-Loop* actor model, +similar to that offered by the E programming language ([Wikipedia](https://en.wikipedia.org/wiki/E_(programming_language)); [erights.org](http://erights.org/)). -**Note.** In the full Syndicated Actor model, entities are arranged in a tree of -*facets*; the current Rust implementation does not yet include support -for facets. - - - Actor, ActorRef, ActorState, Mailbox + - [Actor], [ActorRef], [Facet], [FacetRef], [ActorState], [Mailbox], + [Activation] **References.** diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index 9e46b95..4651b1b 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -16,12 +16,14 @@ use preserves::value::Domain; use preserves::value::IOValue; use preserves::value::Map; use preserves::value::NestedValue; +use preserves::value::Set; use preserves_schema::support::ParseError; use std::boxed::Box; use std::collections::hash_map::HashMap; use std::convert::TryFrom; use std::convert::TryInto; +use std::num::NonZeroU64; use std::sync::Arc; use std::sync::Mutex; use std::sync::RwLock; @@ -50,6 +52,9 @@ pub type AnyValue = super::schemas::internal_protocol::_Any; /// The type of process-unique actor IDs. pub type ActorId = u64; +/// The type of process-unique facet IDs. +pub type FacetId = NonZeroU64; + /// The type of process-unique assertion handles. /// /// Used both as a reference to [retract][Entity::retract] @@ -60,8 +65,8 @@ pub type Handle = u64; /// Responses to events must have type `ActorResult`. pub type ActorResult = Result<(), Error>; -/// Methods [`Actor::boot`] and [`Actor::start`] return an -/// `ActorHandle`, representing the actor's mainloop task. +/// The [`Actor::boot`] method returns an `ActorHandle`, representing +/// the actor's mainloop task. pub type ActorHandle = tokio::task::JoinHandle; /// A small protocol for indicating successful synchronisation with @@ -193,17 +198,22 @@ pub type PendingEventQueue = Vec; /// The main API for programming Syndicated Actor objects. /// /// Through `Activation`s, programs can access the state of their -/// animating [`RunningActor`]. +/// animating [`RunningActor`] and their active [`Facet`]. +/// +/// Usually, an `Activation` will be supplied to code that needs one; but when non-Actor code +/// (such as a [linked task][crate::actor#linked-tasks]) needs to enter an Actor's execution +/// context, use [`FacetRef::activate`] to construct one. /// /// Many actions that an entity can perform are methods directly on -/// `Activation`, but methods on the [`RunningActor`] and [`ActorRef`] +/// `Activation`, but methods on the [`RunningActor`] and [`FacetRef`] /// values contained in an `Activation` are also sometimes useful. /// /// This is what other implementations call a "Turn", renamed here to /// avoid conflicts with [`crate::schemas::internal_protocol::Turn`]. pub struct Activation<'activation> { - /// A reference to the implementation-side of the currently active [`Actor`]. - pub actor: ActorRef, + /// A reference to the currently active [`Facet`] and the implementation-side state of its + /// [`Actor`]. + pub facet: FacetRef, /// A reference to the current state of the active [`Actor`]. pub state: &'activation mut RunningActor, pending: EventBuffer, @@ -272,6 +282,14 @@ pub struct ActorRef { state: Arc>, } +/// A combination of an [`ActorRef`] with a [`FacetId`], acting as a capability to enter the +/// execution context of a facet from a linked task. +#[derive(Clone)] +pub struct FacetRef { + pub actor: ActorRef, + pub facet_id: FacetId, +} + /// The state of an actor: either `Running` or `Terminated`. pub enum ActorState { /// A non-terminated actor has an associated [`RunningActor`] state record. @@ -290,9 +308,40 @@ pub struct RunningActor { pub actor_id: ActorId, tx: UnboundedSender, mailbox: Weak, - cleanup_actions: CleanupActions, - linked_tasks: Map, exit_hooks: Vec) -> ActorResult>>, + facet_nodes: Map, + facet_children: Map>, + root: FacetId, +} + +/// State associated with each facet in an [`Actor`]'s facet tree. +/// +/// # Inert facets +/// +/// A facet is considered *inert* if: +/// +/// 1. it has no child facets; +/// 2. it has no cleanup actions (that is, no assertions placed by any of its entities); +/// 3. it has no linked tasks; and +/// 4. it has no "inert check preventers" (see [Activation::prevent_inert_check]). +/// +/// If a facet is created and is inert at the moment that its `boot` function returns, it is +/// automatically terminated. +/// +/// When a facet is terminated, if its parent facet is inert, the parent is terminated. +/// +/// If the root facet in an actor is terminated, the entire actor is terminated (with exit +/// status `Ok(())`). +/// +pub struct Facet { + /// The ID of the facet. + pub facet_id: FacetId, + /// The ID of the facet's parent facet, if any; if None, this facet is the `Actor`'s root facet. + pub parent_facet_id: Option, + cleanup_actions: CleanupActions, + stop_actions: Vec, + linked_tasks: Map, + inert_check_preventers: Arc, } /// A reference to an object that expects messages/assertions of type @@ -303,6 +352,8 @@ pub struct RunningActor { pub struct Ref { /// Mailbox of the actor owning the referenced entity. pub mailbox: Arc, + /// ID of the facet (within the actor) owning the referenced entity. + pub facet_id: FacetId, /// Mutex owning and guarding the state backing the referenced entity. pub target: Mutex>>>, } @@ -352,15 +403,22 @@ pub fn next_actor_id() -> ActorId { NEXT_ACTOR_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed) } -static NEXT_HANDLE: AtomicU64 = AtomicU64::new(2); +static NEXT_FACET_ID: AtomicU64 = AtomicU64::new(2); +#[doc(hidden)] +pub fn next_facet_id() -> FacetId { + FacetId::new(NEXT_FACET_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed)) + .expect("Internal error: Attempt to allocate FacetId of zero. Too many FacetIds allocated. Restart the process.") +} + +static NEXT_HANDLE: AtomicU64 = AtomicU64::new(3); /// Allocate a process-unique `Handle`. pub fn next_handle() -> Handle { NEXT_HANDLE.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed) } -static NEXT_ACCOUNT_ID: AtomicU64 = AtomicU64::new(3); +static NEXT_ACCOUNT_ID: AtomicU64 = AtomicU64::new(4); -static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(4); +static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(5); preserves_schema::support::lazy_static! { #[doc(hidden)] @@ -394,29 +452,20 @@ impl From<&Synced> for AnyValue { } } -impl<'activation> Activation<'activation> { - fn make(actor: &ActorRef, account: Arc, state: &'activation mut RunningActor) -> Self { - Activation { - actor: actor.clone(), - state, - pending: EventBuffer::new(account), - } - } - - /// Constructs and executes `f` in a new "turn" for `actor`. If - /// `f` returns `Ok(())`, [commits the turn][Self::deliver] and - /// performs the buffered actions; otherwise, [abandons the - /// turn][Self::clear] and discards the buffered actions. +impl FacetRef { + /// Executes `f` in a new "[turn][Activation]" for `actor`. If `f` returns `Ok(())`, + /// [commits the turn][Activation::deliver] and performs the buffered actions; otherwise, + /// [abandons the turn][Activation::clear] and discards the buffered actions. /// /// Bills any activity to `account`. - pub fn for_actor( - actor: &ActorRef, + pub fn activate( + &self, account: Arc, f: F, ) -> ActorResult where F: FnOnce(&mut Activation) -> ActorResult, { - match Self::for_actor_exit(actor, account, |t| match f(t) { + match self.activate_exit(account, |t| match f(t) { Ok(()) => None, Err(e) => Some(Err(e)), }) { @@ -425,28 +474,26 @@ impl<'activation> Activation<'activation> { } } - /// Constructs and executes `f` in a new "turn" for `actor`. If - /// `f` returns `Some(exit_status)`, terminates `actor` with that - /// `exit_status`. Otherwise, if `f` returns `None`, leaves - /// `actor` in runnable state. [Commits buffered - /// actions][Self::deliver] unless `actor` terminates with an - /// `Err` status. + /// Executes `f` in a new "[turn][Activation]" for `actor`. If `f` returns + /// `Some(exit_status)`, terminates `actor` with that `exit_status`. Otherwise, if `f` + /// returns `None`, leaves `actor` in runnable state. [Commits buffered + /// actions][Activation::deliver] unless `actor` terminates with an `Err` status. /// /// Bills any activity to `account`. - pub fn for_actor_exit( - actor: &ActorRef, + pub fn activate_exit( + &self, account: Arc, f: F, ) -> Option where F: FnOnce(&mut Activation) -> Option, { - match actor.state.lock() { + match self.actor.state.lock() { Err(_) => panicked_err(), Ok(mut g) => match &mut *g { ActorState::Terminated { exit_status } => Some((**exit_status).clone()), ActorState::Running(state) => { - let mut activation = Activation::make(actor, account, state); + let mut activation = Activation::make(self, account, state); match f(&mut activation) { None => None, Some(exit_status) => { @@ -455,12 +502,19 @@ impl<'activation> Activation<'activation> { } drop(activation); let exit_status = Arc::new(exit_status); - let mut t = Activation::make(actor, Account::new(crate::name!("shutdown")), state); + let mut t = Activation::make(&self.actor.facet_ref(state.root), + Account::new(crate::name!("shutdown")), + state); for action in std::mem::take(&mut t.state.exit_hooks) { if let Err(err) = action(&mut t, &exit_status) { tracing::error!(err = debug(err), "error in exit hook"); } } + if let Err(err) = t._terminate_facet(t.state.root, false) { + // This can only occur as the result of an internal error in this file's code. + tracing::error!(err = debug(err), "unexpected error from disorderly terminate_facet"); + panic!("Unexpected error result from disorderly terminate_facet"); + } *g = ActorState::Terminated { exit_status: Arc::clone(&exit_status), }; @@ -471,29 +525,72 @@ impl<'activation> Activation<'activation> { } } } +} + +impl<'activation> Activation<'activation> { + fn make( + facet: &FacetRef, + account: Arc, + state: &'activation mut RunningActor, + ) -> Self { + Activation { + facet: facet.clone(), + state, + pending: EventBuffer::new(account), + } + } fn immediate_oid(&self, r: &Arc>) { - if r.mailbox.actor_id != self.actor.actor_id { + if r.mailbox.actor_id != self.facet.actor.actor_id { panic!("Cannot use for_myself to send to remote peers"); } } + fn with_facet(&mut self, check_existence: bool, facet_id: FacetId, f: F) -> ActorResult + where + F: FnOnce(&mut Activation) -> ActorResult, + { + if !check_existence || self.state.facet_nodes.contains_key(&facet_id) { + let old_facet_id = self.facet.facet_id; + self.facet.facet_id = facet_id; + let result = f(self); + self.facet.facet_id = old_facet_id; + result + } else { + Ok(()) + } + } + + #[doc(hidden)] + pub fn with_entity(&mut self, r: &Arc>, f: F) -> ActorResult where + F: FnOnce(&mut Activation, &mut dyn Entity) -> ActorResult + { + self.with_facet(true, r.facet_id, |t| r.internal_with_entity(|e| f(t, e))) + } + + fn active_facet<'a>(&'a mut self) -> Option<&'a mut Facet> { + self.state.get_facet(self.facet.facet_id) + } + /// Core API: assert `a` at recipient `r`. /// /// Returns the [`Handle`] for the new assertion. pub fn assert(&mut self, r: &Arc>, a: M) -> Handle { let handle = next_handle(); - { - let r = Arc::clone(r); - self.pending.queue_for(&r).push(Box::new( - move |t| r.with_entity(|e| e.assert(t, a, handle)))); - } - { - let r = Arc::clone(r); - self.state.cleanup_actions.insert( - handle, - CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new( - move |t| r.with_entity(|e| e.retract(t, handle))))); + if let Some(f) = self.active_facet() { + { + let r = Arc::clone(r); + f.cleanup_actions.insert( + handle, + CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new( + move |t| t.with_entity(&r, |t, e| e.retract(t, handle))))); + } + drop(f); + { + let r = Arc::clone(r); + self.pending.queue_for(&r).push(Box::new( + move |t| t.with_entity(&r, |t, e| e.assert(t, a, handle)))); + } } handle } @@ -515,25 +612,30 @@ impl<'activation> Activation<'activation> { pub fn assert_for_myself(&mut self, r: &Arc>, a: M) -> Handle { self.immediate_oid(r); let handle = next_handle(); - { - let r = Arc::clone(r); - self.pending.for_myself.push(Box::new( - move |t| r.with_entity(|e| e.assert(t, a, handle)))); - } - { - let r = Arc::clone(r); - self.state.cleanup_actions.insert( - handle, - CleanupAction::ForMyself(Box::new( - move |t| r.with_entity(|e| e.retract(t, handle))))); + if let Some(f) = self.active_facet() { + { + let r = Arc::clone(r); + f.cleanup_actions.insert( + handle, + CleanupAction::ForMyself(Box::new( + move |t| t.with_entity(&r, |t, e| e.retract(t, handle))))); + } + drop(f); + { + let r = Arc::clone(r); + self.pending.for_myself.push(Box::new( + move |t| t.with_entity(&r, |t, e| e.assert(t, a, handle)))); + } } handle } /// Core API: retract a previously-established assertion. pub fn retract(&mut self, handle: Handle) { - if let Some(d) = self.state.cleanup_actions.remove(&handle) { - self.pending.execute_cleanup_action(d) + if let Some(f) = self.active_facet() { + if let Some(d) = f.cleanup_actions.remove(&handle) { + self.pending.execute_cleanup_action(d) + } } } @@ -541,7 +643,7 @@ impl<'activation> Activation<'activation> { pub fn message(&mut self, r: &Arc>, m: M) { let r = Arc::clone(r); self.pending.queue_for(&r).push(Box::new( - move |t| r.with_entity(|e| e.message(t, m)))) + move |t| t.with_entity(&r, |t, e| e.message(t, m)))) } /// Core API: send message `m` to recipient `r`, which must be a @@ -558,7 +660,7 @@ impl<'activation> Activation<'activation> { self.immediate_oid(r); let r = Arc::clone(r); self.pending.for_myself.push(Box::new( - move |t| r.with_entity(|e| e.message(t, m)))) + move |t| t.with_entity(&r, |t, e| e.message(t, m)))) } /// Core API: begins a synchronisation with `r`. @@ -569,7 +671,7 @@ impl<'activation> Activation<'activation> { pub fn sync(&mut self, r: &Arc>, peer: Arc>) { let r = Arc::clone(r); self.pending.queue_for(&r).push(Box::new( - move |t| r.with_entity(|e| e.sync(t, peer)))) + move |t| t.with_entity(&r, |t, e| e.sync(t, peer)))) } /// Retrieve the [`Account`] against which actions are recorded. @@ -596,6 +698,182 @@ impl<'activation> Activation<'activation> { pub fn deliver(&mut self) { self.pending.deliver(); } + + /// Construct an entity with behaviour [`InertEntity`] within the active facet. + pub fn inert_entity(&mut self) -> Arc> { + self.create(InertEntity) + } + + /// Construct an entity with behaviour `e` within the active facet. + pub fn create + Send + 'static>(&mut self, e: E) -> Arc> { + let r = self.create_inert(); + r.become_entity(e); + r + } + + /// Construct an entity (within the active facet) whose behaviour will be specified later + /// via [`become_entity`][Ref::become_entity]. + pub fn create_inert(&mut self) -> Arc> { + Arc::new(Ref { + mailbox: self.state.mailbox(), + facet_id: self.facet.facet_id, + target: Mutex::new(None), + }) + } + + /// Start a new [linked task][crate::actor#linked-tasks] attached to the active facet. The + /// task will execute the future "`boot`" to completion unless it is cancelled first (by + /// e.g. termination of the owning facet or crashing of the owning actor). Uses `name` for + /// log messages emitted by the task. + pub fn linked_task>( + &mut self, + name: tracing::Span, + boot: F, + ) { + let mailbox = self.state.mailbox(); + if let Some(f) = self.active_facet() { + let token = CancellationToken::new(); + let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed); + name.record("task_id", &task_id); + { + let token = token.clone(); + tokio::spawn(async move { + tracing::trace!(task_id, "linked task start"); + select! { + _ = token.cancelled() => { + tracing::trace!(task_id, "linked task cancelled"); + Ok(()) + } + result = boot => { + match &result { + Ok(()) => { + tracing::trace!(task_id, "linked task normal stop"); + () + } + Err(e) => { + tracing::error!(task_id, "linked task error: {}", e); + let _ = mailbox.tx.send(SystemMessage::Crash(e.clone())); + () + } + } + result + } + } + }.instrument(name)); + } + f.linked_tasks.insert(task_id, token); + } + } + + /// Create a new subfacet of the currently-active facet. Runs `boot` in the new facet's + /// context. If `boot` returns leaving the new facet [inert][Facet#inert-facets], + pub fn facet ActorResult>( + &mut self, + boot: F, + ) -> Result { + let f = Facet::new(Some(self.facet.facet_id)); + let facet_id = f.facet_id; + self.state.facet_nodes.insert(facet_id, f); + self.state.facet_children.entry(self.facet.facet_id).or_default().insert(facet_id); + self.with_facet(true /* TODO: tiny optimisation: "false" would be safe here */, facet_id, move |t| { + boot(t)?; + t.stop_if_inert(); + Ok(()) + })?; + Ok(facet_id) + } + + /// Useful during facet (and actor) startup, in some situations: when a facet `boot` + /// procedure would return while the facet is inert, but the facet should survive until + /// some subsequent time, call `prevent_inert_check` to increment a counter that prevents + /// inertness-checks from succeeding on the active facet. + /// + /// The result of `prevent_inert_check` is a function which, when called, decrements the + /// counter again. After the counter has been decremented, any subsequent inertness checks + /// will no longer be artificially forced to fail. + /// + /// An example of when you might want this: creating an actor having only a single + /// Dataspace entity within it, then using the Dataspace from other actors. At the start of + /// its life, the Dataspace actor will have no outbound assertions, no child facets, and no + /// linked tasks, so the only way to prevent it from being prematurely garbage collected is + /// to use `prevent_inert_check` in its boot function. + pub fn prevent_inert_check(&mut self) -> Box { + if let Some(f) = self.active_facet() { + Box::new(f.prevent_inert_check()) + } else { + Box::new(|| ()) + } + } + + /// Arranges for the [`Facet`] named by `facet_id` to be stopped cleanly when `self` + /// commits. If `continuation` is supplied, the facet to be stopped hasn't been stopped + /// yet, none of the shutdown handlers yields an error, and the facet's parent facet is + /// alive, executes `continuation` in the parent facet's context. + pub fn stop_facet(&mut self, facet_id: FacetId, continuation: Option) { + let mailbox = self.state.mailbox(); + let maybe_parent_id = self.active_facet().and_then(|f| f.parent_facet_id); + self.pending.queue_for_mailbox(&mailbox).push(Box::new( + move |t| { + t._terminate_facet(facet_id, true)?; + if let Some(k) = continuation { + if let Some(parent_id) = maybe_parent_id { + t.with_facet(true, parent_id, k)?; + } + } + Ok(()) + })); + } + + /// Arranges for the active facet to be stopped cleanly when `self` commits. + /// + /// Equivalent to `self.stop_facet(self.facet_id.unwrap(), None)`. + pub fn stop(&mut self) { + self.stop_facet(self.facet.facet_id, None) + } + + fn stop_if_inert(&mut self) { + if self.state.facet_exists_and_is_inert(self.facet.facet_id) { + self.stop_facet(self.facet.facet_id, None); + } + } + + fn _terminate_facet(&mut self, facet_id: FacetId, orderly: bool) -> ActorResult { + if let Some(mut f) = self.state.facet_nodes.remove(&facet_id) { + tracing::debug!("{} terminating {:?}", + if orderly { "orderly" } else { "disorderly" }, + facet_id); + if let Some(p) = f.parent_facet_id { + self.state.facet_children.get_mut(&p).map(|children| children.remove(&facet_id)); + } + self.with_facet(false, facet_id, |t| { + if let Some(children) = t.state.facet_children.remove(&facet_id) { + for child_id in children.into_iter() { + t._terminate_facet(child_id, orderly)?; + } + } + if orderly { + for action in std::mem::take(&mut f.stop_actions) { + action(t)?; + } + let parent_facet_id = f.parent_facet_id; + // if !orderly, the drop will happen at the end of this function, but we + // need it to happen right here so that child-facet cleanup-actions are + // performed before parent-facet cleanup-actions. + drop(f); + if let Some(p) = parent_facet_id { + if t.state.facet_exists_and_is_inert(p) { + t._terminate_facet(p, true)?; + } + } else { + t.state.shutdown(); + } + } + Ok(()) + }) + } else { + Ok(()) + } + } } impl EventBuffer { @@ -764,52 +1042,31 @@ impl Drop for Mailbox { } impl Actor { - /// Create a new actor. It still needs to be - /// [`start`ed][Self::start]/[`boot`ed][Self::boot]. + /// Create a new actor. It still needs to be [`boot`ed][Self::boot]. pub fn new() -> Self { let (tx, rx) = unbounded_channel(); let actor_id = next_actor_id(); + let root = Facet::new(None); // tracing::trace!(id = actor_id, "Actor::new"); + let mut st = RunningActor { + actor_id, + tx, + mailbox: Weak::new(), + exit_hooks: Vec::new(), + facet_nodes: Map::new(), + facet_children: Map::new(), + root: root.facet_id, + }; + st.facet_nodes.insert(root.facet_id, root); Actor { rx, ac_ref: ActorRef { actor_id, - state: Arc::new(Mutex::new(ActorState::Running(RunningActor { - actor_id, - tx, - mailbox: Weak::new(), - cleanup_actions: Map::new(), - linked_tasks: Map::new(), - exit_hooks: Vec::new(), - }))), + state: Arc::new(Mutex::new(ActorState::Running(st))), }, } } - /// Create and start a new actor to own entity `e`. Returns a - /// `Ref` to the new entity. The `name` is used as context for any - /// log messages emitted by the new actor. - pub fn create_and_start + Send + 'static>( - name: tracing::Span, - e: E, - ) -> Arc> { - let r = Self::create_and_start_inert(name); - r.become_entity(e); - r - } - - /// Create and start a new actor, returning a `Ref` to a fresh - /// entity contained within it. Before using the `Ref`, its - /// initialization must be completed by calling - /// [`become_entity`][Ref::become_entity] on it. The `name` is - /// used as context for any log messages emitted by the new actor. - pub fn create_and_start_inert(name: tracing::Span) -> Arc> { - let ac = Self::new(); - let r = ac.ac_ref.access(|s| s.unwrap().expect_running().create_inert()); - ac.start(name); - r - } - /// Start the actor's mainloop. Takes ownership of `self`. The /// `name` is used as context for any log messages emitted by the /// actor. The `boot` function is called in the actor's context, @@ -822,7 +1079,11 @@ impl Actor { name.record("actor_id", &self.ac_ref.actor_id); tokio::spawn(async move { tracing::trace!("start"); - self.run(boot).await; + self.run(|t| { + boot(t)?; + t.stop_if_inert(); + Ok(()) + }).await; let result = self.ac_ref.exit_status().expect("terminated"); match &result { Ok(()) => tracing::trace!("normal stop"), @@ -832,53 +1093,51 @@ impl Actor { }.instrument(name)) } - /// Start the actor's mainloop. Takes ownership of `self`. The - /// `name` is used as context for any log messages emitted by the - /// actor. Delegates to [`boot`][Self::boot], with a no-op `boot` - /// function. - pub fn start(self, name: tracing::Span) -> ActorHandle { - self.boot(name, |_ac| Ok(())) - } - - fn terminate(&mut self, result: ActorResult) { - let _ = Activation::for_actor_exit( - &self.ac_ref, Account::new(crate::name!("shutdown")), |_| Some(result)); - } - async fn run ActorResult>( &mut self, boot: F, ) -> () { - if Activation::for_actor(&self.ac_ref, Account::new(crate::name!("boot")), boot).is_err() { + let root = self.ac_ref.access(|s| match s.expect("New actor missing its state") { + ActorState::Terminated { .. } => panic!("New actor unexpectedly in terminated state"), + ActorState::Running(ra) => ra.root, // what a lot of work to get this one number + }); + + let root_facet_ref = self.ac_ref.facet_ref(root); + + let terminate = |result: ActorResult| { + let _ = root_facet_ref.activate_exit(Account::new(crate::name!("shutdown")), + |_| Some(result)); + }; + + if root_facet_ref.activate(Account::new(crate::name!("boot")), boot).is_err() { return; } loop { match self.rx.recv().await { None => { - return self.terminate(Err(error("Unexpected channel close", AnyValue::new(false)))); + return terminate(Err(error("Unexpected channel close", AnyValue::new(false)))); } Some(m) => match m { SystemMessage::Release => { tracing::trace!("SystemMessage::Release"); - return self.terminate(Ok(())); + return terminate(Ok(())); } SystemMessage::Turn(mut loaned_item) => { let mut actions = std::mem::take(&mut loaned_item.item); - let r = Activation::for_actor( - &self.ac_ref, Arc::clone(&loaned_item.account), |t| { - loop { - for action in actions.into_iter() { action(t)? } - actions = std::mem::take(&mut t.pending.for_myself); - if actions.is_empty() { break; } - } - Ok(()) - }); + let r = root_facet_ref.activate(Arc::clone(&loaned_item.account), |t| { + loop { + for action in actions.into_iter() { action(t)? } + actions = std::mem::take(&mut t.pending.for_myself); + if actions.is_empty() { break; } + } + Ok(()) + }); if r.is_err() { return; } } SystemMessage::Crash(e) => { tracing::trace!("SystemMessage::Crash({:?})", &e); - return self.terminate(Err(e)); + return terminate(Err(e)); } } } @@ -886,6 +1145,34 @@ impl Actor { } } +impl Facet { + fn new(parent_facet_id: Option) -> Self { + Facet { + facet_id: next_facet_id(), + parent_facet_id, + cleanup_actions: Map::new(), + stop_actions: Vec::new(), + linked_tasks: Map::new(), + inert_check_preventers: Arc::new(AtomicU64::new(0)), + } + } + + fn prevent_inert_check(&mut self) -> impl FnOnce() { + let inert_check_preventers = Arc::clone(&self.inert_check_preventers); + let armed = AtomicU64::new(1); + inert_check_preventers.fetch_add(1, Ordering::Relaxed); + move || { + match armed.compare_exchange(1, 0, Ordering::SeqCst, Ordering::SeqCst) { + Ok(_) => { + inert_check_preventers.fetch_sub(1, Ordering::Relaxed); + () + } + Err(_) => (), + } + } + } +} + fn panicked_err() -> Option { Some(Err(error("Actor panicked", AnyValue::new(false)))) } @@ -913,13 +1200,11 @@ impl ActorRef { ActorState::Terminated { exit_status } => Some((**exit_status).clone()), })) } -} -impl ActorState { - fn expect_running(&mut self) -> &mut RunningActor { - match self { - ActorState::Terminated { .. } => panic!("Expected a running actor"), - ActorState::Running(r) => r, + fn facet_ref(&self, facet_id: FacetId) -> FacetRef { + FacetRef { + actor: self.clone(), + facet_id, } } } @@ -946,77 +1231,30 @@ impl RunningActor { } } - /// Construct an entity with behaviour [`InertEntity`] within this - /// actor. - pub fn inert_entity(&mut self) -> Arc> { - self.create(InertEntity) - } - - /// Construct an entity with behaviour `e` within this actor. - pub fn create + Send + 'static>(&mut self, e: E) -> Arc> { - let r = self.create_inert(); - r.become_entity(e); - r - } - - /// Construct an entity whose behaviour will be specified later - /// (via [`become_entity`][Ref::become_entity]). - pub fn create_inert(&mut self) -> Arc> { - Arc::new(Ref { - mailbox: self.mailbox(), - target: Mutex::new(None), - }) - } - /// Registers the entity `r` in the list of exit hooks for this /// actor. When the actor terminates, `r`'s /// [`exit_hook`][Entity::exit_hook] will be called. pub fn add_exit_hook(&mut self, r: &Arc>) { let r = Arc::clone(r); - self.exit_hooks.push(Box::new(move |t, exit_status| { - r.with_entity(|e| e.exit_hook(t, &exit_status)) - })) + self.exit_hooks.push(Box::new( + move |t, exit_status| r.internal_with_entity(|e| e.exit_hook(t, &exit_status)))) } - /// Start a new [linked task][crate::actor#linked-tasks] attached - /// to this actor. The function `boot` is the main function of the - /// new task. Uses `name` for log messages emitted by the task. - pub fn linked_task>( - &mut self, - name: tracing::Span, - boot: F, - ) { - let mailbox = self.mailbox(); - let token = CancellationToken::new(); - let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed); - name.record("task_id", &task_id); - { - let token = token.clone(); - tokio::spawn(async move { - tracing::trace!(task_id, "linked task start"); - select! { - _ = token.cancelled() => { - tracing::trace!(task_id, "linked task cancelled"); - Ok(()) - } - result = boot => { - match &result { - Ok(()) => { - tracing::trace!(task_id, "linked task normal stop"); - () - } - Err(e) => { - tracing::error!(task_id, "linked task error: {}", e); - let _ = mailbox.tx.send(SystemMessage::Crash(e.clone())); - () - } - } - result - } - } - }.instrument(name)); + fn get_facet(&mut self, facet_id: FacetId) -> Option<&mut Facet> { + self.facet_nodes.get_mut(&facet_id) + } + + /// See the definition of an [inert facet][Facet#inert-facets]. + fn facet_exists_and_is_inert(&mut self, facet_id: FacetId) -> bool { + let no_kids = self.facet_children.get(&facet_id).map(|cs| cs.is_empty()).unwrap_or(true); + if let Some(f) = self.get_facet(facet_id) { + no_kids && + f.cleanup_actions.is_empty() && + f.linked_tasks.is_empty() && + f.inert_check_preventers.load(Ordering::Relaxed) == 0 + } else { + false } - self.linked_tasks.insert(task_id, token); } } @@ -1026,7 +1264,7 @@ impl Drop for Actor { } } -impl Drop for RunningActor { +impl Drop for Facet { fn drop(&mut self) { for (_task_id, token) in std::mem::take(&mut self.linked_tasks).into_iter() { token.cancel(); @@ -1041,13 +1279,13 @@ impl Drop for RunningActor { } } - tracing::trace!("Actor::drop"); + tracing::trace!(facet_id = debug(self.facet_id), "Facet::drop"); } } /// Directly injects `action` into `mailbox`, billing subsequent activity against `account`. /// -/// Primarily for use by [linked tasks][RunningActor::linked_task]. +/// Primarily for use by [linked tasks][Activation::linked_task]. #[must_use] pub fn external_event(mailbox: &Arc, account: &Arc, action: Action) -> ActorResult { send_actions(&mailbox.tx, account, vec![action]) @@ -1055,7 +1293,7 @@ pub fn external_event(mailbox: &Arc, account: &Arc, action: Ac /// Directly injects `actions` into `mailbox`, billing subsequent activity against `account`. /// -/// Primarily for use by [linked tasks][RunningActor::linked_task]. +/// Primarily for use by [linked tasks][Activation::linked_task]. #[must_use] pub fn external_events(mailbox: &Arc, account: &Arc, actions: PendingEventQueue) -> ActorResult { send_actions(&mailbox.tx, account, actions) @@ -1063,7 +1301,7 @@ pub fn external_events(mailbox: &Arc, account: &Arc, actions: impl Ref { /// Supplies the behaviour (`e`) for a `Ref` created via - /// [`create_inert`][RunningActor::create_inert]. + /// [`create_inert`][Activation::create_inert]. /// /// # Panics /// @@ -1076,8 +1314,7 @@ impl Ref { *g = Some(Box::new(e)); } - #[doc(hidden)] - pub fn with_entity) -> R>(&self, f: F) -> R { + fn internal_with_entity) -> R>(&self, f: F) -> R { let mut g = self.target.lock().expect("unpoisoned"); f(g.as_mut().expect("initialized").as_mut()) } @@ -1135,6 +1372,7 @@ impl Cap { { Self::new(&Arc::new(Ref { mailbox: Arc::clone(&underlying.mailbox), + facet_id: underlying.facet_id, target: Mutex::new(Some(Box::new(Guard { underlying: underlying.clone() }))), })) } @@ -1221,24 +1459,24 @@ where { fn assert(&mut self, t: &mut Activation, a: AnyValue, h: Handle) -> ActorResult { match M::try_from(&a) { - Ok(a) => self.underlying.with_entity(|e| e.assert(t, a, h)), + Ok(a) => t.with_entity(&self.underlying, |t, e| e.assert(t, a, h)), Err(_) => Ok(()), } } fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult { - self.underlying.with_entity(|e| e.retract(t, h)) + t.with_entity(&self.underlying, |t, e| e.retract(t, h)) } fn message(&mut self, t: &mut Activation, m: AnyValue) -> ActorResult { match M::try_from(&m) { - Ok(m) => self.underlying.with_entity(|e| e.message(t, m)), + Ok(m) => t.with_entity(&self.underlying, |t, e| e.message(t, m)), Err(_) => Ok(()), } } fn sync(&mut self, t: &mut Activation, peer: Arc>) -> ActorResult { - self.underlying.with_entity(|e| e.sync(t, peer)) + t.with_entity(&self.underlying, |t, e| e.sync(t, peer)) } fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) -> ActorResult { - self.underlying.with_entity(|e| e.exit_hook(t, exit_status)) + self.underlying.internal_with_entity(|e| e.exit_hook(t, exit_status)) } } diff --git a/syndicate/src/during.rs b/syndicate/src/during.rs index d2d43eb..911c3eb 100644 --- a/syndicate/src/during.rs +++ b/syndicate/src/during.rs @@ -98,8 +98,8 @@ where } } - pub fn create(self, ac: &mut RunningActor) -> Arc> { - ac.create(self) + pub fn create(self, t: &mut Activation) -> Arc> { + t.create(self) } } @@ -109,9 +109,9 @@ where Fa: 'static + Send + FnMut(&mut E, &mut Activation, AnyValue) -> DuringResult, Fm: 'static + Send + FnMut(&mut E, &mut Activation, AnyValue) -> ActorResult, { - pub fn create_cap(self, ac: &mut RunningActor) -> Arc + pub fn create_cap(self, t: &mut Activation) -> Arc { - Cap::new(&self.create(ac)) + Cap::new(&self.create(t)) } } diff --git a/syndicate/src/relay.rs b/syndicate/src/relay.rs index e8aa517..44a2703 100644 --- a/syndicate/src/relay.rs +++ b/syndicate/src/relay.rs @@ -162,7 +162,7 @@ pub fn connect_stream( let i = Input::Bytes(Box::pin(i)); let o = Output::Bytes(Box::pin(o)); let gatekeeper = TunnelRelay::run(t, i, o, None, Some(sturdy::Oid(0.into()))).unwrap(); - let main_entity = t.state.create(during::entity(initial_state).on_asserted(move |state, t, a: AnyValue| { + let main_entity = t.create(during::entity(initial_state).on_asserted(move |state, t, a: AnyValue| { let denotation = a.value().to_embedded()?; f(state, t, Arc::clone(denotation)) })); @@ -182,7 +182,7 @@ impl TunnelRelay { ) -> Option> { let (output_tx, output_rx) = unbounded_channel(); let tr_ref = Arc::new(Mutex::new(None)); - let self_entity = t.state.create(TunnelRefEntity { + let self_entity = t.create(TunnelRefEntity { relay_ref: Arc::clone(&tr_ref), }); let mut tr = TunnelRelay { @@ -203,10 +203,10 @@ impl TunnelRelay { tr.membranes.export_ref(ir, true); } let result = initial_oid.map( - |io| Arc::clone(&tr.membranes.import_oid(t.state, &tr_ref, io).obj)); + |io| Arc::clone(&tr.membranes.import_oid(t, &tr_ref, io).obj)); *tr_ref.lock().unwrap() = Some(tr); - t.state.linked_task(crate::name!("writer"), output_loop(o, output_rx)); - t.state.linked_task(crate::name!("reader"), input_loop(t.actor.clone(), i, tr_ref)); + t.linked_task(crate::name!("writer"), output_loop(o, output_rx)); + t.linked_task(crate::name!("reader"), input_loop(t.facet.clone(), i, tr_ref)); t.state.add_exit_hook(&self_entity); result } @@ -324,7 +324,7 @@ impl TunnelRelay { Ok(()) } } - let k = t.state.create(SyncPeer { + let k = t.create(SyncPeer { relay_ref: Arc::clone(&self.self_ref), peer: Arc::clone(&peer), }); @@ -419,11 +419,11 @@ impl Membranes { fn import_oid( &mut self, - ac: &mut RunningActor, + t: &mut Activation, relay_ref: &TunnelRelayRef, oid: sturdy::Oid, ) -> Arc { - let obj = ac.create(RelayEntity { relay_ref: Arc::clone(relay_ref), oid: oid.clone() }); + let obj = t.create(RelayEntity { relay_ref: Arc::clone(relay_ref), oid: oid.clone() }); self.imported.insert(oid, Cap::new(&obj)) } @@ -440,7 +440,7 @@ impl Membranes { let oid = *b; match self.imported.oid_map.get(&oid) { Some(ws) => Ok(Arc::clone(&ws.obj)), - None => Ok(Arc::clone(&self.import_oid(t.state, relay_ref, oid).obj)), + None => Ok(Arc::clone(&self.import_oid(t, relay_ref, oid).obj)), } } sturdy::WireRef::Yours { oid: b, attenuation } => { @@ -458,7 +458,7 @@ impl Membranes { })?) } } - None => Ok(Cap::new(&t.state.inert_entity())), + None => Ok(Cap::new(&t.inert_entity())), } } } @@ -515,7 +515,7 @@ impl DomainEncode for Membranes { } async fn input_loop( - ac: ActorRef, + facet: FacetRef, i: Input, relay: TunnelRelayRef, ) -> ActorResult { @@ -525,10 +525,8 @@ async fn input_loop( loop { account.ensure_clear_funds().await; match src.next().await { - None => return Activation::for_actor(&ac, Arc::clone(&account), |t| { - Ok(t.state.shutdown()) - }), - Some(bs) => Activation::for_actor(&ac, Arc::clone(&account), |t| { + None => return facet.activate(Arc::clone(&account), |t| Ok(t.state.shutdown())), + Some(bs) => facet.activate(Arc::clone(&account), |t| { let mut g = relay.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); tr.handle_inbound_datagram(t, &bs?) @@ -546,18 +544,14 @@ async fn input_loop( Ok(n) => n, Err(e) => if e.kind() == io::ErrorKind::ConnectionReset { - return Activation::for_actor(&ac, Arc::clone(&account), |t| { - Ok(t.state.shutdown()) - }); + return facet.activate(Arc::clone(&account), |t| Ok(t.state.shutdown())); } else { return Err(e)?; }, }; match n { - 0 => return Activation::for_actor(&ac, Arc::clone(&account), |t| { - Ok(t.state.shutdown()) - }), - _ => Activation::for_actor(&ac, Arc::clone(&account), |t| { + 0 => return facet.activate(Arc::clone(&account), |t| Ok(t.state.shutdown())), + _ => facet.activate(Arc::clone(&account), |t| { let mut g = relay.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); tr.handle_inbound_stream(t, &mut buf) diff --git a/syndicate/src/tracer.rs b/syndicate/src/tracer.rs index ada5a1e..b932a69 100644 --- a/syndicate/src/tracer.rs +++ b/syndicate/src/tracer.rs @@ -10,17 +10,9 @@ fn set_name_oid(t: &mut Tracer, r: &Arc>) { t.0.record("oid", &tracing::field::display(&r.oid())); } -pub fn tracer(ac: &mut RunningActor, name: tracing::Span) -> Arc> { +pub fn tracer(t: &mut Activation, name: tracing::Span) -> Arc> { let mut e = Tracer(name); - let r = ac.create_inert(); - set_name_oid(&mut e, &r); - r.become_entity(e); - r -} - -pub fn tracer_top(name: tracing::Span) -> Arc> { - let mut e = Tracer(name); - let r = Actor::create_and_start_inert(crate::name!(parent: None, "tracer")); + let r = t.create_inert(); set_name_oid(&mut e, &r); r.become_entity(e); r