syndicate-rs/syndicate/src/supervise.rs

167 lines
5.6 KiB
Rust
Raw Normal View History

//! Extremely simple single-actor supervision. Vastly simplified compared to the available
//! options in [Erlang/OTP](https://erlang.org/doc/man/supervisor.html).
2022-01-19 13:40:50 +00:00
use preserves::value::NestedValue;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use tokio::time::Instant;
use crate::actor::*;
use crate::schemas::service::State;
pub type Boot = Arc<Mutex<Box<dyn Send + FnMut(&mut Activation) -> ActorResult>>>;
2021-09-01 15:31:01 +00:00
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RestartPolicy {
Always,
OnErrorOnly,
}
#[derive(Debug, Clone)]
pub struct SupervisorConfiguration {
pub intensity: usize,
pub period: Duration,
pub pause_time: Duration,
pub sleep_time: Duration,
2021-09-01 15:31:01 +00:00
pub restart_policy: RestartPolicy,
}
2022-01-26 22:37:43 +00:00
#[derive(Debug)]
struct StartNow;
pub struct Supervisor {
2022-01-26 22:37:43 +00:00
self_ref: Arc<Ref<StartNow>>,
2022-01-19 13:40:50 +00:00
child_name: Name,
config: SupervisorConfiguration,
boot_fn: Boot,
restarts: VecDeque<Instant>,
state: Arc<Field<State>>,
2021-09-01 15:31:01 +00:00
ac_ref: Option<ActorRef>,
}
impl Default for SupervisorConfiguration {
fn default() -> Self {
Self {
intensity: 1,
period: Duration::from_secs(5),
pause_time: Duration::from_millis(200),
sleep_time: Duration::from_secs(10),
restart_policy: RestartPolicy::Always,
}
}
}
impl SupervisorConfiguration {
pub fn on_error_only() -> Self {
Self {
restart_policy: RestartPolicy::OnErrorOnly,
.. Self::default()
}
}
}
2022-01-26 22:37:43 +00:00
impl Entity<StartNow> for Supervisor
{
2022-01-26 22:37:43 +00:00
fn message(&mut self, t: &mut Activation, _m: StartNow) -> ActorResult {
self.start_now(t)
}
2022-01-26 22:37:43 +00:00
fn stop(&mut self, t: &mut Activation) -> ActorResult {
2022-01-19 13:40:50 +00:00
let _entry = tracing::info_span!("supervisor", name = ?self.child_name).entered();
MAJOR REFACTORING OF CORE ASSERTION-TRACKING STRUCTURES. Little impact on API. Read on for details. 2022-02-01 15:22:30 Two problems. - If a stop action panics (in `_terminate_facet`), the Facet is dropped before its outbound handles are removed. With the code as it stands, this leaks assertions (!!). - The logic for removing an outbound handle seems to be running in the wrong facet context??? (See `f.outbound_handles.remove(&handle)` in the cleanup actions - I think I need to remove the for_myself mechanism - and add some callbacks to run only on successful commit 2022-02-02 12:12:33 This is hard. Here's the current implementation: - assert - inserts into outbound_handles of active facet - adds cleanup action describing how to do the retraction - enqueues the assert action, which - calls e.assert() - retract - looks up & removes the cleanup action, which - enqueues the retract action, which - removes from outbound_handles of the WRONG facet in the WRONG actor - calls e.retract() - _terminate_facet - uses outbound_handles to retract the facet's assertions - doesn't directly touch cleanup actions, relying on retract to do that - if one of a facet's stop actions panics, will drop the facet, leaking its assertions - actually, even if a stop action yields `Err`, it will drop the facet and leak assertions - yikes - facet drop - panics if outbound_handles is nonempty - actor cleanup - relies on facet tree to find assertions to retract Revised plan: - ✓ revise Activation/PendingEvents structures - rename `cleanup_actions` to `outbound_assertions` - remove `for_myself` queues and `final_actions` - add `pre_commit_actions`, `rollback_actions` and `commit_actions` - ✓ assert - as before - but on rollback, removes from `outbound_handles` (if the facet still exists) and `outbound_assertions` (always) - marks the new assertion as "established" on commit - ✓ retract - lookup in `outbound_assertions` by handle, using presence as indication it hasn't been scheduled in this turn - on rollback, put it back in `outbound_assertions` ONLY IF IT IS MARKED ESTABLISHED - otherwise it is a retraction of an `assert` that has *also* been rolled back in this turn - on commit, remove it from `outbound_handles` - enqueue the retract action, which just calls e.retract() - ✓ _terminate_facet - revised quite a bit now we rely on `RunningActor::cleanup` to use `outbound_assertions` rather than the facet tree. - still drops Facets on panic, but this is now mostly harmless (reorders retractions a bit) - handles `Err` from a stop action more gracefully - slightly cleverer tracking of what needs doing based on a `TerminationDirection` - now ONLY applies to ORDERLY cleanup of the facet tree. Disorderly cleanup ignores the facet tree and just retracts the assertions willy-nilly. - ✓ facet drop - warn if outbound_handles is nonempty, but don't do anything about it - ✓ actor cleanup - doesn't use the facet tree at all. - cleanly shutting down is done elsewhere - uses the remaining entries in `outbound_assertions` (previously `cleanup_actions`) to deal with retractions for dropped facets as well as any other facets that haven't been cleanly shut down - ✓ activate - now has a panic_guard::PanicGuard RAII for conveying a crash to an actor in case the activation is happening from a linked task or another thread (this wasn't the case in the examples that provoked this work, though) - simplified - explicit commit/rollback decision - ✓ Actor::run - no longer uses the same path for crash-termination and success-termination - instead, for success-termination, takes a turn that calls Activation::stop_root - this cleans up the facet tree using _terminate_facet - when the turn ends, it notices that the root facet is gone and shuts down the actor - so in principle there will be nothing for actor cleanup to do 2022-02-04 13:52:34 This took days. :-(
2022-02-04 12:59:37 +00:00
match self.ac_ref.take().expect("valid supervisee ActorRef").exit_status() {
None =>
tracing::debug!("Supervisor shut down; supervisee will exit soon"),
Some(Ok(())) if self.config.restart_policy == RestartPolicy::OnErrorOnly => {
2021-09-01 15:31:01 +00:00
tracing::trace!("Not restarting: normal exit, restart_policy is OnErrorOnly");
t.set(&self.state, State::Complete);
2021-09-01 15:31:01 +00:00
},
MAJOR REFACTORING OF CORE ASSERTION-TRACKING STRUCTURES. Little impact on API. Read on for details. 2022-02-01 15:22:30 Two problems. - If a stop action panics (in `_terminate_facet`), the Facet is dropped before its outbound handles are removed. With the code as it stands, this leaks assertions (!!). - The logic for removing an outbound handle seems to be running in the wrong facet context??? (See `f.outbound_handles.remove(&handle)` in the cleanup actions - I think I need to remove the for_myself mechanism - and add some callbacks to run only on successful commit 2022-02-02 12:12:33 This is hard. Here's the current implementation: - assert - inserts into outbound_handles of active facet - adds cleanup action describing how to do the retraction - enqueues the assert action, which - calls e.assert() - retract - looks up & removes the cleanup action, which - enqueues the retract action, which - removes from outbound_handles of the WRONG facet in the WRONG actor - calls e.retract() - _terminate_facet - uses outbound_handles to retract the facet's assertions - doesn't directly touch cleanup actions, relying on retract to do that - if one of a facet's stop actions panics, will drop the facet, leaking its assertions - actually, even if a stop action yields `Err`, it will drop the facet and leak assertions - yikes - facet drop - panics if outbound_handles is nonempty - actor cleanup - relies on facet tree to find assertions to retract Revised plan: - ✓ revise Activation/PendingEvents structures - rename `cleanup_actions` to `outbound_assertions` - remove `for_myself` queues and `final_actions` - add `pre_commit_actions`, `rollback_actions` and `commit_actions` - ✓ assert - as before - but on rollback, removes from `outbound_handles` (if the facet still exists) and `outbound_assertions` (always) - marks the new assertion as "established" on commit - ✓ retract - lookup in `outbound_assertions` by handle, using presence as indication it hasn't been scheduled in this turn - on rollback, put it back in `outbound_assertions` ONLY IF IT IS MARKED ESTABLISHED - otherwise it is a retraction of an `assert` that has *also* been rolled back in this turn - on commit, remove it from `outbound_handles` - enqueue the retract action, which just calls e.retract() - ✓ _terminate_facet - revised quite a bit now we rely on `RunningActor::cleanup` to use `outbound_assertions` rather than the facet tree. - still drops Facets on panic, but this is now mostly harmless (reorders retractions a bit) - handles `Err` from a stop action more gracefully - slightly cleverer tracking of what needs doing based on a `TerminationDirection` - now ONLY applies to ORDERLY cleanup of the facet tree. Disorderly cleanup ignores the facet tree and just retracts the assertions willy-nilly. - ✓ facet drop - warn if outbound_handles is nonempty, but don't do anything about it - ✓ actor cleanup - doesn't use the facet tree at all. - cleanly shutting down is done elsewhere - uses the remaining entries in `outbound_assertions` (previously `cleanup_actions`) to deal with retractions for dropped facets as well as any other facets that haven't been cleanly shut down - ✓ activate - now has a panic_guard::PanicGuard RAII for conveying a crash to an actor in case the activation is happening from a linked task or another thread (this wasn't the case in the examples that provoked this work, though) - simplified - explicit commit/rollback decision - ✓ Actor::run - no longer uses the same path for crash-termination and success-termination - instead, for success-termination, takes a turn that calls Activation::stop_root - this cleans up the facet tree using _terminate_facet - when the turn ends, it notices that the root facet is gone and shuts down the actor - so in principle there will be nothing for actor cleanup to do 2022-02-04 13:52:34 This took days. :-(
2022-02-04 12:59:37 +00:00
Some(exit_status) => {
tracing::debug!(?exit_status);
2021-09-01 15:31:01 +00:00
tracing::trace!("Restarting: restart_policy is Always or exit was abnormal");
t.set(&self.state,
if exit_status.is_ok() { State::Complete } else { State::Failed });
2021-09-01 15:31:01 +00:00
let now = Instant::now();
let oldest_to_keep = now - self.config.period;
self.restarts.push_back(now);
while let Some(stamp) = self.restarts.front() {
if stamp < &oldest_to_keep {
self.restarts.pop_front();
} else {
break;
}
}
let self_ref = Arc::clone(&self.self_ref);
let wait_time = if self.restarts.len() > self.config.intensity {
2022-01-07 16:16:20 +00:00
tracing::warn!(?self.config.sleep_time, "Restart intensity exceeded; sleeping");
2021-09-01 15:31:01 +00:00
self.config.sleep_time
} else {
2022-01-07 16:16:20 +00:00
tracing::trace!(?self.config.pause_time, "pausing");
2021-09-01 15:31:01 +00:00
self.config.pause_time
};
t.after(wait_time, move |t| {
tracing::trace!("Sending retry trigger");
2022-01-26 22:37:43 +00:00
t.message(&self_ref, StartNow);
2021-09-01 15:31:01 +00:00
Ok(())
});
2021-09-01 15:31:01 +00:00
},
}
Ok(())
}
}
impl Supervisor {
pub fn start<C: 'static + Send + FnMut(&mut Activation, State) -> ActorResult,
B: 'static + Send + FnMut(&mut Activation) -> ActorResult>(
t: &mut Activation,
2022-01-19 13:40:50 +00:00
name: Name,
config: SupervisorConfiguration,
mut state_cb: C,
boot_fn: B,
) -> ActorResult {
2022-01-19 13:40:50 +00:00
let _entry = tracing::info_span!("supervisor", ?name).entered();
tracing::trace!(?config);
let self_ref = t.create_inert();
let state_field = t.named_field("supervisee_state", State::Started);
2022-01-19 13:40:50 +00:00
let my_name = name.as_ref().map(
|n| preserves::rec![AnyValue::symbol("supervisor"), n.clone()]);
let mut supervisor = Supervisor {
self_ref: Arc::clone(&self_ref),
2022-01-19 13:40:50 +00:00
child_name: name,
config,
boot_fn: Arc::new(Mutex::new(Box::new(boot_fn))),
restarts: VecDeque::new(),
state: Arc::clone(&state_field),
2021-09-01 15:31:01 +00:00
ac_ref: None,
};
2022-01-15 22:23:18 +00:00
tracing::info!(self_ref = ?supervisor.self_ref, "Supervisor starting");
supervisor.start_now(t)?;
2022-01-19 13:40:50 +00:00
t.dataflow(move |t| {
let state = t.get(&state_field).clone();
2022-01-19 13:40:50 +00:00
tracing::debug!(name = ?my_name, ?state);
state_cb(t, state)
2022-01-19 13:40:50 +00:00
})?;
self_ref.become_entity(supervisor);
Ok(())
}
fn start_now(&mut self, t: &mut Activation) -> ActorResult {
let boot_cell = Arc::clone(&self.boot_fn);
t.facet(|t: &mut Activation| {
2022-01-26 22:37:43 +00:00
t.on_stop_notify(&self.self_ref);
self.ac_ref = Some(t.spawn_link(
self.child_name.clone(),
move |t| boot_cell.lock().expect("Unpoisoned boot_fn mutex")(t)));
tracing::debug!(self_ref = ?self.self_ref,
supervisee = ?self.ac_ref,
"Supervisee started");
Ok(())
})?;
2022-01-26 22:37:43 +00:00
t.set(&self.state, State::Started);
Ok(())
}
}