diff --git a/syndicate-macros/examples/box-and-client.rs b/syndicate-macros/examples/box-and-client.rs index 5387c52..d400f9b 100644 --- a/syndicate-macros/examples/box-and-client.rs +++ b/syndicate-macros/examples/box-and-client.rs @@ -13,7 +13,7 @@ async fn main() -> Result<(), Box> { let _ = t.prevent_inert_check(); Actor::new().boot(syndicate::name!("box"), enclose!((ds) move |t| { - let current_value = t.field(0u64); + let current_value = t.named_field("current_value", 0u64); t.dataflow({ let mut state_assertion_handle = None; @@ -41,7 +41,7 @@ async fn main() -> Result<(), Box> { t.dataflow(enclose!((current_value) move |t| { if *t.get(¤t_value) == 1000000 { - t.stop(); + t.stop()?; } Ok(()) }))?; @@ -62,7 +62,7 @@ async fn main() -> Result<(), Box> { *count = *count - 1; if *count == 0 { tracing::info!("box state retracted"); - t.stop(); + t.stop()?; } Ok(()) }))) diff --git a/syndicate-server/src/counter.rs b/syndicate-server/src/counter.rs index c15b8dd..8ad14bb 100644 --- a/syndicate-server/src/counter.rs +++ b/syndicate-server/src/counter.rs @@ -4,13 +4,13 @@ use syndicate::actor::*; pub fn adjust(t: &mut Activation, f: &Arc>, delta: isize) { let f = f.clone(); - tracing::trace!(v0 = ?t.get(&f), "adjust"); + tracing::trace!(?f, v0 = ?t.get(&f), "adjust"); *t.get_mut(&f) += delta; - tracing::trace!(v1 = ?t.get(&f), "adjust"); + tracing::trace!(?f, v1 = ?t.get(&f), "adjust"); t.on_stop(move |t| { - tracing::trace!(v0 = ?t.get(&f), "cleanup"); + tracing::trace!(?f, v0 = ?t.get(&f), "cleanup"); *t.get_mut(&f) -= delta; - tracing::trace!(v1 = ?t.get(&f), "cleanup"); + tracing::trace!(?f, v1 = ?t.get(&f), "cleanup"); Ok(()) }); } @@ -18,9 +18,9 @@ pub fn adjust(t: &mut Activation, f: &Arc>, delta: isize) { pub fn sync_and_adjust(t: &mut Activation, r: &Arc>, f: &Arc>, delta: isize) { let f = f.clone(); let sync_handler = t.create(move |t: &mut Activation| { - tracing::trace!(v0 = ?t.get(&f), "sync"); + tracing::trace!(?f, v0 = ?t.get(&f), "sync"); *t.get_mut(&f) += delta; - tracing::trace!(v1 = ?t.get(&f), "sync"); + tracing::trace!(?f, v1 = ?t.get(&f), "sync"); Ok(()) }); t.sync(r, sync_handler) diff --git a/syndicate-server/src/dependencies.rs b/syndicate-server/src/dependencies.rs index 662282b..a109862 100644 --- a/syndicate-server/src/dependencies.rs +++ b/syndicate-server/src/dependencies.rs @@ -52,7 +52,7 @@ fn run(t: &mut Activation, ds: Arc, service_name: AnyValue) -> ActorResult }); } - let obstacle_count = t.field(1isize); + let obstacle_count = t.named_field("obstacle_count", 1isize); t.dataflow(enclose!((obstacle_count) move |t| { tracing::trace!(obstacle_count = ?t.get(&obstacle_count)); Ok(()) diff --git a/syndicate-server/src/services/config_watcher.rs b/syndicate-server/src/services/config_watcher.rs index 3433418..669c364 100644 --- a/syndicate-server/src/services/config_watcher.rs +++ b/syndicate-server/src/services/config_watcher.rs @@ -202,8 +202,7 @@ fn run(t: &mut Activation, ds: Arc, spec: internal_services::ConfigWatcher) let _ = facet.activate(Account::new(syndicate::name!("termination")), |t| { tracing::trace!("linked thread terminating associated facet"); - t.stop(); - Ok(()) + t.stop() }); tracing::trace!("linked thread done"); diff --git a/syndicate-server/src/services/daemon.rs b/syndicate-server/src/services/daemon.rs index 92af22b..7d48baa 100644 --- a/syndicate-server/src/services/daemon.rs +++ b/syndicate-server/src/services/daemon.rs @@ -64,73 +64,84 @@ impl CommandLine { struct DaemonProcessInstance { name: tracing::Span, - facet: FacetRef, cmd: process::Command, announce_presumed_readiness: bool, unready_configs: Arc>, + completed_processes: Arc>, restart_policy: RestartPolicy, } impl DaemonProcessInstance { - async fn handle_exit(self, error_message: Option) -> Result { - let delay_ms = if let None = error_message { 200 } else { 1000 }; - let sleep_after_exit = || tokio::time::sleep(std::time::Duration::from_millis(delay_ms)); - Ok(match self.restart_policy { - RestartPolicy::Always => { - sleep_after_exit().await; - self.start()?; - LinkedTaskTermination::Normal + fn handle_exit(self, t: &mut Activation, error_message: Option) -> ActorResult { + let delay = + std::time::Duration::from_millis(if let None = error_message { 200 } else { 1000 }); + t.stop_facet_and_continue(t.facet.facet_id, Some(move |t: &mut Activation| { + enum NextStep { + SleepAndRestart, + SignalSuccessfulCompletion, } - RestartPolicy::OnError => - match error_message { - None => LinkedTaskTermination::KeepFacet, - Some(_) => { - sleep_after_exit().await; - self.start()?; - LinkedTaskTermination::Normal - } - }, - RestartPolicy::All => - match error_message { - None => LinkedTaskTermination::KeepFacet, - Some(s) => Err(s.as_str())?, - }, - }) + use NextStep::*; + + let next_step = match self.restart_policy { + RestartPolicy::Always => SleepAndRestart, + RestartPolicy::OnError => + match error_message { + None => SignalSuccessfulCompletion, + Some(_) => SleepAndRestart, + }, + RestartPolicy::All => + match error_message { + None => SignalSuccessfulCompletion, + Some(s) => Err(s.as_str())?, + }, + }; + + match next_step { + SleepAndRestart => t.after(delay, |t| self.start(t)), + SignalSuccessfulCompletion => { + t.facet(|t| { + let _ = t.prevent_inert_check(); + counter::adjust(t, &self.completed_processes, 1); + counter::adjust(t, &self.unready_configs, -1); + Ok(()) + })?; + () + } + } + Ok(()) + })) } - fn start(mut self) -> ActorResult { - tracing::trace!("DaemonProcessInstance start (outer)"); - self.facet.clone().activate( - Account::new(syndicate::name!(parent: self.name.clone(), "instance")), |t| { - tracing::trace!("DaemonProcessInstance start (inner)"); - t.facet(|t| { - tracing::trace!(cmd = ?self.cmd, "starting"); - let mut child = match self.cmd.spawn() { - Ok(child) => child, - Err(e) => { - tracing::info!(spawn_err = ?e); - t.linked_task(syndicate::name!(parent: self.name.clone(), "fail"), - self.handle_exit(Some(format!("{}", e)))); - return Ok(()); - } - }; - tracing::info!(pid = ?child.id(), cmd = ?self.cmd, "started"); + fn start(mut self, t: &mut Activation) -> ActorResult { + t.facet(|t| { + tracing::trace!(cmd = ?self.cmd, "starting"); + let mut child = match self.cmd.spawn() { + Ok(child) => child, + Err(e) => { + tracing::info!(spawn_err = ?e); + return self.handle_exit(t, Some(format!("{}", e))); + } + }; + tracing::info!(pid = ?child.id(), cmd = ?self.cmd, "started"); - if self.announce_presumed_readiness { - counter::adjust(t, &self.unready_configs, -1); - } + if self.announce_presumed_readiness { + counter::adjust(t, &self.unready_configs, -1); + } - t.linked_task(syndicate::name!(parent: self.name.clone(), "wait"), async move { - tracing::trace!("waiting for process exit"); - let status = child.wait().await?; - tracing::info!(?status); - self.handle_exit( - if status.success() { None } else { Some(format!("{}", status)) }).await - }); - Ok(()) + let facet = t.facet.clone(); + t.linked_task(syndicate::name!(parent: self.name.clone(), "wait"), async move { + tracing::trace!("waiting for process exit"); + let status = child.wait().await?; + tracing::info!(?status); + facet.activate(Account::new(syndicate::name!("instance-terminated")), |t| { + let m = if status.success() { None } else { Some(format!("{}", status)) }; + self.handle_exit(t, m) })?; - Ok(()) - }) + Ok(LinkedTaskTermination::Normal) + }); + Ok(()) + })?; + Ok(()) } } @@ -142,7 +153,10 @@ fn run( ) -> ActorResult { let spec = language().unparse(&service); - let unready_configs = t.field(1isize); + let total_configs = t.named_field("total_configs", 0isize); + let unready_configs = t.named_field("unready_configs", 1isize); + let completed_processes = t.named_field("completed_processes", 0isize); + t.dataflow({ let mut handle = None; let ready = lifecycle::ready(&spec); @@ -154,11 +168,21 @@ fn run( }) })?; - enclose!((unready_configs) during!(t, config_ds, language(), , { - let unready_configs = unready_configs.clone(); - |t: &mut Activation| { - tracing::debug!(?config, "new unready config"); + t.dataflow(enclose!((completed_processes, total_configs) move |t| { + let total = *t.get(&total_configs); + let completed = *t.get(&completed_processes); + tracing::debug!(total_configs = ?total, completed_processes = ?completed); + if total > 0 && total == completed { + t.stop()?; + } + Ok(()) + }))?; + + enclose!((unready_configs, completed_processes) during!(t, config_ds, language(), , { + enclose!((unready_configs, completed_processes) |t: &mut Activation| { + tracing::debug!(?config, "new config"); counter::adjust(t, &unready_configs, 1); + counter::adjust(t, &total_configs, 1); match language().parse::(&config) { Ok(config) => { @@ -238,15 +262,18 @@ fn run( cmd.stderr(std::process::Stdio::inherit()); cmd.kill_on_drop(true); - (DaemonProcessInstance { + let process_instance = DaemonProcessInstance { name: tracing::Span::current(), - facet, cmd, announce_presumed_readiness, unready_configs, + completed_processes, restart_policy, - }).start()?; + }; + facet.activate(Account::new(syndicate::name!("instance-startup")), |t| { + process_instance.start(t) + })?; Ok(LinkedTaskTermination::KeepFacet) }); Ok(()) @@ -256,7 +283,7 @@ fn run( return Ok(()); } } - } + }) })); tracing::debug!("syncing to ds"); diff --git a/syndicate/benches/bench_dataspace.rs b/syndicate/benches/bench_dataspace.rs index b08b580..a6b84eb 100644 --- a/syndicate/benches/bench_dataspace.rs +++ b/syndicate/benches/bench_dataspace.rs @@ -33,8 +33,7 @@ struct ShutdownEntity; impl Entity for ShutdownEntity { fn message(&mut self, t: &mut Activation, _m: AnyValue) -> ActorResult { - t.stop(); - Ok(()) + t.stop() } } @@ -94,12 +93,7 @@ pub fn bench_pub(c: &mut Criterion) { let ds = Cap::new(&t.create(Dataspace::new())); let shutdown = entity(()) - .on_asserted(|_, _, _| { - Ok(Some(Box::new(|_, t| { - t.stop(); - Ok(()) - }))) - }) + .on_asserted(|_, _, _| Ok(Some(Box::new(|_, t| t.stop())))) .create_cap(t); ds.assert(t, language(), &Observe { diff --git a/syndicate/benches/ring.rs b/syndicate/benches/ring.rs index 5d672da..5af2344 100644 --- a/syndicate/benches/ring.rs +++ b/syndicate/benches/ring.rs @@ -59,7 +59,7 @@ pub fn bench_ring(c: &mut Criterion) { } impl Counter { - fn step(&mut self, t: &mut Activation) { + fn step(&mut self, t: &mut Activation) -> ActorResult { if self.remaining_to_send > 0 { self.remaining_to_send -= 1; MESSAGES_SENT.fetch_add(1, Ordering::Relaxed); @@ -68,20 +68,21 @@ pub fn bench_ring(c: &mut Criterion) { tracing::info!(iters = self.iters, actors_created = ACTORS_CREATED.load(Ordering::SeqCst), messages_sent = MESSAGES_SENT.load(Ordering::SeqCst)); - t.stop(); + t.stop()?; self.tx.send(self.start.elapsed() / ACTOR_COUNT).unwrap() } + Ok(()) } } impl Entity<()> for Counter { fn message(&mut self, t: &mut Activation, _message: ()) -> ActorResult { - Ok(self.step(t)) + self.step(t) } } impl Spawner { - fn step(&mut self, t: &mut Activation, next: Arc>) { + fn step(&mut self, t: &mut Activation, next: Arc>) -> ActorResult { if self.i < ACTOR_COUNT { let i = self.i; self.i += 1; @@ -103,15 +104,16 @@ pub fn bench_ring(c: &mut Criterion) { iters: self.iters, next, }; - c_state.step(t); + c_state.step(t)?; self.c.become_entity(c_state); } + Ok(()) } } impl Entity>> for Spawner { fn message(&mut self, t: &mut Activation, f: Arc>) -> ActorResult { - Ok(self.step(t, f)) + self.step(t, f) } } @@ -125,7 +127,7 @@ pub fn bench_ring(c: &mut Criterion) { i: 1, c: t.create_inert(), }; - s.step(t, Arc::clone(&s.c)); + s.step(t, Arc::clone(&s.c))?; Arc::clone(&s.self_ref).become_entity(s); Ok(()) }).await.unwrap().unwrap(); diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index a7e5332..a2405f7 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -355,8 +355,8 @@ pub struct RunningActor { /// values. Use [`Activation::dataflow`] to create a reactive block within a facet that will be /// (re-)executed whenever some dependent field changes value. /// -#[derive(Debug)] pub struct Field { + pub name: String, pub field_id: FieldId, tx: UnboundedSender, phantom: PhantomData, @@ -925,7 +925,7 @@ impl<'activation> Activation<'activation> { f.linked_tasks.remove(&task_id); } if let LinkedTaskTermination::Normal = result { - t.stop(); + t.stop()?; } Ok(()) }); @@ -938,18 +938,28 @@ impl<'activation> Activation<'activation> { /// Executes the given action after the given duration has elapsed (so long as the active /// facet still exists at that time). - pub fn after(&mut self, duration: time::Duration, a: Action) { + pub fn after ActorResult>( + &mut self, + duration: time::Duration, + a: F, + ) { self.at(time::Instant::now() + duration, a) } /// Executes the given action at the given instant (so long as the active facet still /// exists at that time). - pub fn at>(&mut self, instant: I, a: Action) { + pub fn at, F: 'static + Send + FnOnce(&mut Activation) -> ActorResult>( + &mut self, + instant: I, + a: F, + ) { let account = Arc::clone(self.account()); let instant = instant.into(); let facet = self.facet.clone(); - self.linked_task(crate::name!("Activation::at"), async move { + let span = tracing::Span::current().clone(); + self.linked_task(crate::name!(parent: None, "Activation::at"), async move { tokio::time::sleep_until(instant.into()).await; + let _entry = span.enter(); facet.activate(account, a)?; Ok(LinkedTaskTermination::KeepFacet) }); @@ -1006,7 +1016,9 @@ impl<'activation> Activation<'activation> { let f = Facet::new(Some(self.facet.facet_id)); let facet_id = f.facet_id; self.state.facet_nodes.insert(facet_id, f); - tracing::trace!(?facet_id, facet_count = ?self.state.facet_nodes.len()); + tracing::debug!(parent_id = ?self.facet.facet_id, + ?facet_id, + actor_facet_count = ?self.state.facet_nodes.len()); self.state.facet_children.entry(self.facet.facet_id).or_default().insert(facet_id); self.with_facet(true /* TODO: tiny optimisation: "false" would be safe here */, facet_id, move |t| { boot(t)?; @@ -1039,28 +1051,43 @@ impl<'activation> Activation<'activation> { } /// Arranges for the [`Facet`] named by `facet_id` to be stopped cleanly when `self` - /// commits. If `continuation` is supplied, the facet to be stopped hasn't been stopped at - /// the time of the `stop_facet` call, none of the shutdown handlers yields an error, and - /// the facet's parent facet is alive, executes `continuation` in the parent facet's - /// context. - pub fn stop_facet(&mut self, facet_id: FacetId, continuation: Option) { + /// commits. + /// + /// Then, + /// - if `continuation` is supplied, and + /// - the facet to be stopped hasn't been stopped at the time of the `stop_facet_and_continue` call, and + /// - none of the shutdown handlers yields an error, and + /// - the facet's parent facet is alive, + /// executes `continuation` (immediately) in the *parent* facet's context. + /// + pub fn stop_facet_and_continue ActorResult>( + &mut self, + facet_id: FacetId, + continuation: Option, + ) -> ActorResult { let maybe_parent_id = self.active_facet().and_then(|f| f.parent_facet_id); - self.enqueue_for_myself_at_commit(Box::new(move |t| { - t._terminate_facet(facet_id, true)?; - if let Some(k) = continuation { - if let Some(parent_id) = maybe_parent_id { - t.with_facet(true, parent_id, k)?; - } + self.enqueue_for_myself_at_commit(Box::new(move |t| t._terminate_facet(facet_id, true))); + if let Some(k) = continuation { + if let Some(parent_id) = maybe_parent_id { + self.with_facet(true, parent_id, k)?; } - Ok(()) - })); + } + Ok(()) + } + + /// Arranges for the [`Facet`] named by `facet_id` to be stopped cleanly when `self` + /// commits. + /// + /// Equivalent to `self.stop_facet_and_continue(facet_id, None)`. + pub fn stop_facet(&mut self, facet_id: FacetId) -> ActorResult { + self.stop_facet_and_continue::(facet_id, None) } /// Arranges for the active facet to be stopped cleanly when `self` commits. /// - /// Equivalent to `self.stop_facet(self.facet.facet_id, None)`. - pub fn stop(&mut self) { - self.stop_facet(self.facet.facet_id, None) + /// Equivalent to `self.stop_facet(self.facet.facet_id)`. + pub fn stop(&mut self) -> ActorResult { + self.stop_facet(self.facet.facet_id) } fn stop_if_inert(&mut self) { @@ -1069,7 +1096,7 @@ impl<'activation> Activation<'activation> { tracing::trace!("Checking inertness of facet {} from facet {}", facet_id, t.facet.facet_id); if t.state.facet_exists_and_is_inert(facet_id) { tracing::trace!(" - facet {} is inert, stopping it", facet_id); - t.stop_facet(facet_id, None); + t.stop_facet(facet_id)?; } else { tracing::trace!(" - facet {} is not inert", facet_id); } @@ -1079,7 +1106,8 @@ impl<'activation> Activation<'activation> { fn _terminate_facet(&mut self, facet_id: FacetId, alive: bool) -> ActorResult { if let Some(mut f) = self.state.facet_nodes.remove(&facet_id) { - tracing::debug!("{} termination of {:?}", + tracing::debug!(actor_facet_count = ?self.state.facet_nodes.len(), + "{} termination of {:?}", if alive { "living" } else { "post-exit" }, facet_id); if let Some(p) = f.parent_facet_id { @@ -1101,9 +1129,13 @@ impl<'activation> Activation<'activation> { // cleanup-actions are performed before parent-facet cleanup-actions. if let Some(p) = parent_facet_id { if t.state.facet_exists_and_is_inert(p) { + tracing::trace!("terminating parent {:?} of facet {:?}", p, facet_id); t._terminate_facet(p, true)?; + } else { + tracing::trace!("not terminating parent {:?} of facet {:?}", p, facet_id); } } else { + tracing::trace!("terminating actor of root facet {:?}", facet_id); t.state.shutdown(); } } else { @@ -1116,25 +1148,31 @@ impl<'activation> Activation<'activation> { } } - /// Create a new dataflow variable (field) within the active [`Actor`]. - pub fn field(&mut self, initial_value: T) -> Arc> { + /// Create a new named dataflow variable (field) within the active [`Actor`]. + pub fn named_field(&mut self, name: &str, initial_value: T) -> Arc> { let field_id = FieldId::new(NEXT_FIELD_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed)) .expect("Internal error: Attempt to allocate FieldId of zero. Too many FieldIds allocated. Restart the process."); self.state.fields.insert(field_id, Box::new(initial_value)); Arc::new(Field { + name: name.to_owned(), field_id, tx: self.state.tx.clone(), phantom: PhantomData, }) } + /// Create a new anonymous dataflow variable (field) within the active [`Actor`]. + pub fn field(&mut self, initial_value: T) -> Arc> { + self.named_field("", initial_value) + } + /// Retrieve a reference to the current value of a dataflow variable (field); if execution /// is currently within a [dataflow block][Activation::dataflow], marks the block as /// *depending upon* the field. /// pub fn get(&mut self, field: &Field) -> &T { - tracing::trace!(field = ?field.field_id, block = ?self.active_block, "get"); if let Some(block) = self.active_block { + tracing::trace!(?field, ?block, action = "get", "observed"); self.state.dataflow.record_observation(block, field.field_id); } let any = self.state.fields.get(&field.field_id) @@ -1149,12 +1187,13 @@ impl<'activation> Activation<'activation> { /// reevaluation of dependent blocks. /// pub fn get_mut(&mut self, field: &Field) -> &mut T { - tracing::trace!(field = ?field.field_id, block = ?self.active_block, "get_mut"); { // Overapproximation. if let Some(block) = self.active_block { + tracing::trace!(?field, ?block, action = "get_mut", "observed"); self.state.dataflow.record_observation(block, field.field_id); } + tracing::trace!(?field, active_block = ?self.active_block, action = "get_mut", "damaged"); self.state.dataflow.record_damage(field.field_id); } let any = self.state.fields.get_mut(&field.field_id) @@ -1166,7 +1205,7 @@ impl<'activation> Activation<'activation> { /// the new value is [`eq`][std::cmp::PartialEq::eq] to the value being overwritten. /// pub fn set(&mut self, field: &Field, value: T) { - tracing::trace!(field = ?field.field_id, block = ?self.active_block, "set"); + tracing::trace!(?field, active_block = ?self.active_block, action = "set", "damaged"); // Overapproximation in many cases, since the new value may not produce an // observable difference (may be equal to the current value). self.state.dataflow.record_damage(field.field_id); @@ -1213,7 +1252,9 @@ impl<'activation> Activation<'activation> { } } } - tracing::trace!(passes = ?pass_number, "repair_dataflow complete"); + if pass_number > 0 { + tracing::trace!(passes = ?pass_number, "repair_dataflow complete"); + } Ok(pass_number > 0) } @@ -1421,7 +1462,7 @@ impl Actor { let (tx, rx) = unbounded_channel(); let actor_id = next_actor_id(); let root = Facet::new(None); - // tracing::trace!(id = actor_id, "Actor::new"); + tracing::trace!(?actor_id, root_facet_id = ?root.facet_id, "Actor::new"); let mut st = RunningActor { actor_id, tx, @@ -1656,11 +1697,17 @@ impl RunningActor { fn facet_exists_and_is_inert(&mut self, facet_id: FacetId) -> bool { let no_kids = self.facet_children.get(&facet_id).map(|cs| cs.is_empty()).unwrap_or(true); if let Some(f) = self.get_facet(facet_id) { - no_kids && - f.outbound_handles.is_empty() && - f.linked_tasks.is_empty() && - f.inert_check_preventers.load(Ordering::Relaxed) == 0 + // The only outbound handle the root facet of an actor may have is a link + // assertion, from [Activation::link]. This is not to be considered a "real" + // assertion for purposes of keeping the facet alive! + let no_outbound_handles = f.outbound_handles.is_empty(); + let is_root_facet = f.parent_facet_id.is_none(); + let no_linked_tasks = f.linked_tasks.is_empty(); + let no_inert_check_preventers = f.inert_check_preventers.load(Ordering::Relaxed) == 0; + tracing::trace!(?facet_id, ?no_kids, ?no_outbound_handles, ?is_root_facet, ?no_linked_tasks, ?no_inert_check_preventers); + no_kids && (no_outbound_handles || is_root_facet) && no_linked_tasks && no_inert_check_preventers } else { + tracing::trace!(?facet_id, exists = ?false); false } } @@ -1684,6 +1731,12 @@ impl RunningActor { } } +impl std::fmt::Debug for Field { + fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { + write!(f, "#", self.name, self.field_id) + } +} + impl Eq for Field {} impl PartialEq for Field { fn eq(&self, other: &Field) -> bool { @@ -1943,8 +1996,7 @@ where impl Entity for StopOnRetract { fn retract(&mut self, t: &mut Activation, _h: Handle) -> ActorResult { - t.stop(); - Ok(()) + t.stop() } } diff --git a/syndicate/src/during.rs b/syndicate/src/during.rs index 048be81..b439f42 100644 --- a/syndicate/src/during.rs +++ b/syndicate/src/during.rs @@ -118,10 +118,7 @@ where let _ = t.prevent_inert_check(); assertion_handler(state, t, a) })?; - Ok(Some(Box::new(move |_state, t| { - t.stop_facet(facet_id, None); - Ok(()) - }))) + Ok(Some(Box::new(move |_state, t| t.stop_facet(facet_id)))) })) } diff --git a/syndicate/src/supervise.rs b/syndicate/src/supervise.rs index fe2976b..707bad3 100644 --- a/syndicate/src/supervise.rs +++ b/syndicate/src/supervise.rs @@ -109,11 +109,11 @@ impl Entity for Supervisor } else { self.config.pause_time }; - t.after(wait_time, Box::new(move |t| { + t.after(wait_time, move |t| { tracing::trace!("Sending retry trigger"); t.message(&self_ref, Protocol::Retry); Ok(()) - })); + }); }, } Ok(()) @@ -151,7 +151,7 @@ impl Supervisor { let _entry = name.enter(); tracing::trace!(?config); let self_ref = t.create_inert(); - let state_field = t.field(State::Started); + let state_field = t.named_field("supervisee_state", State::Started); let mut supervisor = Supervisor { self_ref: Arc::clone(&self_ref), name: name.clone(),