From 6fb1db4f6b5e3580de36708d125c0a2bc03b9154 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 4 Oct 2021 14:40:39 +0200 Subject: [PATCH] Improve logging --- Cargo.lock | 20 +++++++++++--- syndicate-server/Cargo.toml | 5 ++-- syndicate-server/src/main.rs | 36 ++++++++++++++++--------- syndicate-server/src/services/daemon.rs | 24 ++++++++++++----- syndicate/Cargo.toml | 6 ++--- 5 files changed, 63 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3673434..8532d62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -133,6 +133,7 @@ dependencies = [ "libc", "num-integer", "num-traits", + "time", "winapi 0.3.9", ] @@ -1096,9 +1097,9 @@ checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" [[package]] name = "preserves" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b43cf73db6e750654d441b642f2b638ed1ac43a3068d401dae04cd4da7b319f" +checksum = "c604be1e4ddac999d7c3d81ca9f7912a33a1234f5394fb902315f8cee25645cd" dependencies = [ "base64", "dtoa", @@ -1110,9 +1111,9 @@ dependencies = [ [[package]] name = "preserves-schema" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b88b030244936c9b68b802a7969700604dba0defe1e6e28c6056ccd84ead948" +checksum = "e1e7d339a003cb3e6852e6793b34d34c811f134d0b54b839d6b360f7c692b924" dependencies = [ "convert_case", "glob", @@ -1546,6 +1547,7 @@ dependencies = [ name = "syndicate-server" version = "0.12.0" dependencies = [ + "chrono", "futures", "lazy_static", "notify", @@ -1614,6 +1616,16 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" +dependencies = [ + "libc", + "winapi 0.3.9", +] + [[package]] name = "tinytemplate" version = "1.2.1" diff --git a/syndicate-server/Cargo.toml b/syndicate-server/Cargo.toml index 586790e..b7c7694 100644 --- a/syndicate-server/Cargo.toml +++ b/syndicate-server/Cargo.toml @@ -10,14 +10,15 @@ repository = "https://git.syndicate-lang.org/syndicate-lang/syndicate-rs" license = "Apache-2.0" [build-dependencies] -preserves-schema = "2.1.1" +preserves-schema = "2.2.0" syndicate = { path = "../syndicate", version = "^0.12.0"} [dependencies] -preserves-schema = "2.1.1" +preserves-schema = "2.2.0" syndicate = { path = "../syndicate", version = "^0.12.0"} syndicate-macros = { path = "../syndicate-macros", version = "^0.7.0"} +chrono = "0.4" futures = "0.3" lazy_static = "1.4" notify = "4.0" diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index fec73f4..34d2116 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -151,24 +151,36 @@ async fn main() -> Result<(), Box> { let n_stream: AnyValue = AnyValue::symbol("stream"); let e = syndicate::during::entity(()) .on_message(move |(), _t, captures: AnyValue| { - if let Some(mut d) = captures.value_owned().into_sequence().and_then( - |s| s.into_iter().next().and_then(|d| d.value_owned().into_dictionary())) - { - let pid = d.remove(&n_pid).unwrap_or_else(|| n_unknown.clone()); - let line = d.remove(&n_line).unwrap_or_else(|| n_unknown.clone()); - let service = d.remove(&n_service).unwrap_or_else(|| n_unknown.clone()); - let stream = d.remove(&n_stream).unwrap_or_else(|| n_unknown.clone()); - if d.is_empty() { - tracing::info!(?stream, ?service, ?pid, message = ?line); - } else { - tracing::info!(?stream, ?service, ?pid, data = ?d, message = ?line); + if let Some(captures) = captures.value_owned().into_sequence() { + let mut captures = captures.into_iter(); + let timestamp = captures.next() + .and_then(|t| t.value_owned().into_string()) + .unwrap_or_else(|| "-".to_owned()); + if let Some(mut d) = captures.next() + .and_then(|d| d.value_owned().into_dictionary()) + { + let pid = d.remove(&n_pid).unwrap_or_else(|| n_unknown.clone()); + let service = d.remove(&n_service).unwrap_or_else(|| n_unknown.clone()); + let line = d.remove(&n_line).unwrap_or_else(|| n_unknown.clone()); + let stream = d.remove(&n_stream).unwrap_or_else(|| n_unknown.clone()); + let message = format!("{} {:?}[{:?}] {:?}: {:?}", + timestamp, + service, + pid, + stream, + line); + if d.is_empty() { + tracing::info!(target: "", message = %message); + } else { + tracing::info!(target: "", data = ?d, message = %message); + } } } Ok(()) }) .create_cap(t); root_ds.assert(t, language(), &syndicate::schemas::dataspace::Observe { - pattern: syndicate_macros::pattern!(), + pattern: syndicate_macros::pattern!(), observer: e, }); Ok(()) diff --git a/syndicate-server/src/services/daemon.rs b/syndicate-server/src/services/daemon.rs index 76e5302..448ca0e 100644 --- a/syndicate-server/src/services/daemon.rs +++ b/syndicate-server/src/services/daemon.rs @@ -150,6 +150,7 @@ impl DaemonInstance { let delay = std::time::Duration::from_millis(if let None = error_message { 200 } else { 1000 }); t.stop_facet_and_continue(t.facet.facet_id, Some(move |t: &mut Activation| { + #[derive(Debug)] enum NextStep { SleepAndRestart, SignalSuccessfulCompletion, @@ -159,17 +160,25 @@ impl DaemonInstance { let next_step = match self.restart_policy { RestartPolicy::Always => SleepAndRestart, RestartPolicy::OnError => - match error_message { + match &error_message { None => SignalSuccessfulCompletion, Some(_) => SleepAndRestart, }, RestartPolicy::All => - match error_message { + match &error_message { None => SignalSuccessfulCompletion, - Some(s) => Err(s.as_str())?, + Some(s) => { + tracing::error!(cmd = ?self.cmd, next_step = %"RestartDaemon", message = %s); + Err(s.as_str())? + } }, }; + match error_message { + Some(m) => tracing::error!(cmd = ?self.cmd, ?next_step, message = %m), + None => tracing::info!(cmd = ?self.cmd, ?next_step), + } + match next_step { SleepAndRestart => t.after(delay, |t| self.start(t)), SignalSuccessfulCompletion => { @@ -213,10 +222,11 @@ impl DaemonInstance { Ok(s) => AnyValue::new(s), Err(_) => AnyValue::bytestring(buf), }; + let now = AnyValue::new(chrono::Utc::now().to_rfc3339()); facet.activate(Account::new(tracing::Span::current()), enclose!((pid, service, kind) |t| { log_ds.message(t, &(), &syndicate_macros::template!( - " child, Err(e) => { - tracing::info!(spawn_err = ?e); + tracing::debug!(spawn_err = ?e); return self.handle_exit(t, Some(format!("{}", e))); } }; let pid = child.id(); - tracing::info!(?pid, cmd = ?self.cmd, "started"); + tracing::debug!(?pid, cmd = ?self.cmd, "started"); let facet = t.facet.clone(); @@ -257,7 +267,7 @@ impl DaemonInstance { enclose!((facet) async move { tracing::trace!("waiting for process exit"); let status = child.wait().await?; - tracing::info!(?status); + tracing::debug!(?status); facet.activate(Account::new(syndicate::name!("instance-terminated")), |t| { let m = if status.success() { None } else { Some(format!("{}", status)) }; self.handle_exit(t, m) diff --git a/syndicate/Cargo.toml b/syndicate/Cargo.toml index 91085d1..3c15c23 100644 --- a/syndicate/Cargo.toml +++ b/syndicate/Cargo.toml @@ -13,11 +13,11 @@ license = "Apache-2.0" vendored-openssl = ["openssl/vendored"] [build-dependencies] -preserves-schema = "2.1.1" +preserves-schema = "2.2.0" [dependencies] -preserves = "2.1.1" -preserves-schema = "2.1.1" +preserves = "2.2.0" +preserves-schema = "2.2.0" tokio = { version = "1.10", features = ["io-util", "macros", "rt", "rt-multi-thread", "time"] } tokio-util = "0.6"