First stab at service logging

This commit is contained in:
Tony Garnock-Jones 2021-10-01 22:07:28 +02:00
parent ea7e13b0c0
commit 5e3a497c32
2 changed files with 106 additions and 16 deletions

View File

@ -143,6 +143,37 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});
}
t.spawn(tracing::Span::current(), enclose!((root_ds) move |t| {
let n_unknown: AnyValue = AnyValue::symbol("-");
let n_pid: AnyValue = AnyValue::symbol("pid");
let n_line: AnyValue = AnyValue::symbol("line");
let n_service: AnyValue = AnyValue::symbol("service");
let n_stream: AnyValue = AnyValue::symbol("stream");
let e = syndicate::during::entity(())
.on_message(move |(), _t, captures: AnyValue| {
if let Some(mut d) = captures.value_owned().into_sequence().and_then(
|s| s.into_iter().next().and_then(|d| d.value_owned().into_dictionary()))
{
let pid = d.remove(&n_pid).unwrap_or_else(|| n_unknown.clone());
let line = d.remove(&n_line).unwrap_or_else(|| n_unknown.clone());
let service = d.remove(&n_service).unwrap_or_else(|| n_unknown.clone());
let stream = d.remove(&n_stream).unwrap_or_else(|| n_unknown.clone());
if d.is_empty() {
tracing::info!(?stream, ?service, ?pid, message = ?line);
} else {
tracing::info!(?stream, ?service, ?pid, data = ?d, message = ?line);
}
}
Ok(())
})
.create_cap(t);
root_ds.assert(t, language(), &syndicate::schemas::dataspace::Observe {
pattern: syndicate_macros::pattern!(<log $>),
observer: e,
});
Ok(())
}));
Ok(())
}).await??;

View File

@ -5,7 +5,11 @@ use std::sync::Arc;
use syndicate::actor::*;
use syndicate::enclose;
use syndicate::supervise::{Supervisor, SupervisorConfiguration};
use syndicate::value::NestedValue;
use tokio::io::AsyncRead;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::process;
use crate::counter;
@ -97,8 +101,8 @@ impl FullProcess {
}
cmd.stdin(std::process::Stdio::null());
cmd.stdout(std::process::Stdio::inherit());
cmd.stderr(std::process::Stdio::inherit());
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
cmd.kill_on_drop(true);
Some(cmd)
@ -131,6 +135,8 @@ impl CommandLine {
}
struct DaemonInstance {
log_ds: Arc<Cap>,
service: AnyValue,
name: tracing::Span,
cmd: process::Command,
announce_presumed_readiness: bool,
@ -180,6 +186,50 @@ impl DaemonInstance {
}))
}
fn log<R: 'static + Send + AsyncRead + Unpin>(
&self,
t: &mut Activation,
facet: FacetRef,
pid: Option<u32>,
r: R,
kind: &str
) {
let log_ds = self.log_ds.clone();
let service = self.service.clone();
let kind = AnyValue::symbol(kind);
let pid = match pid {
Some(n) => AnyValue::new(n),
None => AnyValue::symbol("unknown"),
};
t.spawn(syndicate::name!(parent: self.name.clone(), "log"), move |t| {
t.linked_task(tracing::Span::current(), async move {
let mut r = BufReader::new(r);
loop {
let mut buf = Vec::new();
if r.read_until(b'\n', &mut buf).await? == 0 {
return Ok(LinkedTaskTermination::Normal);
}
let buf = match std::str::from_utf8(&buf) {
Ok(s) => AnyValue::new(s),
Err(_) => AnyValue::bytestring(buf),
};
facet.activate(Account::new(tracing::Span::current()),
enclose!((pid, service, kind) |t| {
log_ds.message(t, &(), &syndicate_macros::template!(
"<log {
pid: =pid,
service: =service,
stream: =kind,
line: =buf,
}>"));
Ok(())
}))?;
}
});
Ok(())
});
}
fn start(mut self, t: &mut Activation) -> ActorResult {
t.facet(|t| {
tracing::trace!(cmd = ?self.cmd, "starting");
@ -190,23 +240,30 @@ impl DaemonInstance {
return self.handle_exit(t, Some(format!("{}", e)));
}
};
tracing::info!(pid = ?child.id(), cmd = ?self.cmd, "started");
let pid = child.id();
tracing::info!(?pid, cmd = ?self.cmd, "started");
let facet = t.facet.clone();
if let Some(r) = child.stdout.take() { self.log(t, facet.clone(), pid, r, "stdout"); }
if let Some(r) = child.stderr.take() { self.log(t, facet.clone(), pid, r, "stderr"); }
if self.announce_presumed_readiness {
counter::adjust(t, &self.unready_configs, -1);
}
let facet = t.facet.clone();
t.linked_task(syndicate::name!(parent: self.name.clone(), "wait"), async move {
tracing::trace!("waiting for process exit");
let status = child.wait().await?;
tracing::info!(?status);
facet.activate(Account::new(syndicate::name!("instance-terminated")), |t| {
let m = if status.success() { None } else { Some(format!("{}", status)) };
self.handle_exit(t, m)
})?;
Ok(LinkedTaskTermination::Normal)
});
t.linked_task(
syndicate::name!(parent: self.name.clone(), "wait"),
enclose!((facet) async move {
tracing::trace!("waiting for process exit");
let status = child.wait().await?;
tracing::info!(?status);
facet.activate(Account::new(syndicate::name!("instance-terminated")), |t| {
let m = if status.success() { None } else { Some(format!("{}", status)) };
self.handle_exit(t, m)
})?;
Ok(LinkedTaskTermination::Normal)
}));
Ok(())
})?;
Ok(())
@ -216,7 +273,7 @@ impl DaemonInstance {
fn run(
t: &mut Activation,
config_ds: Arc<Cap>,
_root_ds: Arc<Cap>,
root_ds: Arc<Cap>,
service: DaemonService,
) -> ActorResult {
let spec = language().unparse(&service);
@ -247,7 +304,7 @@ fn run(
}))?;
enclose!((unready_configs, completed_processes) during!(t, config_ds, language(), <daemon #(service.id) $config>, {
enclose!((unready_configs, completed_processes) |t: &mut Activation| {
enclose!((spec, root_ds, unready_configs, completed_processes) |t: &mut Activation| {
tracing::debug!(?config, "new config");
counter::adjust(t, &unready_configs, 1);
counter::adjust(t, &total_configs, 1);
@ -278,6 +335,8 @@ fn run(
};
let daemon_instance = DaemonInstance {
log_ds: root_ds,
service: spec,
name: tracing::Span::current(),
cmd,
announce_presumed_readiness,