diff --git a/syndicate-server/src/services/daemon.rs b/syndicate-server/src/services/daemon.rs index cc10174..bf02563 100644 --- a/syndicate-server/src/services/daemon.rs +++ b/syndicate-server/src/services/daemon.rs @@ -225,20 +225,20 @@ impl DaemonInstance { fn log( &self, t: &mut Activation, - facet: FacetRef, pid: Option, r: R, kind: &str - ) { - let log_ds = self.log_ds.clone(); - let service = self.service.clone(); - let kind = AnyValue::symbol(kind); - let pid = match pid { - Some(n) => AnyValue::new(n), - None => AnyValue::symbol("unknown"), - }; - let trace_collector = t.trace_collector(); - t.spawn(Some(rec![AnyValue::symbol("log"), kind.clone(), self.service.clone()]), move |t| { + ) -> ActorResult { + t.facet(|t| { + let facet = t.facet.clone(); + let log_ds = self.log_ds.clone(); + let service = self.service.clone(); + let kind = AnyValue::symbol(kind); + let pid = match pid { + Some(n) => AnyValue::new(n), + None => AnyValue::symbol("unknown"), + }; + let trace_collector = t.trace_collector(); t.linked_task(None, async move { let mut r = BufReader::new(r); let cause = trace_collector.as_ref().map( @@ -246,8 +246,9 @@ impl DaemonInstance { let account = Account::new(None, trace_collector); loop { let mut buf = Vec::new(); - if r.read_until(b'\n', &mut buf).await? == 0 { - return Ok(LinkedTaskTermination::Normal); + match r.read_until(b'\n', &mut buf).await { + Ok(0) | Err(_) => break, + Ok(_) => (), } let buf = match std::str::from_utf8(&buf) { Ok(s) => AnyValue::new(s), @@ -258,11 +259,11 @@ impl DaemonInstance { &account, cause.clone(), enclose!((pid, service, kind) |t| { log_ds.message(t, &(), &syndicate_macros::template!( "")); + pid: =pid, + service: =service, + stream: =kind, + line: =buf, + }>")); Ok(()) })) { @@ -272,7 +273,8 @@ impl DaemonInstance { Ok(LinkedTaskTermination::Normal) }); Ok(()) - }); + })?; + Ok(()) } fn start(mut self, t: &mut Activation) -> ActorResult { @@ -291,7 +293,7 @@ impl DaemonInstance { let facet = t.facet.clone(); if let Some(r) = child.stderr.take() { - self.log(t, facet.clone(), pid, r, "stderr"); + self.log(t, pid, r, "stderr")?; } match self.protocol { @@ -299,7 +301,7 @@ impl DaemonInstance { Protocol::BinarySyndicate => self.relay_facet(t, &mut child, false)?, Protocol::None => { if let Some(r) = child.stdout.take() { - self.log(t, facet.clone(), pid, r, "stdout"); + self.log(t, pid, r, "stdout")?; } } }