From 7b6a2dab76444b6513854dd2d1f186ad56166617 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 6 Oct 2021 22:03:12 +0200 Subject: [PATCH] More interesting config interpreter --- syndicate-server/src/main.rs | 2 +- .../src/services/config_watcher.rs | 407 +++++++++++++++--- syndicate-server/src/services/daemon.rs | 181 ++++---- 3 files changed, 452 insertions(+), 138 deletions(-) diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index 0154cbb..cb37b8e 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -104,7 +104,7 @@ async fn main() -> Result<(), Box> { syndicate::entity(Arc::clone(&root_ds)).on_asserted(gatekeeper::handle_resolve))); dependencies::boot(t, Arc::clone(&server_config_ds)); - services::config_watcher::on_demand(t, Arc::clone(&server_config_ds)); + services::config_watcher::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&root_ds)); services::daemon::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&root_ds)); services::debt_reporter::on_demand(t, Arc::clone(&server_config_ds)); services::milestone::on_demand(t, Arc::clone(&server_config_ds)); diff --git a/syndicate-server/src/services/config_watcher.rs b/syndicate-server/src/services/config_watcher.rs index 964f345..32a5935 100644 --- a/syndicate-server/src/services/config_watcher.rs +++ b/syndicate-server/src/services/config_watcher.rs @@ -13,7 +13,11 @@ use std::thread; use std::time::Duration; use syndicate::actor::*; +use syndicate::error::Error; +use syndicate::during; use syndicate::enclose; +use syndicate::schemas::dataspace; +use syndicate::schemas::dataspace_patterns as P; use syndicate::supervise::{Supervisor, SupervisorConfiguration}; use syndicate::value::BinarySource; use syndicate::value::IOBinarySource; @@ -21,8 +25,11 @@ use syndicate::value::Map; use syndicate::value::NestedValue; use syndicate::value::NoEmbeddedDomainCodec; use syndicate::value::Reader; +use syndicate::value::Record; use syndicate::value::Set; +use syndicate::value::Value; use syndicate::value::ViaCodec; +use syndicate::value::signed_integer::SignedInteger; use crate::language::language; use crate::lifecycle; @@ -30,51 +37,342 @@ use crate::schemas::internal_services; use syndicate_macros::during; -pub fn on_demand(t: &mut Activation, ds: Arc) { +pub fn on_demand(t: &mut Activation, config_ds: Arc, root_ds: Arc) { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { - Ok(during!(t, ds, language(), , |t| { - Supervisor::start( - t, - syndicate::name!(parent: None, "config", spec = ?spec), - SupervisorConfiguration::default(), - enclose!((ds, spec) lifecycle::updater(ds, spec)), - enclose!((ds) move |t| enclose!((ds, spec) run(t, ds, spec)))) - })) + Ok(during!( + t, config_ds, language(), , |t| { + Supervisor::start( + t, + syndicate::name!(parent: None, "config", spec = ?spec), + SupervisorConfiguration::default(), + enclose!((config_ds, spec) lifecycle::updater(config_ds, spec)), + enclose!((config_ds, root_ds) move |t| + enclose!((config_ds, root_ds, spec) run(t, config_ds, root_ds, spec)))) + })) }); } -fn convert_notify_error(e: notify::Error) -> syndicate::error::Error { +fn convert_notify_error(e: notify::Error) -> Error { syndicate::error::error(&format!("Notify error: {:?}", e), AnyValue::new(false)) } -fn assertions_at_existing_file(t: &mut Activation, ds: &Arc, path: &PathBuf) -> io::Result> { - let mut handles = Set::new(); - let fh = fs::File::open(path)?; - let mut src = IOBinarySource::new(fh); - let mut r = src.text::(ViaCodec::new(NoEmbeddedDomainCodec)); - let mut values = Vec::new(); - while let Some(value) = Reader::::next(&mut r, true)? { - values.push(value); - } - for value in values.into_iter() { - if let Some(handle) = ds.assert(t, &(), &value) { - handles.insert(handle); - } - } - Ok(handles) +#[derive(Debug, Clone)] +enum Instruction { + Assert { + target: String, + template: AnyValue, + }, + React { + target: String, + pattern_template: AnyValue, + body: Box, + }, + Sequence { + instructions: Vec, + }, } -fn assertions_at_path(t: &mut Activation, ds: &Arc, path: &PathBuf) -> io::Result> { +#[derive(Debug)] +enum Symbolic { + Reference(String), + Binder(String), + Discard, + Literal(String), +} + +fn analyze(s: &str) -> Symbolic { + if s == "_" { + Symbolic::Discard + } else if s.starts_with("?") { + Symbolic::Binder(s[1..].to_owned()) + } else if s.starts_with("$") { + Symbolic::Reference(s[1..].to_owned()) + } else if s.starts_with("=") { + Symbolic::Literal(s[1..].to_owned()) + } else { + Symbolic::Literal(s.to_owned()) + } +} + +fn bad_instruction(message: String) -> io::Error { + io::Error::new(io::ErrorKind::InvalidData, message) +} + +fn discard() -> P::Pattern { + P::Pattern::DDiscard(Box::new(P::DDiscard)) +} + +#[derive(Debug)] +struct PatternInstantiator<'e> { + env: &'e Env, + binding_names: Vec, +} + +#[derive(Debug, Clone)] +struct Env(Map); + +impl<'e> PatternInstantiator<'e> { + fn instantiate_pattern(&mut self, template: &AnyValue) -> io::Result { + Ok(match template.value() { + Value::Boolean(_) | + Value::Float(_) | + Value::Double(_) | + Value::SignedInteger(_) | + Value::String(_) | + Value::ByteString(_) | + Value::Embedded(_) => + P::Pattern::DLit(Box::new(P::DLit { value: template.clone() })), + + Value::Symbol(s) => match analyze(s) { + Symbolic::Discard => discard(), + Symbolic::Binder(s) => { + self.binding_names.push(s); + P::Pattern::DBind(Box::new(P::DBind { pattern: discard() })) + } + Symbolic::Reference(s) => + P::Pattern::DLit(Box::new(P::DLit { + value: self.env.0.get(&s) + .ok_or_else(|| bad_instruction( + format!("Undefined pattern-template variable: {:?}", template)))? + .clone(), + })), + Symbolic::Literal(s) => + P::Pattern::DLit(Box::new(P::DLit { + value: Value::Symbol(s).wrap(), + })), + }, + + Value::Record(r) => + P::Pattern::DCompound(Box::new(P::DCompound::Rec { + ctor: Box::new(P::CRec { + label: r.label().clone(), + arity: r.fields().len().into(), + }), + members: r.fields().iter().enumerate() + .map(|(i, p)| Ok((i.into(), self.instantiate_pattern(p)?))) + .filter(|e| discard() != e.as_ref().unwrap().1) + .collect::>>()?, + })), + Value::Sequence(v) => + P::Pattern::DCompound(Box::new(P::DCompound::Arr { + ctor: Box::new(P::CArr { + arity: v.len().into(), + }), + members: v.iter().enumerate() + .map(|(i, p)| Ok((i.into(), self.instantiate_pattern(p)?))) + .filter(|e| discard() != e.as_ref().unwrap().1) + .collect::>>()?, + })), + Value::Set(_) => + Err(bad_instruction(format!("Sets not permitted in patterns: {:?}", template)))?, + Value::Dictionary(v) => + P::Pattern::DCompound(Box::new(P::DCompound::Dict { + ctor: Box::new(P::CDict), + members: v.iter() + .map(|(a, b)| Ok((a.clone(), self.instantiate_pattern(b)?))) + .collect::>>()?, + })), + }) + } +} + +impl Env { + fn lookup_target(&self, target: &String) -> io::Result> { + Ok(self.0.get(target) + .ok_or_else(|| bad_instruction(format!("Undefined target variable: {:?}", target)))? + .value().to_embedded()?.clone()) + } + + fn instantiate_value(&self, template: &AnyValue) -> io::Result { + Ok(match template.value() { + Value::Boolean(_) | + Value::Float(_) | + Value::Double(_) | + Value::SignedInteger(_) | + Value::String(_) | + Value::ByteString(_) | + Value::Embedded(_) => + template.clone(), + + Value::Symbol(s) => match analyze(s) { + Symbolic::Binder(_) | Symbolic::Discard => + Err(bad_instruction( + format!("Invalid use of wildcard in template: {:?}", template)))?, + Symbolic::Reference(s) => + self.0.get(&s).ok_or_else(|| bad_instruction( + format!("Undefined template variable: {:?}", template)))?.clone(), + Symbolic::Literal(s) => + Value::Symbol(s).wrap(), + }, + + Value::Record(r) => + Value::Record(Record(r.fields_vec().iter().map(|a| self.instantiate_value(a)) + .collect::, _>>()?)).wrap(), + Value::Sequence(v) => + Value::Sequence(v.iter().map(|a| self.instantiate_value(a)) + .collect::, _>>()?).wrap(), + Value::Set(v) => + Value::Set(v.iter().map(|a| self.instantiate_value(a)) + .collect::, _>>()?).wrap(), + Value::Dictionary(v) => + Value::Dictionary(v.iter().map(|(a,b)| Ok((self.instantiate_value(a)?, + self.instantiate_value(b)?))) + .collect::>>()?).wrap(), + }) + } +} + +impl Instruction { + fn eval(&self, t: &mut Activation, env: &Env) -> io::Result<()> { + match self { + Instruction::Assert { target, template } => { + env.lookup_target(target)?.assert(t, &(), &env.instantiate_value(template)?); + } + Instruction::React { target, pattern_template, body } => { + let mut inst = PatternInstantiator { + env, + binding_names: Vec::new(), + }; + let pattern = inst.instantiate_pattern(pattern_template)?; + let binding_names = inst.binding_names; + let observer = during::entity(env.clone()) + .on_asserted_facet(enclose!( + (binding_names, body) move |env, t, captures: AnyValue| { + if let Some(captures) = captures.value_owned().into_sequence() { + let mut env = env.clone(); + for (k, v) in binding_names.iter().zip(captures) { + env.0.insert(k.clone(), v); + } + body.eval(t, &env)?; + } + Ok(()) + })) + .create_cap(t); + env.lookup_target(target)?.assert(t, language(), &dataspace::Observe { + pattern, + observer, + }); + } + Instruction::Sequence { instructions } => { + for i in instructions { + i.eval(t, env)?; + } + } + } + Ok(()) + } + + fn parse<'t>(target: &str, tokens: &'t [AnyValue]) -> io::Result> { + if tokens.len() == 0 { + return Ok(None); + } + + if tokens[0].value().is_record() || tokens[0].value().is_dictionary() { + return Ok(Some((Instruction::Assert { + target: target.to_owned(), + template: tokens[0].clone(), + }, &tokens[1..]))); + } + + if let Some(tokens) = tokens[0].value().as_sequence() { + return Ok(Some((Instruction::Sequence { + instructions: Instruction::parse_all(target, tokens)?, + }, &tokens[1..]))); + } + + if let Some(s) = tokens[0].value().as_symbol() { + match analyze(s) { + Symbolic::Binder(s) => + if s.len() == 0 { + if tokens.len() == 1 { + Err(bad_instruction(format!("Missing pattern and instruction in react")))?; + } else { + let pattern_template = tokens[1].clone(); + match Instruction::parse(target, &tokens[2..])? { + None => + Err(bad_instruction(format!("Missing instruction in react with pattern {:?}", tokens[1])))?, + Some((body, tokens)) => + return Ok(Some((Instruction::React { + target: target.to_owned(), + pattern_template, + body: Box::new(body), + }, tokens))), + } + } + } else { + Err(bad_instruction(format!("Invalid use of pattern binder in target: {:?}", tokens[0])))?; + }, + Symbolic::Discard => + Err(bad_instruction(format!("Invalid use of discard in target: {:?}", tokens[0])))?, + Symbolic::Reference(s) => + if tokens.len() == 1 { + Err(bad_instruction(format!("Missing instruction after retarget: {:?}", tokens[0])))?; + } else if tokens[1].value().is_symbol() { + Err(bad_instruction(format!("Two retargets in a row: {:?} {:?}", tokens[0], tokens[1])))?; + } else { + return Instruction::parse(&s, &tokens[1..]); + }, + Symbolic::Literal(_) => + /* fall through */ (), + } + } + + Err(bad_instruction(format!("Invalid token: {:?}", tokens[0])))? + } + + fn parse_all(target: &str, mut tokens: &[AnyValue]) -> io::Result> { + let mut instructions = Vec::new(); + while let Some((i, more)) = Instruction::parse(target, tokens)? { + instructions.push(i); + tokens = more; + } + Ok(instructions) + } +} + +fn process_existing_file( + t: &mut Activation, + config_ds: &Arc, + root_ds: &Arc, + path: &PathBuf, +) -> io::Result> { + let tokens: Vec = IOBinarySource::new(fs::File::open(path)?) + .text::(ViaCodec::new(NoEmbeddedDomainCodec)) + .configured(true) + .collect::, _>>()?; + let instructions = Instruction::parse_all("config", &tokens)?; + if instructions.is_empty() { + Ok(None) + } else { + let mut env = Map::new(); + env.insert("config".to_owned(), AnyValue::domain(config_ds.clone())); + env.insert("root".to_owned(), AnyValue::domain(root_ds.clone())); + match t.facet(|t| Ok(Instruction::Sequence { instructions }.eval(t, &Env(env))?)) { + Ok(facet_id) => Ok(Some(facet_id)), + Err(error) => { + tracing::error!(?path, ?error); + Ok(None) + } + } + } +} + +fn process_path( + t: &mut Activation, + config_ds: &Arc, + root_ds: &Arc, + path: &PathBuf, +) -> io::Result> { match fs::metadata(path) { Ok(md) => if md.is_file() { - assertions_at_existing_file(t, ds, path) + process_existing_file(t, config_ds, root_ds, path) } else { - Ok(Set::new()) + Ok(None) } Err(e) => if e.kind() != io::ErrorKind::NotFound { Err(e)? } else { - Ok(Set::new()) + Ok(None) } } } @@ -88,18 +386,19 @@ fn is_hidden(path: &PathBuf) -> bool { fn scan_file( t: &mut Activation, - path_state: &mut Map>, - ds: &Arc, + path_state: &mut Map, + config_ds: &Arc, + root_ds: &Arc, path: &PathBuf, ) -> bool { if is_hidden(path) { return true; } tracing::info!("scan_file: {:?}", path); - match assertions_at_path(t, ds, path) { - Ok(new_handles) => { - if !new_handles.is_empty() { - path_state.insert(path.clone(), new_handles); + match process_path(t, config_ds, root_ds, path) { + Ok(maybe_facet_id) => { + if let Some(facet_id) = maybe_facet_id { + path_state.insert(path.clone(), facet_id); } true }, @@ -112,8 +411,9 @@ fn scan_file( fn initial_scan( t: &mut Activation, - path_state: &mut Map>, - ds: &Arc, + path_state: &mut Map, + config_ds: &Arc, + root_ds: &Arc, path: &PathBuf, ) { if is_hidden(path) { @@ -121,12 +421,12 @@ fn initial_scan( } match fs::metadata(path) { Ok(md) => if md.is_file() { - scan_file(t, path_state, ds, path); + scan_file(t, path_state, config_ds, root_ds, path); } else { match fs::read_dir(path) { Ok(entries) => for er in entries { match er { - Ok(e) => initial_scan(t, path_state, ds, &e.path()), + Ok(e) => initial_scan(t, path_state, config_ds, root_ds, &e.path()), Err(e) => tracing::warn!( "initial_scan: transient during scan of {:?}: {:?}", path, e), } @@ -138,7 +438,12 @@ fn initial_scan( } } -fn run(t: &mut Activation, ds: Arc, spec: internal_services::ConfigWatcher) -> ActorResult { +fn run( + t: &mut Activation, + config_ds: Arc, + root_ds: Arc, + spec: internal_services::ConfigWatcher, +) -> ActorResult { let path = fs::canonicalize(spec.path.clone())?; tracing::info!("watching {:?}", &path); @@ -149,13 +454,13 @@ fn run(t: &mut Activation, ds: Arc, spec: internal_services::ConfigWatcher) let facet = t.facet.clone(); thread::spawn(move || { - let mut path_state: Map> = Map::new(); + let mut path_state: Map = Map::new(); { let root_path = path.clone().into(); facet.activate(Account::new(syndicate::name!("initial_scan")), |t| { - initial_scan(t, &mut path_state, &ds, &root_path); - ds.assert(t, language(), &lifecycle::ready(&spec)); + initial_scan(t, &mut path_state, &config_ds, &root_ds, &root_path); + config_ds.assert(t, language(), &lifecycle::ready(&spec)); Ok(()) }).unwrap(); tracing::trace!("initial_scan complete"); @@ -163,20 +468,20 @@ fn run(t: &mut Activation, ds: Arc, spec: internal_services::ConfigWatcher) let mut rescan = |paths: Vec| { facet.activate(Account::new(syndicate::name!("rescan")), |t| { - let mut to_retract = Set::new(); + let mut to_stop = Vec::new(); for path in paths.into_iter() { - let maybe_handles = path_state.remove(&path); - let new_content_ok = scan_file(t, &mut path_state, &ds, &path); - if let Some(old_handles) = maybe_handles { + let maybe_facet_id = path_state.remove(&path); + let new_content_ok = scan_file(t, &mut path_state, &config_ds, &root_ds, &path); + if let Some(old_facet_id) = maybe_facet_id { if new_content_ok { - to_retract.extend(old_handles.into_iter()); + to_stop.push(old_facet_id); } else { - path_state.insert(path, old_handles); + path_state.insert(path, old_facet_id); } } } - for h in to_retract.into_iter() { - t.retract(h); + for facet_id in to_stop.into_iter() { + t.stop_facet(facet_id)?; } Ok(()) }).unwrap() diff --git a/syndicate-server/src/services/daemon.rs b/syndicate-server/src/services/daemon.rs index e34cbbe..24fcd46 100644 --- a/syndicate-server/src/services/daemon.rs +++ b/syndicate-server/src/services/daemon.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use syndicate::actor::*; use syndicate::enclose; +use syndicate::schemas::service; use syndicate::supervise::{Supervisor, SupervisorConfiguration}; use syndicate::value::NestedValue; @@ -131,6 +132,7 @@ impl CommandLine { } struct DaemonInstance { + config_ds: Arc, log_ds: Arc, service: AnyValue, name: tracing::Span, @@ -257,8 +259,8 @@ impl DaemonInstance { } match self.protocol { - Protocol::TextSyndicate => relay_facet(t, &mut child, true)?, - Protocol::BinarySyndicate => relay_facet(t, &mut child, false)?, + Protocol::TextSyndicate => self.relay_facet(t, &mut child, true)?, + Protocol::BinarySyndicate => self.relay_facet(t, &mut child, false)?, Protocol::None => { if let Some(r) = child.stdout.take() { self.log(t, facet.clone(), pid, r, "stdout"); @@ -286,25 +288,29 @@ impl DaemonInstance { })?; Ok(()) } -} -fn relay_facet(t: &mut Activation, child: &mut process::Child, output_text: bool) -> ActorResult { - use syndicate::relay; - use syndicate::schemas::sturdy; + fn relay_facet(&self, t: &mut Activation, child: &mut process::Child, output_text: bool) -> ActorResult { + use syndicate::relay; + use syndicate::schemas::sturdy; - let to_child = child.stdin.take().expect("pipe to child"); - let from_child = child.stdout.take().expect("pipe from child"); + let to_child = child.stdin.take().expect("pipe to child"); + let from_child = child.stdout.take().expect("pipe from child"); - let i = relay::Input::Bytes(Box::pin(from_child)); - let o = relay::Output::Bytes(Box::pin(to_child)); + let i = relay::Input::Bytes(Box::pin(from_child)); + let o = relay::Output::Bytes(Box::pin(to_child)); - t.facet(|t| { - let cap = relay::TunnelRelay::run(t, i, o, None, Some(sturdy::Oid(0.into())), output_text) - .ok_or("initial capability reference unavailable")?; - tracing::info!(?cap); - Ok(()) - })?; - Ok(()) + t.facet(|t| { + let cap = relay::TunnelRelay::run(t, i, o, None, Some(sturdy::Oid(0.into())), output_text) + .ok_or("initial capability reference unavailable")?; + tracing::info!(?cap); + self.config_ds.assert(t, language(), &service::ServiceObject { + service_name: self.service.clone(), + object: AnyValue::domain(cap), + }); + Ok(()) + })?; + Ok(()) + } } fn run( @@ -340,80 +346,83 @@ fn run( Ok(()) }))?; - enclose!((unready_configs, completed_processes) during!(t, config_ds, language(), , { - enclose!((spec, root_ds, unready_configs, completed_processes) |t: &mut Activation| { - tracing::debug!(?config, "new config"); - counter::adjust(t, &unready_configs, 1); - counter::adjust(t, &total_configs, 1); + enclose!((config_ds, unready_configs, completed_processes) + during!(t, config_ds.clone(), language(), , { + enclose!((spec, config_ds, root_ds, unready_configs, completed_processes) + |t: &mut Activation| { + tracing::debug!(?config, "new config"); + counter::adjust(t, &unready_configs, 1); + counter::adjust(t, &total_configs, 1); - match language().parse::(&config) { - Ok(config) => { - tracing::info!(?config); - let config = config.elaborate(); - let facet = t.facet.clone(); - t.linked_task(syndicate::name!("subprocess"), async move { - let mut cmd = config.process.build_command().ok_or("Cannot start daemon process")?; + match language().parse::(&config) { + Ok(config) => { + tracing::info!(?config); + let config = config.elaborate(); + let facet = t.facet.clone(); + t.linked_task(syndicate::name!("subprocess"), async move { + let mut cmd = config.process.build_command().ok_or("Cannot start daemon process")?; - let announce_presumed_readiness = match config.ready_on_start { - ReadyOnStart::Present { ready_on_start } => ready_on_start, - ReadyOnStart::Absent => true, - ReadyOnStart::Invalid { ready_on_start } => { - tracing::error!(?ready_on_start, "Invalid readyOnStart value"); - Err("Invalid readyOnStart value")? - } - }; - let restart_policy = match config.restart { - RestartField::Present { restart } => *restart, - RestartField::Absent => RestartPolicy::All, - RestartField::Invalid { restart } => { - tracing::error!(?restart, "Invalid restart value"); - Err("Invalid restart value")? - } - }; - let protocol = match config.protocol { - ProtocolField::Present { protocol } => *protocol, - ProtocolField::Absent => Protocol::None, - ProtocolField::Invalid { protocol } => { - tracing::error!(?protocol, "Invalid protocol value"); - Err("Invalid protocol value")? - } - }; + let announce_presumed_readiness = match config.ready_on_start { + ReadyOnStart::Present { ready_on_start } => ready_on_start, + ReadyOnStart::Absent => true, + ReadyOnStart::Invalid { ready_on_start } => { + tracing::error!(?ready_on_start, "Invalid readyOnStart value"); + Err("Invalid readyOnStart value")? + } + }; + let restart_policy = match config.restart { + RestartField::Present { restart } => *restart, + RestartField::Absent => RestartPolicy::All, + RestartField::Invalid { restart } => { + tracing::error!(?restart, "Invalid restart value"); + Err("Invalid restart value")? + } + }; + let protocol = match config.protocol { + ProtocolField::Present { protocol } => *protocol, + ProtocolField::Absent => Protocol::None, + ProtocolField::Invalid { protocol } => { + tracing::error!(?protocol, "Invalid protocol value"); + Err("Invalid protocol value")? + } + }; - cmd.stdin(match &protocol { - Protocol::None => - std::process::Stdio::null(), - Protocol::TextSyndicate | Protocol::BinarySyndicate => - std::process::Stdio::piped(), - }); - cmd.stdout(std::process::Stdio::piped()); - cmd.stderr(std::process::Stdio::piped()); + cmd.stdin(match &protocol { + Protocol::None => + std::process::Stdio::null(), + Protocol::TextSyndicate | Protocol::BinarySyndicate => + std::process::Stdio::piped(), + }); + cmd.stdout(std::process::Stdio::piped()); + cmd.stderr(std::process::Stdio::piped()); - let daemon_instance = DaemonInstance { - log_ds: root_ds, - service: spec, - name: tracing::Span::current(), - cmd, - announce_presumed_readiness, - unready_configs, - completed_processes, - restart_policy, - protocol, - }; + let daemon_instance = DaemonInstance { + config_ds, + log_ds: root_ds, + service: spec, + name: tracing::Span::current(), + cmd, + announce_presumed_readiness, + unready_configs, + completed_processes, + restart_policy, + protocol, + }; - facet.activate(Account::new(syndicate::name!("instance-startup")), |t| { - daemon_instance.start(t) - })?; - Ok(LinkedTaskTermination::KeepFacet) - }); - Ok(()) - } - Err(_) => { - tracing::error!(?config, "Invalid Process specification"); - return Ok(()); - } - } - }) - })); + facet.activate(Account::new(syndicate::name!("instance-startup")), |t| { + daemon_instance.start(t) + })?; + Ok(LinkedTaskTermination::KeepFacet) + }); + Ok(()) + } + Err(_) => { + tracing::error!(?config, "Invalid Process specification"); + return Ok(()); + } + } + }) + })); tracing::debug!("syncing to ds"); counter::sync_and_adjust(t, &config_ds.underlying, &unready_configs, -1);