2021-08-30 21:41:51 +00:00
|
|
|
//! Extremely simple single-actor supervision. Vastly simplified compared to the available
|
|
|
|
//! options in [Erlang/OTP](https://erlang.org/doc/man/supervisor.html).
|
|
|
|
|
2022-01-19 13:40:50 +00:00
|
|
|
use preserves::value::NestedValue;
|
|
|
|
|
2021-08-30 21:41:51 +00:00
|
|
|
use std::collections::VecDeque;
|
|
|
|
use std::sync::Arc;
|
2022-01-26 21:30:47 +00:00
|
|
|
use std::sync::Mutex;
|
2021-08-30 21:41:51 +00:00
|
|
|
use std::time::Duration;
|
|
|
|
|
|
|
|
use tokio::time::Instant;
|
|
|
|
|
|
|
|
use crate::actor::*;
|
2021-09-28 10:53:11 +00:00
|
|
|
use crate::schemas::service::State;
|
2021-08-30 21:41:51 +00:00
|
|
|
|
2022-01-26 21:30:47 +00:00
|
|
|
pub type Boot = Arc<Mutex<Box<dyn Send + FnMut(&mut Activation) -> ActorResult>>>;
|
2021-09-28 10:53:11 +00:00
|
|
|
|
2021-09-01 15:31:01 +00:00
|
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
|
|
pub enum RestartPolicy {
|
|
|
|
Always,
|
|
|
|
OnErrorOnly,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Clone)]
|
2021-08-30 21:41:51 +00:00
|
|
|
pub struct SupervisorConfiguration {
|
|
|
|
pub intensity: usize,
|
|
|
|
pub period: Duration,
|
|
|
|
pub pause_time: Duration,
|
|
|
|
pub sleep_time: Duration,
|
2021-09-01 15:31:01 +00:00
|
|
|
pub restart_policy: RestartPolicy,
|
2021-08-30 21:41:51 +00:00
|
|
|
}
|
|
|
|
|
2022-01-26 22:37:43 +00:00
|
|
|
#[derive(Debug)]
|
|
|
|
struct StartNow;
|
|
|
|
|
2021-09-28 10:53:11 +00:00
|
|
|
pub struct Supervisor {
|
2022-01-26 22:37:43 +00:00
|
|
|
self_ref: Arc<Ref<StartNow>>,
|
2022-01-19 13:40:50 +00:00
|
|
|
child_name: Name,
|
2021-08-30 21:41:51 +00:00
|
|
|
config: SupervisorConfiguration,
|
2022-01-26 21:30:47 +00:00
|
|
|
boot_fn: Boot,
|
2021-08-30 21:41:51 +00:00
|
|
|
restarts: VecDeque<Instant>,
|
2021-09-28 10:53:11 +00:00
|
|
|
state: Arc<Field<State>>,
|
2021-09-01 15:31:01 +00:00
|
|
|
ac_ref: Option<ActorRef>,
|
2021-08-30 21:41:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for SupervisorConfiguration {
|
|
|
|
fn default() -> Self {
|
|
|
|
Self {
|
|
|
|
intensity: 1,
|
|
|
|
period: Duration::from_secs(5),
|
|
|
|
pause_time: Duration::from_millis(200),
|
|
|
|
sleep_time: Duration::from_secs(10),
|
2022-01-07 16:16:05 +00:00
|
|
|
restart_policy: RestartPolicy::Always,
|
2021-08-30 21:41:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-01-07 21:05:12 +00:00
|
|
|
impl SupervisorConfiguration {
|
|
|
|
pub fn on_error_only() -> Self {
|
|
|
|
Self {
|
|
|
|
restart_policy: RestartPolicy::OnErrorOnly,
|
|
|
|
.. Self::default()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-01-26 22:37:43 +00:00
|
|
|
impl Entity<StartNow> for Supervisor
|
2021-08-30 21:41:51 +00:00
|
|
|
{
|
2022-01-26 22:37:43 +00:00
|
|
|
fn message(&mut self, t: &mut Activation, _m: StartNow) -> ActorResult {
|
|
|
|
self.start_now(t)
|
2021-08-30 21:41:51 +00:00
|
|
|
}
|
|
|
|
|
2022-01-26 22:37:43 +00:00
|
|
|
fn stop(&mut self, t: &mut Activation) -> ActorResult {
|
2022-01-19 13:40:50 +00:00
|
|
|
let _entry = tracing::info_span!("supervisor", name = ?self.child_name).entered();
|
MAJOR REFACTORING OF CORE ASSERTION-TRACKING STRUCTURES. Little impact on API. Read on for details.
2022-02-01 15:22:30 Two problems.
- If a stop action panics (in `_terminate_facet`), the Facet is dropped before its outbound
handles are removed. With the code as it stands, this leaks assertions (!!).
- The logic for removing an outbound handle seems to be running in the wrong facet context???
(See `f.outbound_handles.remove(&handle)` in the cleanup actions
- I think I need to remove the for_myself mechanism
- and add some callbacks to run only on successful commit
2022-02-02 12:12:33 This is hard.
Here's the current implementation:
- assert
- inserts into outbound_handles of active facet
- adds cleanup action describing how to do the retraction
- enqueues the assert action, which
- calls e.assert()
- retract
- looks up & removes the cleanup action, which
- enqueues the retract action, which
- removes from outbound_handles of the WRONG facet in the WRONG actor
- calls e.retract()
- _terminate_facet
- uses outbound_handles to retract the facet's assertions
- doesn't directly touch cleanup actions, relying on retract to do that
- if one of a facet's stop actions panics, will drop the facet, leaking its assertions
- actually, even if a stop action yields `Err`, it will drop the facet and leak assertions
- yikes
- facet drop
- panics if outbound_handles is nonempty
- actor cleanup
- relies on facet tree to find assertions to retract
Revised plan:
- ✓ revise Activation/PendingEvents structures
- rename `cleanup_actions` to `outbound_assertions`
- remove `for_myself` queues and `final_actions`
- add `pre_commit_actions`, `rollback_actions` and `commit_actions`
- ✓ assert
- as before
- but on rollback, removes from `outbound_handles` (if the facet still exists) and
`outbound_assertions` (always)
- marks the new assertion as "established" on commit
- ✓ retract
- lookup in `outbound_assertions` by handle, using presence as indication it hasn't been
scheduled in this turn
- on rollback, put it back in `outbound_assertions` ONLY IF IT IS MARKED ESTABLISHED -
otherwise it is a retraction of an `assert` that has *also* been rolled back in this turn
- on commit, remove it from `outbound_handles`
- enqueue the retract action, which just calls e.retract()
- ✓ _terminate_facet
- revised quite a bit now we rely on `RunningActor::cleanup` to use `outbound_assertions`
rather than the facet tree.
- still drops Facets on panic, but this is now mostly harmless (reorders retractions a bit)
- handles `Err` from a stop action more gracefully
- slightly cleverer tracking of what needs doing based on a `TerminationDirection`
- now ONLY applies to ORDERLY cleanup of the facet tree. Disorderly cleanup ignores the
facet tree and just retracts the assertions willy-nilly.
- ✓ facet drop
- warn if outbound_handles is nonempty, but don't do anything about it
- ✓ actor cleanup
- doesn't use the facet tree at all.
- cleanly shutting down is done elsewhere
- uses the remaining entries in `outbound_assertions` (previously `cleanup_actions`) to
deal with retractions for dropped facets as well as any other facets that haven't been
cleanly shut down
- ✓ activate
- now has a panic_guard::PanicGuard RAII for conveying a crash to an actor in case the
activation is happening from a linked task or another thread (this wasn't the case in the
examples that provoked this work, though)
- simplified
- explicit commit/rollback decision
- ✓ Actor::run
- no longer uses the same path for crash-termination and success-termination
- instead, for success-termination, takes a turn that calls Activation::stop_root
- this cleans up the facet tree using _terminate_facet
- when the turn ends, it notices that the root facet is gone and shuts down the actor
- so in principle there will be nothing for actor cleanup to do
2022-02-04 13:52:34 This took days. :-(
2022-02-04 12:59:37 +00:00
|
|
|
match self.ac_ref.take().expect("valid supervisee ActorRef").exit_status() {
|
|
|
|
None =>
|
|
|
|
tracing::debug!("Supervisor shut down; supervisee will exit soon"),
|
|
|
|
Some(Ok(())) if self.config.restart_policy == RestartPolicy::OnErrorOnly => {
|
2021-09-01 15:31:01 +00:00
|
|
|
tracing::trace!("Not restarting: normal exit, restart_policy is OnErrorOnly");
|
2021-09-28 10:53:11 +00:00
|
|
|
t.set(&self.state, State::Complete);
|
2021-09-01 15:31:01 +00:00
|
|
|
},
|
MAJOR REFACTORING OF CORE ASSERTION-TRACKING STRUCTURES. Little impact on API. Read on for details.
2022-02-01 15:22:30 Two problems.
- If a stop action panics (in `_terminate_facet`), the Facet is dropped before its outbound
handles are removed. With the code as it stands, this leaks assertions (!!).
- The logic for removing an outbound handle seems to be running in the wrong facet context???
(See `f.outbound_handles.remove(&handle)` in the cleanup actions
- I think I need to remove the for_myself mechanism
- and add some callbacks to run only on successful commit
2022-02-02 12:12:33 This is hard.
Here's the current implementation:
- assert
- inserts into outbound_handles of active facet
- adds cleanup action describing how to do the retraction
- enqueues the assert action, which
- calls e.assert()
- retract
- looks up & removes the cleanup action, which
- enqueues the retract action, which
- removes from outbound_handles of the WRONG facet in the WRONG actor
- calls e.retract()
- _terminate_facet
- uses outbound_handles to retract the facet's assertions
- doesn't directly touch cleanup actions, relying on retract to do that
- if one of a facet's stop actions panics, will drop the facet, leaking its assertions
- actually, even if a stop action yields `Err`, it will drop the facet and leak assertions
- yikes
- facet drop
- panics if outbound_handles is nonempty
- actor cleanup
- relies on facet tree to find assertions to retract
Revised plan:
- ✓ revise Activation/PendingEvents structures
- rename `cleanup_actions` to `outbound_assertions`
- remove `for_myself` queues and `final_actions`
- add `pre_commit_actions`, `rollback_actions` and `commit_actions`
- ✓ assert
- as before
- but on rollback, removes from `outbound_handles` (if the facet still exists) and
`outbound_assertions` (always)
- marks the new assertion as "established" on commit
- ✓ retract
- lookup in `outbound_assertions` by handle, using presence as indication it hasn't been
scheduled in this turn
- on rollback, put it back in `outbound_assertions` ONLY IF IT IS MARKED ESTABLISHED -
otherwise it is a retraction of an `assert` that has *also* been rolled back in this turn
- on commit, remove it from `outbound_handles`
- enqueue the retract action, which just calls e.retract()
- ✓ _terminate_facet
- revised quite a bit now we rely on `RunningActor::cleanup` to use `outbound_assertions`
rather than the facet tree.
- still drops Facets on panic, but this is now mostly harmless (reorders retractions a bit)
- handles `Err` from a stop action more gracefully
- slightly cleverer tracking of what needs doing based on a `TerminationDirection`
- now ONLY applies to ORDERLY cleanup of the facet tree. Disorderly cleanup ignores the
facet tree and just retracts the assertions willy-nilly.
- ✓ facet drop
- warn if outbound_handles is nonempty, but don't do anything about it
- ✓ actor cleanup
- doesn't use the facet tree at all.
- cleanly shutting down is done elsewhere
- uses the remaining entries in `outbound_assertions` (previously `cleanup_actions`) to
deal with retractions for dropped facets as well as any other facets that haven't been
cleanly shut down
- ✓ activate
- now has a panic_guard::PanicGuard RAII for conveying a crash to an actor in case the
activation is happening from a linked task or another thread (this wasn't the case in the
examples that provoked this work, though)
- simplified
- explicit commit/rollback decision
- ✓ Actor::run
- no longer uses the same path for crash-termination and success-termination
- instead, for success-termination, takes a turn that calls Activation::stop_root
- this cleans up the facet tree using _terminate_facet
- when the turn ends, it notices that the root facet is gone and shuts down the actor
- so in principle there will be nothing for actor cleanup to do
2022-02-04 13:52:34 This took days. :-(
2022-02-04 12:59:37 +00:00
|
|
|
Some(exit_status) => {
|
|
|
|
tracing::debug!(?exit_status);
|
2021-09-01 15:31:01 +00:00
|
|
|
tracing::trace!("Restarting: restart_policy is Always or exit was abnormal");
|
2021-09-28 10:53:11 +00:00
|
|
|
t.set(&self.state,
|
|
|
|
if exit_status.is_ok() { State::Complete } else { State::Failed });
|
2021-09-01 15:31:01 +00:00
|
|
|
let now = Instant::now();
|
|
|
|
let oldest_to_keep = now - self.config.period;
|
|
|
|
self.restarts.push_back(now);
|
|
|
|
while let Some(stamp) = self.restarts.front() {
|
|
|
|
if stamp < &oldest_to_keep {
|
|
|
|
self.restarts.pop_front();
|
|
|
|
} else {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
let self_ref = Arc::clone(&self.self_ref);
|
|
|
|
let wait_time = if self.restarts.len() > self.config.intensity {
|
2022-01-07 16:16:20 +00:00
|
|
|
tracing::warn!(?self.config.sleep_time, "Restart intensity exceeded; sleeping");
|
2021-09-01 15:31:01 +00:00
|
|
|
self.config.sleep_time
|
|
|
|
} else {
|
2022-01-07 16:16:20 +00:00
|
|
|
tracing::trace!(?self.config.pause_time, "pausing");
|
2021-09-01 15:31:01 +00:00
|
|
|
self.config.pause_time
|
|
|
|
};
|
2021-09-28 15:10:36 +00:00
|
|
|
t.after(wait_time, move |t| {
|
2021-09-28 10:53:11 +00:00
|
|
|
tracing::trace!("Sending retry trigger");
|
2022-01-26 22:37:43 +00:00
|
|
|
t.message(&self_ref, StartNow);
|
2021-09-01 15:31:01 +00:00
|
|
|
Ok(())
|
2021-09-28 15:10:36 +00:00
|
|
|
});
|
2021-09-01 15:31:01 +00:00
|
|
|
},
|
2021-08-30 21:41:51 +00:00
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-28 10:53:11 +00:00
|
|
|
impl Supervisor {
|
|
|
|
pub fn start<C: 'static + Send + FnMut(&mut Activation, State) -> ActorResult,
|
|
|
|
B: 'static + Send + FnMut(&mut Activation) -> ActorResult>(
|
2021-08-30 21:41:51 +00:00
|
|
|
t: &mut Activation,
|
2022-01-19 13:40:50 +00:00
|
|
|
name: Name,
|
2021-08-30 21:41:51 +00:00
|
|
|
config: SupervisorConfiguration,
|
2021-09-28 10:53:11 +00:00
|
|
|
mut state_cb: C,
|
|
|
|
boot_fn: B,
|
|
|
|
) -> ActorResult {
|
2022-01-19 13:40:50 +00:00
|
|
|
let _entry = tracing::info_span!("supervisor", ?name).entered();
|
2021-08-30 21:41:51 +00:00
|
|
|
tracing::trace!(?config);
|
|
|
|
let self_ref = t.create_inert();
|
2021-09-28 15:10:36 +00:00
|
|
|
let state_field = t.named_field("supervisee_state", State::Started);
|
2022-01-19 13:40:50 +00:00
|
|
|
let my_name = name.as_ref().map(
|
|
|
|
|n| preserves::rec![AnyValue::symbol("supervisor"), n.clone()]);
|
2021-08-30 21:41:51 +00:00
|
|
|
let mut supervisor = Supervisor {
|
|
|
|
self_ref: Arc::clone(&self_ref),
|
2022-01-19 13:40:50 +00:00
|
|
|
child_name: name,
|
2021-08-30 21:41:51 +00:00
|
|
|
config,
|
2022-01-26 21:30:47 +00:00
|
|
|
boot_fn: Arc::new(Mutex::new(Box::new(boot_fn))),
|
2021-08-30 21:41:51 +00:00
|
|
|
restarts: VecDeque::new(),
|
2021-09-28 10:53:11 +00:00
|
|
|
state: Arc::clone(&state_field),
|
2021-09-01 15:31:01 +00:00
|
|
|
ac_ref: None,
|
2021-08-30 21:41:51 +00:00
|
|
|
};
|
2022-01-15 22:23:18 +00:00
|
|
|
tracing::info!(self_ref = ?supervisor.self_ref, "Supervisor starting");
|
2022-01-26 21:30:47 +00:00
|
|
|
supervisor.start_now(t)?;
|
2022-01-19 13:40:50 +00:00
|
|
|
t.dataflow(move |t| {
|
2021-09-28 10:53:11 +00:00
|
|
|
let state = t.get(&state_field).clone();
|
2022-01-19 13:40:50 +00:00
|
|
|
tracing::debug!(name = ?my_name, ?state);
|
2021-09-28 10:53:11 +00:00
|
|
|
state_cb(t, state)
|
2022-01-19 13:40:50 +00:00
|
|
|
})?;
|
2021-08-30 21:41:51 +00:00
|
|
|
self_ref.become_entity(supervisor);
|
2021-09-28 10:53:11 +00:00
|
|
|
Ok(())
|
2021-08-30 21:41:51 +00:00
|
|
|
}
|
|
|
|
|
2022-01-26 21:30:47 +00:00
|
|
|
fn start_now(&mut self, t: &mut Activation) -> ActorResult {
|
|
|
|
let boot_cell = Arc::clone(&self.boot_fn);
|
|
|
|
t.facet(|t: &mut Activation| {
|
2022-01-26 22:37:43 +00:00
|
|
|
t.on_stop_notify(&self.self_ref);
|
2022-01-26 21:30:47 +00:00
|
|
|
self.ac_ref = Some(t.spawn_link(
|
|
|
|
self.child_name.clone(),
|
|
|
|
move |t| boot_cell.lock().expect("Unpoisoned boot_fn mutex")(t)));
|
|
|
|
tracing::debug!(self_ref = ?self.self_ref,
|
|
|
|
supervisee = ?self.ac_ref,
|
|
|
|
"Supervisee started");
|
|
|
|
Ok(())
|
|
|
|
})?;
|
2022-01-26 22:37:43 +00:00
|
|
|
t.set(&self.state, State::Started);
|
2021-08-30 21:41:51 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|