From d8c3e37d17d42fffad7b92724ff6c7c0d5e8d140 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 30 Aug 2021 23:41:51 +0200 Subject: [PATCH] Supervision; delayed actions; better tracing (incl `M: Debug`); linked task release --- .../src/services/config_watcher.rs | 2 - .../src/services/tcp_relay_listener.rs | 10 +- syndicate/Cargo.toml | 2 +- syndicate/src/actor.rs | 78 ++++++-- syndicate/src/lib.rs | 1 + syndicate/src/supervise.rs | 180 ++++++++++++++++++ 6 files changed, 255 insertions(+), 18 deletions(-) create mode 100644 syndicate/src/supervise.rs diff --git a/syndicate-server/src/services/config_watcher.rs b/syndicate-server/src/services/config_watcher.rs index ca902c8..8d31e18 100644 --- a/syndicate-server/src/services/config_watcher.rs +++ b/syndicate-server/src/services/config_watcher.rs @@ -64,7 +64,6 @@ fn assertions_at_existing_file(t: &mut Activation, ds: &Arc, path: &PathBuf } for value in values.into_iter() { if let Some(handle) = ds.assert(t, value.clone()) { - tracing::debug!("asserted {:?} -> {:?}", value, handle); handles.insert(handle); } } @@ -190,7 +189,6 @@ fn run(t: &mut Activation, ds: Arc, captures: AnyValue) -> ActorResult { } } for h in to_retract.into_iter() { - tracing::debug!("retract {:?}", h); t.retract(h); } Ok(()) diff --git a/syndicate-server/src/services/tcp_relay_listener.rs b/syndicate-server/src/services/tcp_relay_listener.rs index fb7b0dd..d3898f2 100644 --- a/syndicate-server/src/services/tcp_relay_listener.rs +++ b/syndicate-server/src/services/tcp_relay_listener.rs @@ -5,6 +5,7 @@ use syndicate::actor::*; use syndicate::convert::*; use syndicate::during::entity; use syndicate::schemas::dataspace::Observe; +use syndicate::supervise::{Supervisor, SupervisorConfiguration}; use syndicate::value::NestedValue; use tokio::net::TcpListener; @@ -17,11 +18,14 @@ pub fn on_demand(t: &mut Activation, ds: Arc, gateway: Arc) { let monitor = entity(()) .on_asserted_facet({ let ds = Arc::clone(&ds); - move |_, t, captures| { + move |_, t, captures: AnyValue| { let ds = Arc::clone(&ds); let gateway = Arc::clone(&gateway); - t.spawn_link(syndicate::name!(parent: None, "relay", addr = ?captures), - |t| run(t, ds, gateway, captures)); + Supervisor::start( + t, + syndicate::name!(parent: None, "relay", addr = ?captures), + SupervisorConfiguration::default(), + move |t| run(t, Arc::clone(&ds), Arc::clone(&gateway), captures.clone())); Ok(()) } }) diff --git a/syndicate/Cargo.toml b/syndicate/Cargo.toml index 4312d58..6e4c0e0 100644 --- a/syndicate/Cargo.toml +++ b/syndicate/Cargo.toml @@ -19,7 +19,7 @@ preserves-schema = "0.8.0" preserves = "0.20.0" preserves-schema = "0.8.0" -tokio = { version = "1.10.0", features = ["io-util", "macros", "rt", "rt-multi-thread"] } +tokio = { version = "1.10.0", features = ["io-util", "macros", "rt", "rt-multi-thread", "time"] } tokio-util = "0.6.7" bytes = "1.0.1" diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index 9a0bb71..09da2c3 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -29,6 +29,7 @@ use std::sync::Mutex; use std::sync::RwLock; use std::sync::Weak; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; +use std::time; use tokio::select; use tokio::sync::Notify; @@ -69,8 +70,12 @@ pub type ActorResult = Result<(), Error>; /// the actor's mainloop task. pub type ActorHandle = tokio::task::JoinHandle; +/// The type of the "disarm" function returned from [`Activation::prevent_inert_check`]. +pub type DisarmFn = Box; + /// A small protocol for indicating successful synchronisation with /// some peer; see [Entity::sync]. +#[derive(Debug)] pub struct Synced; /// The core metaprotocol implemented by every object. @@ -594,15 +599,19 @@ impl<'activation> Activation<'activation> { /// Core API: assert `a` at recipient `r`. /// /// Returns the [`Handle`] for the new assertion. - pub fn assert(&mut self, r: &Arc>, a: M) -> Handle { + pub fn assert(&mut self, r: &Arc>, a: M) -> Handle { let handle = next_handle(); if let Some(f) = self.active_facet() { + tracing::trace!(?r, ?handle, ?a, "assert"); f.insert_retract_cleanup_action(&r, handle); drop(f); { let r = Arc::clone(r); self.pending.queue_for(&r).push(Box::new( - move |t| t.with_entity(&r, |t, e| e.assert(t, a, handle)))); + move |t| t.with_entity(&r, |t, e| { + tracing::trace!(?handle, ?a, "asserted"); + e.assert(t, a, handle) + }))); } } handle @@ -622,22 +631,29 @@ impl<'activation> Activation<'activation> { /// # Panics /// /// Panics if `r` is not part of the active actor. - pub fn assert_for_myself(&mut self, r: &Arc>, a: M) -> Handle { + pub fn assert_for_myself(&mut self, r: &Arc>, a: M) -> Handle { self.immediate_oid(r); let handle = next_handle(); if let Some(f) = self.active_facet() { + tracing::trace!(?r, ?handle, ?a, "assert_for_myself"); { let r = Arc::clone(r); f.cleanup_actions.insert( handle, CleanupAction::ForMyself(Box::new( - move |t| t.with_entity(&r, |t, e| e.retract(t, handle))))); + move |t| t.with_entity(&r, |t, e| { + tracing::trace!(?handle, "retracted"); + e.retract(t, handle) + })))); } drop(f); { let r = Arc::clone(r); self.pending.for_myself.push(Box::new( - move |t| t.with_entity(&r, |t, e| e.assert(t, a, handle)))); + move |t| t.with_entity(&r, |t, e| { + tracing::trace!(?handle, ?a, "asserted"); + e.assert(t, a, handle) + }))); } } handle @@ -646,6 +662,7 @@ impl<'activation> Activation<'activation> { fn half_link(&mut self, t_other: &mut Activation) { let entity_ref = t_other.create::(StopOnRetract); let handle = next_handle(); + tracing::trace!(?handle, ?entity_ref, "half_link"); self.active_facet().unwrap().insert_retract_cleanup_action(&entity_ref, handle); t_other.with_entity(&entity_ref, |t, e| e.assert(t, AnyValue::new(true), handle)).unwrap(); } @@ -660,10 +677,14 @@ impl<'activation> Activation<'activation> { } /// Core API: send message `m` to recipient `r`. - pub fn message(&mut self, r: &Arc>, m: M) { + pub fn message(&mut self, r: &Arc>, m: M) { + tracing::trace!(?r, ?m, "message"); let r = Arc::clone(r); self.pending.queue_for(&r).push(Box::new( - move |t| t.with_entity(&r, |t, e| e.message(t, m)))) + move |t| t.with_entity(&r, |t, e| { + tracing::trace!(?m, "delivered"); + e.message(t, m) + }))) } /// Core API: send message `m` to recipient `r`, which must be a @@ -766,6 +787,7 @@ impl<'activation> Activation<'activation> { boot: F, ) { let mailbox = self.state.mailbox(); + let facet = self.facet.clone(); if let Some(f) = self.active_facet() { let token = CancellationToken::new(); let task_id = NEXT_TASK_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed); @@ -774,7 +796,7 @@ impl<'activation> Activation<'activation> { let token = token.clone(); tokio::spawn(async move { tracing::trace!(task_id, "linked task start"); - select! { + let result = select! { _ = token.cancelled() => { tracing::trace!(task_id, "linked task cancelled"); Ok(()) @@ -793,13 +815,40 @@ impl<'activation> Activation<'activation> { } result } - } + }; + let _ = facet.activate( + Account::new(crate::name!("release_linked_task")), |t| { + if let Some(f) = t.active_facet() { + tracing::trace!(task_id, "cancellation token removed"); + f.linked_tasks.remove(&task_id); + } + Ok(()) + }); + result }.instrument(name)); } f.linked_tasks.insert(task_id, token); } } + /// Executes the given action after the given duration has elapsed (so long as the active + /// facet still exists at that time). + pub fn after(&mut self, duration: time::Duration, a: Action) { + self.at(time::Instant::now() + duration, a) + } + + /// Executes the given action at the given instant (so long as the active facet still + /// exists at that time). + pub fn at>(&mut self, instant: I, a: Action) { + let facet = self.facet.clone(); + let account = Arc::clone(self.account()); + let instant = instant.into(); + self.linked_task(crate::name!("Activation::at"), async move { + tokio::time::sleep_until(instant.into()).await; + facet.activate(account, a) + }); + } + fn enqueue_for_myself_at_commit(&mut self, action: Action) { let mailbox = self.state.mailbox(); self.pending.queue_for_mailbox(&mailbox).push(action); @@ -868,7 +917,7 @@ impl<'activation> Activation<'activation> { /// its life, the Dataspace actor will have no outbound assertions, no child facets, and no /// linked tasks, so the only way to prevent it from being prematurely garbage collected is /// to use `prevent_inert_check` in its boot function. - pub fn prevent_inert_check(&mut self) -> Box { + pub fn prevent_inert_check(&mut self) -> DisarmFn { if let Some(f) = self.active_facet() { Box::new(f.prevent_inert_check()) } else { @@ -986,6 +1035,7 @@ impl EventBuffer { } fn deliver(&mut self) { + tracing::trace!("EventBuffer::deliver"); if !self.for_myself.is_empty() { panic!("Unprocessed for_myself events remain at deliver() time"); } @@ -1223,7 +1273,8 @@ impl Actor { if r.is_err() { return; } } SystemMessage::Crash(e) => { - tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::Crash({:?})", &e); + tracing::trace!(actor_id = ?self.ac_ref.actor_id, + "SystemMessage::Crash({:?})", &e); return terminate(Err(e)); } } @@ -1264,7 +1315,10 @@ impl Facet { self.cleanup_actions.insert( handle, CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new( - move |t| t.with_entity(&r, |t, e| e.retract(t, handle))))); + move |t| t.with_entity(&r, |t, e| { + tracing::trace!(?handle, "retracted"); + e.retract(t, handle) + })))); } } diff --git a/syndicate/src/lib.rs b/syndicate/src/lib.rs index 9410e77..b261e28 100644 --- a/syndicate/src/lib.rs +++ b/syndicate/src/lib.rs @@ -16,6 +16,7 @@ pub mod error; pub mod pattern; pub mod relay; pub mod rewrite; +pub mod supervise; pub mod schemas { //! Auto-generated codecs for [Syndicate protocol diff --git a/syndicate/src/supervise.rs b/syndicate/src/supervise.rs new file mode 100644 index 0000000..2b1d0f6 --- /dev/null +++ b/syndicate/src/supervise.rs @@ -0,0 +1,180 @@ +//! Extremely simple single-actor supervision. Vastly simplified compared to the available +//! options in [Erlang/OTP](https://erlang.org/doc/man/supervisor.html). + +use std::collections::VecDeque; +use std::sync::Arc; +use std::time::Duration; + +use tokio::time::Instant; + +use crate::actor::*; + +enum Protocol ActorResult> { + SuperviseeStarted, // assertion + BootFunction(Boot), // message + Retry, // message +} + +#[derive(Debug)] +pub struct SupervisorConfiguration { + pub intensity: usize, + pub period: Duration, + pub pause_time: Duration, + pub sleep_time: Duration, +} + +pub struct Supervisor ActorResult> { + self_ref: Arc>>, + name: tracing::Span, + config: SupervisorConfiguration, + boot_fn: Option, + restarts: VecDeque, + supervisee: Supervisee, +} + +#[derive(Debug, PartialEq, Eq)] +enum Supervisee { + NotRunning, + Booting, + Running, + Recovering, +} + +impl ActorResult> std::fmt::Debug for Protocol { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Protocol::SuperviseeStarted => write!(f, "Protocol::SuperviseeStarted"), + Protocol::BootFunction(_) => write!(f, "Protocol::BootFunction(_)"), + Protocol::Retry => write!(f, "Protocol::Retry"), + } + } +} + +impl Default for SupervisorConfiguration { + fn default() -> Self { + Self { + intensity: 1, + period: Duration::from_secs(5), + pause_time: Duration::from_millis(200), + sleep_time: Duration::from_secs(10), + } + } +} + +impl ActorResult> + Entity> for Supervisor +{ + fn assert(&mut self, _t: &mut Activation, m: Protocol, _h: Handle) -> ActorResult { + match m { + Protocol::SuperviseeStarted => self.enter_state(Supervisee::Booting), + _ => (), + } + Ok(()) + } + + fn retract(&mut self, t: &mut Activation, _h: Handle) -> ActorResult { + self.enter_state(Supervisee::Recovering); + let now = Instant::now(); + let oldest_to_keep = now - self.config.period; + self.restarts.push_back(now); + while let Some(stamp) = self.restarts.front() { + if stamp < &oldest_to_keep { + self.restarts.pop_front(); + } else { + break; + } + } + let self_ref = Arc::clone(&self.self_ref); + let wait_time = if self.restarts.len() > self.config.intensity { + self.config.sleep_time + } else { + self.config.pause_time + }; + t.after(wait_time, Box::new(move |t| { + t.message(&self_ref, Protocol::Retry); + Ok(()) + })); + Ok(()) + } + + fn message(&mut self, t: &mut Activation, m: Protocol) -> ActorResult { + match m { + Protocol::BootFunction(b) => { + self.enter_state(Supervisee::Running); + self.boot_fn = Some(b); + Ok(()) + } + Protocol::Retry => { + self.enter_state(Supervisee::NotRunning); + self.ensure_started(t) + } + _ => Ok(()) + } + } +} + +impl ActorResult> Supervisor { + pub fn start( + t: &mut Activation, + name: tracing::Span, + config: SupervisorConfiguration, + boot_fn: Boot, + ) { + tracing::trace!(?config); + let self_ref = t.create_inert(); + let mut supervisor = Supervisor { + self_ref: Arc::clone(&self_ref), + name, + config, + boot_fn: Some(boot_fn), + restarts: VecDeque::new(), + supervisee: Supervisee::NotRunning, + }; + + // In cases where we are the only Entity in our Facet, and our + // supervisee terminates, we will often be "inert" until we + // can restart it. So we prevent_inert_check to signal to the + // system that there's something going on for that moment of + // time between the supervisee terminating and our responding + // to it. + let _ = t.prevent_inert_check(); + + supervisor.ensure_started(t).unwrap(); + self_ref.become_entity(supervisor); + } + + fn enter_state(&mut self, supervisee: Supervisee) { + let _entry = self.name.enter(); + tracing::info!("{:?} --> {:?}", self.supervisee, supervisee); + self.supervisee = supervisee; + } + + fn ensure_started(&mut self, t: &mut Activation) -> ActorResult { + match self.boot_fn.take() { + None => + tracing::error!("Cannot restart supervisee, because it panicked at startup"), + Some(mut boot_fn) => { + let self_ref = Arc::clone(&self.self_ref); + t.facet(|t: &mut Activation| { + t.assert(&self.self_ref, Protocol::SuperviseeStarted); + t.spawn_link(crate::name!(parent: &self.name, "supervisee"), move |t| { + match boot_fn(t) { + Ok(()) => { + t.message(&self_ref, Protocol::BootFunction(boot_fn)); + Ok(()) + } + Err(e) => { + t.clear(); + t.message(&self_ref, Protocol::BootFunction(boot_fn)); + t.deliver(); + Err(e) + } + } + }); + Ok(()) + })?; + } + } + Ok(()) + } +}