diff --git a/syndicate-server/src/dependencies.rs b/syndicate-server/src/dependencies.rs new file mode 100644 index 0000000..9d2c7d6 --- /dev/null +++ b/syndicate-server/src/dependencies.rs @@ -0,0 +1,121 @@ +use preserves_schema::Codec; + +use std::sync::Arc; + +use syndicate::actor::*; +use syndicate::during::entity; +use syndicate::enclose; +use syndicate::schemas::dataspace::Observe; +use syndicate::schemas::service; +use syndicate::value::NestedValue; + +use crate::language::language; +use crate::schemas::internal_services; + +use syndicate_macros::during; + +pub fn boot(t: &mut Activation, ds: Arc) { + t.spawn(syndicate::name!("tracker", module = module_path!()), move |t| { + enclose!((ds) during!(t, ds, language(), , |t: &mut Activation| { + ds.assert(t, language(), &service::ServiceDependency { + depender: s, + dependee: service::Dependee::ServiceRunning(Box::new(service::ServiceRunning { + service_name: language().unparse(&internal_services::Milestone { + name: m, + }), + })), + }); + Ok(()) + })); + Ok(during!(t, ds, language(), , |t: &mut Activation| { + tracing::info!(?spec, "tracking dependencies"); + t.spawn_link(syndicate::name!(parent: None, "tracker", spec = ?spec), + enclose!((ds) |t| run(t, ds, spec))); + Ok(()) + })) + }); +} + +fn run(t: &mut Activation, ds: Arc, service_name: AnyValue) -> ActorResult { + ds.assert(t, language(), &service::ServiceStarted { + service_name: service_name.clone(), + }); + + if !service_name.value().is_simple_record("milestone", Some(1)) { + let core_dep = service::ServiceDependency { + depender: service_name.clone(), + dependee: service::Dependee::ServiceRunning(Box::new(service::ServiceRunning { + service_name: language().unparse(&internal_services::Milestone { + name: AnyValue::symbol("core"), + }), + })), + }; + let milestone_monitor = entity(ds.assert(t, language(), &core_dep)) + .on_asserted(enclose!((ds) move |handle, t, _captures: AnyValue| { + ds.update::<_, service::ServiceDependency>(t, handle, language(), None); + Ok(Some(Box::new(enclose!((ds, core_dep) move |handle, t| { + ds.update(t, handle, language(), Some(&core_dep)); + Ok(()) + })))) + })) + .create_cap(t); + ds.assert(t, language(), &Observe { + pattern: syndicate_macros::pattern!{}, + observer: milestone_monitor, + }); + } + + let obstacle_count = t.field(1u64); + t.dataflow(enclose!((obstacle_count) move |t| { + tracing::trace!(obstacle_count = ?t.get(&obstacle_count)); + Ok(()) + }))?; + + t.dataflow({ + let mut handle = None; + enclose!((ds, obstacle_count, service_name) move |t| { + let obstacle_count = *t.get(&obstacle_count); + if obstacle_count == 0 { + ds.update(t, &mut handle, language(), Some(&service::RunService { + service_name: service_name.clone(), + })); + } else { + ds.update::<_, service::RunService>(t, &mut handle, language(), None); + } + Ok(()) + }) + })?; + + enclose!((ds, obstacle_count) during!( + t, ds, language(), , + enclose!((ds, obstacle_count) move |t: &mut Activation| { + if let Ok(dependee) = language().parse::(&dependee) { + tracing::trace!(on = ?dependee, "new dependency"); + *t.get_mut(&obstacle_count) += 1; + t.on_stop(enclose!((obstacle_count) move |t| { *t.get_mut(&obstacle_count) -= 1; Ok(()) })); + ds.assert(t, language(), &service::RequireService { + service_name: match &dependee { + service::Dependee::ServiceStarted(b) => b.service_name.clone(), + service::Dependee::ServiceRunning(b) => b.service_name.clone(), + }, + }); + let d = dependee.clone(); + during!(t, ds, language(), #(language().unparse(&d)), + enclose!((obstacle_count, dependee) move |t: &mut Activation| { + tracing::trace!(on = ?dependee, "dependency satisfied"); + *t.get_mut(&obstacle_count) -= 1; + t.on_stop(move |t| { *t.get_mut(&obstacle_count) += 1; Ok(()) }); + Ok(()) + })); + } + Ok(()) + }))); + + let initial_sync_handler = t.create(enclose!((obstacle_count) move |t: &mut Activation| { + *t.get_mut(&obstacle_count) -= 1; + Ok(()) + })); + t.sync(&ds.underlying, initial_sync_handler); + + Ok(()) +} diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index 7c72591..10acf80 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -14,6 +14,7 @@ use syndicate::schemas::transport_address; use syndicate::value::NestedValue; +mod dependencies; mod gatekeeper; mod language; mod protocol; @@ -97,20 +98,22 @@ async fn main() -> Result<(), Box> { let gateway = Cap::guard(Arc::clone(&language().syndicate), t.create( syndicate::entity(Arc::clone(&root_ds)).on_asserted(gatekeeper::handle_resolve))); - services::debt_reporter::on_demand(t, Arc::clone(&server_config_ds)); - services::tcp_relay_listener::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&gateway)); - services::unix_relay_listener::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&gateway)); + dependencies::boot(t, Arc::clone(&server_config_ds)); services::config_watcher::on_demand(t, Arc::clone(&server_config_ds)); services::daemon::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&root_ds)); + services::debt_reporter::on_demand(t, Arc::clone(&server_config_ds)); + services::milestone::on_demand(t, Arc::clone(&server_config_ds)); + services::tcp_relay_listener::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&gateway)); + services::unix_relay_listener::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&gateway)); if config.debt_reporter { - server_config_ds.assert(t, language(), &service::RequireService { + server_config_ds.assert(t, language(), &service::RunService { service_name: language().unparse(&internal_services::DebtReporter), }); } for port in config.ports.clone() { - server_config_ds.assert(t, language(), &service::RequireService { + server_config_ds.assert(t, language(), &service::RunService { service_name: language().unparse(&internal_services::TcpRelayListener { addr: transport_address::Tcp { host: "0.0.0.0".to_owned(), @@ -121,7 +124,7 @@ async fn main() -> Result<(), Box> { } for path in config.sockets.clone() { - server_config_ds.assert(t, language(), &service::RequireService { + server_config_ds.assert(t, language(), &service::RunService { service_name: language().unparse(&internal_services::UnixRelayListener { addr: transport_address::Unix { path: path.to_str().expect("representable UnixListener path").to_owned(), @@ -131,7 +134,7 @@ async fn main() -> Result<(), Box> { } for path in config.config.clone() { - server_config_ds.assert(t, language(), &service::RequireService { + server_config_ds.assert(t, language(), &service::RunService { service_name: language().unparse(&internal_services::ConfigWatcher { path: path.to_str().expect("representable ConfigWatcher path").to_owned(), }), diff --git a/syndicate-server/src/services/config_watcher.rs b/syndicate-server/src/services/config_watcher.rs index f020bd6..c19c1bd 100644 --- a/syndicate-server/src/services/config_watcher.rs +++ b/syndicate-server/src/services/config_watcher.rs @@ -32,7 +32,7 @@ use syndicate_macros::during; pub fn on_demand(t: &mut Activation, ds: Arc) { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { - Ok(during!(t, ds, language(), , + Ok(during!(t, ds, language(), , |t: &mut Activation| { t.spawn_link(syndicate::name!(parent: None, "config", spec = ?spec), enclose!((ds) |t| run(t, ds, spec))); diff --git a/syndicate-server/src/services/daemon.rs b/syndicate-server/src/services/daemon.rs index 62b5966..f7eb064 100644 --- a/syndicate-server/src/services/daemon.rs +++ b/syndicate-server/src/services/daemon.rs @@ -15,7 +15,7 @@ use syndicate_macros::during; pub fn on_demand(t: &mut Activation, config_ds: Arc, root_ds: Arc) { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { - Ok(during!(t, config_ds, language(), , |t| { + Ok(during!(t, config_ds, language(), , |t| { Ok(Supervisor::start( t, syndicate::name!(parent: None, "daemon", service = ?spec), @@ -139,7 +139,8 @@ fn run( Ok(()) })?; - tracing::info!(status = ?child.wait().await); + let status = child.wait().await; + tracing::info!(?status); Ok(()) }); diff --git a/syndicate-server/src/services/debt_reporter.rs b/syndicate-server/src/services/debt_reporter.rs index f5934c6..e0991ea 100644 --- a/syndicate-server/src/services/debt_reporter.rs +++ b/syndicate-server/src/services/debt_reporter.rs @@ -11,7 +11,7 @@ use syndicate_macros::during; pub fn on_demand(t: &mut Activation, ds: Arc) { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { - Ok(during!(t, ds, language(), , |t: &mut Activation| { + Ok(during!(t, ds, language(), , |t: &mut Activation| { t.spawn_link(tracing::Span::current(), enclose!((ds) |t| run(t, ds))); Ok(()) })) diff --git a/syndicate-server/src/services/milestone.rs b/syndicate-server/src/services/milestone.rs new file mode 100644 index 0000000..317accd --- /dev/null +++ b/syndicate-server/src/services/milestone.rs @@ -0,0 +1,21 @@ +use std::sync::Arc; + +use syndicate::actor::*; +use syndicate::preserves_schema::Codec; +use syndicate::schemas::service; + +use crate::language::language; +use crate::schemas::internal_services::Milestone; + +use syndicate_macros::during; + +pub fn on_demand(t: &mut Activation, ds: Arc) { + t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { + Ok(during!(t, ds, language(), , |t: &mut Activation| { + ds.assert(t, language(), &service::ServiceRunning { + service_name: language().unparse(&spec), + }); + Ok(()) + })) + }); +} diff --git a/syndicate-server/src/services/mod.rs b/syndicate-server/src/services/mod.rs index a134df5..e7b0304 100644 --- a/syndicate-server/src/services/mod.rs +++ b/syndicate-server/src/services/mod.rs @@ -1,5 +1,6 @@ pub mod config_watcher; pub mod daemon; pub mod debt_reporter; +pub mod milestone; pub mod tcp_relay_listener; pub mod unix_relay_listener; diff --git a/syndicate-server/src/services/tcp_relay_listener.rs b/syndicate-server/src/services/tcp_relay_listener.rs index 9e13423..fc659a0 100644 --- a/syndicate-server/src/services/tcp_relay_listener.rs +++ b/syndicate-server/src/services/tcp_relay_listener.rs @@ -17,7 +17,7 @@ use syndicate_macros::during; pub fn on_demand(t: &mut Activation, ds: Arc, gateway: Arc) { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { - Ok(during!(t, ds, language(), , + Ok(during!(t, ds, language(), , |t| { Supervisor::start( t, diff --git a/syndicate-server/src/services/unix_relay_listener.rs b/syndicate-server/src/services/unix_relay_listener.rs index 7ff27ac..4462177 100644 --- a/syndicate-server/src/services/unix_relay_listener.rs +++ b/syndicate-server/src/services/unix_relay_listener.rs @@ -20,7 +20,7 @@ use syndicate_macros::during; pub fn on_demand(t: &mut Activation, ds: Arc, gateway: Arc) { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { - Ok(during!(t, ds, language(), , + Ok(during!(t, ds, language(), , |t: &mut Activation| { t.spawn_link(syndicate::name!(parent: None, "relay", addr = ?spec), enclose!((ds, gateway) |t| run(t, ds, gateway, spec)));