From 9080dc6f1e2841f5d4f866488b41072d3757c1ca Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 20 Jan 2022 10:08:49 +0100 Subject: [PATCH] Fill in the rest of the jolly owl --- syndicate-server/examples/consumer.rs | 19 +- syndicate-server/examples/pingpong.rs | 34 +-- syndicate-server/examples/producer.rs | 20 +- syndicate-server/examples/state-consumer.rs | 19 +- syndicate-server/examples/state-producer.rs | 31 +-- syndicate/benches/bench_dataspace.rs | 49 ++--- syndicate/build.rs | 2 +- syndicate/src/actor.rs | 231 +++++++++++--------- syndicate/src/trace.rs | 2 +- 9 files changed, 189 insertions(+), 218 deletions(-) diff --git a/syndicate-server/examples/consumer.rs b/syndicate-server/examples/consumer.rs index 1509a1b..d4093cf 100644 --- a/syndicate-server/examples/consumer.rs +++ b/syndicate-server/examples/consumer.rs @@ -12,7 +12,6 @@ use syndicate::value::NestedValue; use tokio::net::TcpStream; use core::time::Duration; -use tokio::time::interval; #[derive(Clone, Debug, StructOpt)] pub struct Config { @@ -44,19 +43,11 @@ async fn main() -> Result<(), Box> { observer: Arc::clone(&consumer), }); - t.linked_task(Some(AnyValue::symbol("tick")), async move { - let mut stats_timer = interval(Duration::from_secs(1)); - loop { - stats_timer.tick().await; - let consumer = Arc::clone(&consumer); - external_event(&Arc::clone(&consumer.underlying.mailbox), - None, - &Account::new(None, None), - Box::new(move |t| t.with_entity( - &consumer.underlying, - |t, e| e.message(t, AnyValue::new(true)))))?; - } - }); + t.every(Duration::from_secs(1), move |t| { + consumer.message(t, &(), &AnyValue::new(true)); + Ok(()) + })?; + Ok(None) }); Ok(()) diff --git a/syndicate-server/examples/pingpong.rs b/syndicate-server/examples/pingpong.rs index 0798c35..b9b248f 100644 --- a/syndicate-server/examples/pingpong.rs +++ b/syndicate-server/examples/pingpong.rs @@ -14,7 +14,6 @@ use syndicate::value::Value; use tokio::net::TcpStream; use core::time::Duration; -use tokio::time::interval; #[derive(Clone, Debug, StructOpt)] pub struct PingConfig { @@ -172,39 +171,28 @@ async fn main() -> Result<(), Box> { observer: Arc::clone(&consumer), }); - t.linked_task(Some(AnyValue::symbol("tick")), async move { - let mut stats_timer = interval(Duration::from_secs(1)); - loop { - stats_timer.tick().await; - let consumer = Arc::clone(&consumer); - external_event(&Arc::clone(&consumer.underlying.mailbox), - None, - &Account::new(None, None), - Box::new(move |t| t.with_entity( - &consumer.underlying, - |t, e| e.message(t, AnyValue::new(true)))))?; - } - }); + t.every(Duration::from_secs(1), move |t| { + consumer.message(t, &(), &AnyValue::new(true)); + Ok(()) + })?; if let PingPongMode::Ping(c) = &config.mode { + let facet = t.facet.clone(); let turn_count = c.turn_count; let action_count = c.action_count; let account = Arc::clone(t.account()); t.linked_task(Some(AnyValue::symbol("boot-ping")), async move { let padding = AnyValue::bytestring(vec![0; bytes_padding]); for _ in 0..turn_count { - let mut events: PendingEventQueue = vec![]; let current_rec = simple_record2(send_label, Value::from(now()).wrap(), padding.clone()); - for _ in 0..action_count { - let ds = Arc::clone(&ds); - let current_rec = current_rec.clone(); - events.push(Box::new(move |t| t.with_entity( - &ds.underlying, - |t, e| e.message(t, current_rec)))); - } - external_events(&ds.underlying.mailbox, None, &account, events)? + facet.activate(&account, None, |t| { + for _ in 0..action_count { + ds.message(t, &(), ¤t_rec); + } + Ok(()) + }); } Ok(LinkedTaskTermination::KeepFacet) }); diff --git a/syndicate-server/examples/producer.rs b/syndicate-server/examples/producer.rs index d41032d..353b20a 100644 --- a/syndicate-server/examples/producer.rs +++ b/syndicate-server/examples/producer.rs @@ -1,7 +1,6 @@ use structopt::StructOpt; use syndicate::actor::*; -use syndicate::enclose; use syndicate::preserves::rec; use syndicate::relay; use syndicate::sturdy; @@ -29,22 +28,21 @@ async fn main() -> Result<(), Box> { let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); Actor::top(None, |t| { relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| { + let facet = t.facet.clone(); let padding = AnyValue::new(&vec![0u8; config.bytes_padding][..]); let action_count = config.action_count; let account = Account::new(None, None); t.linked_task(Some(AnyValue::symbol("sender")), async move { loop { account.ensure_clear_funds().await; - let mut events: PendingEventQueue = Vec::new(); - for _ in 0..action_count { - events.push(Box::new(enclose!((ds, padding) move |t| t.with_entity( - &ds.underlying, |t, e| e.message( - t, - rec![AnyValue::symbol("Says"), - AnyValue::new("producer"), - padding]))))); - } - external_events(&ds.underlying.mailbox, None, &account, events)?; + facet.activate(&account, None, |t| { + for _ in 0..action_count { + ds.message(t, &(), &rec![AnyValue::symbol("Says"), + AnyValue::new("producer"), + padding.clone()]); + } + Ok(()) + }); } }); Ok(None) diff --git a/syndicate-server/examples/state-consumer.rs b/syndicate-server/examples/state-consumer.rs index d5fe8dd..6a0cd5d 100644 --- a/syndicate-server/examples/state-consumer.rs +++ b/syndicate-server/examples/state-consumer.rs @@ -12,7 +12,6 @@ use syndicate::value::NestedValue; use tokio::net::TcpStream; use core::time::Duration; -use tokio::time::interval; #[derive(Clone, Debug, StructOpt)] pub struct Config { @@ -65,19 +64,11 @@ async fn main() -> Result<(), Box> { observer: Arc::clone(&consumer), }); - t.linked_task(Some(AnyValue::symbol("tick")), async move { - let mut stats_timer = interval(Duration::from_secs(1)); - loop { - stats_timer.tick().await; - let consumer = Arc::clone(&consumer); - external_event(&Arc::clone(&consumer.underlying.mailbox), - None, - &Account::new(None, None), - Box::new(move |t| t.with_entity( - &consumer.underlying, - |t, e| e.message(t, AnyValue::new(true)))))?; - } - }); + t.every(Duration::from_secs(1), move |t| { + consumer.message(t, &(), &AnyValue::new(true)); + Ok(()) + })?; + Ok(None) }); Ok(()) diff --git a/syndicate-server/examples/state-producer.rs b/syndicate-server/examples/state-producer.rs index 48cdc5e..8861e5e 100644 --- a/syndicate-server/examples/state-producer.rs +++ b/syndicate-server/examples/state-producer.rs @@ -1,9 +1,6 @@ -use std::sync::Arc; - use structopt::StructOpt; use syndicate::actor::*; -use syndicate::enclose; use syndicate::preserves::rec; use syndicate::relay; use syndicate::sturdy; @@ -25,27 +22,23 @@ async fn main() -> Result<(), Box> { let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); Actor::top(None, |t| { relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| { + let facet = t.facet.clone(); let account = Account::new(None, None); t.linked_task(Some(AnyValue::symbol("sender")), async move { let presence = rec![AnyValue::symbol("Present"), AnyValue::new(std::process::id())]; - let handle = syndicate::actor::next_handle(); - let assert_e = || { - external_event( - &Arc::clone(&ds.underlying.mailbox), None, &account, Box::new(enclose!( - (ds, presence, handle) move |t| t.with_entity( - &ds.underlying, |t, e| e.assert(t, presence, handle))))) - }; - let retract_e = || { - external_event( - &Arc::clone(&ds.underlying.mailbox), None, &account, Box::new(enclose!( - (ds, handle) move |t| t.with_entity( - &ds.underlying, |t, e| e.retract(t, handle))))) - }; - assert_e()?; loop { + let mut handle = None; + facet.activate(&account, None, |t| { + handle = ds.assert(t, &(), &presence); + Ok(()) + }); account.ensure_clear_funds().await; - retract_e()?; - assert_e()?; + facet.activate(&account, None, |t| { + if let Some(h) = handle { + t.retract(h); + } + Ok(()) + }); } }); Ok(None) diff --git a/syndicate/benches/bench_dataspace.rs b/syndicate/benches/bench_dataspace.rs index 5095069..be79996 100644 --- a/syndicate/benches/bench_dataspace.rs +++ b/syndicate/benches/bench_dataspace.rs @@ -9,7 +9,6 @@ use syndicate::language; use syndicate::actor::*; use syndicate::during::entity; use syndicate::dataspace::Dataspace; -use syndicate::enclose; use syndicate::schemas::dataspace::Observe; use syndicate::schemas::dataspace_patterns as p; use syndicate::value::NestedValue; @@ -53,23 +52,17 @@ pub fn bench_pub(c: &mut Criterion) { let start = Instant::now(); rt.block_on(async move { Actor::top(None, move |t| { + let _ = t.prevent_inert_check(); + // The reason this works is that all the messages to `ds` will be delivered + // before the message to `shutdown`, because `ds` and `shutdown` are in the + // same Actor. let ds = t.create(Dataspace::new(None)); let shutdown = t.create(ShutdownEntity); - let account = Account::new(None, None); - t.linked_task(Some(AnyValue::symbol("sender")), async move { - for _ in 0..iters { - external_event(&ds.mailbox, None, &account, Box::new( - enclose!((ds) move |t| t.with_entity( - &ds, - |t, e| e.message(t, says(AnyValue::new("bench_pub"), - Value::ByteString(vec![]).wrap()))))))? - } - external_event(&shutdown.mailbox, None, &account, Box::new( - enclose!((shutdown) move |t| t.with_entity( - &shutdown, - |t, e| e.message(t, AnyValue::new(true))))))?; - Ok(LinkedTaskTermination::KeepFacet) - }); + for _ in 0..iters { + t.message(&ds, says(AnyValue::new("bench_pub"), + Value::ByteString(vec![]).wrap())); + } + t.message(&shutdown, AnyValue::new(true)); Ok(()) }).await.unwrap().unwrap(); }); @@ -138,27 +131,15 @@ pub fn bench_pub(c: &mut Criterion) { observer: shutdown, }); - let account = Arc::clone(t.account()); - t.linked_task(Some(AnyValue::symbol("sender")), async move { + t.after(core::time::Duration::from_secs(0), move |t| { for _i in 0..iters { - let ds = Arc::clone(&ds); - external_event( - &Arc::clone(&ds.underlying.mailbox), None, &account, Box::new( - move |t| t.with_entity( - &ds.underlying, - |t, e| e.message(t, says(AnyValue::new("bench_pub"), - Value::ByteString(vec![]).wrap())))))? + ds.message(t, &(), &says(AnyValue::new("bench_pub"), + Value::ByteString(vec![]).wrap())); } - { - let ds = Arc::clone(&ds); - external_event( - &Arc::clone(&ds.underlying.mailbox), None, &account, Box::new( - move |t| t.with_entity( - &ds.underlying, - |t, e| e.message(t, AnyValue::new(true)))))?; - } - Ok(LinkedTaskTermination::KeepFacet) + ds.message(t, &(), &AnyValue::new(true)); + Ok(()) }); + Ok(()) }); Ok(()) diff --git a/syndicate/build.rs b/syndicate/build.rs index c425f6b..92ac4aa 100644 --- a/syndicate/build.rs +++ b/syndicate/build.rs @@ -30,7 +30,7 @@ fn main() -> std::io::Result<()> { c.plugins.push(Box::new(syndicate_plugins::PatternPlugin)); c.add_external_module(ExternalModule::new(vec!["EntityRef".to_owned()], "crate::actor")); - let inputs = expand_inputs(&vec!["protocols/schema-bundle.bin".to_owned()])?; + let inputs = expand_inputs(&vec!["../../syndicate-protocols/schema-bundle.bin".to_owned()])?; c.load_schemas_and_bundles(&inputs)?; compile(&c) } diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index a7e5dc5..188865e 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -218,8 +218,7 @@ pub trait Entity: Send { pub struct InertEntity; impl Entity for InertEntity {} -type ActionDescriber = Box trace::ActionDescription>; -type TracedAction = (Option, Action); +type TracedAction = (Option, Action); enum CleanupAction { ForMyself(TracedAction), @@ -232,7 +231,7 @@ type Action = Box ActorResult>; type Block = Box ActorResult>; #[doc(hidden)] -pub type PendingEventQueue = Vec; +pub type PendingEventQueue = Vec; /// The main API for programming Syndicated Actor objects. /// @@ -266,6 +265,7 @@ struct EventBuffer { pub account: Arc, queues: HashMap, PendingEventQueue)>, for_myself: PendingEventQueue, + final_actions: Vec>, } /// An `Account` records a "debt" in terms of outstanding work items. @@ -745,21 +745,24 @@ impl<'activation> Activation<'activation> { f.outbound_handles.insert(handle); drop(f); self.state.insert_retract_cleanup_action(&r, handle, self.pending.desc.as_ref().map( - enclose!((r) |_| Box::new(move || trace::ActionDescription::Retract { - target: Box::new(r.as_ref().into()), - handle: Box::new(protocol::Handle(handle.into())), - }) as ActionDescriber))); + enclose!((r) move |_| trace::TargetedTurnEvent { + target: r.as_ref().into(), + detail: trace::TurnEvent::Retract { + handle: Box::new(protocol::Handle(handle.into())), + }, + }))); { let r = Arc::clone(r); - self.pending.trace(|| trace::ActionDescription::Assert { - target: Box::new(r.as_ref().into()), + let description = self.pending.trace_targeted(false, &r, || trace::TurnEvent::Assert { assertion: Box::new((&a).into()), handle: Box::new(protocol::Handle(handle.into())), }); - self.pending.queue_for(&r).push(Box::new(move |t| t.with_entity(&r, |t, e| { - tracing::trace!(?handle, ?a, "asserted"); - e.assert(t, a, handle) - }))); + self.pending.queue_for(&r).push(( + description, + Box::new(move |t| t.with_entity(&r, |t, e| { + tracing::trace!(?handle, ?a, "asserted"); + e.assert(t, a, handle) + })))); } } handle @@ -791,11 +794,12 @@ impl<'activation> Activation<'activation> { self.state.cleanup_actions.insert( handle, CleanupAction::ForMyself(( - self.pending.desc.as_ref().map(enclose!((r, handle) move |_| Box::new( - move || trace::ActionDescription::Retract { - target: Box::new(r.as_ref().into()), + self.pending.desc.as_ref().map(enclose!((r, handle) move |_| trace::TargetedTurnEvent { + target: r.as_ref().into(), + detail: trace::TurnEvent::Retract { handle: Box::new(protocol::Handle(handle.into())), - }) as ActionDescriber)), + } + })), Box::new(move |t| t.with_entity(&r, |t, e| { tracing::trace!(?handle, "retracted"); if let Some(f) = t.active_facet() { @@ -806,30 +810,34 @@ impl<'activation> Activation<'activation> { } { let r = Arc::clone(r); - self.pending.trace(|| trace::ActionDescription::Assert { - target: Box::new(r.as_ref().into()), + let description = self.pending.trace_targeted(true, &r, || trace::TurnEvent::Assert { assertion: Box::new((&a).into()), handle: Box::new(protocol::Handle(handle.into())), }); - self.pending.for_myself.push(Box::new(move |t| t.with_entity(&r, |t, e| { - tracing::trace!(?handle, ?a, "asserted"); - e.assert(t, a, handle) - }))); + self.pending.for_myself.push(( + description, + Box::new(move |t| t.with_entity(&r, |t, e| { + tracing::trace!(?handle, ?a, "asserted"); + e.assert(t, a, handle) + })))); } } handle } fn half_link(&mut self, t_other: &mut Activation) -> Handle { - let other_actor_id = t_other.state.actor_id; + let this_actor_id = self.state.actor_id; let entity_ref = t_other.create::(StopOnRetract); let handle = next_handle(); tracing::trace!(?handle, ?entity_ref, "half_link"); self.state.insert_retract_cleanup_action(&entity_ref, handle, self.pending.desc.as_ref().map( - move |_| Box::new(move || trace::ActionDescription::BreakLink { - peer: Box::new(trace::ActorId(AnyValue::new(other_actor_id))), - handle: Box::new(protocol::Handle(handle.into())), - }) as ActionDescriber)); + enclose!((entity_ref) move |_| trace::TargetedTurnEvent { + target: entity_ref.as_ref().into(), + detail: trace::TurnEvent::BreakLink { + source: Box::new(trace::ActorId(AnyValue::new(this_actor_id))), + handle: Box::new(protocol::Handle(handle.into())), + }, + }))); self.active_facet().unwrap().outbound_handles.insert(handle); t_other.with_entity(&entity_ref, |t, e| e.assert(t, AnyValue::new(true), handle)).unwrap(); handle @@ -863,14 +871,15 @@ impl<'activation> Activation<'activation> { pub fn message(&mut self, r: &Arc>, m: M) { tracing::trace!(?r, ?m, "message"); let r = Arc::clone(r); - self.pending.trace(|| trace::ActionDescription::Message { - target: Box::new(r.as_ref().into()), + let description = self.pending.trace_targeted(false, &r, || trace::TurnEvent::Message { body: Box::new((&m).into()), }); - self.pending.queue_for(&r).push(Box::new(move |t| t.with_entity(&r, |t, e| { - tracing::trace!(?m, "delivered"); - e.message(t, m) - }))) + self.pending.queue_for(&r).push(( + description, + Box::new(move |t| t.with_entity(&r, |t, e| { + tracing::trace!(?m, "delivered"); + e.message(t, m) + })))) } /// Core API: send message `m` to recipient `r`, which must be a @@ -886,12 +895,12 @@ impl<'activation> Activation<'activation> { pub fn message_for_myself(&mut self, r: &Arc>, m: M) { self.immediate_oid(r); let r = Arc::clone(r); - self.pending.trace(|| trace::ActionDescription::Message { - target: Box::new(r.as_ref().into()), + let description = self.pending.trace_targeted(true, &r, || trace::TurnEvent::Message { body: Box::new((&m).into()), }); - self.pending.for_myself.push(Box::new( - move |t| t.with_entity(&r, |t, e| e.message(t, m)))) + self.pending.for_myself.push(( + description, + Box::new(move |t| t.with_entity(&r, |t, e| e.message(t, m))))) } /// Core API: begins a synchronisation with `r`. @@ -901,11 +910,12 @@ impl<'activation> Activation<'activation> { /// the synchronisation request. pub fn sync(&mut self, r: &Arc>, peer: Arc>) { let r = Arc::clone(r); - self.pending.trace(|| trace::ActionDescription::Sync { - target: Box::new(r.as_ref().into()), + let description = self.pending.trace_targeted(false, &r, || trace::TurnEvent::Sync { + peer: Box::new(peer.as_ref().into()), }); - self.pending.queue_for(&r).push(Box::new( - move |t| t.with_entity(&r, |t, e| e.sync(t, peer)))) + self.pending.queue_for(&r).push(( + description, + Box::new(move |t| t.with_entity(&r, |t, e| e.sync(t, peer))))) } /// Registers the entity `r` in the list of stop actions for the active facet. If the facet @@ -958,10 +968,10 @@ impl<'activation> Activation<'activation> { /// /// # Panics /// - /// Panics if any pending actions "`for_myself`" (resulting from + /// Panics if any pending "`final_actions`" or actions "`for_myself`" (resulting from /// [`assert_for_myself`][Self::assert_for_myself] or - /// [`message_for_myself`][Self::message_for_myself]) are - /// outstanding at the time of the call. + /// [`message_for_myself`][Self::message_for_myself]) are outstanding at the time of the + /// call. pub fn deliver(&mut self) { self.pending.deliver(); } @@ -1125,9 +1135,10 @@ impl<'activation> Activation<'activation> { link: false, id: Box::new(trace::ActorId(AnyValue::new(ac_ref.actor_id))), }); - self.pending.for_myself.push(Box::new(move |t| { - ac.boot(name, Arc::clone(t.account()), Some(trace::TurnCause::ActorBoot), boot); - Ok(()) + let cause = self.pending.desc.as_ref().map( + |d| trace::TurnCause::Turn { id: Box::new(d.id.clone()) }); + self.pending.final_actions.push(Box::new(move |t| { + ac.boot(name, Arc::clone(t.account()), cause, boot); })); ac_ref } @@ -1149,14 +1160,13 @@ impl<'activation> Activation<'activation> { link: true, id: Box::new(trace::ActorId(AnyValue::new(ac_ref.actor_id))), }); - self.pending.for_myself.push(Box::new(move |t| { + let cause = self.pending.desc.as_ref().map( + |d| trace::TurnCause::Turn { id: Box::new(d.id.clone()) }); + self.pending.final_actions.push(Box::new(move |t| { t.with_facet(true, facet_id, move |t| { - ac.link(t)?.boot(name, - Arc::clone(t.account()), - Some(trace::TurnCause::ActorBoot), - boot); + ac.link(t).boot(name, Arc::clone(t.account()), cause, boot); Ok(()) - }) + }).unwrap() })); ac_ref } @@ -1249,7 +1259,7 @@ impl<'activation> Activation<'activation> { fn stop_if_inert(&mut self) { let facet_id = self.facet.facet_id; - self.pending.for_myself.push(Box::new(move |t| { + self.pending.final_actions.push(Box::new(move |t| { tracing::trace!("Checking inertness of facet {} from facet {}", facet_id, t.facet.facet_id); if t.state.facet_exists_and_is_inert(facet_id) { tracing::trace!(" - facet {} is inert, stopping it", facet_id); @@ -1258,7 +1268,6 @@ impl<'activation> Activation<'activation> { } else { tracing::trace!(" - facet {} is not inert", facet_id); } - Ok(()) })) } @@ -1432,12 +1441,22 @@ impl<'activation> Activation<'activation> { loop { let actions = std::mem::take(&mut self.pending.for_myself); if actions.is_empty() { break; } - for action in actions.into_iter() { action(self)? } + for (maybe_desc, action) in actions.into_iter() { + if let Some(desc) = maybe_desc { + self.pending.trace(|| trace::ActionDescription::DequeueInternal { + event: Box::new(desc), + }); + } + action(self)? + } } if !self.repair_dataflow()? { break; } } + for final_action in std::mem::take(&mut self.pending.final_actions) { + final_action(self); + } Ok(()) } @@ -1477,23 +1496,28 @@ impl EventBuffer { trace_collector, account, queues: HashMap::new(), - for_myself: Vec::new(), + for_myself: PendingEventQueue::new(), + final_actions: Vec::new(), } } fn execute_cleanup_action(&mut self, d: CleanupAction) { match d { - CleanupAction::ForAnother(mailbox, (tracer, action)) => { - if let Some(f) = tracer { - self.trace(f); + CleanupAction::ForAnother(mailbox, (maybe_desc, action)) => { + if let Some(desc) = &maybe_desc { + self.trace(|| trace::ActionDescription::Enqueue { + event: Box::new(desc.clone()), + }) } - self.queue_for_mailbox(&mailbox).push(action); + self.queue_for_mailbox(&mailbox).push((maybe_desc, action)); } - CleanupAction::ForMyself((tracer, action)) => { - if let Some(f) = tracer { - self.trace(f); + CleanupAction::ForMyself((maybe_desc, action)) => { + if let Some(desc) = &maybe_desc { + self.trace(|| trace::ActionDescription::EnqueueInternal { + event: Box::new(desc.clone()), + }) } - self.for_myself.push(action); + self.for_myself.push((maybe_desc, action)); } } } @@ -1510,10 +1534,14 @@ impl EventBuffer { fn clear(&mut self) { self.queues = HashMap::new(); self.for_myself = PendingEventQueue::new(); + self.final_actions = Vec::new(); } fn deliver(&mut self) { tracing::trace!("EventBuffer::deliver"); + if !self.final_actions.is_empty() { + panic!("Unprocessed final_actions at deliver() time"); + } if !self.for_myself.is_empty() { panic!("Unprocessed for_myself events remain at deliver() time"); } @@ -1544,6 +1572,27 @@ impl EventBuffer { d.record(f()); } } + + #[inline(always)] + fn trace_targeted trace::TurnEvent>( + &mut self, + internal: bool, + r: &Arc>, + f: F, + ) -> Option { + self.desc.as_mut().map(|d| { + let event = trace::TargetedTurnEvent { + target: r.as_ref().into(), + detail: f(), + }; + d.record(if internal { + trace::ActionDescription::EnqueueInternal { event: Box::new(event.clone()) } + } else { + trace::ActionDescription::Enqueue { event: Box::new(event.clone()) } + }); + event + }) + } } impl Drop for EventBuffer { @@ -1680,9 +1729,8 @@ impl Actor { boot: F, ) -> ActorHandle { let ac = Actor::new(None, trace_collector.clone()); - let cause = trace_collector.as_ref().map(|_| trace::TurnCause::ActorBoot); let account = Account::new(None, trace_collector); - ac.boot(None, account, cause, boot) + ac.boot(None, account, Some(trace::TurnCause::external("top-level actor")), boot) } /// Create a new actor. It still needs to be [`boot`ed][Self::boot]. @@ -1709,7 +1757,7 @@ impl Actor { Actor { rx, trace_collector, ac_ref } } - fn link(self, t_parent: &mut Activation) -> Result { + fn link(self, t_parent: &mut Activation) -> Self { if t_parent.active_facet().is_none() { panic!("No active facet when calling spawn_link"); } @@ -1732,9 +1780,9 @@ impl Actor { child_actor: Box::new(trace::ActorId(AnyValue::new(self.ac_ref.actor_id))), child_to_parent: Box::new(protocol::Handle(h_to_parent.unwrap().into())), }); - Ok(self) + self } else { - Err(error("spawn_link'd actor terminated before link could happen", AnyValue::new(false))) + panic!("spawn_link'd actor terminated before link could happen"); } } @@ -1822,7 +1870,14 @@ impl Actor { let actions = std::mem::take(&mut loaned_item.item); if !root_facet_ref.activate( &loaned_item.account, cause, |t| { - for action in actions.into_iter() { action(t)? } + for (maybe_desc, action) in actions.into_iter() { + if let Some(desc) = maybe_desc { + t.pending.trace(|| trace::ActionDescription::Dequeue { + event: Box::new(desc), + }); + } + action(t)?; + } Ok(()) }) { @@ -1977,13 +2032,13 @@ impl RunningActor { &mut self, r: &Arc>, handle: Handle, - describer: Option, + description: Option, ) { let r = Arc::clone(r); self.cleanup_actions.insert( handle, CleanupAction::ForAnother(Arc::clone(&r.mailbox), ( - describer, + description, Box::new(move |t| t.with_entity(&r, |t, e| { tracing::trace!(?handle, "retracted"); if let Some(f) = t.active_facet() { @@ -1999,7 +2054,7 @@ impl RunningActor { exit_status: &Arc, trace_collector: Option, ) { - let cause = trace_collector.as_ref().map(|_| trace::TurnCause::ActorCleanup); + let cause = Some(trace::TurnCause::Cleanup); let account = Account::new(Some(AnyValue::symbol("cleanup")), trace_collector.clone()); let mut t = Activation::make(&ac_ref.facet_ref(self.root), account, cause, trace_collector, self); if let Err(err) = t._terminate_facet(t.state.root, exit_status.is_ok(), trace::FacetStopReason::ActorStopping) { @@ -2069,32 +2124,6 @@ impl Drop for Facet { } } -/// Directly injects `action` into `mailbox`, billing subsequent activity against `account`. -/// -/// Primarily for use by [linked tasks][Activation::linked_task]. -#[must_use] -pub fn external_event( - mailbox: &Arc, - cause: Option, - account: &Arc, - action: Action, -) -> ActorResult { - send_actions(&mailbox.tx, cause, account, vec![action]) -} - -/// Directly injects `actions` into `mailbox`, billing subsequent activity against `account`. -/// -/// Primarily for use by [linked tasks][Activation::linked_task]. -#[must_use] -pub fn external_events( - mailbox: &Arc, - cause: Option, - account: &Arc, - actions: PendingEventQueue, -) -> ActorResult { - send_actions(&mailbox.tx, cause, account, actions) -} - impl Ref { /// Supplies the behaviour (`e`) for a `Ref` created via /// [`create_inert`][Activation::create_inert]. diff --git a/syndicate/src/trace.rs b/syndicate/src/trace.rs index 534299d..51d9a63 100644 --- a/syndicate/src/trace.rs +++ b/syndicate/src/trace.rs @@ -132,7 +132,7 @@ impl TraceCollector { pub fn new(mut f: F) -> TraceCollector { let (tx, mut rx) = unbounded_channel::(); tokio::spawn(async move { - let mut timer = tokio::time::interval(std::time::Duration::from_secs(1)); + let mut timer = tokio::time::interval(std::time::Duration::from_millis(100)); loop { select! { maybe_entry = rx.recv() => {