diff --git a/syndicate-macros/examples/box-and-client.rs b/syndicate-macros/examples/box-and-client.rs index 137eea4..0fe5b97 100644 --- a/syndicate-macros/examples/box-and-client.rs +++ b/syndicate-macros/examples/box-and-client.rs @@ -8,11 +8,11 @@ use syndicate::value::NestedValue; #[tokio::main] async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; - Actor::new().boot(tracing::Span::current(), |t| { + Actor::new(None).boot(tracing::Span::current(), |t| { let ds = Cap::new(&t.create(Dataspace::new())); let _ = t.prevent_inert_check(); - Actor::new().boot(syndicate::name!("box"), enclose!((ds) move |t| { + t.spawn(syndicate::name!("box"), enclose!((ds) move |t| { let current_value = t.named_field("current_value", 0u64); t.dataflow({ @@ -49,7 +49,7 @@ async fn main() -> Result<(), Box> { Ok(()) })); - Actor::new().boot(syndicate::name!("client"), enclose!((ds) move |t| { + t.spawn(syndicate::name!("client"), enclose!((ds) move |t| { let box_state_handler = syndicate::entity(0u32) .on_asserted(enclose!((ds) move |count, t, captures: AnyValue| { *count = *count + 1; diff --git a/syndicate-server/examples/consumer.rs b/syndicate-server/examples/consumer.rs index 58a43ca..cf20836 100644 --- a/syndicate-server/examples/consumer.rs +++ b/syndicate-server/examples/consumer.rs @@ -26,7 +26,7 @@ async fn main() -> Result<(), Box> { let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - Actor::new().boot(syndicate::name!("consumer"), |t| { + Actor::new(None).boot(syndicate::name!("consumer"), |t| { relay::connect_stream(t, i, o, false, sturdyref, (), |_state, t, ds| { let consumer = syndicate::entity(0) .on_message(|message_count, _t, m: AnyValue| { diff --git a/syndicate-server/examples/pingpong.rs b/syndicate-server/examples/pingpong.rs index 674dec9..3a39801 100644 --- a/syndicate-server/examples/pingpong.rs +++ b/syndicate-server/examples/pingpong.rs @@ -93,7 +93,7 @@ async fn main() -> Result<(), Box> { let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - Actor::new().boot(syndicate::name!("pingpong"), |t| { + Actor::new(None).boot(syndicate::name!("pingpong"), |t| { relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| { let (send_label, recv_label, report_latency_every, should_echo, bytes_padding) = diff --git a/syndicate-server/examples/producer.rs b/syndicate-server/examples/producer.rs index 4caf289..72e9575 100644 --- a/syndicate-server/examples/producer.rs +++ b/syndicate-server/examples/producer.rs @@ -34,7 +34,7 @@ async fn main() -> Result<(), Box> { let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - Actor::new().boot(syndicate::name!("producer"), |t| { + Actor::new(None).boot(syndicate::name!("producer"), |t| { relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| { let padding: AnyValue = Value::ByteString(vec![0; config.bytes_padding]).wrap(); let action_count = config.action_count; diff --git a/syndicate-server/examples/state-consumer.rs b/syndicate-server/examples/state-consumer.rs index e45741f..8ebd8bf 100644 --- a/syndicate-server/examples/state-consumer.rs +++ b/syndicate-server/examples/state-consumer.rs @@ -26,7 +26,7 @@ async fn main() -> Result<(), Box> { let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - Actor::new().boot(syndicate::name!("state-consumer"), |t| { + Actor::new(None).boot(syndicate::name!("state-consumer"), |t| { relay::connect_stream(t, i, o, false, sturdyref, (), |_state, t, ds| { let consumer = { #[derive(Default)] diff --git a/syndicate-server/examples/state-producer.rs b/syndicate-server/examples/state-producer.rs index ee13a5e..1f86d9a 100644 --- a/syndicate-server/examples/state-producer.rs +++ b/syndicate-server/examples/state-producer.rs @@ -22,7 +22,7 @@ async fn main() -> Result<(), Box> { let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - Actor::new().boot(syndicate::name!("state-producer"), |t| { + Actor::new(None).boot(syndicate::name!("state-producer"), |t| { relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| { let account = Account::new(syndicate::name!("account")); t.linked_task(syndicate::name!("sender"), async move { diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index c7fdf66..165975e 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -83,7 +83,7 @@ async fn main() -> Result<(), Box> { tracing::trace!("startup"); - Actor::new().boot(tracing::Span::current(), move |t| { + Actor::new(None).boot(tracing::Span::current(), move |t| { let server_config_ds = Cap::new(&t.create(Dataspace::new())); let log_ds = Cap::new(&t.create(Dataspace::new())); diff --git a/syndicate-server/src/services/tcp_relay_listener.rs b/syndicate-server/src/services/tcp_relay_listener.rs index 74f6745..40cf177 100644 --- a/syndicate-server/src/services/tcp_relay_listener.rs +++ b/syndicate-server/src/services/tcp_relay_listener.rs @@ -44,15 +44,19 @@ fn run(t: &mut Activation, ds: Arc, spec: TcpRelayListener) -> ActorResult loop { let (stream, addr) = listener.accept().await?; let gatekeeper = spec.gatekeeper.clone(); - Actor::new().boot( - syndicate::name!(parent: parent_span.clone(), "conn"), - move |t| Ok(t.linked_task(tracing::Span::current(), { - let facet = t.facet.clone(); - async move { - detect_protocol(facet, stream, gatekeeper, addr).await?; - Ok(LinkedTaskTermination::KeepFacet) - } - }))); + let name = syndicate::name!(parent: parent_span.clone(), "conn"); + facet.activate(Account::new(name.clone()), move |t| { + t.spawn(name, move |t| { + Ok(t.linked_task(tracing::Span::current(), { + let facet = t.facet.clone(); + async move { + detect_protocol(facet, stream, gatekeeper, addr).await?; + Ok(LinkedTaskTermination::KeepFacet) + } + })) + }); + Ok(()) + })?; } }); Ok(()) diff --git a/syndicate-server/src/services/unix_relay_listener.rs b/syndicate-server/src/services/unix_relay_listener.rs index 156f504..1aedb4e 100644 --- a/syndicate-server/src/services/unix_relay_listener.rs +++ b/syndicate-server/src/services/unix_relay_listener.rs @@ -47,22 +47,26 @@ fn run(t: &mut Activation, ds: Arc, spec: UnixRelayListener) -> ActorResult let (stream, _addr) = listener.accept().await?; let peer = stream.peer_cred()?; let gatekeeper = spec.gatekeeper.clone(); - Actor::new().boot( - syndicate::name!(parent: parent_span.clone(), "conn", - pid = ?peer.pid().unwrap_or(-1), - uid = peer.uid()), - |t| Ok(t.linked_task(tracing::Span::current(), { - let facet = t.facet.clone(); - async move { - tracing::info!(protocol = %"unix"); - let (i, o) = stream.into_split(); - run_connection(facet, - relay::Input::Bytes(Box::pin(i)), - relay::Output::Bytes(Box::pin(o)), - gatekeeper)?; - Ok(LinkedTaskTermination::KeepFacet) - } - }))); + let name = syndicate::name!(parent: parent_span.clone(), "conn", + pid = ?peer.pid().unwrap_or(-1), + uid = peer.uid()); + facet.activate(Account::new(name.clone()), move |t| { + t.spawn(name, |t| { + Ok(t.linked_task(tracing::Span::current(), { + let facet = t.facet.clone(); + async move { + tracing::info!(protocol = %"unix"); + let (i, o) = stream.into_split(); + run_connection(facet, + relay::Input::Bytes(Box::pin(i)), + relay::Output::Bytes(Box::pin(o)), + gatekeeper)?; + Ok(LinkedTaskTermination::KeepFacet) + } + })) + }); + Ok(()) + })?; } }); Ok(()) diff --git a/syndicate/benches/bench_dataspace.rs b/syndicate/benches/bench_dataspace.rs index 810da0b..4304869 100644 --- a/syndicate/benches/bench_dataspace.rs +++ b/syndicate/benches/bench_dataspace.rs @@ -52,7 +52,7 @@ pub fn bench_pub(c: &mut Criterion) { b.iter_custom(|iters| { let start = Instant::now(); rt.block_on(async move { - Actor::new().boot(syndicate::name!("dataspace"), move |t| { + Actor::new(None).boot(syndicate::name!("dataspace"), move |t| { let ds = t.create(Dataspace::new()); let shutdown = t.create(ShutdownEntity); let account = Account::new(syndicate::name!("sender-account")); @@ -83,7 +83,7 @@ pub fn bench_pub(c: &mut Criterion) { rt.block_on(async move { let turn_count = Arc::new(AtomicU64::new(0)); - Actor::new().boot(syndicate::name!("dataspace"), { + Actor::new(None).boot(syndicate::name!("dataspace"), { let iters = iters.clone(); let turn_count = Arc::clone(&turn_count); @@ -103,7 +103,7 @@ pub fn bench_pub(c: &mut Criterion) { observer: shutdown, }); - Actor::new().boot(syndicate::name!("consumer"), move |t| { + t.spawn(syndicate::name!("consumer"), move |t| { struct Receiver(Arc); impl Entity for Receiver { fn message(&mut self, _t: &mut Activation, _m: AnyValue) -> ActorResult { diff --git a/syndicate/benches/ring.rs b/syndicate/benches/ring.rs index e25d2e4..68bbc7e 100644 --- a/syndicate/benches/ring.rs +++ b/syndicate/benches/ring.rs @@ -88,7 +88,7 @@ pub fn bench_ring(c: &mut Criterion) { self.i += 1; let spawner_ref = Arc::clone(&self.self_ref); ACTORS_CREATED.fetch_add(1, Ordering::Relaxed); - Actor::new().boot(syndicate::name!("forwarder", ?i), move |t| { + t.spawn(syndicate::name!("forwarder", ?i), move |t| { let _ = t.prevent_inert_check(); let f = t.create(Forwarder { next, @@ -118,7 +118,7 @@ pub fn bench_ring(c: &mut Criterion) { } ACTORS_CREATED.fetch_add(1, Ordering::Relaxed); - Actor::new().boot(syndicate::name!("counter"), move |t| { + Actor::new(None).boot(syndicate::name!("counter"), move |t| { let _ = t.prevent_inert_check(); let mut s = Spawner { self_ref: t.create_inert(), diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index b8e1b23..b071a12 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -1012,7 +1012,7 @@ impl<'activation> Activation<'activation> { name: tracing::Span, boot: F, ) -> ActorRef { - let ac = Actor::new(); + let ac = Actor::new(Some(self.state.actor_id)); let ac_ref = ac.ac_ref.clone(); self.enqueue_for_myself_at_commit(Box::new(move |_| { ac.boot(name, boot); @@ -1031,7 +1031,7 @@ impl<'activation> Activation<'activation> { name: tracing::Span, boot: F, ) -> ActorRef { - let ac = Actor::new(); + let ac = Actor::new(Some(self.state.actor_id)); let ac_ref = ac.ac_ref.clone(); let facet_id = self.facet.facet_id; self.enqueue_for_myself_at_commit(Box::new(move |t| { @@ -1498,11 +1498,11 @@ impl Drop for Mailbox { impl Actor { /// Create a new actor. It still needs to be [`boot`ed][Self::boot]. - pub fn new() -> Self { + pub fn new(parent_actor_id: Option) -> Self { let (tx, rx) = unbounded_channel(); let actor_id = next_actor_id(); let root = Facet::new(None); - tracing::trace!(?actor_id, root_facet_id = ?root.facet_id, "Actor::new"); + tracing::debug!(?actor_id, ?parent_actor_id, root_facet_id = ?root.facet_id, "Actor::new"); let mut st = RunningActor { actor_id, tx, @@ -1806,7 +1806,10 @@ impl Drop for Field { impl Drop for Actor { fn drop(&mut self) { self.rx.close(); - ACTORS.write().remove(&self.ac_ref.actor_id); + let _name = ACTORS.write().remove(&self.ac_ref.actor_id) + .map_or_else(|| crate::name!(parent: None, "DROPPED", actor_id=?self.ac_ref.actor_id), + |(span, _ac_ref)| span); + let _scope = _name.enter(); let mut g = self.ac_ref.state.lock(); if let ActorState::Running(ref mut state) = *g { tracing::warn!(actor_id = ?self.ac_ref.actor_id, "Force-terminated by Actor::drop"); @@ -1815,7 +1818,7 @@ impl Drop for Actor { state.cleanup(&self.ac_ref, &exit_status); *g = ActorState::Terminated { exit_status }; } - tracing::trace!(actor_id = ?self.ac_ref.actor_id, "Actor::drop"); + tracing::debug!(actor_id = ?self.ac_ref.actor_id, "Actor::drop"); } }