Fix up daemon retry logic. Also: named fields; better stop logic.

In particular:

1. The root facet is considered inert even if it has outbound
assertions. This is because the only outbound assertion it can have is
a half-link to a peer actor, which shouldn't prevent the actor from
terminating normally if the user-level "root" facet stops.

2. On stop_facet_and_continue, parent-facet continuations execute
inline rather than at commit time. This is so that a user-level "root"
facet can *replace* itself. Remains to be properly exercised/tested.
This commit is contained in:
Tony Garnock-Jones 2021-09-28 17:10:36 +02:00
parent fe7086b84b
commit de795219af
10 changed files with 205 additions and 134 deletions

View File

@ -13,7 +13,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let _ = t.prevent_inert_check(); let _ = t.prevent_inert_check();
Actor::new().boot(syndicate::name!("box"), enclose!((ds) move |t| { Actor::new().boot(syndicate::name!("box"), enclose!((ds) move |t| {
let current_value = t.field(0u64); let current_value = t.named_field("current_value", 0u64);
t.dataflow({ t.dataflow({
let mut state_assertion_handle = None; let mut state_assertion_handle = None;
@ -41,7 +41,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
t.dataflow(enclose!((current_value) move |t| { t.dataflow(enclose!((current_value) move |t| {
if *t.get(&current_value) == 1000000 { if *t.get(&current_value) == 1000000 {
t.stop(); t.stop()?;
} }
Ok(()) Ok(())
}))?; }))?;
@ -62,7 +62,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
*count = *count - 1; *count = *count - 1;
if *count == 0 { if *count == 0 {
tracing::info!("box state retracted"); tracing::info!("box state retracted");
t.stop(); t.stop()?;
} }
Ok(()) Ok(())
}))) })))

View File

@ -4,13 +4,13 @@ use syndicate::actor::*;
pub fn adjust(t: &mut Activation, f: &Arc<Field<isize>>, delta: isize) { pub fn adjust(t: &mut Activation, f: &Arc<Field<isize>>, delta: isize) {
let f = f.clone(); let f = f.clone();
tracing::trace!(v0 = ?t.get(&f), "adjust"); tracing::trace!(?f, v0 = ?t.get(&f), "adjust");
*t.get_mut(&f) += delta; *t.get_mut(&f) += delta;
tracing::trace!(v1 = ?t.get(&f), "adjust"); tracing::trace!(?f, v1 = ?t.get(&f), "adjust");
t.on_stop(move |t| { t.on_stop(move |t| {
tracing::trace!(v0 = ?t.get(&f), "cleanup"); tracing::trace!(?f, v0 = ?t.get(&f), "cleanup");
*t.get_mut(&f) -= delta; *t.get_mut(&f) -= delta;
tracing::trace!(v1 = ?t.get(&f), "cleanup"); tracing::trace!(?f, v1 = ?t.get(&f), "cleanup");
Ok(()) Ok(())
}); });
} }
@ -18,9 +18,9 @@ pub fn adjust(t: &mut Activation, f: &Arc<Field<isize>>, delta: isize) {
pub fn sync_and_adjust<M: 'static + Send>(t: &mut Activation, r: &Arc<Ref<M>>, f: &Arc<Field<isize>>, delta: isize) { pub fn sync_and_adjust<M: 'static + Send>(t: &mut Activation, r: &Arc<Ref<M>>, f: &Arc<Field<isize>>, delta: isize) {
let f = f.clone(); let f = f.clone();
let sync_handler = t.create(move |t: &mut Activation| { let sync_handler = t.create(move |t: &mut Activation| {
tracing::trace!(v0 = ?t.get(&f), "sync"); tracing::trace!(?f, v0 = ?t.get(&f), "sync");
*t.get_mut(&f) += delta; *t.get_mut(&f) += delta;
tracing::trace!(v1 = ?t.get(&f), "sync"); tracing::trace!(?f, v1 = ?t.get(&f), "sync");
Ok(()) Ok(())
}); });
t.sync(r, sync_handler) t.sync(r, sync_handler)

View File

@ -52,7 +52,7 @@ fn run(t: &mut Activation, ds: Arc<Cap>, service_name: AnyValue) -> ActorResult
}); });
} }
let obstacle_count = t.field(1isize); let obstacle_count = t.named_field("obstacle_count", 1isize);
t.dataflow(enclose!((obstacle_count) move |t| { t.dataflow(enclose!((obstacle_count) move |t| {
tracing::trace!(obstacle_count = ?t.get(&obstacle_count)); tracing::trace!(obstacle_count = ?t.get(&obstacle_count));
Ok(()) Ok(())

View File

@ -202,8 +202,7 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: internal_services::ConfigWatcher)
let _ = facet.activate(Account::new(syndicate::name!("termination")), |t| { let _ = facet.activate(Account::new(syndicate::name!("termination")), |t| {
tracing::trace!("linked thread terminating associated facet"); tracing::trace!("linked thread terminating associated facet");
t.stop(); t.stop()
Ok(())
}); });
tracing::trace!("linked thread done"); tracing::trace!("linked thread done");

View File

@ -64,73 +64,84 @@ impl CommandLine {
struct DaemonProcessInstance { struct DaemonProcessInstance {
name: tracing::Span, name: tracing::Span,
facet: FacetRef,
cmd: process::Command, cmd: process::Command,
announce_presumed_readiness: bool, announce_presumed_readiness: bool,
unready_configs: Arc<Field<isize>>, unready_configs: Arc<Field<isize>>,
completed_processes: Arc<Field<isize>>,
restart_policy: RestartPolicy, restart_policy: RestartPolicy,
} }
impl DaemonProcessInstance { impl DaemonProcessInstance {
async fn handle_exit(self, error_message: Option<String>) -> Result<LinkedTaskTermination, Error> { fn handle_exit(self, t: &mut Activation, error_message: Option<String>) -> ActorResult {
let delay_ms = if let None = error_message { 200 } else { 1000 }; let delay =
let sleep_after_exit = || tokio::time::sleep(std::time::Duration::from_millis(delay_ms)); std::time::Duration::from_millis(if let None = error_message { 200 } else { 1000 });
Ok(match self.restart_policy { t.stop_facet_and_continue(t.facet.facet_id, Some(move |t: &mut Activation| {
RestartPolicy::Always => { enum NextStep {
sleep_after_exit().await; SleepAndRestart,
self.start()?; SignalSuccessfulCompletion,
LinkedTaskTermination::Normal
} }
RestartPolicy::OnError => use NextStep::*;
match error_message {
None => LinkedTaskTermination::KeepFacet, let next_step = match self.restart_policy {
Some(_) => { RestartPolicy::Always => SleepAndRestart,
sleep_after_exit().await; RestartPolicy::OnError =>
self.start()?; match error_message {
LinkedTaskTermination::Normal None => SignalSuccessfulCompletion,
} Some(_) => SleepAndRestart,
}, },
RestartPolicy::All => RestartPolicy::All =>
match error_message { match error_message {
None => LinkedTaskTermination::KeepFacet, None => SignalSuccessfulCompletion,
Some(s) => Err(s.as_str())?, Some(s) => Err(s.as_str())?,
}, },
}) };
match next_step {
SleepAndRestart => t.after(delay, |t| self.start(t)),
SignalSuccessfulCompletion => {
t.facet(|t| {
let _ = t.prevent_inert_check();
counter::adjust(t, &self.completed_processes, 1);
counter::adjust(t, &self.unready_configs, -1);
Ok(())
})?;
()
}
}
Ok(())
}))
} }
fn start(mut self) -> ActorResult { fn start(mut self, t: &mut Activation) -> ActorResult {
tracing::trace!("DaemonProcessInstance start (outer)"); t.facet(|t| {
self.facet.clone().activate( tracing::trace!(cmd = ?self.cmd, "starting");
Account::new(syndicate::name!(parent: self.name.clone(), "instance")), |t| { let mut child = match self.cmd.spawn() {
tracing::trace!("DaemonProcessInstance start (inner)"); Ok(child) => child,
t.facet(|t| { Err(e) => {
tracing::trace!(cmd = ?self.cmd, "starting"); tracing::info!(spawn_err = ?e);
let mut child = match self.cmd.spawn() { return self.handle_exit(t, Some(format!("{}", e)));
Ok(child) => child, }
Err(e) => { };
tracing::info!(spawn_err = ?e); tracing::info!(pid = ?child.id(), cmd = ?self.cmd, "started");
t.linked_task(syndicate::name!(parent: self.name.clone(), "fail"),
self.handle_exit(Some(format!("{}", e))));
return Ok(());
}
};
tracing::info!(pid = ?child.id(), cmd = ?self.cmd, "started");
if self.announce_presumed_readiness { if self.announce_presumed_readiness {
counter::adjust(t, &self.unready_configs, -1); counter::adjust(t, &self.unready_configs, -1);
} }
t.linked_task(syndicate::name!(parent: self.name.clone(), "wait"), async move { let facet = t.facet.clone();
tracing::trace!("waiting for process exit"); t.linked_task(syndicate::name!(parent: self.name.clone(), "wait"), async move {
let status = child.wait().await?; tracing::trace!("waiting for process exit");
tracing::info!(?status); let status = child.wait().await?;
self.handle_exit( tracing::info!(?status);
if status.success() { None } else { Some(format!("{}", status)) }).await facet.activate(Account::new(syndicate::name!("instance-terminated")), |t| {
}); let m = if status.success() { None } else { Some(format!("{}", status)) };
Ok(()) self.handle_exit(t, m)
})?; })?;
Ok(()) Ok(LinkedTaskTermination::Normal)
}) });
Ok(())
})?;
Ok(())
} }
} }
@ -142,7 +153,10 @@ fn run(
) -> ActorResult { ) -> ActorResult {
let spec = language().unparse(&service); let spec = language().unparse(&service);
let unready_configs = t.field(1isize); let total_configs = t.named_field("total_configs", 0isize);
let unready_configs = t.named_field("unready_configs", 1isize);
let completed_processes = t.named_field("completed_processes", 0isize);
t.dataflow({ t.dataflow({
let mut handle = None; let mut handle = None;
let ready = lifecycle::ready(&spec); let ready = lifecycle::ready(&spec);
@ -154,11 +168,21 @@ fn run(
}) })
})?; })?;
enclose!((unready_configs) during!(t, config_ds, language(), <daemon #(service.id.0) $config>, { t.dataflow(enclose!((completed_processes, total_configs) move |t| {
let unready_configs = unready_configs.clone(); let total = *t.get(&total_configs);
|t: &mut Activation| { let completed = *t.get(&completed_processes);
tracing::debug!(?config, "new unready config"); tracing::debug!(total_configs = ?total, completed_processes = ?completed);
if total > 0 && total == completed {
t.stop()?;
}
Ok(())
}))?;
enclose!((unready_configs, completed_processes) during!(t, config_ds, language(), <daemon #(service.id.0) $config>, {
enclose!((unready_configs, completed_processes) |t: &mut Activation| {
tracing::debug!(?config, "new config");
counter::adjust(t, &unready_configs, 1); counter::adjust(t, &unready_configs, 1);
counter::adjust(t, &total_configs, 1);
match language().parse::<DaemonSpec>(&config) { match language().parse::<DaemonSpec>(&config) {
Ok(config) => { Ok(config) => {
@ -238,15 +262,18 @@ fn run(
cmd.stderr(std::process::Stdio::inherit()); cmd.stderr(std::process::Stdio::inherit());
cmd.kill_on_drop(true); cmd.kill_on_drop(true);
(DaemonProcessInstance { let process_instance = DaemonProcessInstance {
name: tracing::Span::current(), name: tracing::Span::current(),
facet,
cmd, cmd,
announce_presumed_readiness, announce_presumed_readiness,
unready_configs, unready_configs,
completed_processes,
restart_policy, restart_policy,
}).start()?; };
facet.activate(Account::new(syndicate::name!("instance-startup")), |t| {
process_instance.start(t)
})?;
Ok(LinkedTaskTermination::KeepFacet) Ok(LinkedTaskTermination::KeepFacet)
}); });
Ok(()) Ok(())
@ -256,7 +283,7 @@ fn run(
return Ok(()); return Ok(());
} }
} }
} })
})); }));
tracing::debug!("syncing to ds"); tracing::debug!("syncing to ds");

View File

@ -33,8 +33,7 @@ struct ShutdownEntity;
impl Entity<AnyValue> for ShutdownEntity { impl Entity<AnyValue> for ShutdownEntity {
fn message(&mut self, t: &mut Activation, _m: AnyValue) -> ActorResult { fn message(&mut self, t: &mut Activation, _m: AnyValue) -> ActorResult {
t.stop(); t.stop()
Ok(())
} }
} }
@ -94,12 +93,7 @@ pub fn bench_pub(c: &mut Criterion) {
let ds = Cap::new(&t.create(Dataspace::new())); let ds = Cap::new(&t.create(Dataspace::new()));
let shutdown = entity(()) let shutdown = entity(())
.on_asserted(|_, _, _| { .on_asserted(|_, _, _| Ok(Some(Box::new(|_, t| t.stop()))))
Ok(Some(Box::new(|_, t| {
t.stop();
Ok(())
})))
})
.create_cap(t); .create_cap(t);
ds.assert(t, language(), &Observe { ds.assert(t, language(), &Observe {

View File

@ -59,7 +59,7 @@ pub fn bench_ring(c: &mut Criterion) {
} }
impl Counter { impl Counter {
fn step(&mut self, t: &mut Activation) { fn step(&mut self, t: &mut Activation) -> ActorResult {
if self.remaining_to_send > 0 { if self.remaining_to_send > 0 {
self.remaining_to_send -= 1; self.remaining_to_send -= 1;
MESSAGES_SENT.fetch_add(1, Ordering::Relaxed); MESSAGES_SENT.fetch_add(1, Ordering::Relaxed);
@ -68,20 +68,21 @@ pub fn bench_ring(c: &mut Criterion) {
tracing::info!(iters = self.iters, tracing::info!(iters = self.iters,
actors_created = ACTORS_CREATED.load(Ordering::SeqCst), actors_created = ACTORS_CREATED.load(Ordering::SeqCst),
messages_sent = MESSAGES_SENT.load(Ordering::SeqCst)); messages_sent = MESSAGES_SENT.load(Ordering::SeqCst));
t.stop(); t.stop()?;
self.tx.send(self.start.elapsed() / ACTOR_COUNT).unwrap() self.tx.send(self.start.elapsed() / ACTOR_COUNT).unwrap()
} }
Ok(())
} }
} }
impl Entity<()> for Counter { impl Entity<()> for Counter {
fn message(&mut self, t: &mut Activation, _message: ()) -> ActorResult { fn message(&mut self, t: &mut Activation, _message: ()) -> ActorResult {
Ok(self.step(t)) self.step(t)
} }
} }
impl Spawner { impl Spawner {
fn step(&mut self, t: &mut Activation, next: Arc<Ref<()>>) { fn step(&mut self, t: &mut Activation, next: Arc<Ref<()>>) -> ActorResult {
if self.i < ACTOR_COUNT { if self.i < ACTOR_COUNT {
let i = self.i; let i = self.i;
self.i += 1; self.i += 1;
@ -103,15 +104,16 @@ pub fn bench_ring(c: &mut Criterion) {
iters: self.iters, iters: self.iters,
next, next,
}; };
c_state.step(t); c_state.step(t)?;
self.c.become_entity(c_state); self.c.become_entity(c_state);
} }
Ok(())
} }
} }
impl Entity<Arc<Ref<()>>> for Spawner { impl Entity<Arc<Ref<()>>> for Spawner {
fn message(&mut self, t: &mut Activation, f: Arc<Ref<()>>) -> ActorResult { fn message(&mut self, t: &mut Activation, f: Arc<Ref<()>>) -> ActorResult {
Ok(self.step(t, f)) self.step(t, f)
} }
} }
@ -125,7 +127,7 @@ pub fn bench_ring(c: &mut Criterion) {
i: 1, i: 1,
c: t.create_inert(), c: t.create_inert(),
}; };
s.step(t, Arc::clone(&s.c)); s.step(t, Arc::clone(&s.c))?;
Arc::clone(&s.self_ref).become_entity(s); Arc::clone(&s.self_ref).become_entity(s);
Ok(()) Ok(())
}).await.unwrap().unwrap(); }).await.unwrap().unwrap();

View File

@ -355,8 +355,8 @@ pub struct RunningActor {
/// values. Use [`Activation::dataflow`] to create a reactive block within a facet that will be /// values. Use [`Activation::dataflow`] to create a reactive block within a facet that will be
/// (re-)executed whenever some dependent field changes value. /// (re-)executed whenever some dependent field changes value.
/// ///
#[derive(Debug)]
pub struct Field<T: Any + Send> { pub struct Field<T: Any + Send> {
pub name: String,
pub field_id: FieldId, pub field_id: FieldId,
tx: UnboundedSender<SystemMessage>, tx: UnboundedSender<SystemMessage>,
phantom: PhantomData<T>, phantom: PhantomData<T>,
@ -925,7 +925,7 @@ impl<'activation> Activation<'activation> {
f.linked_tasks.remove(&task_id); f.linked_tasks.remove(&task_id);
} }
if let LinkedTaskTermination::Normal = result { if let LinkedTaskTermination::Normal = result {
t.stop(); t.stop()?;
} }
Ok(()) Ok(())
}); });
@ -938,18 +938,28 @@ impl<'activation> Activation<'activation> {
/// Executes the given action after the given duration has elapsed (so long as the active /// Executes the given action after the given duration has elapsed (so long as the active
/// facet still exists at that time). /// facet still exists at that time).
pub fn after(&mut self, duration: time::Duration, a: Action) { pub fn after<F: 'static + Send + FnOnce(&mut Activation) -> ActorResult>(
&mut self,
duration: time::Duration,
a: F,
) {
self.at(time::Instant::now() + duration, a) self.at(time::Instant::now() + duration, a)
} }
/// Executes the given action at the given instant (so long as the active facet still /// Executes the given action at the given instant (so long as the active facet still
/// exists at that time). /// exists at that time).
pub fn at<I: Into<tokio::time::Instant>>(&mut self, instant: I, a: Action) { pub fn at<I: Into<tokio::time::Instant>, F: 'static + Send + FnOnce(&mut Activation) -> ActorResult>(
&mut self,
instant: I,
a: F,
) {
let account = Arc::clone(self.account()); let account = Arc::clone(self.account());
let instant = instant.into(); let instant = instant.into();
let facet = self.facet.clone(); let facet = self.facet.clone();
self.linked_task(crate::name!("Activation::at"), async move { let span = tracing::Span::current().clone();
self.linked_task(crate::name!(parent: None, "Activation::at"), async move {
tokio::time::sleep_until(instant.into()).await; tokio::time::sleep_until(instant.into()).await;
let _entry = span.enter();
facet.activate(account, a)?; facet.activate(account, a)?;
Ok(LinkedTaskTermination::KeepFacet) Ok(LinkedTaskTermination::KeepFacet)
}); });
@ -1006,7 +1016,9 @@ impl<'activation> Activation<'activation> {
let f = Facet::new(Some(self.facet.facet_id)); let f = Facet::new(Some(self.facet.facet_id));
let facet_id = f.facet_id; let facet_id = f.facet_id;
self.state.facet_nodes.insert(facet_id, f); self.state.facet_nodes.insert(facet_id, f);
tracing::trace!(?facet_id, facet_count = ?self.state.facet_nodes.len()); tracing::debug!(parent_id = ?self.facet.facet_id,
?facet_id,
actor_facet_count = ?self.state.facet_nodes.len());
self.state.facet_children.entry(self.facet.facet_id).or_default().insert(facet_id); self.state.facet_children.entry(self.facet.facet_id).or_default().insert(facet_id);
self.with_facet(true /* TODO: tiny optimisation: "false" would be safe here */, facet_id, move |t| { self.with_facet(true /* TODO: tiny optimisation: "false" would be safe here */, facet_id, move |t| {
boot(t)?; boot(t)?;
@ -1039,28 +1051,43 @@ impl<'activation> Activation<'activation> {
} }
/// Arranges for the [`Facet`] named by `facet_id` to be stopped cleanly when `self` /// Arranges for the [`Facet`] named by `facet_id` to be stopped cleanly when `self`
/// commits. If `continuation` is supplied, the facet to be stopped hasn't been stopped at /// commits.
/// the time of the `stop_facet` call, none of the shutdown handlers yields an error, and ///
/// the facet's parent facet is alive, executes `continuation` in the parent facet's /// Then,
/// context. /// - if `continuation` is supplied, and
pub fn stop_facet(&mut self, facet_id: FacetId, continuation: Option<Action>) { /// - the facet to be stopped hasn't been stopped at the time of the `stop_facet_and_continue` call, and
/// - none of the shutdown handlers yields an error, and
/// - the facet's parent facet is alive,
/// executes `continuation` (immediately) in the *parent* facet's context.
///
pub fn stop_facet_and_continue<F: 'static + Send + FnOnce(&mut Activation) -> ActorResult>(
&mut self,
facet_id: FacetId,
continuation: Option<F>,
) -> ActorResult {
let maybe_parent_id = self.active_facet().and_then(|f| f.parent_facet_id); let maybe_parent_id = self.active_facet().and_then(|f| f.parent_facet_id);
self.enqueue_for_myself_at_commit(Box::new(move |t| { self.enqueue_for_myself_at_commit(Box::new(move |t| t._terminate_facet(facet_id, true)));
t._terminate_facet(facet_id, true)?; if let Some(k) = continuation {
if let Some(k) = continuation { if let Some(parent_id) = maybe_parent_id {
if let Some(parent_id) = maybe_parent_id { self.with_facet(true, parent_id, k)?;
t.with_facet(true, parent_id, k)?;
}
} }
Ok(()) }
})); Ok(())
}
/// Arranges for the [`Facet`] named by `facet_id` to be stopped cleanly when `self`
/// commits.
///
/// Equivalent to `self.stop_facet_and_continue(facet_id, None)`.
pub fn stop_facet(&mut self, facet_id: FacetId) -> ActorResult {
self.stop_facet_and_continue::<Action>(facet_id, None)
} }
/// Arranges for the active facet to be stopped cleanly when `self` commits. /// Arranges for the active facet to be stopped cleanly when `self` commits.
/// ///
/// Equivalent to `self.stop_facet(self.facet.facet_id, None)`. /// Equivalent to `self.stop_facet(self.facet.facet_id)`.
pub fn stop(&mut self) { pub fn stop(&mut self) -> ActorResult {
self.stop_facet(self.facet.facet_id, None) self.stop_facet(self.facet.facet_id)
} }
fn stop_if_inert(&mut self) { fn stop_if_inert(&mut self) {
@ -1069,7 +1096,7 @@ impl<'activation> Activation<'activation> {
tracing::trace!("Checking inertness of facet {} from facet {}", facet_id, t.facet.facet_id); tracing::trace!("Checking inertness of facet {} from facet {}", facet_id, t.facet.facet_id);
if t.state.facet_exists_and_is_inert(facet_id) { if t.state.facet_exists_and_is_inert(facet_id) {
tracing::trace!(" - facet {} is inert, stopping it", facet_id); tracing::trace!(" - facet {} is inert, stopping it", facet_id);
t.stop_facet(facet_id, None); t.stop_facet(facet_id)?;
} else { } else {
tracing::trace!(" - facet {} is not inert", facet_id); tracing::trace!(" - facet {} is not inert", facet_id);
} }
@ -1079,7 +1106,8 @@ impl<'activation> Activation<'activation> {
fn _terminate_facet(&mut self, facet_id: FacetId, alive: bool) -> ActorResult { fn _terminate_facet(&mut self, facet_id: FacetId, alive: bool) -> ActorResult {
if let Some(mut f) = self.state.facet_nodes.remove(&facet_id) { if let Some(mut f) = self.state.facet_nodes.remove(&facet_id) {
tracing::debug!("{} termination of {:?}", tracing::debug!(actor_facet_count = ?self.state.facet_nodes.len(),
"{} termination of {:?}",
if alive { "living" } else { "post-exit" }, if alive { "living" } else { "post-exit" },
facet_id); facet_id);
if let Some(p) = f.parent_facet_id { if let Some(p) = f.parent_facet_id {
@ -1101,9 +1129,13 @@ impl<'activation> Activation<'activation> {
// cleanup-actions are performed before parent-facet cleanup-actions. // cleanup-actions are performed before parent-facet cleanup-actions.
if let Some(p) = parent_facet_id { if let Some(p) = parent_facet_id {
if t.state.facet_exists_and_is_inert(p) { if t.state.facet_exists_and_is_inert(p) {
tracing::trace!("terminating parent {:?} of facet {:?}", p, facet_id);
t._terminate_facet(p, true)?; t._terminate_facet(p, true)?;
} else {
tracing::trace!("not terminating parent {:?} of facet {:?}", p, facet_id);
} }
} else { } else {
tracing::trace!("terminating actor of root facet {:?}", facet_id);
t.state.shutdown(); t.state.shutdown();
} }
} else { } else {
@ -1116,25 +1148,31 @@ impl<'activation> Activation<'activation> {
} }
} }
/// Create a new dataflow variable (field) within the active [`Actor`]. /// Create a new named dataflow variable (field) within the active [`Actor`].
pub fn field<T: Any + Send>(&mut self, initial_value: T) -> Arc<Field<T>> { pub fn named_field<T: Any + Send>(&mut self, name: &str, initial_value: T) -> Arc<Field<T>> {
let field_id = FieldId::new(NEXT_FIELD_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed)) let field_id = FieldId::new(NEXT_FIELD_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed))
.expect("Internal error: Attempt to allocate FieldId of zero. Too many FieldIds allocated. Restart the process."); .expect("Internal error: Attempt to allocate FieldId of zero. Too many FieldIds allocated. Restart the process.");
self.state.fields.insert(field_id, Box::new(initial_value)); self.state.fields.insert(field_id, Box::new(initial_value));
Arc::new(Field { Arc::new(Field {
name: name.to_owned(),
field_id, field_id,
tx: self.state.tx.clone(), tx: self.state.tx.clone(),
phantom: PhantomData, phantom: PhantomData,
}) })
} }
/// Create a new anonymous dataflow variable (field) within the active [`Actor`].
pub fn field<T: Any + Send>(&mut self, initial_value: T) -> Arc<Field<T>> {
self.named_field("", initial_value)
}
/// Retrieve a reference to the current value of a dataflow variable (field); if execution /// Retrieve a reference to the current value of a dataflow variable (field); if execution
/// is currently within a [dataflow block][Activation::dataflow], marks the block as /// is currently within a [dataflow block][Activation::dataflow], marks the block as
/// *depending upon* the field. /// *depending upon* the field.
/// ///
pub fn get<T: Any + Send>(&mut self, field: &Field<T>) -> &T { pub fn get<T: Any + Send>(&mut self, field: &Field<T>) -> &T {
tracing::trace!(field = ?field.field_id, block = ?self.active_block, "get");
if let Some(block) = self.active_block { if let Some(block) = self.active_block {
tracing::trace!(?field, ?block, action = "get", "observed");
self.state.dataflow.record_observation(block, field.field_id); self.state.dataflow.record_observation(block, field.field_id);
} }
let any = self.state.fields.get(&field.field_id) let any = self.state.fields.get(&field.field_id)
@ -1149,12 +1187,13 @@ impl<'activation> Activation<'activation> {
/// reevaluation of dependent blocks. /// reevaluation of dependent blocks.
/// ///
pub fn get_mut<T: Any + Send>(&mut self, field: &Field<T>) -> &mut T { pub fn get_mut<T: Any + Send>(&mut self, field: &Field<T>) -> &mut T {
tracing::trace!(field = ?field.field_id, block = ?self.active_block, "get_mut");
{ {
// Overapproximation. // Overapproximation.
if let Some(block) = self.active_block { if let Some(block) = self.active_block {
tracing::trace!(?field, ?block, action = "get_mut", "observed");
self.state.dataflow.record_observation(block, field.field_id); self.state.dataflow.record_observation(block, field.field_id);
} }
tracing::trace!(?field, active_block = ?self.active_block, action = "get_mut", "damaged");
self.state.dataflow.record_damage(field.field_id); self.state.dataflow.record_damage(field.field_id);
} }
let any = self.state.fields.get_mut(&field.field_id) let any = self.state.fields.get_mut(&field.field_id)
@ -1166,7 +1205,7 @@ impl<'activation> Activation<'activation> {
/// the new value is [`eq`][std::cmp::PartialEq::eq] to the value being overwritten. /// the new value is [`eq`][std::cmp::PartialEq::eq] to the value being overwritten.
/// ///
pub fn set<T: Any + Send>(&mut self, field: &Field<T>, value: T) { pub fn set<T: Any + Send>(&mut self, field: &Field<T>, value: T) {
tracing::trace!(field = ?field.field_id, block = ?self.active_block, "set"); tracing::trace!(?field, active_block = ?self.active_block, action = "set", "damaged");
// Overapproximation in many cases, since the new value may not produce an // Overapproximation in many cases, since the new value may not produce an
// observable difference (may be equal to the current value). // observable difference (may be equal to the current value).
self.state.dataflow.record_damage(field.field_id); self.state.dataflow.record_damage(field.field_id);
@ -1213,7 +1252,9 @@ impl<'activation> Activation<'activation> {
} }
} }
} }
tracing::trace!(passes = ?pass_number, "repair_dataflow complete"); if pass_number > 0 {
tracing::trace!(passes = ?pass_number, "repair_dataflow complete");
}
Ok(pass_number > 0) Ok(pass_number > 0)
} }
@ -1421,7 +1462,7 @@ impl Actor {
let (tx, rx) = unbounded_channel(); let (tx, rx) = unbounded_channel();
let actor_id = next_actor_id(); let actor_id = next_actor_id();
let root = Facet::new(None); let root = Facet::new(None);
// tracing::trace!(id = actor_id, "Actor::new"); tracing::trace!(?actor_id, root_facet_id = ?root.facet_id, "Actor::new");
let mut st = RunningActor { let mut st = RunningActor {
actor_id, actor_id,
tx, tx,
@ -1656,11 +1697,17 @@ impl RunningActor {
fn facet_exists_and_is_inert(&mut self, facet_id: FacetId) -> bool { fn facet_exists_and_is_inert(&mut self, facet_id: FacetId) -> bool {
let no_kids = self.facet_children.get(&facet_id).map(|cs| cs.is_empty()).unwrap_or(true); let no_kids = self.facet_children.get(&facet_id).map(|cs| cs.is_empty()).unwrap_or(true);
if let Some(f) = self.get_facet(facet_id) { if let Some(f) = self.get_facet(facet_id) {
no_kids && // The only outbound handle the root facet of an actor may have is a link
f.outbound_handles.is_empty() && // assertion, from [Activation::link]. This is not to be considered a "real"
f.linked_tasks.is_empty() && // assertion for purposes of keeping the facet alive!
f.inert_check_preventers.load(Ordering::Relaxed) == 0 let no_outbound_handles = f.outbound_handles.is_empty();
let is_root_facet = f.parent_facet_id.is_none();
let no_linked_tasks = f.linked_tasks.is_empty();
let no_inert_check_preventers = f.inert_check_preventers.load(Ordering::Relaxed) == 0;
tracing::trace!(?facet_id, ?no_kids, ?no_outbound_handles, ?is_root_facet, ?no_linked_tasks, ?no_inert_check_preventers);
no_kids && (no_outbound_handles || is_root_facet) && no_linked_tasks && no_inert_check_preventers
} else { } else {
tracing::trace!(?facet_id, exists = ?false);
false false
} }
} }
@ -1684,6 +1731,12 @@ impl RunningActor {
} }
} }
impl<T: Any + Send> std::fmt::Debug for Field<T> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
write!(f, "#<Field {:?} {}>", self.name, self.field_id)
}
}
impl<T: Any + Send> Eq for Field<T> {} impl<T: Any + Send> Eq for Field<T> {}
impl<T: Any + Send> PartialEq for Field<T> { impl<T: Any + Send> PartialEq for Field<T> {
fn eq(&self, other: &Field<T>) -> bool { fn eq(&self, other: &Field<T>) -> bool {
@ -1943,8 +1996,7 @@ where
impl<M> Entity<M> for StopOnRetract { impl<M> Entity<M> for StopOnRetract {
fn retract(&mut self, t: &mut Activation, _h: Handle) -> ActorResult { fn retract(&mut self, t: &mut Activation, _h: Handle) -> ActorResult {
t.stop(); t.stop()
Ok(())
} }
} }

View File

@ -118,10 +118,7 @@ where
let _ = t.prevent_inert_check(); let _ = t.prevent_inert_check();
assertion_handler(state, t, a) assertion_handler(state, t, a)
})?; })?;
Ok(Some(Box::new(move |_state, t| { Ok(Some(Box::new(move |_state, t| t.stop_facet(facet_id))))
t.stop_facet(facet_id, None);
Ok(())
})))
})) }))
} }

View File

@ -109,11 +109,11 @@ impl Entity<Protocol> for Supervisor
} else { } else {
self.config.pause_time self.config.pause_time
}; };
t.after(wait_time, Box::new(move |t| { t.after(wait_time, move |t| {
tracing::trace!("Sending retry trigger"); tracing::trace!("Sending retry trigger");
t.message(&self_ref, Protocol::Retry); t.message(&self_ref, Protocol::Retry);
Ok(()) Ok(())
})); });
}, },
} }
Ok(()) Ok(())
@ -151,7 +151,7 @@ impl Supervisor {
let _entry = name.enter(); let _entry = name.enter();
tracing::trace!(?config); tracing::trace!(?config);
let self_ref = t.create_inert(); let self_ref = t.create_inert();
let state_field = t.field(State::Started); let state_field = t.named_field("supervisee_state", State::Started);
let mut supervisor = Supervisor { let mut supervisor = Supervisor {
self_ref: Arc::clone(&self_ref), self_ref: Arc::clone(&self_ref),
name: name.clone(), name: name.clone(),