Further simplify supervision protocols

This commit is contained in:
Tony Garnock-Jones 2022-01-26 23:37:43 +01:00
parent 70c442ad47
commit 4ea07cdd6b
1 changed files with 11 additions and 36 deletions

View File

@ -15,12 +15,6 @@ use crate::schemas::service::State;
pub type Boot = Arc<Mutex<Box<dyn Send + FnMut(&mut Activation) -> ActorResult>>>;
#[derive(Debug)]
enum Protocol {
SuperviseeStarted, // assertion
Retry, // message
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RestartPolicy {
Always,
@ -36,9 +30,11 @@ pub struct SupervisorConfiguration {
pub restart_policy: RestartPolicy,
}
#[derive(Debug)]
struct StartNow;
pub struct Supervisor {
self_ref: Arc<Ref<Protocol>>,
my_name: Name,
self_ref: Arc<Ref<StartNow>>,
child_name: Name,
config: SupervisorConfiguration,
boot_fn: Boot,
@ -68,17 +64,13 @@ impl SupervisorConfiguration {
}
}
impl Entity<Protocol> for Supervisor
impl Entity<StartNow> for Supervisor
{
fn assert(&mut self, t: &mut Activation, m: Protocol, _h: Handle) -> ActorResult {
match m {
Protocol::SuperviseeStarted => t.set(&self.state, State::Started),
_ => Err(format!("Unexpected assertion: {:?}", m).as_str())?,
}
Ok(())
fn message(&mut self, t: &mut Activation, _m: StartNow) -> ActorResult {
self.start_now(t)
}
fn retract(&mut self, t: &mut Activation, _h: Handle) -> ActorResult {
fn stop(&mut self, t: &mut Activation) -> ActorResult {
let _entry = tracing::info_span!("supervisor", name = ?self.child_name).entered();
let exit_status =
self.ac_ref.take().expect("valid supervisee ActorRef")
@ -114,29 +106,13 @@ impl Entity<Protocol> for Supervisor
};
t.after(wait_time, move |t| {
tracing::trace!("Sending retry trigger");
t.message(&self_ref, Protocol::Retry);
t.message(&self_ref, StartNow);
Ok(())
});
},
}
Ok(())
}
fn message(&mut self, t: &mut Activation, m: Protocol) -> ActorResult {
match m {
Protocol::Retry => {
self.start_now(t)
}
_ => Ok(())
}
}
fn stop(&mut self, _t: &mut Activation) -> ActorResult {
tracing::info!(name = ?self.my_name,
self_ref = ?self.self_ref,
"Supervisor terminating");
Ok(())
}
}
impl Supervisor {
@ -156,7 +132,6 @@ impl Supervisor {
|n| preserves::rec![AnyValue::symbol("supervisor"), n.clone()]);
let mut supervisor = Supervisor {
self_ref: Arc::clone(&self_ref),
my_name: my_name.clone(),
child_name: name,
config,
boot_fn: Arc::new(Mutex::new(Box::new(boot_fn))),
@ -172,14 +147,13 @@ impl Supervisor {
state_cb(t, state)
})?;
self_ref.become_entity(supervisor);
t.on_stop_notify(&self_ref);
Ok(())
}
fn start_now(&mut self, t: &mut Activation) -> ActorResult {
let boot_cell = Arc::clone(&self.boot_fn);
t.facet(|t: &mut Activation| {
t.assert(&self.self_ref, Protocol::SuperviseeStarted);
t.on_stop_notify(&self.self_ref);
self.ac_ref = Some(t.spawn_link(
self.child_name.clone(),
move |t| boot_cell.lock().expect("Unpoisoned boot_fn mutex")(t)));
@ -188,6 +162,7 @@ impl Supervisor {
"Supervisee started");
Ok(())
})?;
t.set(&self.state, State::Started);
Ok(())
}
}