Simplify and repair stdout/stderr logging in daemons

This commit is contained in:
Tony Garnock-Jones 2022-01-26 23:37:04 +01:00
parent 1111776754
commit 7e4654c8f7
1 changed files with 23 additions and 21 deletions

View File

@ -225,20 +225,20 @@ impl DaemonInstance {
fn log<R: 'static + Send + AsyncRead + Unpin>( fn log<R: 'static + Send + AsyncRead + Unpin>(
&self, &self,
t: &mut Activation, t: &mut Activation,
facet: FacetRef,
pid: Option<u32>, pid: Option<u32>,
r: R, r: R,
kind: &str kind: &str
) { ) -> ActorResult {
let log_ds = self.log_ds.clone(); t.facet(|t| {
let service = self.service.clone(); let facet = t.facet.clone();
let kind = AnyValue::symbol(kind); let log_ds = self.log_ds.clone();
let pid = match pid { let service = self.service.clone();
Some(n) => AnyValue::new(n), let kind = AnyValue::symbol(kind);
None => AnyValue::symbol("unknown"), let pid = match pid {
}; Some(n) => AnyValue::new(n),
let trace_collector = t.trace_collector(); None => AnyValue::symbol("unknown"),
t.spawn(Some(rec![AnyValue::symbol("log"), kind.clone(), self.service.clone()]), move |t| { };
let trace_collector = t.trace_collector();
t.linked_task(None, async move { t.linked_task(None, async move {
let mut r = BufReader::new(r); let mut r = BufReader::new(r);
let cause = trace_collector.as_ref().map( let cause = trace_collector.as_ref().map(
@ -246,8 +246,9 @@ impl DaemonInstance {
let account = Account::new(None, trace_collector); let account = Account::new(None, trace_collector);
loop { loop {
let mut buf = Vec::new(); let mut buf = Vec::new();
if r.read_until(b'\n', &mut buf).await? == 0 { match r.read_until(b'\n', &mut buf).await {
return Ok(LinkedTaskTermination::Normal); Ok(0) | Err(_) => break,
Ok(_) => (),
} }
let buf = match std::str::from_utf8(&buf) { let buf = match std::str::from_utf8(&buf) {
Ok(s) => AnyValue::new(s), Ok(s) => AnyValue::new(s),
@ -258,11 +259,11 @@ impl DaemonInstance {
&account, cause.clone(), enclose!((pid, service, kind) |t| { &account, cause.clone(), enclose!((pid, service, kind) |t| {
log_ds.message(t, &(), &syndicate_macros::template!( log_ds.message(t, &(), &syndicate_macros::template!(
"<log =now { "<log =now {
pid: =pid, pid: =pid,
service: =service, service: =service,
stream: =kind, stream: =kind,
line: =buf, line: =buf,
}>")); }>"));
Ok(()) Ok(())
})) }))
{ {
@ -272,7 +273,8 @@ impl DaemonInstance {
Ok(LinkedTaskTermination::Normal) Ok(LinkedTaskTermination::Normal)
}); });
Ok(()) Ok(())
}); })?;
Ok(())
} }
fn start(mut self, t: &mut Activation) -> ActorResult { fn start(mut self, t: &mut Activation) -> ActorResult {
@ -291,7 +293,7 @@ impl DaemonInstance {
let facet = t.facet.clone(); let facet = t.facet.clone();
if let Some(r) = child.stderr.take() { if let Some(r) = child.stderr.take() {
self.log(t, facet.clone(), pid, r, "stderr"); self.log(t, pid, r, "stderr")?;
} }
match self.protocol { match self.protocol {
@ -299,7 +301,7 @@ impl DaemonInstance {
Protocol::BinarySyndicate => self.relay_facet(t, &mut child, false)?, Protocol::BinarySyndicate => self.relay_facet(t, &mut child, false)?,
Protocol::None => { Protocol::None => {
if let Some(r) = child.stdout.take() { if let Some(r) = child.stdout.take() {
self.log(t, facet.clone(), pid, r, "stdout"); self.log(t, pid, r, "stdout")?;
} }
} }
} }