Improve logging
This commit is contained in:
parent
5e3a497c32
commit
6fb1db4f6b
|
@ -133,6 +133,7 @@ dependencies = [
|
|||
"libc",
|
||||
"num-integer",
|
||||
"num-traits",
|
||||
"time",
|
||||
"winapi 0.3.9",
|
||||
]
|
||||
|
||||
|
@ -1096,9 +1097,9 @@ checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
|
|||
|
||||
[[package]]
|
||||
name = "preserves"
|
||||
version = "2.1.1"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6b43cf73db6e750654d441b642f2b638ed1ac43a3068d401dae04cd4da7b319f"
|
||||
checksum = "c604be1e4ddac999d7c3d81ca9f7912a33a1234f5394fb902315f8cee25645cd"
|
||||
dependencies = [
|
||||
"base64",
|
||||
"dtoa",
|
||||
|
@ -1110,9 +1111,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "preserves-schema"
|
||||
version = "2.1.1"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b88b030244936c9b68b802a7969700604dba0defe1e6e28c6056ccd84ead948"
|
||||
checksum = "e1e7d339a003cb3e6852e6793b34d34c811f134d0b54b839d6b360f7c692b924"
|
||||
dependencies = [
|
||||
"convert_case",
|
||||
"glob",
|
||||
|
@ -1546,6 +1547,7 @@ dependencies = [
|
|||
name = "syndicate-server"
|
||||
version = "0.12.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"futures",
|
||||
"lazy_static",
|
||||
"notify",
|
||||
|
@ -1614,6 +1616,16 @@ dependencies = [
|
|||
"once_cell",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "time"
|
||||
version = "0.1.43"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"winapi 0.3.9",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tinytemplate"
|
||||
version = "1.2.1"
|
||||
|
|
|
@ -10,14 +10,15 @@ repository = "https://git.syndicate-lang.org/syndicate-lang/syndicate-rs"
|
|||
license = "Apache-2.0"
|
||||
|
||||
[build-dependencies]
|
||||
preserves-schema = "2.1.1"
|
||||
preserves-schema = "2.2.0"
|
||||
syndicate = { path = "../syndicate", version = "^0.12.0"}
|
||||
|
||||
[dependencies]
|
||||
preserves-schema = "2.1.1"
|
||||
preserves-schema = "2.2.0"
|
||||
syndicate = { path = "../syndicate", version = "^0.12.0"}
|
||||
syndicate-macros = { path = "../syndicate-macros", version = "^0.7.0"}
|
||||
|
||||
chrono = "0.4"
|
||||
futures = "0.3"
|
||||
lazy_static = "1.4"
|
||||
notify = "4.0"
|
||||
|
|
|
@ -151,24 +151,36 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
let n_stream: AnyValue = AnyValue::symbol("stream");
|
||||
let e = syndicate::during::entity(())
|
||||
.on_message(move |(), _t, captures: AnyValue| {
|
||||
if let Some(mut d) = captures.value_owned().into_sequence().and_then(
|
||||
|s| s.into_iter().next().and_then(|d| d.value_owned().into_dictionary()))
|
||||
{
|
||||
let pid = d.remove(&n_pid).unwrap_or_else(|| n_unknown.clone());
|
||||
let line = d.remove(&n_line).unwrap_or_else(|| n_unknown.clone());
|
||||
let service = d.remove(&n_service).unwrap_or_else(|| n_unknown.clone());
|
||||
let stream = d.remove(&n_stream).unwrap_or_else(|| n_unknown.clone());
|
||||
if d.is_empty() {
|
||||
tracing::info!(?stream, ?service, ?pid, message = ?line);
|
||||
} else {
|
||||
tracing::info!(?stream, ?service, ?pid, data = ?d, message = ?line);
|
||||
if let Some(captures) = captures.value_owned().into_sequence() {
|
||||
let mut captures = captures.into_iter();
|
||||
let timestamp = captures.next()
|
||||
.and_then(|t| t.value_owned().into_string())
|
||||
.unwrap_or_else(|| "-".to_owned());
|
||||
if let Some(mut d) = captures.next()
|
||||
.and_then(|d| d.value_owned().into_dictionary())
|
||||
{
|
||||
let pid = d.remove(&n_pid).unwrap_or_else(|| n_unknown.clone());
|
||||
let service = d.remove(&n_service).unwrap_or_else(|| n_unknown.clone());
|
||||
let line = d.remove(&n_line).unwrap_or_else(|| n_unknown.clone());
|
||||
let stream = d.remove(&n_stream).unwrap_or_else(|| n_unknown.clone());
|
||||
let message = format!("{} {:?}[{:?}] {:?}: {:?}",
|
||||
timestamp,
|
||||
service,
|
||||
pid,
|
||||
stream,
|
||||
line);
|
||||
if d.is_empty() {
|
||||
tracing::info!(target: "", message = %message);
|
||||
} else {
|
||||
tracing::info!(target: "", data = ?d, message = %message);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.create_cap(t);
|
||||
root_ds.assert(t, language(), &syndicate::schemas::dataspace::Observe {
|
||||
pattern: syndicate_macros::pattern!(<log $>),
|
||||
pattern: syndicate_macros::pattern!(<log $ $>),
|
||||
observer: e,
|
||||
});
|
||||
Ok(())
|
||||
|
|
|
@ -150,6 +150,7 @@ impl DaemonInstance {
|
|||
let delay =
|
||||
std::time::Duration::from_millis(if let None = error_message { 200 } else { 1000 });
|
||||
t.stop_facet_and_continue(t.facet.facet_id, Some(move |t: &mut Activation| {
|
||||
#[derive(Debug)]
|
||||
enum NextStep {
|
||||
SleepAndRestart,
|
||||
SignalSuccessfulCompletion,
|
||||
|
@ -159,17 +160,25 @@ impl DaemonInstance {
|
|||
let next_step = match self.restart_policy {
|
||||
RestartPolicy::Always => SleepAndRestart,
|
||||
RestartPolicy::OnError =>
|
||||
match error_message {
|
||||
match &error_message {
|
||||
None => SignalSuccessfulCompletion,
|
||||
Some(_) => SleepAndRestart,
|
||||
},
|
||||
RestartPolicy::All =>
|
||||
match error_message {
|
||||
match &error_message {
|
||||
None => SignalSuccessfulCompletion,
|
||||
Some(s) => Err(s.as_str())?,
|
||||
Some(s) => {
|
||||
tracing::error!(cmd = ?self.cmd, next_step = %"RestartDaemon", message = %s);
|
||||
Err(s.as_str())?
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
match error_message {
|
||||
Some(m) => tracing::error!(cmd = ?self.cmd, ?next_step, message = %m),
|
||||
None => tracing::info!(cmd = ?self.cmd, ?next_step),
|
||||
}
|
||||
|
||||
match next_step {
|
||||
SleepAndRestart => t.after(delay, |t| self.start(t)),
|
||||
SignalSuccessfulCompletion => {
|
||||
|
@ -213,10 +222,11 @@ impl DaemonInstance {
|
|||
Ok(s) => AnyValue::new(s),
|
||||
Err(_) => AnyValue::bytestring(buf),
|
||||
};
|
||||
let now = AnyValue::new(chrono::Utc::now().to_rfc3339());
|
||||
facet.activate(Account::new(tracing::Span::current()),
|
||||
enclose!((pid, service, kind) |t| {
|
||||
log_ds.message(t, &(), &syndicate_macros::template!(
|
||||
"<log {
|
||||
"<log =now {
|
||||
pid: =pid,
|
||||
service: =service,
|
||||
stream: =kind,
|
||||
|
@ -236,12 +246,12 @@ impl DaemonInstance {
|
|||
let mut child = match self.cmd.spawn() {
|
||||
Ok(child) => child,
|
||||
Err(e) => {
|
||||
tracing::info!(spawn_err = ?e);
|
||||
tracing::debug!(spawn_err = ?e);
|
||||
return self.handle_exit(t, Some(format!("{}", e)));
|
||||
}
|
||||
};
|
||||
let pid = child.id();
|
||||
tracing::info!(?pid, cmd = ?self.cmd, "started");
|
||||
tracing::debug!(?pid, cmd = ?self.cmd, "started");
|
||||
|
||||
let facet = t.facet.clone();
|
||||
|
||||
|
@ -257,7 +267,7 @@ impl DaemonInstance {
|
|||
enclose!((facet) async move {
|
||||
tracing::trace!("waiting for process exit");
|
||||
let status = child.wait().await?;
|
||||
tracing::info!(?status);
|
||||
tracing::debug!(?status);
|
||||
facet.activate(Account::new(syndicate::name!("instance-terminated")), |t| {
|
||||
let m = if status.success() { None } else { Some(format!("{}", status)) };
|
||||
self.handle_exit(t, m)
|
||||
|
|
|
@ -13,11 +13,11 @@ license = "Apache-2.0"
|
|||
vendored-openssl = ["openssl/vendored"]
|
||||
|
||||
[build-dependencies]
|
||||
preserves-schema = "2.1.1"
|
||||
preserves-schema = "2.2.0"
|
||||
|
||||
[dependencies]
|
||||
preserves = "2.1.1"
|
||||
preserves-schema = "2.1.1"
|
||||
preserves = "2.2.0"
|
||||
preserves-schema = "2.2.0"
|
||||
|
||||
tokio = { version = "1.10", features = ["io-util", "macros", "rt", "rt-multi-thread", "time"] }
|
||||
tokio-util = "0.6"
|
||||
|
|
Loading…
Reference in New Issue