From e90fe2c41ebf2c450bf42703e9094f693d0ce3e5 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 1 Sep 2021 17:31:01 +0200 Subject: [PATCH] Supervisor RestartPolicy --- .../src/services/config_watcher.rs | 2 +- .../src/services/debt_reporter.rs | 2 +- .../src/services/tcp_relay_listener.rs | 2 +- .../src/services/unix_relay_listener.rs | 2 +- syndicate/src/actor.rs | 14 ++- syndicate/src/supervise.rs | 112 ++++++++++++------ 6 files changed, 89 insertions(+), 45 deletions(-) diff --git a/syndicate-server/src/services/config_watcher.rs b/syndicate-server/src/services/config_watcher.rs index f6803c9..fab1d6a 100644 --- a/syndicate-server/src/services/config_watcher.rs +++ b/syndicate-server/src/services/config_watcher.rs @@ -46,7 +46,7 @@ pub fn on_demand(t: &mut Activation, ds: Arc) { observer: monitor, }); Ok(()) - }) + }); } fn convert_notify_error(e: notify::Error) -> syndicate::error::Error { diff --git a/syndicate-server/src/services/debt_reporter.rs b/syndicate-server/src/services/debt_reporter.rs index ff3d07a..7a63628 100644 --- a/syndicate-server/src/services/debt_reporter.rs +++ b/syndicate-server/src/services/debt_reporter.rs @@ -25,7 +25,7 @@ pub fn on_demand(t: &mut Activation, ds: Arc) { observer: monitor, }); Ok(()) - }) + }); } fn run(t: &mut Activation, ds: Arc) -> ActorResult { diff --git a/syndicate-server/src/services/tcp_relay_listener.rs b/syndicate-server/src/services/tcp_relay_listener.rs index f9aa63b..91376e9 100644 --- a/syndicate-server/src/services/tcp_relay_listener.rs +++ b/syndicate-server/src/services/tcp_relay_listener.rs @@ -35,7 +35,7 @@ pub fn on_demand(t: &mut Activation, ds: Arc, gateway: Arc) { observer: monitor, }); Ok(()) - }) + }); } fn run( diff --git a/syndicate-server/src/services/unix_relay_listener.rs b/syndicate-server/src/services/unix_relay_listener.rs index 617a9f6..c6a3683 100644 --- a/syndicate-server/src/services/unix_relay_listener.rs +++ b/syndicate-server/src/services/unix_relay_listener.rs @@ -36,7 +36,7 @@ pub fn on_demand(t: &mut Activation, ds: Arc, gateway: Arc) { observer: monitor, }); Ok(()) - }) + }); } fn run( diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index 2fe861f..d38552c 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -859,11 +859,14 @@ impl<'activation> Activation<'activation> { &mut self, name: tracing::Span, boot: F, - ) { + ) -> ActorRef { + let ac = Actor::new(); + let ac_ref = ac.ac_ref.clone(); self.enqueue_for_myself_at_commit(Box::new(move |_| { - Actor::new().boot(name, boot); + ac.boot(name, boot); Ok(()) })); + ac_ref } /// Schedule the creation of a new actor when the Activation commits. @@ -875,14 +878,17 @@ impl<'activation> Activation<'activation> { &mut self, name: tracing::Span, boot: F, - ) { + ) -> ActorRef { + let ac = Actor::new(); + let ac_ref = ac.ac_ref.clone(); let facet_id = self.facet.facet_id; self.enqueue_for_myself_at_commit(Box::new(move |t| { t.with_facet(true, facet_id, move |t| { - Actor::new().link(t).boot(name, boot); + ac.link(t).boot(name, boot); Ok(()) }) })); + ac_ref } /// Create a new subfacet of the currently-active facet. Runs `boot` in the new facet's diff --git a/syndicate/src/supervise.rs b/syndicate/src/supervise.rs index 8851fe3..bec2f9b 100644 --- a/syndicate/src/supervise.rs +++ b/syndicate/src/supervise.rs @@ -15,12 +15,19 @@ enum Protocol ActorResult> { Retry, // message } -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RestartPolicy { + Always, + OnErrorOnly, +} + +#[derive(Debug, Clone)] pub struct SupervisorConfiguration { pub intensity: usize, pub period: Duration, pub pause_time: Duration, pub sleep_time: Duration, + pub restart_policy: RestartPolicy, } pub struct Supervisor ActorResult> { @@ -30,6 +37,7 @@ pub struct Supervisor ActorResu boot_fn: Option, restarts: VecDeque, supervisee: Supervisee, + ac_ref: Option, } #[derive(Debug, PartialEq, Eq)] @@ -57,6 +65,7 @@ impl Default for SupervisorConfiguration { period: Duration::from_secs(5), pause_time: Duration::from_millis(200), sleep_time: Duration::from_secs(10), + restart_policy: RestartPolicy::Always, } } } @@ -73,27 +82,43 @@ impl ActorResult> } fn retract(&mut self, t: &mut Activation, _h: Handle) -> ActorResult { - self.enter_state(Supervisee::Recovering); - let now = Instant::now(); - let oldest_to_keep = now - self.config.period; - self.restarts.push_back(now); - while let Some(stamp) = self.restarts.front() { - if stamp < &oldest_to_keep { - self.restarts.pop_front(); - } else { - break; - } + let _name = self.name.clone(); + let _entry = _name.enter(); + let exit_status = + self.ac_ref.take().expect("valid supervisee ActorRef") + .exit_status() + .expect("supervisee to have terminated"); + tracing::debug!(?exit_status); + match exit_status { + Ok(()) if self.config.restart_policy == RestartPolicy::OnErrorOnly => { + tracing::trace!("Not restarting: normal exit, restart_policy is OnErrorOnly"); + self.enter_state(Supervisee::NotRunning); + }, + _ => { + tracing::trace!("Restarting: restart_policy is Always or exit was abnormal"); + self.enter_state(Supervisee::Recovering); + let now = Instant::now(); + let oldest_to_keep = now - self.config.period; + self.restarts.push_back(now); + while let Some(stamp) = self.restarts.front() { + if stamp < &oldest_to_keep { + self.restarts.pop_front(); + } else { + break; + } + } + let self_ref = Arc::clone(&self.self_ref); + let wait_time = if self.restarts.len() > self.config.intensity { + self.config.sleep_time + } else { + self.config.pause_time + }; + t.after(wait_time, Box::new(move |t| { + t.message(&self_ref, Protocol::Retry); + Ok(()) + })); + }, } - let self_ref = Arc::clone(&self.self_ref); - let wait_time = if self.restarts.len() > self.config.intensity { - self.config.sleep_time - } else { - self.config.pause_time - }; - t.after(wait_time, Box::new(move |t| { - t.message(&self_ref, Protocol::Retry); - Ok(()) - })); Ok(()) } @@ -111,6 +136,12 @@ impl ActorResult> _ => Ok(()) } } + + fn stop(&mut self, _t: &mut Activation) -> ActorResult { + let _entry = self.name.enter(); + tracing::info!("Supervisor terminating"); + Ok(()) + } } impl ActorResult> Supervisor { @@ -120,18 +151,21 @@ impl ActorResult> Supervisor ActorResult> Supervisor ActorResult { match self.boot_fn.take() { - None => - tracing::error!("Cannot restart supervisee, because it panicked at startup"), + None => { + let _entry = self.name.enter(); + tracing::error!("Cannot restart supervisee, because it panicked at startup") + } Some(mut boot_fn) => { let self_ref = Arc::clone(&self.self_ref); t.facet(|t: &mut Activation| { t.assert(&self.self_ref, Protocol::SuperviseeStarted); - t.spawn_link(crate::name!(parent: &self.name, "supervisee"), move |t| { - match boot_fn(t) { - Ok(()) => { - t.message(&self_ref, Protocol::BootFunction(boot_fn)); - Ok(()) + self.ac_ref = Some(t.spawn_link( + crate::name!(parent: &self.name, "supervisee"), + move |t| { + match boot_fn(t) { + Ok(()) => { + t.message(&self_ref, Protocol::BootFunction(boot_fn)); + Ok(()) + } + Err(e) => { + t.clear(); + t.message(&self_ref, Protocol::BootFunction(boot_fn)); + t.deliver(); + Err(e) + } } - Err(e) => { - t.clear(); - t.message(&self_ref, Protocol::BootFunction(boot_fn)); - t.deliver(); - Err(e) - } - } - }); + })); Ok(()) })?; }