Fill in the rest of the jolly owl

This commit is contained in:
Tony Garnock-Jones 2022-01-20 10:08:49 +01:00
parent a9f83e0a9d
commit 9080dc6f1e
9 changed files with 189 additions and 218 deletions

View File

@ -12,7 +12,6 @@ use syndicate::value::NestedValue;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use core::time::Duration; use core::time::Duration;
use tokio::time::interval;
#[derive(Clone, Debug, StructOpt)] #[derive(Clone, Debug, StructOpt)]
pub struct Config { pub struct Config {
@ -44,19 +43,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
observer: Arc::clone(&consumer), observer: Arc::clone(&consumer),
}); });
t.linked_task(Some(AnyValue::symbol("tick")), async move { t.every(Duration::from_secs(1), move |t| {
let mut stats_timer = interval(Duration::from_secs(1)); consumer.message(t, &(), &AnyValue::new(true));
loop { Ok(())
stats_timer.tick().await; })?;
let consumer = Arc::clone(&consumer);
external_event(&Arc::clone(&consumer.underlying.mailbox),
None,
&Account::new(None, None),
Box::new(move |t| t.with_entity(
&consumer.underlying,
|t, e| e.message(t, AnyValue::new(true)))))?;
}
});
Ok(None) Ok(None)
}); });
Ok(()) Ok(())

View File

@ -14,7 +14,6 @@ use syndicate::value::Value;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use core::time::Duration; use core::time::Duration;
use tokio::time::interval;
#[derive(Clone, Debug, StructOpt)] #[derive(Clone, Debug, StructOpt)]
pub struct PingConfig { pub struct PingConfig {
@ -172,39 +171,28 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
observer: Arc::clone(&consumer), observer: Arc::clone(&consumer),
}); });
t.linked_task(Some(AnyValue::symbol("tick")), async move { t.every(Duration::from_secs(1), move |t| {
let mut stats_timer = interval(Duration::from_secs(1)); consumer.message(t, &(), &AnyValue::new(true));
loop { Ok(())
stats_timer.tick().await; })?;
let consumer = Arc::clone(&consumer);
external_event(&Arc::clone(&consumer.underlying.mailbox),
None,
&Account::new(None, None),
Box::new(move |t| t.with_entity(
&consumer.underlying,
|t, e| e.message(t, AnyValue::new(true)))))?;
}
});
if let PingPongMode::Ping(c) = &config.mode { if let PingPongMode::Ping(c) = &config.mode {
let facet = t.facet.clone();
let turn_count = c.turn_count; let turn_count = c.turn_count;
let action_count = c.action_count; let action_count = c.action_count;
let account = Arc::clone(t.account()); let account = Arc::clone(t.account());
t.linked_task(Some(AnyValue::symbol("boot-ping")), async move { t.linked_task(Some(AnyValue::symbol("boot-ping")), async move {
let padding = AnyValue::bytestring(vec![0; bytes_padding]); let padding = AnyValue::bytestring(vec![0; bytes_padding]);
for _ in 0..turn_count { for _ in 0..turn_count {
let mut events: PendingEventQueue = vec![];
let current_rec = simple_record2(send_label, let current_rec = simple_record2(send_label,
Value::from(now()).wrap(), Value::from(now()).wrap(),
padding.clone()); padding.clone());
for _ in 0..action_count { facet.activate(&account, None, |t| {
let ds = Arc::clone(&ds); for _ in 0..action_count {
let current_rec = current_rec.clone(); ds.message(t, &(), &current_rec);
events.push(Box::new(move |t| t.with_entity( }
&ds.underlying, Ok(())
|t, e| e.message(t, current_rec)))); });
}
external_events(&ds.underlying.mailbox, None, &account, events)?
} }
Ok(LinkedTaskTermination::KeepFacet) Ok(LinkedTaskTermination::KeepFacet)
}); });

View File

@ -1,7 +1,6 @@
use structopt::StructOpt; use structopt::StructOpt;
use syndicate::actor::*; use syndicate::actor::*;
use syndicate::enclose;
use syndicate::preserves::rec; use syndicate::preserves::rec;
use syndicate::relay; use syndicate::relay;
use syndicate::sturdy; use syndicate::sturdy;
@ -29,22 +28,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Actor::top(None, |t| { Actor::top(None, |t| {
relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| { relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| {
let facet = t.facet.clone();
let padding = AnyValue::new(&vec![0u8; config.bytes_padding][..]); let padding = AnyValue::new(&vec![0u8; config.bytes_padding][..]);
let action_count = config.action_count; let action_count = config.action_count;
let account = Account::new(None, None); let account = Account::new(None, None);
t.linked_task(Some(AnyValue::symbol("sender")), async move { t.linked_task(Some(AnyValue::symbol("sender")), async move {
loop { loop {
account.ensure_clear_funds().await; account.ensure_clear_funds().await;
let mut events: PendingEventQueue = Vec::new(); facet.activate(&account, None, |t| {
for _ in 0..action_count { for _ in 0..action_count {
events.push(Box::new(enclose!((ds, padding) move |t| t.with_entity( ds.message(t, &(), &rec![AnyValue::symbol("Says"),
&ds.underlying, |t, e| e.message( AnyValue::new("producer"),
t, padding.clone()]);
rec![AnyValue::symbol("Says"), }
AnyValue::new("producer"), Ok(())
padding]))))); });
}
external_events(&ds.underlying.mailbox, None, &account, events)?;
} }
}); });
Ok(None) Ok(None)

View File

@ -12,7 +12,6 @@ use syndicate::value::NestedValue;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use core::time::Duration; use core::time::Duration;
use tokio::time::interval;
#[derive(Clone, Debug, StructOpt)] #[derive(Clone, Debug, StructOpt)]
pub struct Config { pub struct Config {
@ -65,19 +64,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
observer: Arc::clone(&consumer), observer: Arc::clone(&consumer),
}); });
t.linked_task(Some(AnyValue::symbol("tick")), async move { t.every(Duration::from_secs(1), move |t| {
let mut stats_timer = interval(Duration::from_secs(1)); consumer.message(t, &(), &AnyValue::new(true));
loop { Ok(())
stats_timer.tick().await; })?;
let consumer = Arc::clone(&consumer);
external_event(&Arc::clone(&consumer.underlying.mailbox),
None,
&Account::new(None, None),
Box::new(move |t| t.with_entity(
&consumer.underlying,
|t, e| e.message(t, AnyValue::new(true)))))?;
}
});
Ok(None) Ok(None)
}); });
Ok(()) Ok(())

View File

@ -1,9 +1,6 @@
use std::sync::Arc;
use structopt::StructOpt; use structopt::StructOpt;
use syndicate::actor::*; use syndicate::actor::*;
use syndicate::enclose;
use syndicate::preserves::rec; use syndicate::preserves::rec;
use syndicate::relay; use syndicate::relay;
use syndicate::sturdy; use syndicate::sturdy;
@ -25,27 +22,23 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Actor::top(None, |t| { Actor::top(None, |t| {
relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| { relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| {
let facet = t.facet.clone();
let account = Account::new(None, None); let account = Account::new(None, None);
t.linked_task(Some(AnyValue::symbol("sender")), async move { t.linked_task(Some(AnyValue::symbol("sender")), async move {
let presence = rec![AnyValue::symbol("Present"), AnyValue::new(std::process::id())]; let presence = rec![AnyValue::symbol("Present"), AnyValue::new(std::process::id())];
let handle = syndicate::actor::next_handle();
let assert_e = || {
external_event(
&Arc::clone(&ds.underlying.mailbox), None, &account, Box::new(enclose!(
(ds, presence, handle) move |t| t.with_entity(
&ds.underlying, |t, e| e.assert(t, presence, handle)))))
};
let retract_e = || {
external_event(
&Arc::clone(&ds.underlying.mailbox), None, &account, Box::new(enclose!(
(ds, handle) move |t| t.with_entity(
&ds.underlying, |t, e| e.retract(t, handle)))))
};
assert_e()?;
loop { loop {
let mut handle = None;
facet.activate(&account, None, |t| {
handle = ds.assert(t, &(), &presence);
Ok(())
});
account.ensure_clear_funds().await; account.ensure_clear_funds().await;
retract_e()?; facet.activate(&account, None, |t| {
assert_e()?; if let Some(h) = handle {
t.retract(h);
}
Ok(())
});
} }
}); });
Ok(None) Ok(None)

View File

@ -9,7 +9,6 @@ use syndicate::language;
use syndicate::actor::*; use syndicate::actor::*;
use syndicate::during::entity; use syndicate::during::entity;
use syndicate::dataspace::Dataspace; use syndicate::dataspace::Dataspace;
use syndicate::enclose;
use syndicate::schemas::dataspace::Observe; use syndicate::schemas::dataspace::Observe;
use syndicate::schemas::dataspace_patterns as p; use syndicate::schemas::dataspace_patterns as p;
use syndicate::value::NestedValue; use syndicate::value::NestedValue;
@ -53,23 +52,17 @@ pub fn bench_pub(c: &mut Criterion) {
let start = Instant::now(); let start = Instant::now();
rt.block_on(async move { rt.block_on(async move {
Actor::top(None, move |t| { Actor::top(None, move |t| {
let _ = t.prevent_inert_check();
// The reason this works is that all the messages to `ds` will be delivered
// before the message to `shutdown`, because `ds` and `shutdown` are in the
// same Actor.
let ds = t.create(Dataspace::new(None)); let ds = t.create(Dataspace::new(None));
let shutdown = t.create(ShutdownEntity); let shutdown = t.create(ShutdownEntity);
let account = Account::new(None, None); for _ in 0..iters {
t.linked_task(Some(AnyValue::symbol("sender")), async move { t.message(&ds, says(AnyValue::new("bench_pub"),
for _ in 0..iters { Value::ByteString(vec![]).wrap()));
external_event(&ds.mailbox, None, &account, Box::new( }
enclose!((ds) move |t| t.with_entity( t.message(&shutdown, AnyValue::new(true));
&ds,
|t, e| e.message(t, says(AnyValue::new("bench_pub"),
Value::ByteString(vec![]).wrap()))))))?
}
external_event(&shutdown.mailbox, None, &account, Box::new(
enclose!((shutdown) move |t| t.with_entity(
&shutdown,
|t, e| e.message(t, AnyValue::new(true))))))?;
Ok(LinkedTaskTermination::KeepFacet)
});
Ok(()) Ok(())
}).await.unwrap().unwrap(); }).await.unwrap().unwrap();
}); });
@ -138,27 +131,15 @@ pub fn bench_pub(c: &mut Criterion) {
observer: shutdown, observer: shutdown,
}); });
let account = Arc::clone(t.account()); t.after(core::time::Duration::from_secs(0), move |t| {
t.linked_task(Some(AnyValue::symbol("sender")), async move {
for _i in 0..iters { for _i in 0..iters {
let ds = Arc::clone(&ds); ds.message(t, &(), &says(AnyValue::new("bench_pub"),
external_event( Value::ByteString(vec![]).wrap()));
&Arc::clone(&ds.underlying.mailbox), None, &account, Box::new(
move |t| t.with_entity(
&ds.underlying,
|t, e| e.message(t, says(AnyValue::new("bench_pub"),
Value::ByteString(vec![]).wrap())))))?
} }
{ ds.message(t, &(), &AnyValue::new(true));
let ds = Arc::clone(&ds); Ok(())
external_event(
&Arc::clone(&ds.underlying.mailbox), None, &account, Box::new(
move |t| t.with_entity(
&ds.underlying,
|t, e| e.message(t, AnyValue::new(true)))))?;
}
Ok(LinkedTaskTermination::KeepFacet)
}); });
Ok(()) Ok(())
}); });
Ok(()) Ok(())

View File

@ -30,7 +30,7 @@ fn main() -> std::io::Result<()> {
c.plugins.push(Box::new(syndicate_plugins::PatternPlugin)); c.plugins.push(Box::new(syndicate_plugins::PatternPlugin));
c.add_external_module(ExternalModule::new(vec!["EntityRef".to_owned()], "crate::actor")); c.add_external_module(ExternalModule::new(vec!["EntityRef".to_owned()], "crate::actor"));
let inputs = expand_inputs(&vec!["protocols/schema-bundle.bin".to_owned()])?; let inputs = expand_inputs(&vec!["../../syndicate-protocols/schema-bundle.bin".to_owned()])?;
c.load_schemas_and_bundles(&inputs)?; c.load_schemas_and_bundles(&inputs)?;
compile(&c) compile(&c)
} }

View File

@ -218,8 +218,7 @@ pub trait Entity<M>: Send {
pub struct InertEntity; pub struct InertEntity;
impl<M> Entity<M> for InertEntity {} impl<M> Entity<M> for InertEntity {}
type ActionDescriber = Box<dyn Send + FnOnce() -> trace::ActionDescription>; type TracedAction = (Option<trace::TargetedTurnEvent>, Action);
type TracedAction = (Option<ActionDescriber>, Action);
enum CleanupAction { enum CleanupAction {
ForMyself(TracedAction), ForMyself(TracedAction),
@ -232,7 +231,7 @@ type Action = Box<dyn Send + FnOnce(&mut Activation) -> ActorResult>;
type Block = Box<dyn Send + FnMut(&mut Activation) -> ActorResult>; type Block = Box<dyn Send + FnMut(&mut Activation) -> ActorResult>;
#[doc(hidden)] #[doc(hidden)]
pub type PendingEventQueue = Vec<Action>; pub type PendingEventQueue = Vec<TracedAction>;
/// The main API for programming Syndicated Actor objects. /// The main API for programming Syndicated Actor objects.
/// ///
@ -266,6 +265,7 @@ struct EventBuffer {
pub account: Arc<Account>, pub account: Arc<Account>,
queues: HashMap<ActorId, (UnboundedSender<SystemMessage>, PendingEventQueue)>, queues: HashMap<ActorId, (UnboundedSender<SystemMessage>, PendingEventQueue)>,
for_myself: PendingEventQueue, for_myself: PendingEventQueue,
final_actions: Vec<Box<dyn Send + FnOnce(&mut Activation)>>,
} }
/// An `Account` records a "debt" in terms of outstanding work items. /// An `Account` records a "debt" in terms of outstanding work items.
@ -745,21 +745,24 @@ impl<'activation> Activation<'activation> {
f.outbound_handles.insert(handle); f.outbound_handles.insert(handle);
drop(f); drop(f);
self.state.insert_retract_cleanup_action(&r, handle, self.pending.desc.as_ref().map( self.state.insert_retract_cleanup_action(&r, handle, self.pending.desc.as_ref().map(
enclose!((r) |_| Box::new(move || trace::ActionDescription::Retract { enclose!((r) move |_| trace::TargetedTurnEvent {
target: Box::new(r.as_ref().into()), target: r.as_ref().into(),
handle: Box::new(protocol::Handle(handle.into())), detail: trace::TurnEvent::Retract {
}) as ActionDescriber))); handle: Box::new(protocol::Handle(handle.into())),
},
})));
{ {
let r = Arc::clone(r); let r = Arc::clone(r);
self.pending.trace(|| trace::ActionDescription::Assert { let description = self.pending.trace_targeted(false, &r, || trace::TurnEvent::Assert {
target: Box::new(r.as_ref().into()),
assertion: Box::new((&a).into()), assertion: Box::new((&a).into()),
handle: Box::new(protocol::Handle(handle.into())), handle: Box::new(protocol::Handle(handle.into())),
}); });
self.pending.queue_for(&r).push(Box::new(move |t| t.with_entity(&r, |t, e| { self.pending.queue_for(&r).push((
tracing::trace!(?handle, ?a, "asserted"); description,
e.assert(t, a, handle) Box::new(move |t| t.with_entity(&r, |t, e| {
}))); tracing::trace!(?handle, ?a, "asserted");
e.assert(t, a, handle)
}))));
} }
} }
handle handle
@ -791,11 +794,12 @@ impl<'activation> Activation<'activation> {
self.state.cleanup_actions.insert( self.state.cleanup_actions.insert(
handle, handle,
CleanupAction::ForMyself(( CleanupAction::ForMyself((
self.pending.desc.as_ref().map(enclose!((r, handle) move |_| Box::new( self.pending.desc.as_ref().map(enclose!((r, handle) move |_| trace::TargetedTurnEvent {
move || trace::ActionDescription::Retract { target: r.as_ref().into(),
target: Box::new(r.as_ref().into()), detail: trace::TurnEvent::Retract {
handle: Box::new(protocol::Handle(handle.into())), handle: Box::new(protocol::Handle(handle.into())),
}) as ActionDescriber)), }
})),
Box::new(move |t| t.with_entity(&r, |t, e| { Box::new(move |t| t.with_entity(&r, |t, e| {
tracing::trace!(?handle, "retracted"); tracing::trace!(?handle, "retracted");
if let Some(f) = t.active_facet() { if let Some(f) = t.active_facet() {
@ -806,30 +810,34 @@ impl<'activation> Activation<'activation> {
} }
{ {
let r = Arc::clone(r); let r = Arc::clone(r);
self.pending.trace(|| trace::ActionDescription::Assert { let description = self.pending.trace_targeted(true, &r, || trace::TurnEvent::Assert {
target: Box::new(r.as_ref().into()),
assertion: Box::new((&a).into()), assertion: Box::new((&a).into()),
handle: Box::new(protocol::Handle(handle.into())), handle: Box::new(protocol::Handle(handle.into())),
}); });
self.pending.for_myself.push(Box::new(move |t| t.with_entity(&r, |t, e| { self.pending.for_myself.push((
tracing::trace!(?handle, ?a, "asserted"); description,
e.assert(t, a, handle) Box::new(move |t| t.with_entity(&r, |t, e| {
}))); tracing::trace!(?handle, ?a, "asserted");
e.assert(t, a, handle)
}))));
} }
} }
handle handle
} }
fn half_link(&mut self, t_other: &mut Activation) -> Handle { fn half_link(&mut self, t_other: &mut Activation) -> Handle {
let other_actor_id = t_other.state.actor_id; let this_actor_id = self.state.actor_id;
let entity_ref = t_other.create::<AnyValue, _>(StopOnRetract); let entity_ref = t_other.create::<AnyValue, _>(StopOnRetract);
let handle = next_handle(); let handle = next_handle();
tracing::trace!(?handle, ?entity_ref, "half_link"); tracing::trace!(?handle, ?entity_ref, "half_link");
self.state.insert_retract_cleanup_action(&entity_ref, handle, self.pending.desc.as_ref().map( self.state.insert_retract_cleanup_action(&entity_ref, handle, self.pending.desc.as_ref().map(
move |_| Box::new(move || trace::ActionDescription::BreakLink { enclose!((entity_ref) move |_| trace::TargetedTurnEvent {
peer: Box::new(trace::ActorId(AnyValue::new(other_actor_id))), target: entity_ref.as_ref().into(),
handle: Box::new(protocol::Handle(handle.into())), detail: trace::TurnEvent::BreakLink {
}) as ActionDescriber)); source: Box::new(trace::ActorId(AnyValue::new(this_actor_id))),
handle: Box::new(protocol::Handle(handle.into())),
},
})));
self.active_facet().unwrap().outbound_handles.insert(handle); self.active_facet().unwrap().outbound_handles.insert(handle);
t_other.with_entity(&entity_ref, |t, e| e.assert(t, AnyValue::new(true), handle)).unwrap(); t_other.with_entity(&entity_ref, |t, e| e.assert(t, AnyValue::new(true), handle)).unwrap();
handle handle
@ -863,14 +871,15 @@ impl<'activation> Activation<'activation> {
pub fn message<M: 'static + Send + std::fmt::Debug>(&mut self, r: &Arc<Ref<M>>, m: M) { pub fn message<M: 'static + Send + std::fmt::Debug>(&mut self, r: &Arc<Ref<M>>, m: M) {
tracing::trace!(?r, ?m, "message"); tracing::trace!(?r, ?m, "message");
let r = Arc::clone(r); let r = Arc::clone(r);
self.pending.trace(|| trace::ActionDescription::Message { let description = self.pending.trace_targeted(false, &r, || trace::TurnEvent::Message {
target: Box::new(r.as_ref().into()),
body: Box::new((&m).into()), body: Box::new((&m).into()),
}); });
self.pending.queue_for(&r).push(Box::new(move |t| t.with_entity(&r, |t, e| { self.pending.queue_for(&r).push((
tracing::trace!(?m, "delivered"); description,
e.message(t, m) Box::new(move |t| t.with_entity(&r, |t, e| {
}))) tracing::trace!(?m, "delivered");
e.message(t, m)
}))))
} }
/// Core API: send message `m` to recipient `r`, which must be a /// Core API: send message `m` to recipient `r`, which must be a
@ -886,12 +895,12 @@ impl<'activation> Activation<'activation> {
pub fn message_for_myself<M: 'static + Send + std::fmt::Debug>(&mut self, r: &Arc<Ref<M>>, m: M) { pub fn message_for_myself<M: 'static + Send + std::fmt::Debug>(&mut self, r: &Arc<Ref<M>>, m: M) {
self.immediate_oid(r); self.immediate_oid(r);
let r = Arc::clone(r); let r = Arc::clone(r);
self.pending.trace(|| trace::ActionDescription::Message { let description = self.pending.trace_targeted(true, &r, || trace::TurnEvent::Message {
target: Box::new(r.as_ref().into()),
body: Box::new((&m).into()), body: Box::new((&m).into()),
}); });
self.pending.for_myself.push(Box::new( self.pending.for_myself.push((
move |t| t.with_entity(&r, |t, e| e.message(t, m)))) description,
Box::new(move |t| t.with_entity(&r, |t, e| e.message(t, m)))))
} }
/// Core API: begins a synchronisation with `r`. /// Core API: begins a synchronisation with `r`.
@ -901,11 +910,12 @@ impl<'activation> Activation<'activation> {
/// the synchronisation request. /// the synchronisation request.
pub fn sync<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, peer: Arc<Ref<Synced>>) { pub fn sync<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, peer: Arc<Ref<Synced>>) {
let r = Arc::clone(r); let r = Arc::clone(r);
self.pending.trace(|| trace::ActionDescription::Sync { let description = self.pending.trace_targeted(false, &r, || trace::TurnEvent::Sync {
target: Box::new(r.as_ref().into()), peer: Box::new(peer.as_ref().into()),
}); });
self.pending.queue_for(&r).push(Box::new( self.pending.queue_for(&r).push((
move |t| t.with_entity(&r, |t, e| e.sync(t, peer)))) description,
Box::new(move |t| t.with_entity(&r, |t, e| e.sync(t, peer)))))
} }
/// Registers the entity `r` in the list of stop actions for the active facet. If the facet /// Registers the entity `r` in the list of stop actions for the active facet. If the facet
@ -958,10 +968,10 @@ impl<'activation> Activation<'activation> {
/// ///
/// # Panics /// # Panics
/// ///
/// Panics if any pending actions "`for_myself`" (resulting from /// Panics if any pending "`final_actions`" or actions "`for_myself`" (resulting from
/// [`assert_for_myself`][Self::assert_for_myself] or /// [`assert_for_myself`][Self::assert_for_myself] or
/// [`message_for_myself`][Self::message_for_myself]) are /// [`message_for_myself`][Self::message_for_myself]) are outstanding at the time of the
/// outstanding at the time of the call. /// call.
pub fn deliver(&mut self) { pub fn deliver(&mut self) {
self.pending.deliver(); self.pending.deliver();
} }
@ -1125,9 +1135,10 @@ impl<'activation> Activation<'activation> {
link: false, link: false,
id: Box::new(trace::ActorId(AnyValue::new(ac_ref.actor_id))), id: Box::new(trace::ActorId(AnyValue::new(ac_ref.actor_id))),
}); });
self.pending.for_myself.push(Box::new(move |t| { let cause = self.pending.desc.as_ref().map(
ac.boot(name, Arc::clone(t.account()), Some(trace::TurnCause::ActorBoot), boot); |d| trace::TurnCause::Turn { id: Box::new(d.id.clone()) });
Ok(()) self.pending.final_actions.push(Box::new(move |t| {
ac.boot(name, Arc::clone(t.account()), cause, boot);
})); }));
ac_ref ac_ref
} }
@ -1149,14 +1160,13 @@ impl<'activation> Activation<'activation> {
link: true, link: true,
id: Box::new(trace::ActorId(AnyValue::new(ac_ref.actor_id))), id: Box::new(trace::ActorId(AnyValue::new(ac_ref.actor_id))),
}); });
self.pending.for_myself.push(Box::new(move |t| { let cause = self.pending.desc.as_ref().map(
|d| trace::TurnCause::Turn { id: Box::new(d.id.clone()) });
self.pending.final_actions.push(Box::new(move |t| {
t.with_facet(true, facet_id, move |t| { t.with_facet(true, facet_id, move |t| {
ac.link(t)?.boot(name, ac.link(t).boot(name, Arc::clone(t.account()), cause, boot);
Arc::clone(t.account()),
Some(trace::TurnCause::ActorBoot),
boot);
Ok(()) Ok(())
}) }).unwrap()
})); }));
ac_ref ac_ref
} }
@ -1249,7 +1259,7 @@ impl<'activation> Activation<'activation> {
fn stop_if_inert(&mut self) { fn stop_if_inert(&mut self) {
let facet_id = self.facet.facet_id; let facet_id = self.facet.facet_id;
self.pending.for_myself.push(Box::new(move |t| { self.pending.final_actions.push(Box::new(move |t| {
tracing::trace!("Checking inertness of facet {} from facet {}", facet_id, t.facet.facet_id); tracing::trace!("Checking inertness of facet {} from facet {}", facet_id, t.facet.facet_id);
if t.state.facet_exists_and_is_inert(facet_id) { if t.state.facet_exists_and_is_inert(facet_id) {
tracing::trace!(" - facet {} is inert, stopping it", facet_id); tracing::trace!(" - facet {} is inert, stopping it", facet_id);
@ -1258,7 +1268,6 @@ impl<'activation> Activation<'activation> {
} else { } else {
tracing::trace!(" - facet {} is not inert", facet_id); tracing::trace!(" - facet {} is not inert", facet_id);
} }
Ok(())
})) }))
} }
@ -1432,12 +1441,22 @@ impl<'activation> Activation<'activation> {
loop { loop {
let actions = std::mem::take(&mut self.pending.for_myself); let actions = std::mem::take(&mut self.pending.for_myself);
if actions.is_empty() { break; } if actions.is_empty() { break; }
for action in actions.into_iter() { action(self)? } for (maybe_desc, action) in actions.into_iter() {
if let Some(desc) = maybe_desc {
self.pending.trace(|| trace::ActionDescription::DequeueInternal {
event: Box::new(desc),
});
}
action(self)?
}
} }
if !self.repair_dataflow()? { if !self.repair_dataflow()? {
break; break;
} }
} }
for final_action in std::mem::take(&mut self.pending.final_actions) {
final_action(self);
}
Ok(()) Ok(())
} }
@ -1477,23 +1496,28 @@ impl EventBuffer {
trace_collector, trace_collector,
account, account,
queues: HashMap::new(), queues: HashMap::new(),
for_myself: Vec::new(), for_myself: PendingEventQueue::new(),
final_actions: Vec::new(),
} }
} }
fn execute_cleanup_action(&mut self, d: CleanupAction) { fn execute_cleanup_action(&mut self, d: CleanupAction) {
match d { match d {
CleanupAction::ForAnother(mailbox, (tracer, action)) => { CleanupAction::ForAnother(mailbox, (maybe_desc, action)) => {
if let Some(f) = tracer { if let Some(desc) = &maybe_desc {
self.trace(f); self.trace(|| trace::ActionDescription::Enqueue {
event: Box::new(desc.clone()),
})
} }
self.queue_for_mailbox(&mailbox).push(action); self.queue_for_mailbox(&mailbox).push((maybe_desc, action));
} }
CleanupAction::ForMyself((tracer, action)) => { CleanupAction::ForMyself((maybe_desc, action)) => {
if let Some(f) = tracer { if let Some(desc) = &maybe_desc {
self.trace(f); self.trace(|| trace::ActionDescription::EnqueueInternal {
event: Box::new(desc.clone()),
})
} }
self.for_myself.push(action); self.for_myself.push((maybe_desc, action));
} }
} }
} }
@ -1510,10 +1534,14 @@ impl EventBuffer {
fn clear(&mut self) { fn clear(&mut self) {
self.queues = HashMap::new(); self.queues = HashMap::new();
self.for_myself = PendingEventQueue::new(); self.for_myself = PendingEventQueue::new();
self.final_actions = Vec::new();
} }
fn deliver(&mut self) { fn deliver(&mut self) {
tracing::trace!("EventBuffer::deliver"); tracing::trace!("EventBuffer::deliver");
if !self.final_actions.is_empty() {
panic!("Unprocessed final_actions at deliver() time");
}
if !self.for_myself.is_empty() { if !self.for_myself.is_empty() {
panic!("Unprocessed for_myself events remain at deliver() time"); panic!("Unprocessed for_myself events remain at deliver() time");
} }
@ -1544,6 +1572,27 @@ impl EventBuffer {
d.record(f()); d.record(f());
} }
} }
#[inline(always)]
fn trace_targeted<M, F: FnOnce() -> trace::TurnEvent>(
&mut self,
internal: bool,
r: &Arc<Ref<M>>,
f: F,
) -> Option<trace::TargetedTurnEvent> {
self.desc.as_mut().map(|d| {
let event = trace::TargetedTurnEvent {
target: r.as_ref().into(),
detail: f(),
};
d.record(if internal {
trace::ActionDescription::EnqueueInternal { event: Box::new(event.clone()) }
} else {
trace::ActionDescription::Enqueue { event: Box::new(event.clone()) }
});
event
})
}
} }
impl Drop for EventBuffer { impl Drop for EventBuffer {
@ -1680,9 +1729,8 @@ impl Actor {
boot: F, boot: F,
) -> ActorHandle { ) -> ActorHandle {
let ac = Actor::new(None, trace_collector.clone()); let ac = Actor::new(None, trace_collector.clone());
let cause = trace_collector.as_ref().map(|_| trace::TurnCause::ActorBoot);
let account = Account::new(None, trace_collector); let account = Account::new(None, trace_collector);
ac.boot(None, account, cause, boot) ac.boot(None, account, Some(trace::TurnCause::external("top-level actor")), boot)
} }
/// Create a new actor. It still needs to be [`boot`ed][Self::boot]. /// Create a new actor. It still needs to be [`boot`ed][Self::boot].
@ -1709,7 +1757,7 @@ impl Actor {
Actor { rx, trace_collector, ac_ref } Actor { rx, trace_collector, ac_ref }
} }
fn link(self, t_parent: &mut Activation) -> Result<Self, Error> { fn link(self, t_parent: &mut Activation) -> Self {
if t_parent.active_facet().is_none() { if t_parent.active_facet().is_none() {
panic!("No active facet when calling spawn_link"); panic!("No active facet when calling spawn_link");
} }
@ -1732,9 +1780,9 @@ impl Actor {
child_actor: Box::new(trace::ActorId(AnyValue::new(self.ac_ref.actor_id))), child_actor: Box::new(trace::ActorId(AnyValue::new(self.ac_ref.actor_id))),
child_to_parent: Box::new(protocol::Handle(h_to_parent.unwrap().into())), child_to_parent: Box::new(protocol::Handle(h_to_parent.unwrap().into())),
}); });
Ok(self) self
} else { } else {
Err(error("spawn_link'd actor terminated before link could happen", AnyValue::new(false))) panic!("spawn_link'd actor terminated before link could happen");
} }
} }
@ -1822,7 +1870,14 @@ impl Actor {
let actions = std::mem::take(&mut loaned_item.item); let actions = std::mem::take(&mut loaned_item.item);
if !root_facet_ref.activate( if !root_facet_ref.activate(
&loaned_item.account, cause, |t| { &loaned_item.account, cause, |t| {
for action in actions.into_iter() { action(t)? } for (maybe_desc, action) in actions.into_iter() {
if let Some(desc) = maybe_desc {
t.pending.trace(|| trace::ActionDescription::Dequeue {
event: Box::new(desc),
});
}
action(t)?;
}
Ok(()) Ok(())
}) })
{ {
@ -1977,13 +2032,13 @@ impl RunningActor {
&mut self, &mut self,
r: &Arc<Ref<M>>, r: &Arc<Ref<M>>,
handle: Handle, handle: Handle,
describer: Option<ActionDescriber>, description: Option<trace::TargetedTurnEvent>,
) { ) {
let r = Arc::clone(r); let r = Arc::clone(r);
self.cleanup_actions.insert( self.cleanup_actions.insert(
handle, handle,
CleanupAction::ForAnother(Arc::clone(&r.mailbox), ( CleanupAction::ForAnother(Arc::clone(&r.mailbox), (
describer, description,
Box::new(move |t| t.with_entity(&r, |t, e| { Box::new(move |t| t.with_entity(&r, |t, e| {
tracing::trace!(?handle, "retracted"); tracing::trace!(?handle, "retracted");
if let Some(f) = t.active_facet() { if let Some(f) = t.active_facet() {
@ -1999,7 +2054,7 @@ impl RunningActor {
exit_status: &Arc<ActorResult>, exit_status: &Arc<ActorResult>,
trace_collector: Option<trace::TraceCollector>, trace_collector: Option<trace::TraceCollector>,
) { ) {
let cause = trace_collector.as_ref().map(|_| trace::TurnCause::ActorCleanup); let cause = Some(trace::TurnCause::Cleanup);
let account = Account::new(Some(AnyValue::symbol("cleanup")), trace_collector.clone()); let account = Account::new(Some(AnyValue::symbol("cleanup")), trace_collector.clone());
let mut t = Activation::make(&ac_ref.facet_ref(self.root), account, cause, trace_collector, self); let mut t = Activation::make(&ac_ref.facet_ref(self.root), account, cause, trace_collector, self);
if let Err(err) = t._terminate_facet(t.state.root, exit_status.is_ok(), trace::FacetStopReason::ActorStopping) { if let Err(err) = t._terminate_facet(t.state.root, exit_status.is_ok(), trace::FacetStopReason::ActorStopping) {
@ -2069,32 +2124,6 @@ impl Drop for Facet {
} }
} }
/// Directly injects `action` into `mailbox`, billing subsequent activity against `account`.
///
/// Primarily for use by [linked tasks][Activation::linked_task].
#[must_use]
pub fn external_event(
mailbox: &Arc<Mailbox>,
cause: Option<trace::TurnCause>,
account: &Arc<Account>,
action: Action,
) -> ActorResult {
send_actions(&mailbox.tx, cause, account, vec![action])
}
/// Directly injects `actions` into `mailbox`, billing subsequent activity against `account`.
///
/// Primarily for use by [linked tasks][Activation::linked_task].
#[must_use]
pub fn external_events(
mailbox: &Arc<Mailbox>,
cause: Option<trace::TurnCause>,
account: &Arc<Account>,
actions: PendingEventQueue,
) -> ActorResult {
send_actions(&mailbox.tx, cause, account, actions)
}
impl<M> Ref<M> { impl<M> Ref<M> {
/// Supplies the behaviour (`e`) for a `Ref` created via /// Supplies the behaviour (`e`) for a `Ref` created via
/// [`create_inert`][Activation::create_inert]. /// [`create_inert`][Activation::create_inert].

View File

@ -132,7 +132,7 @@ impl TraceCollector {
pub fn new<F: 'static + Send + FnMut(CollectorEvent)>(mut f: F) -> TraceCollector { pub fn new<F: 'static + Send + FnMut(CollectorEvent)>(mut f: F) -> TraceCollector {
let (tx, mut rx) = unbounded_channel::<TraceEntry>(); let (tx, mut rx) = unbounded_channel::<TraceEntry>();
tokio::spawn(async move { tokio::spawn(async move {
let mut timer = tokio::time::interval(std::time::Duration::from_secs(1)); let mut timer = tokio::time::interval(std::time::Duration::from_millis(100));
loop { loop {
select! { select! {
maybe_entry = rx.recv() => { maybe_entry = rx.recv() => {