From 5861f91971d4d932744ef093f7b27de641c4f826 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 30 Aug 2021 14:17:40 +0200 Subject: [PATCH] Entity::stop, Activation::on_stop --- syndicate/src/actor.rs | 38 ++++++++++++++++++++++++---- syndicate/src/during.rs | 55 ++++++++++++++++++++++++++++++++++------- 2 files changed, 79 insertions(+), 14 deletions(-) diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index 9a087d1..9a0bb71 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -165,6 +165,16 @@ pub trait Entity: Send { Ok(()) } + /// Optional callback for running actions when the entity's owning [Facet] terminates + /// cleanly. Will not be called in case of abnormal shutdown (crash) of an actor. + /// + /// Programs register an entity's stop hook with [Activation::on_stop]. + /// + /// The default implementation does nothing. + fn stop(&mut self, turn: &mut Activation) -> ActorResult { + Ok(()) + } + /// Optional callback for running cleanup actions when the /// entity's animating [Actor] terminates. /// @@ -510,16 +520,16 @@ impl FacetRef { let mut t = Activation::make(&self.actor.facet_ref(state.root), Account::new(crate::name!("shutdown")), state); + if let Err(err) = t._terminate_facet(t.state.root, exit_status.is_ok()) { + // This can only occur as the result of an internal error in this file's code. + tracing::error!(?err, "unexpected error from terminate_facet"); + panic!("Unexpected error result from terminate_facet"); + } for action in std::mem::take(&mut t.state.exit_hooks) { if let Err(err) = action(&mut t, &exit_status) { tracing::error!(?err, "error in exit hook"); } } - if let Err(err) = t._terminate_facet(t.state.root, false) { - // This can only occur as the result of an internal error in this file's code. - tracing::error!(?err, "unexpected error from disorderly terminate_facet"); - panic!("Unexpected error result from disorderly terminate_facet"); - } *g = ActorState::Terminated { exit_status: Arc::clone(&exit_status), }; @@ -684,6 +694,21 @@ impl<'activation> Activation<'activation> { move |t| t.with_entity(&r, |t, e| e.sync(t, peer)))) } + /// Registers the entity `r` in the list of stop actions for the active facet. If the facet + /// terminates cleanly, `r`'s [`stop`][Entity::stop] will be called. + /// + /// **Note.** If the actor crashes, the stop actions will *not* be called. + /// + /// Use [`RunningActor::add_exit_hook`] to install a callback that will be called at the + /// end of the lifetime of the *actor* rather than the facet. (Also, exit hooks are called + /// no matter whether actor termination was normal or abnormal.) + pub fn on_stop(&mut self, r: &Arc>) { + if let Some(f) = self.active_facet() { + let r = Arc::clone(r); + f.stop_actions.push(Box::new(move |t| r.internal_with_entity(|e| e.stop(t)))); + } + } + /// Retrieve the [`Account`] against which actions are recorded. pub fn account(&self) -> &Arc { &self.pending.account @@ -1558,6 +1583,9 @@ where fn sync(&mut self, t: &mut Activation, peer: Arc>) -> ActorResult { t.with_entity(&self.underlying, |t, e| e.sync(t, peer)) } + fn stop(&mut self, t: &mut Activation) -> ActorResult { + t.with_entity(&self.underlying, |t, e| e.stop(t)) + } fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) -> ActorResult { self.underlying.internal_with_entity(|e| e.exit_hook(t, exit_status)) } diff --git a/syndicate/src/during.rs b/syndicate/src/during.rs index fd80fd2..cf9aea5 100644 --- a/syndicate/src/during.rs +++ b/syndicate/src/during.rs @@ -10,17 +10,19 @@ pub struct During(Map>); pub type DuringRetractionHandler = Box ActorResult>; pub type DuringResult = Result>, Error>; -pub struct DuringEntity +pub struct DuringEntity where M: 'static + Send, E: 'static + Send, Fa: 'static + Send + FnMut(&mut E, &mut Activation, M) -> DuringResult, Fm: 'static + Send + FnMut(&mut E, &mut Activation, M) -> ActorResult, + Fs: 'static + Send + FnMut(&mut E, &mut Activation) -> ActorResult, Fx: 'static + Send + FnMut(&mut E, &mut Activation, &Arc) -> ActorResult, { state: E, assertion_handler: Option, message_handler: Option, + stop_handler: Option, exit_handler: Option, during: During, phantom: PhantomData, @@ -51,38 +53,42 @@ pub fn entity( E, fn (&mut E, &mut Activation, M) -> DuringResult, fn (&mut E, &mut Activation, M) -> ActorResult, + fn (&mut E, &mut Activation) -> ActorResult, fn (&mut E, &mut Activation, &Arc) -> ActorResult> where E: 'static + Send, { - DuringEntity::new(state, None, None, None) + DuringEntity::new(state, None, None, None, None) } -impl DuringEntity +impl DuringEntity where M: 'static + Send, E: 'static + Send, Fa: 'static + Send + FnMut(&mut E, &mut Activation, M) -> DuringResult, Fm: 'static + Send + FnMut(&mut E, &mut Activation, M) -> ActorResult, + Fs: 'static + Send + FnMut(&mut E, &mut Activation) -> ActorResult, Fx: 'static + Send + FnMut(&mut E, &mut Activation, &Arc) -> ActorResult, { pub fn new( state: E, assertion_handler: Option, message_handler: Option, + stop_handler: Option, exit_handler: Option, ) -> Self { DuringEntity { state, assertion_handler, message_handler, + stop_handler, exit_handler, during: During::new(), phantom: PhantomData, } } - pub fn on_asserted(self, assertion_handler: Fa1) -> DuringEntity + pub fn on_asserted(self, assertion_handler: Fa1) -> DuringEntity where Fa1: 'static + Send + FnMut(&mut E, &mut Activation, M) -> DuringResult, { @@ -90,6 +96,7 @@ where state: self.state, assertion_handler: Some(assertion_handler), message_handler: self.message_handler, + stop_handler: self.stop_handler, exit_handler: self.exit_handler, during: self.during, phantom: self.phantom, @@ -99,7 +106,7 @@ where pub fn on_asserted_facet( self, mut assertion_handler: Fa1, - ) -> DuringEntity DuringResult>, Fm, Fx> + ) -> DuringEntity DuringResult>, Fm, Fs, Fx> where Fa1: 'static + Send + FnMut(&mut E, &mut Activation, M) -> ActorResult { @@ -112,7 +119,7 @@ where })) } - pub fn on_message(self, message_handler: Fm1) -> DuringEntity + pub fn on_message(self, message_handler: Fm1) -> DuringEntity where Fm1: 'static + Send + FnMut(&mut E, &mut Activation, M) -> ActorResult, { @@ -120,13 +127,29 @@ where state: self.state, assertion_handler: self.assertion_handler, message_handler: Some(message_handler), + stop_handler: self.stop_handler, exit_handler: self.exit_handler, during: self.during, phantom: self.phantom, } } - pub fn on_exit(self, exit_handler: Fx1) -> DuringEntity + pub fn on_stop(self, stop_handler: Fs1) -> DuringEntity + where + Fs1: 'static + Send + FnMut(&mut E, &mut Activation) -> ActorResult, + { + DuringEntity { + state: self.state, + assertion_handler: self.assertion_handler, + message_handler: self.message_handler, + stop_handler: Some(stop_handler), + exit_handler: self.exit_handler, + during: self.during, + phantom: self.phantom, + } + } + + pub fn on_exit(self, exit_handler: Fx1) -> DuringEntity where Fx1: 'static + Send + FnMut(&mut E, &mut Activation, &Arc) -> ActorResult, { @@ -134,6 +157,7 @@ where state: self.state, assertion_handler: self.assertion_handler, message_handler: self.message_handler, + stop_handler: self.stop_handler, exit_handler: Some(exit_handler), during: self.during, phantom: self.phantom, @@ -141,8 +165,12 @@ where } pub fn create(self, t: &mut Activation) -> Arc> { + let should_register_stop_action = self.stop_handler.is_some(); let should_register_exit_hook = self.exit_handler.is_some(); let r = t.create(self); + if should_register_stop_action { + t.on_stop(&r); + } if should_register_exit_hook { t.state.add_exit_hook(&r); } @@ -150,11 +178,12 @@ where } } -impl DuringEntity +impl DuringEntity where E: 'static + Send, Fa: 'static + Send + FnMut(&mut E, &mut Activation, AnyValue) -> DuringResult, Fm: 'static + Send + FnMut(&mut E, &mut Activation, AnyValue) -> ActorResult, + Fs: 'static + Send + FnMut(&mut E, &mut Activation) -> ActorResult, Fx: 'static + Send + FnMut(&mut E, &mut Activation, &Arc) -> ActorResult, { pub fn create_cap(self, t: &mut Activation) -> Arc @@ -163,12 +192,13 @@ where } } -impl Entity for DuringEntity +impl Entity for DuringEntity where M: Send, E: 'static + Send, Fa: 'static + Send + FnMut(&mut E, &mut Activation, M) -> DuringResult, Fm: 'static + Send + FnMut(&mut E, &mut Activation, M) -> ActorResult, + Fs: 'static + Send + FnMut(&mut E, &mut Activation) -> ActorResult, Fx: 'static + Send + FnMut(&mut E, &mut Activation, &Arc) -> ActorResult, { fn assert(&mut self, t: &mut Activation, a: M, h: Handle) -> ActorResult { @@ -192,6 +222,13 @@ where } } + fn stop(&mut self, t: &mut Activation) -> ActorResult { + match &mut self.stop_handler { + Some(handler) => handler(&mut self.state, t), + None => Ok(()), + } + } + fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) -> ActorResult { match &mut self.exit_handler { Some(handler) => handler(&mut self.state, t, exit_status),