From 5e3a497c3277c02e27f9d29e43f6c1c29af77518 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Fri, 1 Oct 2021 22:07:28 +0200 Subject: [PATCH] First stab at service logging --- syndicate-server/src/main.rs | 31 +++++++++ syndicate-server/src/services/daemon.rs | 91 ++++++++++++++++++++----- 2 files changed, 106 insertions(+), 16 deletions(-) diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index 8587471..fec73f4 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -143,6 +143,37 @@ async fn main() -> Result<(), Box> { }); } + t.spawn(tracing::Span::current(), enclose!((root_ds) move |t| { + let n_unknown: AnyValue = AnyValue::symbol("-"); + let n_pid: AnyValue = AnyValue::symbol("pid"); + let n_line: AnyValue = AnyValue::symbol("line"); + let n_service: AnyValue = AnyValue::symbol("service"); + let n_stream: AnyValue = AnyValue::symbol("stream"); + let e = syndicate::during::entity(()) + .on_message(move |(), _t, captures: AnyValue| { + if let Some(mut d) = captures.value_owned().into_sequence().and_then( + |s| s.into_iter().next().and_then(|d| d.value_owned().into_dictionary())) + { + let pid = d.remove(&n_pid).unwrap_or_else(|| n_unknown.clone()); + let line = d.remove(&n_line).unwrap_or_else(|| n_unknown.clone()); + let service = d.remove(&n_service).unwrap_or_else(|| n_unknown.clone()); + let stream = d.remove(&n_stream).unwrap_or_else(|| n_unknown.clone()); + if d.is_empty() { + tracing::info!(?stream, ?service, ?pid, message = ?line); + } else { + tracing::info!(?stream, ?service, ?pid, data = ?d, message = ?line); + } + } + Ok(()) + }) + .create_cap(t); + root_ds.assert(t, language(), &syndicate::schemas::dataspace::Observe { + pattern: syndicate_macros::pattern!(), + observer: e, + }); + Ok(()) + })); + Ok(()) }).await??; diff --git a/syndicate-server/src/services/daemon.rs b/syndicate-server/src/services/daemon.rs index 66d60f4..76e5302 100644 --- a/syndicate-server/src/services/daemon.rs +++ b/syndicate-server/src/services/daemon.rs @@ -5,7 +5,11 @@ use std::sync::Arc; use syndicate::actor::*; use syndicate::enclose; use syndicate::supervise::{Supervisor, SupervisorConfiguration}; +use syndicate::value::NestedValue; +use tokio::io::AsyncRead; +use tokio::io::AsyncBufReadExt; +use tokio::io::BufReader; use tokio::process; use crate::counter; @@ -97,8 +101,8 @@ impl FullProcess { } cmd.stdin(std::process::Stdio::null()); - cmd.stdout(std::process::Stdio::inherit()); - cmd.stderr(std::process::Stdio::inherit()); + cmd.stdout(std::process::Stdio::piped()); + cmd.stderr(std::process::Stdio::piped()); cmd.kill_on_drop(true); Some(cmd) @@ -131,6 +135,8 @@ impl CommandLine { } struct DaemonInstance { + log_ds: Arc, + service: AnyValue, name: tracing::Span, cmd: process::Command, announce_presumed_readiness: bool, @@ -180,6 +186,50 @@ impl DaemonInstance { })) } + fn log( + &self, + t: &mut Activation, + facet: FacetRef, + pid: Option, + r: R, + kind: &str + ) { + let log_ds = self.log_ds.clone(); + let service = self.service.clone(); + let kind = AnyValue::symbol(kind); + let pid = match pid { + Some(n) => AnyValue::new(n), + None => AnyValue::symbol("unknown"), + }; + t.spawn(syndicate::name!(parent: self.name.clone(), "log"), move |t| { + t.linked_task(tracing::Span::current(), async move { + let mut r = BufReader::new(r); + loop { + let mut buf = Vec::new(); + if r.read_until(b'\n', &mut buf).await? == 0 { + return Ok(LinkedTaskTermination::Normal); + } + let buf = match std::str::from_utf8(&buf) { + Ok(s) => AnyValue::new(s), + Err(_) => AnyValue::bytestring(buf), + }; + facet.activate(Account::new(tracing::Span::current()), + enclose!((pid, service, kind) |t| { + log_ds.message(t, &(), &syndicate_macros::template!( + "")); + Ok(()) + }))?; + } + }); + Ok(()) + }); + } + fn start(mut self, t: &mut Activation) -> ActorResult { t.facet(|t| { tracing::trace!(cmd = ?self.cmd, "starting"); @@ -190,23 +240,30 @@ impl DaemonInstance { return self.handle_exit(t, Some(format!("{}", e))); } }; - tracing::info!(pid = ?child.id(), cmd = ?self.cmd, "started"); + let pid = child.id(); + tracing::info!(?pid, cmd = ?self.cmd, "started"); + + let facet = t.facet.clone(); + + if let Some(r) = child.stdout.take() { self.log(t, facet.clone(), pid, r, "stdout"); } + if let Some(r) = child.stderr.take() { self.log(t, facet.clone(), pid, r, "stderr"); } if self.announce_presumed_readiness { counter::adjust(t, &self.unready_configs, -1); } - let facet = t.facet.clone(); - t.linked_task(syndicate::name!(parent: self.name.clone(), "wait"), async move { - tracing::trace!("waiting for process exit"); - let status = child.wait().await?; - tracing::info!(?status); - facet.activate(Account::new(syndicate::name!("instance-terminated")), |t| { - let m = if status.success() { None } else { Some(format!("{}", status)) }; - self.handle_exit(t, m) - })?; - Ok(LinkedTaskTermination::Normal) - }); + t.linked_task( + syndicate::name!(parent: self.name.clone(), "wait"), + enclose!((facet) async move { + tracing::trace!("waiting for process exit"); + let status = child.wait().await?; + tracing::info!(?status); + facet.activate(Account::new(syndicate::name!("instance-terminated")), |t| { + let m = if status.success() { None } else { Some(format!("{}", status)) }; + self.handle_exit(t, m) + })?; + Ok(LinkedTaskTermination::Normal) + })); Ok(()) })?; Ok(()) @@ -216,7 +273,7 @@ impl DaemonInstance { fn run( t: &mut Activation, config_ds: Arc, - _root_ds: Arc, + root_ds: Arc, service: DaemonService, ) -> ActorResult { let spec = language().unparse(&service); @@ -247,7 +304,7 @@ fn run( }))?; enclose!((unready_configs, completed_processes) during!(t, config_ds, language(), , { - enclose!((unready_configs, completed_processes) |t: &mut Activation| { + enclose!((spec, root_ds, unready_configs, completed_processes) |t: &mut Activation| { tracing::debug!(?config, "new config"); counter::adjust(t, &unready_configs, 1); counter::adjust(t, &total_configs, 1); @@ -278,6 +335,8 @@ fn run( }; let daemon_instance = DaemonInstance { + log_ds: root_ds, + service: spec, name: tracing::Span::current(), cmd, announce_presumed_readiness,