lifecycle::terminate_on_service_restart; make debt reporter accept a parameter

This commit is contained in:
Tony Garnock-Jones 2022-01-07 17:18:00 +01:00
parent fce928b5b0
commit 895a2f676c
11 changed files with 63 additions and 29 deletions

View File

@ -2,7 +2,7 @@
ProcessDir´³orµµ±present´³dict·³dir´³named³dir´³atom³String„„„„„µ±invalid´³dict·³dir´³named³dir³any„„„„µ±absent´³dict·„„„„„³
ProcessEnv´³orµµ±present´³dict·³env´³named³env´³dictof´³refµ„³ EnvVariable„´³refµ„³EnvValue„„„„„„µ±invalid´³dict·³env´³named³env³any„„„„µ±absent´³dict·„„„„„³ CommandLine´³orµµ±shell´³atom³String„„µ±full´³refµ„³FullCommandLine„„„„³ EnvVariable´³orµµ±string´³atom³String„„µ±symbol´³atom³Symbol„„µ±invalid³any„„„³ FullProcess´³andµ´³dict·³argv´³named³argv´³refµ„³ CommandLine„„„„´³named³env´³refµ„³
ProcessEnv„„´³named³dir´³refµ„³
ProcessDir„„´³named³clearEnv´³refµ„³ClearEnv„„„„³ ReadyOnStart´³orµµ±present´³dict·³ readyOnStart´³named³ readyOnStart´³atom³Boolean„„„„„µ±invalid´³dict·³ readyOnStart´³named³ readyOnStart³any„„„„µ±absent´³dict·„„„„„³ RestartField´³orµµ±present´³dict·³restart´³named³restart´³refµ„³ RestartPolicy„„„„„µ±invalid´³dict·³restart´³named³restart³any„„„„µ±absent´³dict·„„„„„³ DaemonProcess´³rec´³lit³daemon„´³tupleµ´³named³id³any„´³named³config´³refµ„³DaemonProcessSpec„„„„„³ DaemonService´³rec´³lit³daemon„´³tupleµ´³named³id³any„„„„³ ProtocolField´³orµµ±present´³dict·³protocol´³named³protocol´³refµ„³Protocol„„„„„µ±invalid´³dict·³protocol´³named³protocol³any„„„„µ±absent´³dict·„„„„„³ RestartPolicy´³orµµ±always´³lit³always„„µ±onError´³lit³on-error„„µ±all´³lit³all„„„„³FullCommandLine´³ tuplePrefixµ´³named³program´³atom³String„„„´³named³args´³seqof´³atom³String„„„„³DaemonProcessSpec´³orµµ±simple´³refµ„³ CommandLine„„µ±full´³refµ„³FullDaemonProcess„„„„³FullDaemonProcess´³andµ´³named³process´³refµ„³ FullProcess„„´³named³ readyOnStart´³refµ„³ ReadyOnStart„„´³named³restart´³refµ„³ RestartField„„´³named³protocol´³refµ„³ ProtocolField„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³internalServices„´³schema·³version³ definitions·³ ConfigEnv´³dictof´³atom³Symbol„³any„³ Milestone´³rec´³lit³ milestone„´³tupleµ´³named³name³any„„„„³ DebtReporter´³lit³ debt-reporter„³ ConfigWatcher´³rec´³lit³config-watcher„´³tupleµ´³named³path´³atom³String„„´³named³env´³refµ„³ ConfigEnv„„„„„³TcpRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Tcp„„´³named³
ProcessDir„„´³named³clearEnv´³refµ„³ClearEnv„„„„³ ReadyOnStart´³orµµ±present´³dict·³ readyOnStart´³named³ readyOnStart´³atom³Boolean„„„„„µ±invalid´³dict·³ readyOnStart´³named³ readyOnStart³any„„„„µ±absent´³dict·„„„„„³ RestartField´³orµµ±present´³dict·³restart´³named³restart´³refµ„³ RestartPolicy„„„„„µ±invalid´³dict·³restart´³named³restart³any„„„„µ±absent´³dict·„„„„„³ DaemonProcess´³rec´³lit³daemon„´³tupleµ´³named³id³any„´³named³config´³refµ„³DaemonProcessSpec„„„„„³ DaemonService´³rec´³lit³daemon„´³tupleµ´³named³id³any„„„„³ ProtocolField´³orµµ±present´³dict·³protocol´³named³protocol´³refµ„³Protocol„„„„„µ±invalid´³dict·³protocol´³named³protocol³any„„„„µ±absent´³dict·„„„„„³ RestartPolicy´³orµµ±always´³lit³always„„µ±onError´³lit³on-error„„µ±all´³lit³all„„„„³FullCommandLine´³ tuplePrefixµ´³named³program´³atom³String„„„´³named³args´³seqof´³atom³String„„„„³DaemonProcessSpec´³orµµ±simple´³refµ„³ CommandLine„„µ±full´³refµ„³FullDaemonProcess„„„„³FullDaemonProcess´³andµ´³named³process´³refµ„³ FullProcess„„´³named³ readyOnStart´³refµ„³ ReadyOnStart„„´³named³restart´³refµ„³ RestartField„„´³named³protocol´³refµ„³ ProtocolField„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³internalServices„´³schema·³version³ definitions·³ ConfigEnv´³dictof´³atom³Symbol„³any„³ Milestone´³rec´³lit³ milestone„´³tupleµ´³named³name³any„„„„³ DebtReporter´³rec´³lit³ debt-reporter„´³tupleµ´³named³intervalSeconds´³atom³Double„„„„„³ ConfigWatcher´³rec´³lit³config-watcher„´³tupleµ´³named³path´³atom³String„„´³named³env´³refµ„³ ConfigEnv„„„„„³TcpRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Tcp„„´³named³
gatekeeper´³embedded´³refµ³
gatekeeper„³Resolve„„„„„„³UnixRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Unix„„´³named³
gatekeeper´³embedded´³refµ³

View File

@ -1,7 +1,7 @@
version 1 .
embeddedType EntityRef.Cap .
DebtReporter = =debt-reporter .
DebtReporter = <debt-reporter @intervalSeconds double>.
TcpRelayListener = <relay-listener @addr TransportAddress.Tcp @gatekeeper #!gatekeeper.Resolve> .
UnixRelayListener = <relay-listener @addr TransportAddress.Unix @gatekeeper #!gatekeeper.Resolve> .

View File

@ -28,26 +28,26 @@ pub fn boot(t: &mut Activation, ds: Arc<Cap>) {
fn run(t: &mut Activation, ds: Arc<Cap>, service_name: AnyValue) -> ActorResult {
if !service_name.value().is_simple_record("milestone", Some(1)) {
let core_dep = service::ServiceDependency {
let system_layer_dep = service::ServiceDependency {
depender: service_name.clone(),
dependee: service::ServiceState {
service_name: language().unparse(&internal_services::Milestone {
name: AnyValue::symbol("core"),
name: AnyValue::symbol("system-layer"),
}),
state: service::State::Ready,
},
};
let milestone_monitor = entity(ds.assert(t, language(), &core_dep))
let milestone_monitor = entity(ds.assert(t, language(), &system_layer_dep))
.on_asserted(enclose!((ds) move |handle, t, _captures: AnyValue| {
ds.update::<_, service::ServiceDependency>(t, handle, language(), None);
Ok(Some(Box::new(enclose!((ds, core_dep) move |handle, t| {
ds.update(t, handle, language(), Some(&core_dep));
Ok(Some(Box::new(enclose!((ds, system_layer_dep) move |handle, t| {
ds.update(t, handle, language(), Some(&system_layer_dep));
Ok(())
}))))
}))
.create_cap(t);
ds.assert(t, language(), &Observe {
pattern: syndicate_macros::pattern!{<core-service #(&service_name)>},
pattern: syndicate_macros::pattern!{<system-layer-service #(&service_name)>},
observer: milestone_monitor,
});
}

View File

@ -7,6 +7,8 @@ use syndicate::preserves_schema::support::Unparse;
use crate::language::Language;
use crate::language::language;
use syndicate_macros::on_message;
pub fn updater<'a, N: Clone + Unparse<&'a Language<AnyValue>, AnyValue>>(
ds: Arc<Cap>,
name: N,
@ -35,3 +37,15 @@ pub fn started<'a, N: Unparse<&'a Language<AnyValue>, AnyValue>>(service_name: &
pub fn ready<'a, N: Unparse<&'a Language<AnyValue>, AnyValue>>(service_name: &N) -> ServiceState {
lifecycle(service_name, State::Ready)
}
pub fn terminate_on_service_restart<'a, N: Unparse<&'a Language<AnyValue>, AnyValue>>(
t: &mut Activation,
ds: &Arc<Cap>,
service_name: &N,
) {
on_message!(t, ds, language(), <restart-service #(&service_name.unparse(language()))>, |t: &mut Activation| {
tracing::info!("Terminating to restart");
t.state.shutdown();
Ok(())
});
}

View File

@ -116,7 +116,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
if config.debt_reporter {
server_config_ds.assert(t, language(), &service::RunService {
service_name: language().unparse(&internal_services::DebtReporter),
service_name: language().unparse(&internal_services::DebtReporter {
interval_seconds: (1.0).into(),
}),
});
}

View File

@ -160,6 +160,8 @@ fn run(
config_ds: Arc<Cap>,
spec: internal_services::ConfigWatcher,
) -> ActorResult {
lifecycle::terminate_on_service_restart(t, &config_ds, &spec);
let path = fs::canonicalize(spec.path.clone())?;
let env = script::Env::new(path, spec.env.0.clone());

View File

@ -324,6 +324,8 @@ fn run(
root_ds: Arc<Cap>,
service: DaemonService,
) -> ActorResult {
lifecycle::terminate_on_service_restart(t, &config_ds, &service);
let spec = language().unparse(&service);
let total_configs = t.named_field("total_configs", 0isize);

View File

@ -11,25 +11,21 @@ use syndicate_macros::during;
pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
t.spawn(syndicate::name!("debt_reporter"), move |t| {
Ok(during!(t, ds, language(), <run-service $_spec: DebtReporter>, |t: &mut Activation| {
t.spawn_link(tracing::Span::current(), enclose!((ds) |t| run(t, ds)));
Ok(during!(t, ds, language(), <run-service $spec: DebtReporter>, |t: &mut Activation| {
t.spawn_link(tracing::Span::current(), enclose!((ds) |t| run(t, ds, spec)));
Ok(())
}))
});
}
fn run(t: &mut Activation, ds: Arc<Cap>) -> ActorResult {
ds.assert(t, language(), &lifecycle::started(&DebtReporter));
ds.assert(t, language(), &lifecycle::ready(&DebtReporter));
t.linked_task(syndicate::name!("tick"), async {
let mut timer = tokio::time::interval(core::time::Duration::from_secs(1));
loop {
timer.tick().await;
for (id, (name, debt)) in syndicate::actor::ACCOUNTS.read().iter() {
let _enter = name.enter();
tracing::info!(id, debt = ?debt.load(std::sync::atomic::Ordering::Relaxed));
}
fn run(t: &mut Activation, ds: Arc<Cap>, spec: DebtReporter) -> ActorResult {
ds.assert(t, language(), &lifecycle::started(&spec));
ds.assert(t, language(), &lifecycle::ready(&spec));
t.every(core::time::Duration::from_millis((spec.interval_seconds.0 * 1000.0) as u64), |_t| {
for (id, (name, debt)) in syndicate::actor::ACCOUNTS.read().iter() {
let _enter = name.enter();
tracing::info!(id, debt = ?debt.load(std::sync::atomic::Ordering::Relaxed));
}
});
Ok(())
Ok(())
})
}

View File

@ -1,6 +1,8 @@
use std::sync::Arc;
use syndicate::actor::*;
use syndicate::enclose;
use syndicate::supervise::{Supervisor, SupervisorConfiguration};
use crate::language::language;
use crate::lifecycle;
@ -11,11 +13,25 @@ use syndicate_macros::during;
pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
t.spawn(syndicate::name!("milestone"), move |t| {
Ok(during!(t, ds, language(), <run-service $spec: Milestone>, |t: &mut Activation| {
tracing::info!(milestone = ?spec.name, "entered");
ds.assert(t, language(), &lifecycle::started(&spec));
ds.assert(t, language(), &lifecycle::ready(&spec));
t.on_stop(move |_| { tracing::info!(milestone = ?spec.name, "exited"); Ok(()) });
Ok(())
Supervisor::start(
t,
syndicate::name!(parent: None, "milestone", name = ?spec.name),
SupervisorConfiguration::default(),
|_, _| Ok(()),
enclose!((ds) move |t| enclose!((ds, spec) run(t, ds, spec))))
}))
});
}
fn run(
t: &mut Activation,
ds: Arc<Cap>,
spec: Milestone,
) -> ActorResult {
lifecycle::terminate_on_service_restart(t, &ds, &spec);
tracing::info!(milestone = ?spec.name, "entered");
ds.assert(t, language(), &lifecycle::started(&spec));
ds.assert(t, language(), &lifecycle::ready(&spec));
t.on_stop(move |_| { tracing::info!(milestone = ?spec.name, "exited"); Ok(()) });
Ok(())
}

View File

@ -28,6 +28,7 @@ pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
}
fn run(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> ActorResult {
lifecycle::terminate_on_service_restart(t, &ds, &spec);
let host = spec.addr.host.clone();
let port = u16::try_from(&spec.addr.port).map_err(|_| "Invalid TCP port number")?;
let parent_span = tracing::Span::current();

View File

@ -32,6 +32,7 @@ pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
}
fn run(t: &mut Activation, ds: Arc<Cap>, spec: UnixRelayListener) -> ActorResult {
lifecycle::terminate_on_service_restart(t, &ds, &spec);
let path_str = spec.addr.path.clone();
let parent_span = tracing::Span::current();
let facet = t.facet.clone();