From 111177675446ab45bc6ebfc7a66080a96fe16115 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 26 Jan 2022 22:30:47 +0100 Subject: [PATCH] Eliminate need for awkward boot_fn transmission subprotocol --- syndicate/src/supervise.rs | 73 ++++++++++---------------------------- 1 file changed, 19 insertions(+), 54 deletions(-) diff --git a/syndicate/src/supervise.rs b/syndicate/src/supervise.rs index ca242f5..5138044 100644 --- a/syndicate/src/supervise.rs +++ b/syndicate/src/supervise.rs @@ -5,6 +5,7 @@ use preserves::value::NestedValue; use std::collections::VecDeque; use std::sync::Arc; +use std::sync::Mutex; use std::time::Duration; use tokio::time::Instant; @@ -12,11 +13,11 @@ use tokio::time::Instant; use crate::actor::*; use crate::schemas::service::State; -pub type Boot = Box ActorResult>; +pub type Boot = Arc ActorResult>>>; +#[derive(Debug)] enum Protocol { SuperviseeStarted, // assertion - BootFunction(Boot), // message Retry, // message } @@ -40,22 +41,12 @@ pub struct Supervisor { my_name: Name, child_name: Name, config: SupervisorConfiguration, - boot_fn: Option, + boot_fn: Boot, restarts: VecDeque, state: Arc>, ac_ref: Option, } -impl std::fmt::Debug for Protocol { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - Protocol::SuperviseeStarted => write!(f, "Protocol::SuperviseeStarted"), - Protocol::BootFunction(_) => write!(f, "Protocol::BootFunction(_)"), - Protocol::Retry => write!(f, "Protocol::Retry"), - } - } -} - impl Default for SupervisorConfiguration { fn default() -> Self { Self { @@ -133,12 +124,8 @@ impl Entity for Supervisor fn message(&mut self, t: &mut Activation, m: Protocol) -> ActorResult { match m { - Protocol::BootFunction(b) => { - self.boot_fn = Some(b); - Ok(()) - } Protocol::Retry => { - self.ensure_started(t) + self.start_now(t) } _ => Ok(()) } @@ -172,13 +159,13 @@ impl Supervisor { my_name: my_name.clone(), child_name: name, config, - boot_fn: Some(Box::new(boot_fn)), + boot_fn: Arc::new(Mutex::new(Box::new(boot_fn))), restarts: VecDeque::new(), state: Arc::clone(&state_field), ac_ref: None, }; tracing::info!(self_ref = ?supervisor.self_ref, "Supervisor starting"); - supervisor.ensure_started(t)?; + supervisor.start_now(t)?; t.dataflow(move |t| { let state = t.get(&state_field).clone(); tracing::debug!(name = ?my_name, ?state); @@ -189,40 +176,18 @@ impl Supervisor { Ok(()) } - fn ensure_started(&mut self, t: &mut Activation) -> ActorResult { - match self.boot_fn.take() { - None => { - t.set(&self.state, State::Failed); - tracing::error!(name = ?self.my_name, - "Cannot restart supervisee, because it panicked at startup") - } - Some(mut boot_fn) => { - let self_ref = Arc::clone(&self.self_ref); - t.facet(|t: &mut Activation| { - t.assert(&self.self_ref, Protocol::SuperviseeStarted); - self.ac_ref = Some(t.spawn_link( - self.child_name.clone(), - move |t| { - match boot_fn(t) { - Ok(()) => { - t.message(&self_ref, Protocol::BootFunction(boot_fn)); - Ok(()) - } - Err(e) => { - t.clear(); - t.message(&self_ref, Protocol::BootFunction(boot_fn)); - t.deliver(); - Err(e) - } - } - })); - tracing::debug!(self_ref = ?self.self_ref, - supervisee = ?self.ac_ref, - "Supervisee started"); - Ok(()) - })?; - } - } + fn start_now(&mut self, t: &mut Activation) -> ActorResult { + let boot_cell = Arc::clone(&self.boot_fn); + t.facet(|t: &mut Activation| { + t.assert(&self.self_ref, Protocol::SuperviseeStarted); + self.ac_ref = Some(t.spawn_link( + self.child_name.clone(), + move |t| boot_cell.lock().expect("Unpoisoned boot_fn mutex")(t))); + tracing::debug!(self_ref = ?self.self_ref, + supervisee = ?self.ac_ref, + "Supervisee started"); + Ok(()) + })?; Ok(()) } }