From 013e99af70a2272be1474400fa2e5b98434bf5e6 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 28 Sep 2021 12:53:11 +0200 Subject: [PATCH] Greatly improve service lifecycle handling --- syndicate-server/examples/consumer.rs | 3 +- syndicate-server/examples/pingpong.rs | 5 +- syndicate-server/examples/producer.rs | 3 +- syndicate-server/examples/state-consumer.rs | 3 +- syndicate-server/examples/state-producer.rs | 3 +- syndicate-server/protocols/schema-bundle.bin | 4 +- .../protocols/schemas/externalServices.prs | 24 +- syndicate-server/src/counter.rs | 21 ++ syndicate-server/src/dependencies.rs | 61 ++--- syndicate-server/src/lifecycle.rs | 37 +++ syndicate-server/src/main.rs | 2 + .../src/services/config_watcher.rs | 27 +- syndicate-server/src/services/daemon.rs | 257 ++++++++++++------ .../src/services/debt_reporter.rs | 6 +- syndicate-server/src/services/milestone.rs | 8 +- .../src/services/tcp_relay_listener.rs | 55 ++-- .../src/services/unix_relay_listener.rs | 42 +-- syndicate/benches/bench_dataspace.rs | 4 +- syndicate/src/actor.rs | 62 +++-- syndicate/src/relay.rs | 12 +- syndicate/src/supervise.rs | 74 ++--- 21 files changed, 451 insertions(+), 262 deletions(-) create mode 100644 syndicate-server/src/counter.rs create mode 100644 syndicate-server/src/lifecycle.rs diff --git a/syndicate-server/examples/consumer.rs b/syndicate-server/examples/consumer.rs index 8ca6c19..ee458f8 100644 --- a/syndicate-server/examples/consumer.rs +++ b/syndicate-server/examples/consumer.rs @@ -63,7 +63,8 @@ async fn main() -> Result<(), Box> { Ok(None) }); Ok(()) - }) + })?; + Ok(LinkedTaskTermination::KeepFacet) })) }).await??; Ok(()) diff --git a/syndicate-server/examples/pingpong.rs b/syndicate-server/examples/pingpong.rs index e8a0441..0a5d131 100644 --- a/syndicate-server/examples/pingpong.rs +++ b/syndicate-server/examples/pingpong.rs @@ -210,14 +210,15 @@ async fn main() -> Result<(), Box> { } external_events(&ds.underlying.mailbox, &account, events)? } - Ok(()) + Ok(LinkedTaskTermination::KeepFacet) }); } Ok(None) }); Ok(()) - }) + })?; + Ok(LinkedTaskTermination::KeepFacet) })) }).await??; Ok(()) diff --git a/syndicate-server/examples/producer.rs b/syndicate-server/examples/producer.rs index 659e387..65a1286 100644 --- a/syndicate-server/examples/producer.rs +++ b/syndicate-server/examples/producer.rs @@ -60,7 +60,8 @@ async fn main() -> Result<(), Box> { Ok(None) }); Ok(()) - }) + })?; + Ok(LinkedTaskTermination::KeepFacet) })) }).await??; Ok(()) diff --git a/syndicate-server/examples/state-consumer.rs b/syndicate-server/examples/state-consumer.rs index 9dbfaa6..362c77b 100644 --- a/syndicate-server/examples/state-consumer.rs +++ b/syndicate-server/examples/state-consumer.rs @@ -84,7 +84,8 @@ async fn main() -> Result<(), Box> { Ok(None) }); Ok(()) - }) + })?; + Ok(LinkedTaskTermination::KeepFacet) })) }).await??; Ok(()) diff --git a/syndicate-server/examples/state-producer.rs b/syndicate-server/examples/state-producer.rs index e03a05f..7537cac 100644 --- a/syndicate-server/examples/state-producer.rs +++ b/syndicate-server/examples/state-producer.rs @@ -56,7 +56,8 @@ async fn main() -> Result<(), Box> { Ok(None) }); Ok(()) - }) + })?; + Ok(LinkedTaskTermination::KeepFacet) })) }).await??; Ok(()) diff --git a/syndicate-server/protocols/schema-bundle.bin b/syndicate-server/protocols/schema-bundle.bin index 008deab..b0f0c1b 100644 --- a/syndicate-server/protocols/schema-bundle.bin +++ b/syndicate-server/protocols/schema-bundle.bin @@ -1,3 +1,3 @@ ´³bundle·µ³externalServices„´³schema·³version‘³ definitions·³Service´³refµ„³ DaemonService„³ClearEnv´³orµµ±present´³dict·³clearEnv´³named³clearEnv´³atom³Boolean„„„„„µ±invalid´³dict·³clearEnv´³named³clearEnv³any„„„„µ±absent´³dict·„„„„„³DaemonId³any³EnvValue´³orµµ±set´³atom³String„„µ±remove´³lit€„„µ±invalid³any„„„³ DaemonDir´³orµµ±present´³dict·³dir´³named³dir´³atom³String„„„„„µ±invalid´³dict·³dir´³named³dir³any„„„„µ±absent´³dict·„„„„„³ DaemonEnv´³orµµ±present´³dict·³env´³named³env´³dictof´³refµ„³ EnvVariable„´³refµ„³EnvValue„„„„„„µ±invalid´³dict·³env´³named³env³any„„„„µ±absent´³dict·„„„„„³ -DaemonSpec´³orµµ±simple´³refµ„³ CommandLine„„µ±full´³refµ„³FullDaemonSpec„„„„³ CommandLine´³orµµ±shell´³atom³String„„µ±full´³refµ„³FullCommandLine„„„„³ EnvVariable´³orµµ±string´³atom³String„„µ±symbol´³atom³Symbol„„µ±invalid³any„„„³ DaemonProcess´³rec´³lit³daemon„´³tupleµ´³named³id´³refµ„³DaemonId„„´³named³config´³refµ„³ -DaemonSpec„„„„„³ DaemonService´³rec´³lit³daemon„´³tupleµ´³named³id´³refµ„³DaemonId„„„„„³FullDaemonSpec´³andµ´³dict·³argv´³named³argv´³refµ„³ CommandLine„„„„´³named³env´³refµ„³ DaemonEnv„„´³named³dir´³refµ„³ DaemonDir„„´³named³clearEnv´³refµ„³ClearEnv„„„„³FullCommandLine´³ tuplePrefixµ´³named³program´³atom³String„„„´³named³args´³seqof´³atom³String„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³internalServices„´³schema·³version‘³ definitions·³ Milestone´³rec´³lit³ milestone„´³tupleµ´³named³name³any„„„„³ DebtReporter´³lit³ debt-reporter„³ ConfigWatcher´³rec´³lit³config-watcher„´³tupleµ´³named³path´³atom³String„„„„„³TcpRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Tcp„„„„„³UnixRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Unix„„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„„„ \ No newline at end of file +DaemonSpec´³orµµ±simple´³refµ„³ CommandLine„„µ±full´³refµ„³FullDaemonSpec„„„„³ CommandLine´³orµµ±shell´³atom³String„„µ±full´³refµ„³FullCommandLine„„„„³ EnvVariable´³orµµ±string´³atom³String„„µ±symbol´³atom³Symbol„„µ±invalid³any„„„³ ReadyOnStart´³orµµ±present´³dict·³ readyOnStart´³named³ readyOnStart´³atom³Boolean„„„„„µ±invalid´³dict·³ readyOnStart´³named³ readyOnStart³any„„„„µ±absent´³dict·„„„„„³ RestartField´³orµµ±present´³dict·³restart´³named³restart´³refµ„³ RestartPolicy„„„„„µ±invalid´³dict·³restart´³named³restart³any„„„„µ±absent´³dict·„„„„„³ DaemonProcess´³rec´³lit³daemon„´³tupleµ´³named³id´³refµ„³DaemonId„„´³named³config´³refµ„³ +DaemonSpec„„„„„³ DaemonService´³rec´³lit³daemon„´³tupleµ´³named³id´³refµ„³DaemonId„„„„„³ RestartPolicy´³orµµ±always´³lit³always„„µ±onError´³lit³on-error„„µ±all´³lit³all„„„„³FullDaemonSpec´³andµ´³dict·³argv´³named³argv´³refµ„³ CommandLine„„„„´³named³env´³refµ„³ DaemonEnv„„´³named³dir´³refµ„³ DaemonDir„„´³named³clearEnv´³refµ„³ClearEnv„„´³named³ readyOnStart´³refµ„³ ReadyOnStart„„´³named³restart´³refµ„³ RestartField„„„„³FullCommandLine´³ tuplePrefixµ´³named³program´³atom³String„„„´³named³args´³seqof´³atom³String„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³internalServices„´³schema·³version‘³ definitions·³ Milestone´³rec´³lit³ milestone„´³tupleµ´³named³name³any„„„„³ DebtReporter´³lit³ debt-reporter„³ ConfigWatcher´³rec´³lit³config-watcher„´³tupleµ´³named³path´³atom³String„„„„„³TcpRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Tcp„„„„„³UnixRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Unix„„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„„„ \ No newline at end of file diff --git a/syndicate-server/protocols/schemas/externalServices.prs b/syndicate-server/protocols/schemas/externalServices.prs index 4c21cf7..4108d49 100644 --- a/syndicate-server/protocols/schemas/externalServices.prs +++ b/syndicate-server/protocols/schemas/externalServices.prs @@ -8,13 +8,35 @@ DaemonService = . DaemonProcess = . DaemonId = any . DaemonSpec = @simple CommandLine / @full FullDaemonSpec . -FullDaemonSpec = { argv: CommandLine } & @env DaemonEnv & @dir DaemonDir & @clearEnv ClearEnv . +FullDaemonSpec = + & { argv: CommandLine } + & @env DaemonEnv + & @dir DaemonDir + & @clearEnv ClearEnv + & @readyOnStart ReadyOnStart + & @restart RestartField +. DaemonEnv = @present { env: { EnvVariable: EnvValue ...:... } } / @invalid { env: any } / @absent {} . DaemonDir = @present { dir: string } / @invalid { dir: any } / @absent {} . ClearEnv = @present { clearEnv: bool } / @invalid { clearEnv: any } / @absent {} . +ReadyOnStart = @present { readyOnStart: bool } / @invalid { readyOnStart: any } / @absent {} . +RestartField = @present { restart: RestartPolicy } / @invalid { restart: any } / @absent {} . CommandLine = @shell string / @full FullCommandLine . FullCommandLine = [@program string, @args string ...] . EnvVariable = @string string / @symbol symbol / @invalid any . EnvValue = @set string / @remove #f / @invalid any . + +RestartPolicy = +/ ; Whether the process terminates normally or abnormally, restart it + ; without affecting any peer processes within the service. + =always +/ ; If the process terminates normally, leave everything alone; if it + ; terminates abnormally, restart it without affecting peers. + @onError =on-error +/ ; If the process terminates normally, leave everything alone; if it + ; terminates abnormally, restart the whole daemon (all processes + ; within the daemon). + =all +. diff --git a/syndicate-server/src/counter.rs b/syndicate-server/src/counter.rs new file mode 100644 index 0000000..385e1e8 --- /dev/null +++ b/syndicate-server/src/counter.rs @@ -0,0 +1,21 @@ +use std::sync::Arc; + +use syndicate::actor::*; + +pub fn adjust(t: &mut Activation, f: &Arc>, delta: isize) { + let f = f.clone(); + *t.get_mut(&f) += delta; + t.on_stop(move |t| { + *t.get_mut(&f) -= delta; + Ok(()) + }); +} + +pub fn sync_and_adjust(t: &mut Activation, r: &Arc>, f: &Arc>, delta: isize) { + let f = f.clone(); + let sync_handler = t.create(move |t: &mut Activation| { + *t.get_mut(&f) += delta; + Ok(()) + }); + t.sync(r, sync_handler) +} diff --git a/syndicate-server/src/dependencies.rs b/syndicate-server/src/dependencies.rs index 9d2c7d6..662282b 100644 --- a/syndicate-server/src/dependencies.rs +++ b/syndicate-server/src/dependencies.rs @@ -9,6 +9,7 @@ use syndicate::schemas::dataspace::Observe; use syndicate::schemas::service; use syndicate::value::NestedValue; +use crate::counter; use crate::language::language; use crate::schemas::internal_services; @@ -16,17 +17,6 @@ use syndicate_macros::during; pub fn boot(t: &mut Activation, ds: Arc) { t.spawn(syndicate::name!("tracker", module = module_path!()), move |t| { - enclose!((ds) during!(t, ds, language(), , |t: &mut Activation| { - ds.assert(t, language(), &service::ServiceDependency { - depender: s, - dependee: service::Dependee::ServiceRunning(Box::new(service::ServiceRunning { - service_name: language().unparse(&internal_services::Milestone { - name: m, - }), - })), - }); - Ok(()) - })); Ok(during!(t, ds, language(), , |t: &mut Activation| { tracing::info!(?spec, "tracking dependencies"); t.spawn_link(syndicate::name!(parent: None, "tracker", spec = ?spec), @@ -37,18 +27,15 @@ pub fn boot(t: &mut Activation, ds: Arc) { } fn run(t: &mut Activation, ds: Arc, service_name: AnyValue) -> ActorResult { - ds.assert(t, language(), &service::ServiceStarted { - service_name: service_name.clone(), - }); - if !service_name.value().is_simple_record("milestone", Some(1)) { let core_dep = service::ServiceDependency { depender: service_name.clone(), - dependee: service::Dependee::ServiceRunning(Box::new(service::ServiceRunning { + dependee: service::ServiceState { service_name: language().unparse(&internal_services::Milestone { name: AnyValue::symbol("core"), }), - })), + state: service::State::Ready, + }, }; let milestone_monitor = entity(ds.assert(t, language(), &core_dep)) .on_asserted(enclose!((ds) move |handle, t, _captures: AnyValue| { @@ -60,12 +47,12 @@ fn run(t: &mut Activation, ds: Arc, service_name: AnyValue) -> ActorResult })) .create_cap(t); ds.assert(t, language(), &Observe { - pattern: syndicate_macros::pattern!{}, + pattern: syndicate_macros::pattern!{}, observer: milestone_monitor, }); } - let obstacle_count = t.field(1u64); + let obstacle_count = t.field(1isize); t.dataflow(enclose!((obstacle_count) move |t| { tracing::trace!(obstacle_count = ?t.get(&obstacle_count)); Ok(()) @@ -89,33 +76,27 @@ fn run(t: &mut Activation, ds: Arc, service_name: AnyValue) -> ActorResult enclose!((ds, obstacle_count) during!( t, ds, language(), , enclose!((ds, obstacle_count) move |t: &mut Activation| { - if let Ok(dependee) = language().parse::(&dependee) { + if let Ok(dependee) = language().parse::(&dependee) { tracing::trace!(on = ?dependee, "new dependency"); - *t.get_mut(&obstacle_count) += 1; - t.on_stop(enclose!((obstacle_count) move |t| { *t.get_mut(&obstacle_count) -= 1; Ok(()) })); ds.assert(t, language(), &service::RequireService { - service_name: match &dependee { - service::Dependee::ServiceStarted(b) => b.service_name.clone(), - service::Dependee::ServiceRunning(b) => b.service_name.clone(), - }, + service_name: dependee.service_name, }); - let d = dependee.clone(); - during!(t, ds, language(), #(language().unparse(&d)), - enclose!((obstacle_count, dependee) move |t: &mut Activation| { - tracing::trace!(on = ?dependee, "dependency satisfied"); - *t.get_mut(&obstacle_count) -= 1; - t.on_stop(move |t| { *t.get_mut(&obstacle_count) += 1; Ok(()) }); - Ok(()) - })); + } else { + tracing::warn!(on = ?dependee, "cannot deduce dependee service name"); } + + counter::adjust(t, &obstacle_count, 1); + + let d = dependee.clone(); + during!(t, ds, language(), #(d), enclose!( + (obstacle_count, dependee) move |t: &mut Activation| { + tracing::trace!(on = ?dependee, "dependency satisfied"); + counter::adjust(t, &obstacle_count, -1); + Ok(()) + })); Ok(()) }))); - let initial_sync_handler = t.create(enclose!((obstacle_count) move |t: &mut Activation| { - *t.get_mut(&obstacle_count) -= 1; - Ok(()) - })); - t.sync(&ds.underlying, initial_sync_handler); - + counter::sync_and_adjust(t, &ds.underlying, &obstacle_count, -1); Ok(()) } diff --git a/syndicate-server/src/lifecycle.rs b/syndicate-server/src/lifecycle.rs new file mode 100644 index 0000000..818a3fa --- /dev/null +++ b/syndicate-server/src/lifecycle.rs @@ -0,0 +1,37 @@ +use std::sync::Arc; + +use syndicate::actor::*; +use syndicate::schemas::service::*; +use syndicate::preserves_schema::support::Unparse; + +use crate::language::Language; +use crate::language::language; + +pub fn updater<'a, N: Clone + Unparse<&'a Language, AnyValue>>( + ds: Arc, + name: N, +) -> impl FnMut(&mut Activation, State) -> ActorResult { + let mut handle = None; + move |t, state| { + ds.update(t, &mut handle, language(), Some(&lifecycle(&name, state))); + Ok(()) + } +} + +pub fn lifecycle<'a, N: Unparse<&'a Language, AnyValue>>( + service_name: &N, + state: State, +) -> ServiceState { + ServiceState { + service_name: service_name.unparse(language()), + state, + } +} + +pub fn started<'a, N: Unparse<&'a Language, AnyValue>>(service_name: &N) -> ServiceState { + lifecycle(service_name, State::Started) +} + +pub fn ready<'a, N: Unparse<&'a Language, AnyValue>>(service_name: &N) -> ServiceState { + lifecycle(service_name, State::Ready) +} diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index 10acf80..8587471 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -14,9 +14,11 @@ use syndicate::schemas::transport_address; use syndicate::value::NestedValue; +mod counter; mod dependencies; mod gatekeeper; mod language; +mod lifecycle; mod protocol; mod services; diff --git a/syndicate-server/src/services/config_watcher.rs b/syndicate-server/src/services/config_watcher.rs index c19c1bd..3433418 100644 --- a/syndicate-server/src/services/config_watcher.rs +++ b/syndicate-server/src/services/config_watcher.rs @@ -3,8 +3,6 @@ use notify::Watcher; use notify::RecursiveMode; use notify::watcher; -use preserves_schema::Codec; - use std::fs; use std::future; use std::io; @@ -16,6 +14,7 @@ use std::time::Duration; use syndicate::actor::*; use syndicate::enclose; +use syndicate::supervise::{Supervisor, SupervisorConfiguration}; use syndicate::value::BinarySource; use syndicate::value::IOBinarySource; use syndicate::value::Map; @@ -26,18 +25,21 @@ use syndicate::value::Set; use syndicate::value::ViaCodec; use crate::language::language; +use crate::lifecycle; use crate::schemas::internal_services; use syndicate_macros::during; pub fn on_demand(t: &mut Activation, ds: Arc) { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { - Ok(during!(t, ds, language(), , - |t: &mut Activation| { - t.spawn_link(syndicate::name!(parent: None, "config", spec = ?spec), - enclose!((ds) |t| run(t, ds, spec))); - Ok(()) - })) + Ok(during!(t, ds, language(), , |t| { + Supervisor::start( + t, + syndicate::name!(parent: None, "config", spec = ?spec), + SupervisorConfiguration::default(), + enclose!((ds, spec) lifecycle::updater(ds, spec)), + enclose!((ds) move |t| enclose!((ds, spec) run(t, ds, spec)))) + })) }); } @@ -137,11 +139,7 @@ fn initial_scan( } fn run(t: &mut Activation, ds: Arc, spec: internal_services::ConfigWatcher) -> ActorResult { - { - let spec = language().unparse(&spec); - ds.assert(t, &(), &syndicate_macros::template!("")); - } - let path = fs::canonicalize(spec.path)?; + let path = fs::canonicalize(spec.path.clone())?; tracing::info!("watching {:?}", &path); let (tx, rx) = channel(); @@ -157,6 +155,7 @@ fn run(t: &mut Activation, ds: Arc, spec: internal_services::ConfigWatcher) let root_path = path.clone().into(); facet.activate(Account::new(syndicate::name!("initial_scan")), |t| { initial_scan(t, &mut path_state, &ds, &root_path); + ds.assert(t, language(), &lifecycle::ready(&spec)); Ok(()) }).unwrap(); tracing::trace!("initial_scan complete"); @@ -213,7 +212,7 @@ fn run(t: &mut Activation, ds: Arc, spec: internal_services::ConfigWatcher) t.linked_task(syndicate::name!("cancel-wait"), async move { future::pending::<()>().await; drop(watcher); - Ok(()) + Ok(LinkedTaskTermination::KeepFacet) }); Ok(()) diff --git a/syndicate-server/src/services/daemon.rs b/syndicate-server/src/services/daemon.rs index 36da3e0..cc8b85d 100644 --- a/syndicate-server/src/services/daemon.rs +++ b/syndicate-server/src/services/daemon.rs @@ -4,11 +4,14 @@ use std::sync::Arc; use syndicate::actor::*; use syndicate::enclose; +use syndicate::error::Error; use syndicate::supervise::{Supervisor, SupervisorConfiguration}; use tokio::process; +use crate::counter; use crate::language::language; +use crate::lifecycle; use crate::schemas::external_services::*; use syndicate_macros::during; @@ -16,17 +19,18 @@ use syndicate_macros::during; pub fn on_demand(t: &mut Activation, config_ds: Arc, root_ds: Arc) { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { Ok(during!(t, config_ds, language(), , |t| { - Ok(Supervisor::start( + Supervisor::start( t, syndicate::name!(parent: None, "daemon", service = ?spec), SupervisorConfiguration::default(), + enclose!((config_ds, spec) lifecycle::updater(config_ds, spec)), enclose!((config_ds, root_ds) move |t| - enclose!((config_ds, root_ds, spec) run(t, config_ds, root_ds, spec))))) + enclose!((config_ds, root_ds, spec) run(t, config_ds, root_ds, spec)))) })) }); } -fn cannot_start() -> ActorResult { +fn cannot_start() -> Result { Err("Cannot start daemon process")? } @@ -38,6 +42,8 @@ impl DaemonSpec { env: DaemonEnv::Absent, dir: DaemonDir::Absent, clear_env: ClearEnv::Absent, + ready_on_start: ReadyOnStart::Absent, + restart: RestartField::Absent, }, DaemonSpec::Full(spec) => *spec, } @@ -56,6 +62,79 @@ impl CommandLine { } } +struct DaemonProcessInstance { + name: tracing::Span, + facet: FacetRef, + cmd: process::Command, + announce_presumed_readiness: bool, + unready_configs: Arc>, + restart_policy: RestartPolicy, +} + +impl DaemonProcessInstance { + async fn handle_exit(self, error_message: Option) -> Result { + let delay_ms = if let None = error_message { 200 } else { 1000 }; + let sleep_after_exit = || tokio::time::sleep(std::time::Duration::from_millis(delay_ms)); + Ok(match self.restart_policy { + RestartPolicy::Always => { + sleep_after_exit().await; + self.start()?; + LinkedTaskTermination::Normal + } + RestartPolicy::OnError => { + if let None = error_message { + LinkedTaskTermination::KeepFacet + } else { + sleep_after_exit().await; + self.start()?; + LinkedTaskTermination::Normal + } + } + RestartPolicy::All => { + match error_message { + None => LinkedTaskTermination::KeepFacet, + Some(s) => Err(s.as_str())?, + } + } + }) + } + + fn start(mut self) -> ActorResult { + tracing::trace!("DaemonProcessInstance start (outer)"); + self.facet.clone().activate( + Account::new(syndicate::name!(parent: self.name.clone(), "instance")), |t| { + tracing::trace!("DaemonProcessInstance start (inner)"); + t.facet(|t| { + tracing::trace!(cmd = ?self.cmd, "starting"); + let mut child = match self.cmd.spawn() { + Ok(child) => child, + Err(e) => { + tracing::info!(spawn_err = ?e); + t.linked_task(syndicate::name!(parent: self.name.clone(), "fail"), + self.handle_exit(Some(format!("{}", e)))); + return Ok(()); + } + }; + tracing::info!(pid = ?child.id(), cmd = ?self.cmd, "started"); + + if self.announce_presumed_readiness { + counter::adjust(t, &self.unready_configs, -1); + } + + t.linked_task(syndicate::name!(parent: self.name.clone(), "wait"), async move { + tracing::trace!("waiting for process exit"); + let status = child.wait().await?; + tracing::info!(?status); + self.handle_exit( + if status.success() { None } else { Some(format!("{}", status)) }).await + }); + Ok(()) + })?; + Ok(()) + }) + } +} + fn run( t: &mut Activation, config_ds: Arc, @@ -64,92 +143,118 @@ fn run( ) -> ActorResult { let spec = language().unparse(&service); + let unready_configs = t.field(1isize); + t.dataflow({ + let mut handle = None; + let ready = lifecycle::ready(&spec); + enclose!((config_ds, unready_configs) move |t| { + let busy_count = *t.get(&unready_configs); + tracing::trace!(?busy_count); + config_ds.update(t, &mut handle, language(), if busy_count == 0 { Some(&ready) } else { None }); + Ok(()) + }) + })?; + Ok(during!(t, config_ds, language(), , { - let spec = spec.clone(); - let config_ds = Arc::clone(&config_ds); - |t: &mut Activation| match language().parse::(&config) { - Ok(config) => { - tracing::info!(?config); - let config = config.elaborate(); - let facet = t.facet.clone(); - t.linked_task(syndicate::name!("subprocess"), async move { - let argv = config.argv.elaborate(); - let mut cmd = process::Command::new(argv.program); - cmd.args(argv.args); - match config.dir { - DaemonDir::Present { dir } => { cmd.current_dir(dir); () }, - DaemonDir::Absent => (), - DaemonDir::Invalid { dir } => { - tracing::error!(?dir, "Invalid working directory"); - return cannot_start(); + let unready_configs = unready_configs.clone(); + |t: &mut Activation| { + counter::adjust(t, &unready_configs, 1); + + match language().parse::(&config) { + Ok(config) => { + tracing::info!(?config); + let config = config.elaborate(); + let facet = t.facet.clone(); + t.linked_task(syndicate::name!("subprocess"), async move { + let argv = config.argv.elaborate(); + let mut cmd = process::Command::new(argv.program); + cmd.args(argv.args); + match config.dir { + DaemonDir::Present { dir } => { cmd.current_dir(dir); () }, + DaemonDir::Absent => (), + DaemonDir::Invalid { dir } => { + tracing::error!(?dir, "Invalid working directory"); + return cannot_start(); + } } - } - match config.clear_env { - ClearEnv::Present { clear_env: true } => { cmd.env_clear(); () }, - ClearEnv::Present { clear_env: false } => (), - ClearEnv::Absent => (), - ClearEnv::Invalid { clear_env } => { - tracing::error!(?clear_env, "Invalid clearEnv setting"); - return cannot_start(); + match config.clear_env { + ClearEnv::Present { clear_env: true } => { cmd.env_clear(); () }, + ClearEnv::Present { clear_env: false } => (), + ClearEnv::Absent => (), + ClearEnv::Invalid { clear_env } => { + tracing::error!(?clear_env, "Invalid clearEnv setting"); + return cannot_start(); + } } - } - match config.env { - DaemonEnv::Present { env } => { - for (k, v) in env { - if let Some(env_variable) = match k { - EnvVariable::String(k) => Some(k), - EnvVariable::Symbol(k) => Some(k), - EnvVariable::Invalid(env_variable) => { - tracing::error!(?env_variable, - "Invalid environment variable name"); - return cannot_start(); - } - } { - match v { - EnvValue::Set(value) => { cmd.env(env_variable, value); () } - EnvValue::Remove => { cmd.env_remove(env_variable); () } - EnvValue::Invalid(value) => { - tracing::error!(?env_variable, ?value, - "Invalid environment variable value"); + match config.env { + DaemonEnv::Present { env } => { + for (k, v) in env { + if let Some(env_variable) = match k { + EnvVariable::String(k) => Some(k), + EnvVariable::Symbol(k) => Some(k), + EnvVariable::Invalid(env_variable) => { + tracing::error!(?env_variable, + "Invalid environment variable name"); return cannot_start(); } + } { + match v { + EnvValue::Set(value) => { cmd.env(env_variable, value); () } + EnvValue::Remove => { cmd.env_remove(env_variable); () } + EnvValue::Invalid(value) => { + tracing::error!(?env_variable, ?value, + "Invalid environment variable value"); + return cannot_start(); + } + } } } } + DaemonEnv::Absent => (), + DaemonEnv::Invalid { env } => { + tracing::error!(?env, "Invalid daemon environment"); + return cannot_start(); + } } - DaemonEnv::Absent => (), - DaemonEnv::Invalid { env } => { - tracing::error!(?env, "Invalid daemon environment"); - return cannot_start(); - } - } + let announce_presumed_readiness = match config.ready_on_start { + ReadyOnStart::Present { ready_on_start } => ready_on_start, + ReadyOnStart::Absent => true, + ReadyOnStart::Invalid { ready_on_start } => { + tracing::error!(?ready_on_start, "Invalid readyOnStart value"); + return cannot_start(); + } + }; + let restart_policy = match config.restart { + RestartField::Present { restart } => *restart, + RestartField::Absent => RestartPolicy::All, + RestartField::Invalid { restart } => { + tracing::error!(?restart, "Invalid restart value"); + return cannot_start(); + } + }; - cmd.stdin(std::process::Stdio::null()); - cmd.stdout(std::process::Stdio::inherit()); - cmd.stderr(std::process::Stdio::inherit()); - cmd.kill_on_drop(true); + cmd.stdin(std::process::Stdio::null()); + cmd.stdout(std::process::Stdio::inherit()); + cmd.stderr(std::process::Stdio::inherit()); + cmd.kill_on_drop(true); - tracing::info!(?cmd); - let mut child = cmd.spawn()?; - tracing::info!(pid = ?child.id()); - - facet.activate( - Account::new(syndicate::name!("announce-service-running")), - |t| { - config_ds.assert(t, &(), &syndicate_macros::template!("")); - Ok(()) - })?; - - let status = child.wait().await; - tracing::info!(?status); + (DaemonProcessInstance { + name: tracing::Span::current(), + facet, + cmd, + announce_presumed_readiness, + unready_configs, + restart_policy, + }).start()?; + Ok(LinkedTaskTermination::KeepFacet) + }); Ok(()) - }); - Ok(()) - } - Err(_) => { - tracing::error!(?config, "Invalid DaemonSpec"); - return cannot_start(); + } + Err(_) => { + tracing::error!(?config, "Invalid DaemonSpec"); + return Ok(()); + } } } })) diff --git a/syndicate-server/src/services/debt_reporter.rs b/syndicate-server/src/services/debt_reporter.rs index e0991ea..735995f 100644 --- a/syndicate-server/src/services/debt_reporter.rs +++ b/syndicate-server/src/services/debt_reporter.rs @@ -2,9 +2,9 @@ use std::sync::Arc; use syndicate::actor::*; use syndicate::enclose; -use syndicate::preserves_schema::Codec; use crate::language::language; +use crate::lifecycle; use crate::schemas::internal_services::DebtReporter; use syndicate_macros::during; @@ -19,8 +19,8 @@ pub fn on_demand(t: &mut Activation, ds: Arc) { } fn run(t: &mut Activation, ds: Arc) -> ActorResult { - let spec = language().unparse(&DebtReporter); - ds.assert(t, &(), &syndicate_macros::template!("")); + ds.assert(t, language(), &lifecycle::started(&DebtReporter)); + ds.assert(t, language(), &lifecycle::ready(&DebtReporter)); t.linked_task(syndicate::name!("tick"), async { let mut timer = tokio::time::interval(core::time::Duration::from_secs(1)); loop { diff --git a/syndicate-server/src/services/milestone.rs b/syndicate-server/src/services/milestone.rs index 317accd..df77c8d 100644 --- a/syndicate-server/src/services/milestone.rs +++ b/syndicate-server/src/services/milestone.rs @@ -1,10 +1,9 @@ use std::sync::Arc; use syndicate::actor::*; -use syndicate::preserves_schema::Codec; -use syndicate::schemas::service; use crate::language::language; +use crate::lifecycle; use crate::schemas::internal_services::Milestone; use syndicate_macros::during; @@ -12,9 +11,8 @@ use syndicate_macros::during; pub fn on_demand(t: &mut Activation, ds: Arc) { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { Ok(during!(t, ds, language(), , |t: &mut Activation| { - ds.assert(t, language(), &service::ServiceRunning { - service_name: language().unparse(&spec), - }); + ds.assert(t, language(), &lifecycle::started(&spec)); + ds.assert(t, language(), &lifecycle::ready(&spec)); Ok(()) })) }); diff --git a/syndicate-server/src/services/tcp_relay_listener.rs b/syndicate-server/src/services/tcp_relay_listener.rs index fc659a0..4013c45 100644 --- a/syndicate-server/src/services/tcp_relay_listener.rs +++ b/syndicate-server/src/services/tcp_relay_listener.rs @@ -1,5 +1,3 @@ -use preserves_schema::Codec; - use std::convert::TryFrom; use std::sync::Arc; @@ -10,49 +8,50 @@ use syndicate::supervise::{Supervisor, SupervisorConfiguration}; use tokio::net::TcpListener; use crate::language::language; +use crate::lifecycle; use crate::protocol::detect_protocol; -use crate::schemas::internal_services; +use crate::schemas::internal_services::TcpRelayListener; use syndicate_macros::during; pub fn on_demand(t: &mut Activation, ds: Arc, gateway: Arc) { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { - Ok(during!(t, ds, language(), , - |t| { - Supervisor::start( - t, - syndicate::name!(parent: None, "relay", addr = ?spec), - SupervisorConfiguration::default(), - enclose!((ds, gateway) move |t| - enclose!((ds, gateway, spec) run(t, ds, gateway, spec)))); - Ok(()) - })) + Ok(during!(t, ds, language(), , |t| { + Supervisor::start( + t, + syndicate::name!(parent: None, "relay", addr = ?spec), + SupervisorConfiguration::default(), + enclose!((ds, spec) lifecycle::updater(ds, spec)), + enclose!((ds, gateway) move |t| + enclose!((ds, gateway, spec) run(t, ds, gateway, spec)))) + })) }); } -fn run( - t: &'_ mut Activation, - ds: Arc, - gateway: Arc, - spec: internal_services::TcpRelayListener, -) -> ActorResult { +fn run(t: &mut Activation, ds: Arc, gateway: Arc, spec: TcpRelayListener) -> ActorResult { let host = spec.addr.host.clone(); let port = u16::try_from(&spec.addr.port).map_err(|_| "Invalid TCP port number")?; - { - let spec = language().unparse(&spec); - ds.assert(t, &(), &syndicate_macros::template!("")); - } let parent_span = tracing::Span::current(); + let facet = t.facet.clone(); t.linked_task(syndicate::name!("listener"), async move { let listen_addr = format!("{}:{}", host, port); let listener = TcpListener::bind(listen_addr).await?; - tracing::info!("listening"); + facet.activate(Account::new(syndicate::name!("readiness")), |t| { + tracing::info!("listening"); + ds.assert(t, language(), &lifecycle::ready(&spec)); + Ok(()) + })?; loop { let (stream, addr) = listener.accept().await?; - Actor::new().boot(syndicate::name!(parent: parent_span.clone(), "conn"), - enclose!((gateway) move |t| Ok(t.linked_task( - tracing::Span::current(), - detect_protocol(t.facet.clone(), stream, gateway, addr))))); + Actor::new().boot( + syndicate::name!(parent: parent_span.clone(), "conn"), + enclose!((gateway) move |t| Ok(t.linked_task(tracing::Span::current(), { + let facet = t.facet.clone(); + async move { + detect_protocol(facet, stream, gateway, addr).await?; + Ok(LinkedTaskTermination::KeepFacet) + } + })))); } }); Ok(()) diff --git a/syndicate-server/src/services/unix_relay_listener.rs b/syndicate-server/src/services/unix_relay_listener.rs index 4462177..c188a33 100644 --- a/syndicate-server/src/services/unix_relay_listener.rs +++ b/syndicate-server/src/services/unix_relay_listener.rs @@ -1,5 +1,3 @@ -use preserves_schema::Codec; - use std::io; use std::path::PathBuf; use std::sync::Arc; @@ -8,42 +6,43 @@ use syndicate::actor::*; use syndicate::enclose; use syndicate::error::Error; use syndicate::relay; +use syndicate::supervise::{Supervisor, SupervisorConfiguration}; use tokio::net::UnixListener; use tokio::net::UnixStream; use crate::language::language; +use crate::lifecycle; use crate::protocol::run_connection; -use crate::schemas::internal_services; +use crate::schemas::internal_services::UnixRelayListener; use syndicate_macros::during; pub fn on_demand(t: &mut Activation, ds: Arc, gateway: Arc) { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { - Ok(during!(t, ds, language(), , - |t: &mut Activation| { - t.spawn_link(syndicate::name!(parent: None, "relay", addr = ?spec), - enclose!((ds, gateway) |t| run(t, ds, gateway, spec))); - Ok(()) - })) + Ok(during!(t, ds, language(), , |t| { + Supervisor::start( + t, + syndicate::name!(parent: None, "relay", addr = ?spec), + SupervisorConfiguration::default(), + enclose!((ds, spec) lifecycle::updater(ds, spec)), + enclose!((ds, gateway) move |t| + enclose!((ds, gateway, spec) run(t, ds, gateway, spec)))) + })) }); } -fn run( - t: &'_ mut Activation, - ds: Arc, - gateway: Arc, - spec: internal_services::UnixRelayListener, -) -> ActorResult { +fn run(t: &mut Activation, ds: Arc, gateway: Arc, spec: UnixRelayListener) -> ActorResult { let path_str = spec.addr.path.clone(); - { - let spec = language().unparse(&spec); - ds.assert(t, &(), &syndicate_macros::template!("")); - } let parent_span = tracing::Span::current(); + let facet = t.facet.clone(); t.linked_task(syndicate::name!("listener"), async move { let listener = bind_unix_listener(&PathBuf::from(path_str)).await?; - tracing::info!("listening"); + facet.activate(Account::new(syndicate::name!("readiness")), |t| { + tracing::info!("listening"); + ds.assert(t, language(), &lifecycle::ready(&spec)); + Ok(()) + })?; loop { let (stream, _addr) = listener.accept().await?; let peer = stream.peer_cred()?; @@ -61,7 +60,8 @@ fn run( run_connection(facet, relay::Input::Bytes(Box::pin(i)), relay::Output::Bytes(Box::pin(o)), - gateway) + gateway)?; + Ok(LinkedTaskTermination::KeepFacet) } })))); } diff --git a/syndicate/benches/bench_dataspace.rs b/syndicate/benches/bench_dataspace.rs index 32d9124..b08b580 100644 --- a/syndicate/benches/bench_dataspace.rs +++ b/syndicate/benches/bench_dataspace.rs @@ -71,7 +71,7 @@ pub fn bench_pub(c: &mut Criterion) { enclose!((shutdown) move |t| t.with_entity( &shutdown, |t, e| e.message(t, AnyValue::new(true))))))?; - Ok(()) + Ok(LinkedTaskTermination::KeepFacet) }); Ok(()) }).await.unwrap().unwrap(); @@ -167,7 +167,7 @@ pub fn bench_pub(c: &mut Criterion) { &ds.underlying, |t, e| e.message(t, AnyValue::new(true)))))?; } - Ok(()) + Ok(LinkedTaskTermination::KeepFacet) }); Ok(()) }); diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index 44097fb..a7e5332 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -453,6 +453,16 @@ pub enum RunDisposition { Terminate(ActorResult), } +/// [Linked tasks][Activation::linked_task] terminate yielding values of this type. +pub enum LinkedTaskTermination { + /// Causes the task's associated [Facet] to be [stop][Activation::stop]ped when the task + /// returns. + Normal, + /// Causes no action to be taken regarding the task's associated [Facet] at the time the + /// task returns. + KeepFacet, +} + //--------------------------------------------------------------------------- const BUMP_AMOUNT: u8 = 10; @@ -874,9 +884,9 @@ impl<'activation> Activation<'activation> { /// Start a new [linked task][crate::actor#linked-tasks] attached to the active facet. The /// task will execute the future "`boot`" to completion unless it is cancelled first (by - /// e.g. termination of the owning facet or crashing of the owning actor). Uses `name` for - /// log messages emitted by the task. - pub fn linked_task>( + /// e.g. termination of the owning facet or crashing of the owning actor). Stops the active + /// facet when the linked task completes. Uses `name` for log messages emitted by the task. + pub fn linked_task>>( &mut self, name: tracing::Span, boot: F, @@ -894,21 +904,18 @@ impl<'activation> Activation<'activation> { let result = select! { _ = token.cancelled() => { tracing::trace!(task_id, "linked task cancelled"); - Ok(()) + LinkedTaskTermination::Normal } - result = boot => { - match &result { - Ok(()) => { - tracing::trace!(task_id, "linked task normal stop"); - () - } - Err(e) => { - tracing::error!(task_id, "linked task error: {}", e); - let _ = mailbox.tx.send(SystemMessage::Crash(e.clone())); - () - } + result = boot => match result { + Ok(t) => { + tracing::trace!(task_id, "linked task normal stop"); + t + } + Err(e) => { + tracing::error!(task_id, "linked task error: {}", e); + let _ = mailbox.tx.send(SystemMessage::Crash(e.clone())); + Err(e)? } - result } }; let _ = facet.activate( @@ -917,9 +924,12 @@ impl<'activation> Activation<'activation> { tracing::trace!(task_id, "cancellation token removed"); f.linked_tasks.remove(&task_id); } + if let LinkedTaskTermination::Normal = result { + t.stop(); + } Ok(()) }); - result + Ok::<(), Error>(()) }.instrument(name)); } f.linked_tasks.insert(task_id, token); @@ -935,12 +945,13 @@ impl<'activation> Activation<'activation> { /// Executes the given action at the given instant (so long as the active facet still /// exists at that time). pub fn at>(&mut self, instant: I, a: Action) { - let facet = self.facet.clone(); let account = Arc::clone(self.account()); let instant = instant.into(); + let facet = self.facet.clone(); self.linked_task(crate::name!("Activation::at"), async move { tokio::time::sleep_until(instant.into()).await; - facet.activate(account, a) + facet.activate(account, a)?; + Ok(LinkedTaskTermination::KeepFacet) }); } @@ -995,6 +1006,7 @@ impl<'activation> Activation<'activation> { let f = Facet::new(Some(self.facet.facet_id)); let facet_id = f.facet_id; self.state.facet_nodes.insert(facet_id, f); + tracing::trace!(?facet_id, facet_count = ?self.state.facet_nodes.len()); self.state.facet_children.entry(self.facet.facet_id).or_default().insert(facet_id); self.with_facet(true /* TODO: tiny optimisation: "false" would be safe here */, facet_id, move |t| { boot(t)?; @@ -1027,9 +1039,10 @@ impl<'activation> Activation<'activation> { } /// Arranges for the [`Facet`] named by `facet_id` to be stopped cleanly when `self` - /// commits. If `continuation` is supplied, the facet to be stopped hasn't been stopped - /// yet, none of the shutdown handlers yields an error, and the facet's parent facet is - /// alive, executes `continuation` in the parent facet's context. + /// commits. If `continuation` is supplied, the facet to be stopped hasn't been stopped at + /// the time of the `stop_facet` call, none of the shutdown handlers yields an error, and + /// the facet's parent facet is alive, executes `continuation` in the parent facet's + /// context. pub fn stop_facet(&mut self, facet_id: FacetId, continuation: Option) { let maybe_parent_id = self.active_facet().and_then(|f| f.parent_facet_id); self.enqueue_for_myself_at_commit(Box::new(move |t| { @@ -1861,6 +1874,11 @@ impl Cap { t.message(&self.underlying, m) } } + + /// Synchronizes with the reference underlying the cap. + pub fn sync(&self, t: &mut Activation, peer: Arc>) { + t.sync(&self.underlying, peer) + } } impl std::fmt::Debug for Cap { diff --git a/syndicate/src/relay.rs b/syndicate/src/relay.rs index fa887f6..9e04989 100644 --- a/syndicate/src/relay.rs +++ b/syndicate/src/relay.rs @@ -600,14 +600,14 @@ async fn input_loop( facet: FacetRef, i: Input, relay: TunnelRelayRef, -) -> ActorResult { +) -> Result { let account = Account::new(crate::name!("input-loop")); match i { Input::Packets(mut src) => { loop { account.ensure_clear_funds().await; match src.next().await { - None => return facet.activate(Arc::clone(&account), |t| Ok(t.state.shutdown())), + None => return Ok(LinkedTaskTermination::Normal), Some(bs) => facet.activate(Arc::clone(&account), |t| { let mut g = relay.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); @@ -626,13 +626,13 @@ async fn input_loop( Ok(n) => n, Err(e) => if e.kind() == io::ErrorKind::ConnectionReset { - return facet.activate(Arc::clone(&account), |t| Ok(t.state.shutdown())); + return Ok(LinkedTaskTermination::Normal); } else { return Err(e)?; }, }; match n { - 0 => return facet.activate(Arc::clone(&account), |t| Ok(t.state.shutdown())), + 0 => return Ok(LinkedTaskTermination::Normal), _ => facet.activate(Arc::clone(&account), |t| { let mut g = relay.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); @@ -647,11 +647,11 @@ async fn input_loop( async fn output_loop( mut o: Output, mut output_rx: UnboundedReceiver>>, -) -> ActorResult { +) -> Result { loop { match output_rx.recv().await { None => - return Ok(()), + return Ok(LinkedTaskTermination::KeepFacet), Some(mut loaned_item) => { match &mut o { Output::Packets(sink) => sink.send(std::mem::take(&mut loaned_item.item)).await?, diff --git a/syndicate/src/supervise.rs b/syndicate/src/supervise.rs index 815dea9..fe2976b 100644 --- a/syndicate/src/supervise.rs +++ b/syndicate/src/supervise.rs @@ -8,8 +8,12 @@ use std::time::Duration; use tokio::time::Instant; use crate::actor::*; +use crate::enclose; +use crate::schemas::service::State; -enum Protocol ActorResult> { +pub type Boot = Box ActorResult>; + +enum Protocol { SuperviseeStarted, // assertion BootFunction(Boot), // message Retry, // message @@ -30,25 +34,17 @@ pub struct SupervisorConfiguration { pub restart_policy: RestartPolicy, } -pub struct Supervisor ActorResult> { - self_ref: Arc>>, +pub struct Supervisor { + self_ref: Arc>, name: tracing::Span, config: SupervisorConfiguration, boot_fn: Option, restarts: VecDeque, - supervisee: Supervisee, + state: Arc>, ac_ref: Option, } -#[derive(Debug, PartialEq, Eq)] -enum Supervisee { - NotRunning, - Booting, - Running, - Recovering, -} - -impl ActorResult> std::fmt::Debug for Protocol { +impl std::fmt::Debug for Protocol { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { Protocol::SuperviseeStarted => write!(f, "Protocol::SuperviseeStarted"), @@ -65,17 +61,16 @@ impl Default for SupervisorConfiguration { period: Duration::from_secs(5), pause_time: Duration::from_millis(200), sleep_time: Duration::from_secs(10), - restart_policy: RestartPolicy::Always, + restart_policy: RestartPolicy::OnErrorOnly, } } } -impl ActorResult> - Entity> for Supervisor +impl Entity for Supervisor { - fn assert(&mut self, _t: &mut Activation, m: Protocol, _h: Handle) -> ActorResult { + fn assert(&mut self, t: &mut Activation, m: Protocol, _h: Handle) -> ActorResult { match m { - Protocol::SuperviseeStarted => self.enter_state(Supervisee::Booting), + Protocol::SuperviseeStarted => t.set(&self.state, State::Started), _ => (), } Ok(()) @@ -92,11 +87,12 @@ impl ActorResult> match exit_status { Ok(()) if self.config.restart_policy == RestartPolicy::OnErrorOnly => { tracing::trace!("Not restarting: normal exit, restart_policy is OnErrorOnly"); - self.enter_state(Supervisee::NotRunning); + t.set(&self.state, State::Complete); }, _ => { tracing::trace!("Restarting: restart_policy is Always or exit was abnormal"); - self.enter_state(Supervisee::Recovering); + t.set(&self.state, + if exit_status.is_ok() { State::Complete } else { State::Failed }); let now = Instant::now(); let oldest_to_keep = now - self.config.period; self.restarts.push_back(now); @@ -114,6 +110,7 @@ impl ActorResult> self.config.pause_time }; t.after(wait_time, Box::new(move |t| { + tracing::trace!("Sending retry trigger"); t.message(&self_ref, Protocol::Retry); Ok(()) })); @@ -122,15 +119,13 @@ impl ActorResult> Ok(()) } - fn message(&mut self, t: &mut Activation, m: Protocol) -> ActorResult { + fn message(&mut self, t: &mut Activation, m: Protocol) -> ActorResult { match m { Protocol::BootFunction(b) => { - self.enter_state(Supervisee::Running); self.boot_fn = Some(b); Ok(()) } Protocol::Retry => { - self.enter_state(Supervisee::NotRunning); self.ensure_started(t) } _ => Ok(()) @@ -144,40 +139,47 @@ impl ActorResult> } } -impl ActorResult> Supervisor { - pub fn start( +impl Supervisor { + pub fn start ActorResult, + B: 'static + Send + FnMut(&mut Activation) -> ActorResult>( t: &mut Activation, name: tracing::Span, config: SupervisorConfiguration, - boot_fn: Boot, - ) { + mut state_cb: C, + boot_fn: B, + ) -> ActorResult { let _entry = name.enter(); tracing::trace!(?config); let self_ref = t.create_inert(); + let state_field = t.field(State::Started); let mut supervisor = Supervisor { self_ref: Arc::clone(&self_ref), name: name.clone(), config, - boot_fn: Some(boot_fn), + boot_fn: Some(Box::new(boot_fn)), restarts: VecDeque::new(), - supervisee: Supervisee::NotRunning, + state: Arc::clone(&state_field), ac_ref: None, }; - supervisor.ensure_started(t).unwrap(); + supervisor.ensure_started(t)?; + t.dataflow(enclose!((name) move |t| { + let state = t.get(&state_field).clone(); + { + let _entry = name.enter(); + tracing::debug!(?state); + } + state_cb(t, state) + }))?; self_ref.become_entity(supervisor); t.on_stop_notify(&self_ref); - } - - fn enter_state(&mut self, supervisee: Supervisee) { - let _entry = self.name.enter(); - tracing::info!("{:?} --> {:?}", self.supervisee, supervisee); - self.supervisee = supervisee; + Ok(()) } fn ensure_started(&mut self, t: &mut Activation) -> ActorResult { match self.boot_fn.take() { None => { let _entry = self.name.enter(); + t.set(&self.state, State::Failed); tracing::error!("Cannot restart supervisee, because it panicked at startup") } Some(mut boot_fn) => {