use preserves_schema::Codec; use std::sync::Arc; use syndicate::actor::*; use syndicate::enclose; use syndicate::supervise::{Supervisor, SupervisorConfiguration}; use tokio::process; use crate::counter; use crate::language::language; use crate::lifecycle; use crate::schemas::external_services::*; use syndicate_macros::during; pub fn on_demand(t: &mut Activation, config_ds: Arc, root_ds: Arc) { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { Ok(during!(t, config_ds, language(), , |t| { Supervisor::start( t, syndicate::name!(parent: None, "daemon", service = ?spec), SupervisorConfiguration::default(), enclose!((config_ds, spec) lifecycle::updater(config_ds, spec)), enclose!((config_ds, root_ds) move |t| enclose!((config_ds, root_ds, spec) run(t, config_ds, root_ds, spec)))) })) }); } impl Process { fn elaborate(self) -> FullProcess { match self { Process::Simple(command_line) => FullProcess { argv: *command_line, env: ProcessEnv::Absent, dir: ProcessDir::Absent, clear_env: ClearEnv::Absent, }, Process::Full(spec) => *spec, } } } impl FullProcess { fn build_command(&self) -> Option { let argv = self.argv.clone().elaborate(); let mut cmd = process::Command::new(argv.program); cmd.args(argv.args); match &self.dir { ProcessDir::Present { dir } => { cmd.current_dir(dir); () }, ProcessDir::Absent => (), ProcessDir::Invalid { dir } => { tracing::error!(?dir, "Invalid working directory"); return None; } } match &self.clear_env { ClearEnv::Present { clear_env: true } => { cmd.env_clear(); () }, ClearEnv::Present { clear_env: false } => (), ClearEnv::Absent => (), ClearEnv::Invalid { clear_env } => { tracing::error!(?clear_env, "Invalid clearEnv setting"); return None; } } match &self.env { ProcessEnv::Present { env } => { for (k, v) in env { if let Some(env_variable) = match k { EnvVariable::String(k) => Some(k), EnvVariable::Symbol(k) => Some(k), EnvVariable::Invalid(env_variable) => { tracing::error!(?env_variable, "Invalid environment variable name"); return None; } } { match v { EnvValue::Set(value) => { cmd.env(env_variable, value); () } EnvValue::Remove => { cmd.env_remove(env_variable); () } EnvValue::Invalid(value) => { tracing::error!(?env_variable, ?value, "Invalid environment variable value"); return None; } } } } } ProcessEnv::Absent => (), ProcessEnv::Invalid { env } => { tracing::error!(?env, "Invalid daemon environment"); return None; } } cmd.stdin(std::process::Stdio::null()); cmd.stdout(std::process::Stdio::inherit()); cmd.stderr(std::process::Stdio::inherit()); cmd.kill_on_drop(true); Some(cmd) } } impl DaemonProcessSpec { fn elaborate(self) -> FullDaemonProcess { match self { DaemonProcessSpec::Simple(command_line) => FullDaemonProcess { process: Process::Simple(command_line).elaborate(), ready_on_start: ReadyOnStart::Absent, restart: RestartField::Absent, }, DaemonProcessSpec::Full(spec) => *spec, } } } impl CommandLine { fn elaborate(self) -> FullCommandLine { match self { CommandLine::Shell(s) => FullCommandLine { program: "sh".to_owned(), args: vec!["-c".to_owned(), s], }, CommandLine::Full(command_line) => *command_line, } } } struct DaemonInstance { name: tracing::Span, cmd: process::Command, announce_presumed_readiness: bool, unready_configs: Arc>, completed_processes: Arc>, restart_policy: RestartPolicy, } impl DaemonInstance { fn handle_exit(self, t: &mut Activation, error_message: Option) -> ActorResult { let delay = std::time::Duration::from_millis(if let None = error_message { 200 } else { 1000 }); t.stop_facet_and_continue(t.facet.facet_id, Some(move |t: &mut Activation| { enum NextStep { SleepAndRestart, SignalSuccessfulCompletion, } use NextStep::*; let next_step = match self.restart_policy { RestartPolicy::Always => SleepAndRestart, RestartPolicy::OnError => match error_message { None => SignalSuccessfulCompletion, Some(_) => SleepAndRestart, }, RestartPolicy::All => match error_message { None => SignalSuccessfulCompletion, Some(s) => Err(s.as_str())?, }, }; match next_step { SleepAndRestart => t.after(delay, |t| self.start(t)), SignalSuccessfulCompletion => { t.facet(|t| { let _ = t.prevent_inert_check(); counter::adjust(t, &self.completed_processes, 1); counter::adjust(t, &self.unready_configs, -1); Ok(()) })?; () } } Ok(()) })) } fn start(mut self, t: &mut Activation) -> ActorResult { t.facet(|t| { tracing::trace!(cmd = ?self.cmd, "starting"); let mut child = match self.cmd.spawn() { Ok(child) => child, Err(e) => { tracing::info!(spawn_err = ?e); return self.handle_exit(t, Some(format!("{}", e))); } }; tracing::info!(pid = ?child.id(), cmd = ?self.cmd, "started"); if self.announce_presumed_readiness { counter::adjust(t, &self.unready_configs, -1); } let facet = t.facet.clone(); t.linked_task(syndicate::name!(parent: self.name.clone(), "wait"), async move { tracing::trace!("waiting for process exit"); let status = child.wait().await?; tracing::info!(?status); facet.activate(Account::new(syndicate::name!("instance-terminated")), |t| { let m = if status.success() { None } else { Some(format!("{}", status)) }; self.handle_exit(t, m) })?; Ok(LinkedTaskTermination::Normal) }); Ok(()) })?; Ok(()) } } fn run( t: &mut Activation, config_ds: Arc, _root_ds: Arc, service: DaemonService, ) -> ActorResult { let spec = language().unparse(&service); let total_configs = t.named_field("total_configs", 0isize); let unready_configs = t.named_field("unready_configs", 1isize); let completed_processes = t.named_field("completed_processes", 0isize); t.dataflow({ let mut handle = None; let ready = lifecycle::ready(&spec); enclose!((config_ds, unready_configs) move |t| { let busy_count = *t.get(&unready_configs); tracing::debug!(?busy_count); config_ds.update(t, &mut handle, language(), if busy_count == 0 { Some(&ready) } else { None }); Ok(()) }) })?; t.dataflow(enclose!((completed_processes, total_configs) move |t| { let total = *t.get(&total_configs); let completed = *t.get(&completed_processes); tracing::debug!(total_configs = ?total, completed_processes = ?completed); if total > 0 && total == completed { t.stop()?; } Ok(()) }))?; enclose!((unready_configs, completed_processes) during!(t, config_ds, language(), , { enclose!((unready_configs, completed_processes) |t: &mut Activation| { tracing::debug!(?config, "new config"); counter::adjust(t, &unready_configs, 1); counter::adjust(t, &total_configs, 1); match language().parse::(&config) { Ok(config) => { tracing::info!(?config); let config = config.elaborate(); let facet = t.facet.clone(); t.linked_task(syndicate::name!("subprocess"), async move { let cmd = config.process.build_command().ok_or("Cannot start daemon process")?; let announce_presumed_readiness = match config.ready_on_start { ReadyOnStart::Present { ready_on_start } => ready_on_start, ReadyOnStart::Absent => true, ReadyOnStart::Invalid { ready_on_start } => { tracing::error!(?ready_on_start, "Invalid readyOnStart value"); Err("Invalid readyOnStart value")? } }; let restart_policy = match config.restart { RestartField::Present { restart } => *restart, RestartField::Absent => RestartPolicy::All, RestartField::Invalid { restart } => { tracing::error!(?restart, "Invalid restart value"); Err("Invalid restart value")? } }; let daemon_instance = DaemonInstance { name: tracing::Span::current(), cmd, announce_presumed_readiness, unready_configs, completed_processes, restart_policy, }; facet.activate(Account::new(syndicate::name!("instance-startup")), |t| { daemon_instance.start(t) })?; Ok(LinkedTaskTermination::KeepFacet) }); Ok(()) } Err(_) => { tracing::error!(?config, "Invalid Process specification"); return Ok(()); } } }) })); tracing::debug!("syncing to ds"); counter::sync_and_adjust(t, &config_ds.underlying, &unready_configs, -1); Ok(()) }