2021-08-30 21:41:51 +00:00
|
|
|
//! Extremely simple single-actor supervision. Vastly simplified compared to the available
|
|
|
|
//! options in [Erlang/OTP](https://erlang.org/doc/man/supervisor.html).
|
|
|
|
|
|
|
|
use std::collections::VecDeque;
|
|
|
|
use std::sync::Arc;
|
|
|
|
use std::time::Duration;
|
|
|
|
|
|
|
|
use tokio::time::Instant;
|
|
|
|
|
|
|
|
use crate::actor::*;
|
2021-09-28 10:53:11 +00:00
|
|
|
use crate::enclose;
|
|
|
|
use crate::schemas::service::State;
|
2021-08-30 21:41:51 +00:00
|
|
|
|
2021-09-28 10:53:11 +00:00
|
|
|
pub type Boot = Box<dyn Send + FnMut(&mut Activation) -> ActorResult>;
|
|
|
|
|
|
|
|
enum Protocol {
|
2021-08-30 21:41:51 +00:00
|
|
|
SuperviseeStarted, // assertion
|
|
|
|
BootFunction(Boot), // message
|
|
|
|
Retry, // message
|
|
|
|
}
|
|
|
|
|
2021-09-01 15:31:01 +00:00
|
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
|
|
pub enum RestartPolicy {
|
|
|
|
Always,
|
|
|
|
OnErrorOnly,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Clone)]
|
2021-08-30 21:41:51 +00:00
|
|
|
pub struct SupervisorConfiguration {
|
|
|
|
pub intensity: usize,
|
|
|
|
pub period: Duration,
|
|
|
|
pub pause_time: Duration,
|
|
|
|
pub sleep_time: Duration,
|
2021-09-01 15:31:01 +00:00
|
|
|
pub restart_policy: RestartPolicy,
|
2021-08-30 21:41:51 +00:00
|
|
|
}
|
|
|
|
|
2021-09-28 10:53:11 +00:00
|
|
|
pub struct Supervisor {
|
|
|
|
self_ref: Arc<Ref<Protocol>>,
|
2021-08-30 21:41:51 +00:00
|
|
|
name: tracing::Span,
|
|
|
|
config: SupervisorConfiguration,
|
|
|
|
boot_fn: Option<Boot>,
|
|
|
|
restarts: VecDeque<Instant>,
|
2021-09-28 10:53:11 +00:00
|
|
|
state: Arc<Field<State>>,
|
2021-09-01 15:31:01 +00:00
|
|
|
ac_ref: Option<ActorRef>,
|
2021-08-30 21:41:51 +00:00
|
|
|
}
|
|
|
|
|
2021-09-28 10:53:11 +00:00
|
|
|
impl std::fmt::Debug for Protocol {
|
2021-08-30 21:41:51 +00:00
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
|
|
|
match self {
|
|
|
|
Protocol::SuperviseeStarted => write!(f, "Protocol::SuperviseeStarted"),
|
|
|
|
Protocol::BootFunction(_) => write!(f, "Protocol::BootFunction(_)"),
|
|
|
|
Protocol::Retry => write!(f, "Protocol::Retry"),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for SupervisorConfiguration {
|
|
|
|
fn default() -> Self {
|
|
|
|
Self {
|
|
|
|
intensity: 1,
|
|
|
|
period: Duration::from_secs(5),
|
|
|
|
pause_time: Duration::from_millis(200),
|
|
|
|
sleep_time: Duration::from_secs(10),
|
2022-01-07 16:16:05 +00:00
|
|
|
restart_policy: RestartPolicy::Always,
|
2021-08-30 21:41:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-01-07 21:05:12 +00:00
|
|
|
impl SupervisorConfiguration {
|
|
|
|
pub fn on_error_only() -> Self {
|
|
|
|
Self {
|
|
|
|
restart_policy: RestartPolicy::OnErrorOnly,
|
|
|
|
.. Self::default()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-28 10:53:11 +00:00
|
|
|
impl Entity<Protocol> for Supervisor
|
2021-08-30 21:41:51 +00:00
|
|
|
{
|
2021-09-28 10:53:11 +00:00
|
|
|
fn assert(&mut self, t: &mut Activation, m: Protocol, _h: Handle) -> ActorResult {
|
2021-08-30 21:41:51 +00:00
|
|
|
match m {
|
2021-09-28 10:53:11 +00:00
|
|
|
Protocol::SuperviseeStarted => t.set(&self.state, State::Started),
|
2022-01-09 20:01:55 +00:00
|
|
|
_ => Err(format!("Unexpected assertion: {:?}", m).as_str())?,
|
2021-08-30 21:41:51 +00:00
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn retract(&mut self, t: &mut Activation, _h: Handle) -> ActorResult {
|
2021-09-01 15:31:01 +00:00
|
|
|
let _name = self.name.clone();
|
|
|
|
let _entry = _name.enter();
|
|
|
|
let exit_status =
|
|
|
|
self.ac_ref.take().expect("valid supervisee ActorRef")
|
|
|
|
.exit_status()
|
|
|
|
.expect("supervisee to have terminated");
|
|
|
|
tracing::debug!(?exit_status);
|
|
|
|
match exit_status {
|
|
|
|
Ok(()) if self.config.restart_policy == RestartPolicy::OnErrorOnly => {
|
|
|
|
tracing::trace!("Not restarting: normal exit, restart_policy is OnErrorOnly");
|
2021-09-28 10:53:11 +00:00
|
|
|
t.set(&self.state, State::Complete);
|
2021-09-01 15:31:01 +00:00
|
|
|
},
|
|
|
|
_ => {
|
|
|
|
tracing::trace!("Restarting: restart_policy is Always or exit was abnormal");
|
2021-09-28 10:53:11 +00:00
|
|
|
t.set(&self.state,
|
|
|
|
if exit_status.is_ok() { State::Complete } else { State::Failed });
|
2021-09-01 15:31:01 +00:00
|
|
|
let now = Instant::now();
|
|
|
|
let oldest_to_keep = now - self.config.period;
|
|
|
|
self.restarts.push_back(now);
|
|
|
|
while let Some(stamp) = self.restarts.front() {
|
|
|
|
if stamp < &oldest_to_keep {
|
|
|
|
self.restarts.pop_front();
|
|
|
|
} else {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
let self_ref = Arc::clone(&self.self_ref);
|
|
|
|
let wait_time = if self.restarts.len() > self.config.intensity {
|
2022-01-07 16:16:20 +00:00
|
|
|
tracing::warn!(?self.config.sleep_time, "Restart intensity exceeded; sleeping");
|
2021-09-01 15:31:01 +00:00
|
|
|
self.config.sleep_time
|
|
|
|
} else {
|
2022-01-07 16:16:20 +00:00
|
|
|
tracing::trace!(?self.config.pause_time, "pausing");
|
2021-09-01 15:31:01 +00:00
|
|
|
self.config.pause_time
|
|
|
|
};
|
2021-09-28 15:10:36 +00:00
|
|
|
t.after(wait_time, move |t| {
|
2021-09-28 10:53:11 +00:00
|
|
|
tracing::trace!("Sending retry trigger");
|
2021-09-01 15:31:01 +00:00
|
|
|
t.message(&self_ref, Protocol::Retry);
|
|
|
|
Ok(())
|
2021-09-28 15:10:36 +00:00
|
|
|
});
|
2021-09-01 15:31:01 +00:00
|
|
|
},
|
2021-08-30 21:41:51 +00:00
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2021-09-28 10:53:11 +00:00
|
|
|
fn message(&mut self, t: &mut Activation, m: Protocol) -> ActorResult {
|
2021-08-30 21:41:51 +00:00
|
|
|
match m {
|
|
|
|
Protocol::BootFunction(b) => {
|
|
|
|
self.boot_fn = Some(b);
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
Protocol::Retry => {
|
|
|
|
self.ensure_started(t)
|
|
|
|
}
|
|
|
|
_ => Ok(())
|
|
|
|
}
|
|
|
|
}
|
2021-09-01 15:31:01 +00:00
|
|
|
|
|
|
|
fn stop(&mut self, _t: &mut Activation) -> ActorResult {
|
|
|
|
let _entry = self.name.enter();
|
|
|
|
tracing::info!("Supervisor terminating");
|
|
|
|
Ok(())
|
|
|
|
}
|
2021-08-30 21:41:51 +00:00
|
|
|
}
|
|
|
|
|
2021-09-28 10:53:11 +00:00
|
|
|
impl Supervisor {
|
|
|
|
pub fn start<C: 'static + Send + FnMut(&mut Activation, State) -> ActorResult,
|
|
|
|
B: 'static + Send + FnMut(&mut Activation) -> ActorResult>(
|
2021-08-30 21:41:51 +00:00
|
|
|
t: &mut Activation,
|
|
|
|
name: tracing::Span,
|
|
|
|
config: SupervisorConfiguration,
|
2021-09-28 10:53:11 +00:00
|
|
|
mut state_cb: C,
|
|
|
|
boot_fn: B,
|
|
|
|
) -> ActorResult {
|
2021-09-01 15:31:01 +00:00
|
|
|
let _entry = name.enter();
|
2021-08-30 21:41:51 +00:00
|
|
|
tracing::trace!(?config);
|
|
|
|
let self_ref = t.create_inert();
|
2021-09-28 15:10:36 +00:00
|
|
|
let state_field = t.named_field("supervisee_state", State::Started);
|
2021-08-30 21:41:51 +00:00
|
|
|
let mut supervisor = Supervisor {
|
|
|
|
self_ref: Arc::clone(&self_ref),
|
2021-09-01 15:31:01 +00:00
|
|
|
name: name.clone(),
|
2021-08-30 21:41:51 +00:00
|
|
|
config,
|
2021-09-28 10:53:11 +00:00
|
|
|
boot_fn: Some(Box::new(boot_fn)),
|
2021-08-30 21:41:51 +00:00
|
|
|
restarts: VecDeque::new(),
|
2021-09-28 10:53:11 +00:00
|
|
|
state: Arc::clone(&state_field),
|
2021-09-01 15:31:01 +00:00
|
|
|
ac_ref: None,
|
2021-08-30 21:41:51 +00:00
|
|
|
};
|
2021-09-28 10:53:11 +00:00
|
|
|
supervisor.ensure_started(t)?;
|
|
|
|
t.dataflow(enclose!((name) move |t| {
|
|
|
|
let state = t.get(&state_field).clone();
|
|
|
|
{
|
|
|
|
let _entry = name.enter();
|
|
|
|
tracing::debug!(?state);
|
|
|
|
}
|
|
|
|
state_cb(t, state)
|
|
|
|
}))?;
|
2021-08-30 21:41:51 +00:00
|
|
|
self_ref.become_entity(supervisor);
|
2021-09-24 14:14:55 +00:00
|
|
|
t.on_stop_notify(&self_ref);
|
2021-09-28 10:53:11 +00:00
|
|
|
Ok(())
|
2021-08-30 21:41:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn ensure_started(&mut self, t: &mut Activation) -> ActorResult {
|
|
|
|
match self.boot_fn.take() {
|
2021-09-01 15:31:01 +00:00
|
|
|
None => {
|
|
|
|
let _entry = self.name.enter();
|
2021-09-28 10:53:11 +00:00
|
|
|
t.set(&self.state, State::Failed);
|
2021-09-01 15:31:01 +00:00
|
|
|
tracing::error!("Cannot restart supervisee, because it panicked at startup")
|
|
|
|
}
|
2021-08-30 21:41:51 +00:00
|
|
|
Some(mut boot_fn) => {
|
|
|
|
let self_ref = Arc::clone(&self.self_ref);
|
|
|
|
t.facet(|t: &mut Activation| {
|
|
|
|
t.assert(&self.self_ref, Protocol::SuperviseeStarted);
|
2021-09-01 15:31:01 +00:00
|
|
|
self.ac_ref = Some(t.spawn_link(
|
|
|
|
crate::name!(parent: &self.name, "supervisee"),
|
|
|
|
move |t| {
|
|
|
|
match boot_fn(t) {
|
|
|
|
Ok(()) => {
|
|
|
|
t.message(&self_ref, Protocol::BootFunction(boot_fn));
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
t.clear();
|
|
|
|
t.message(&self_ref, Protocol::BootFunction(boot_fn));
|
|
|
|
t.deliver();
|
|
|
|
Err(e)
|
|
|
|
}
|
2021-08-30 21:41:51 +00:00
|
|
|
}
|
2021-09-01 15:31:01 +00:00
|
|
|
}));
|
2021-08-30 21:41:51 +00:00
|
|
|
Ok(())
|
|
|
|
})?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|