diff --git a/Cargo.lock b/Cargo.lock index 4959813..7465677 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1098,9 +1098,9 @@ checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" [[package]] name = "preserves" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c604be1e4ddac999d7c3d81ca9f7912a33a1234f5394fb902315f8cee25645cd" +checksum = "87e17ecd8e57af358d013c7f9cc30078a86bbba63eac64e73e1c9b9651304eb8" dependencies = [ "base64", "dtoa", @@ -1112,9 +1112,9 @@ dependencies = [ [[package]] name = "preserves-schema" -version = "2.3.1" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1ae4749701948512784501bcc48bfee33e8511714fac9c3acfe7fabb1d7fbdf" +checksum = "758fe4013a89071fafdc71ba79bd919f68ec9baadaa77db78b4a41c21cf8c606" dependencies = [ "convert_case", "glob", @@ -1318,9 +1318,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "security-framework" -version = "2.4.2" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "525bc1abfda2e1998d152c45cf13e696f76d0a4972310b22fac1658b05df7c87" +checksum = "d09d3c15d814eda1d6a836f2f2b56a6abc1446c8a34351cb3180d3db92ffe4ce" dependencies = [ "bitflags", "core-foundation", @@ -1331,9 +1331,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.4.2" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9dd14d83160b528b7bfd66439110573efcfbe281b17fc2ca9f39f550d619c7e" +checksum = "e90dd10c41c6bfc633da6e0c659bd25d31e0791e5974ac42970267d59eba87f7" dependencies = [ "core-foundation-sys", "libc", @@ -1386,9 +1386,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.74" +version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee2bb9cd061c5865d345bb02ca49fcef1391741b672b54a0bf7b679badec3142" +checksum = "c059c05b48c5c0067d4b4b2b4f0732dd65feb52daf7e0ea09cd87e7dadc1af79" dependencies = [ "itoa 1.0.1", "ryu", @@ -1459,9 +1459,9 @@ checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" [[package]] name = "structopt" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40b9788f4202aa75c240ecc9c15c65185e6a39ccdeb0fd5d008b98825464c87c" +checksum = "0c6b5c64445ba8094a6ab0c3cd2ad323e07171012d9c98b0b15651daf1787a10" dependencies = [ "clap", "lazy_static", @@ -1890,9 +1890,9 @@ dependencies = [ [[package]] name = "wasi" -version = "0.10.2+wasi-snapshot-preview1" +version = "0.10.3+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +checksum = "46a2e384a3f170b0c7543787a91411175b71afd56ba4d3a0ae5678d4e2243c0e" [[package]] name = "wasm-bindgen" diff --git a/syndicate-macros/examples/box-and-client.rs b/syndicate-macros/examples/box-and-client.rs index c1c3124..fcbb4cc 100644 --- a/syndicate-macros/examples/box-and-client.rs +++ b/syndicate-macros/examples/box-and-client.rs @@ -8,11 +8,11 @@ use syndicate::value::NestedValue; #[tokio::main] async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; - Actor::new(None).boot(tracing::Span::current(), |t| { + Actor::top(None, |t| { let ds = Cap::new(&t.create(Dataspace::new(None))); let _ = t.prevent_inert_check(); - t.spawn(syndicate::name!("box"), enclose!((ds) move |t| { + t.spawn(Some(AnyValue::symbol("box")), enclose!((ds) move |t| { let current_value = t.named_field("current_value", 0u64); t.dataflow({ @@ -49,7 +49,7 @@ async fn main() -> Result<(), Box> { Ok(()) })); - t.spawn(syndicate::name!("client"), enclose!((ds) move |t| { + t.spawn(Some(AnyValue::symbol("client")), enclose!((ds) move |t| { let box_state_handler = syndicate::entity(0u32) .on_asserted(enclose!((ds) move |count, t, captures: AnyValue| { *count = *count + 1; diff --git a/syndicate-server/examples/consumer.rs b/syndicate-server/examples/consumer.rs index cf20836..1509a1b 100644 --- a/syndicate-server/examples/consumer.rs +++ b/syndicate-server/examples/consumer.rs @@ -26,7 +26,7 @@ async fn main() -> Result<(), Box> { let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - Actor::new(None).boot(syndicate::name!("consumer"), |t| { + Actor::top(None, |t| { relay::connect_stream(t, i, o, false, sturdyref, (), |_state, t, ds| { let consumer = syndicate::entity(0) .on_message(|message_count, _t, m: AnyValue| { @@ -44,13 +44,14 @@ async fn main() -> Result<(), Box> { observer: Arc::clone(&consumer), }); - t.linked_task(syndicate::name!("tick"), async move { + t.linked_task(Some(AnyValue::symbol("tick")), async move { let mut stats_timer = interval(Duration::from_secs(1)); loop { stats_timer.tick().await; let consumer = Arc::clone(&consumer); external_event(&Arc::clone(&consumer.underlying.mailbox), - &Account::new(syndicate::name!("account")), + None, + &Account::new(None, None), Box::new(move |t| t.with_entity( &consumer.underlying, |t, e| e.message(t, AnyValue::new(true)))))?; diff --git a/syndicate-server/examples/pingpong.rs b/syndicate-server/examples/pingpong.rs index 3a39801..0798c35 100644 --- a/syndicate-server/examples/pingpong.rs +++ b/syndicate-server/examples/pingpong.rs @@ -93,7 +93,7 @@ async fn main() -> Result<(), Box> { let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - Actor::new(None).boot(syndicate::name!("pingpong"), |t| { + Actor::top(None, |t| { relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| { let (send_label, recv_label, report_latency_every, should_echo, bytes_padding) = @@ -172,13 +172,14 @@ async fn main() -> Result<(), Box> { observer: Arc::clone(&consumer), }); - t.linked_task(syndicate::name!("tick"), async move { + t.linked_task(Some(AnyValue::symbol("tick")), async move { let mut stats_timer = interval(Duration::from_secs(1)); loop { stats_timer.tick().await; let consumer = Arc::clone(&consumer); external_event(&Arc::clone(&consumer.underlying.mailbox), - &Account::new(syndicate::name!("account")), + None, + &Account::new(None, None), Box::new(move |t| t.with_entity( &consumer.underlying, |t, e| e.message(t, AnyValue::new(true)))))?; @@ -189,7 +190,7 @@ async fn main() -> Result<(), Box> { let turn_count = c.turn_count; let action_count = c.action_count; let account = Arc::clone(t.account()); - t.linked_task(syndicate::name!("boot-ping"), async move { + t.linked_task(Some(AnyValue::symbol("boot-ping")), async move { let padding = AnyValue::bytestring(vec![0; bytes_padding]); for _ in 0..turn_count { let mut events: PendingEventQueue = vec![]; @@ -203,7 +204,7 @@ async fn main() -> Result<(), Box> { &ds.underlying, |t, e| e.message(t, current_rec)))); } - external_events(&ds.underlying.mailbox, &account, events)? + external_events(&ds.underlying.mailbox, None, &account, events)? } Ok(LinkedTaskTermination::KeepFacet) }); diff --git a/syndicate-server/examples/producer.rs b/syndicate-server/examples/producer.rs index 72e9575..d41032d 100644 --- a/syndicate-server/examples/producer.rs +++ b/syndicate-server/examples/producer.rs @@ -2,9 +2,10 @@ use structopt::StructOpt; use syndicate::actor::*; use syndicate::enclose; +use syndicate::preserves::rec; use syndicate::relay; use syndicate::sturdy; -use syndicate::value::Value; +use syndicate::value::NestedValue; use tokio::net::TcpStream; @@ -20,35 +21,30 @@ pub struct Config { dataspace: String, } -#[inline] -fn says(who: AnyValue, what: AnyValue) -> AnyValue { - let mut r = Value::simple_record("Says", 2); - r.fields_vec_mut().push(who); - r.fields_vec_mut().push(what); - r.finish().wrap() -} - #[tokio::main] async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - Actor::new(None).boot(syndicate::name!("producer"), |t| { + Actor::top(None, |t| { relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| { - let padding: AnyValue = Value::ByteString(vec![0; config.bytes_padding]).wrap(); + let padding = AnyValue::new(&vec![0u8; config.bytes_padding][..]); let action_count = config.action_count; - let account = Account::new(syndicate::name!("account")); - t.linked_task(syndicate::name!("sender"), async move { + let account = Account::new(None, None); + t.linked_task(Some(AnyValue::symbol("sender")), async move { loop { account.ensure_clear_funds().await; let mut events: PendingEventQueue = Vec::new(); for _ in 0..action_count { events.push(Box::new(enclose!((ds, padding) move |t| t.with_entity( &ds.underlying, |t, e| e.message( - t, says(Value::from("producer").wrap(), padding)))))); + t, + rec![AnyValue::symbol("Says"), + AnyValue::new("producer"), + padding]))))); } - external_events(&ds.underlying.mailbox, &account, events)?; + external_events(&ds.underlying.mailbox, None, &account, events)?; } }); Ok(None) diff --git a/syndicate-server/examples/state-consumer.rs b/syndicate-server/examples/state-consumer.rs index 8ebd8bf..d5fe8dd 100644 --- a/syndicate-server/examples/state-consumer.rs +++ b/syndicate-server/examples/state-consumer.rs @@ -26,7 +26,7 @@ async fn main() -> Result<(), Box> { let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - Actor::new(None).boot(syndicate::name!("state-consumer"), |t| { + Actor::top(None, |t| { relay::connect_stream(t, i, o, false, sturdyref, (), |_state, t, ds| { let consumer = { #[derive(Default)] @@ -65,13 +65,14 @@ async fn main() -> Result<(), Box> { observer: Arc::clone(&consumer), }); - t.linked_task(syndicate::name!("tick"), async move { + t.linked_task(Some(AnyValue::symbol("tick")), async move { let mut stats_timer = interval(Duration::from_secs(1)); loop { stats_timer.tick().await; let consumer = Arc::clone(&consumer); external_event(&Arc::clone(&consumer.underlying.mailbox), - &Account::new(syndicate::name!("account")), + None, + &Account::new(None, None), Box::new(move |t| t.with_entity( &consumer.underlying, |t, e| e.message(t, AnyValue::new(true)))))?; diff --git a/syndicate-server/examples/state-producer.rs b/syndicate-server/examples/state-producer.rs index 1f86d9a..48cdc5e 100644 --- a/syndicate-server/examples/state-producer.rs +++ b/syndicate-server/examples/state-producer.rs @@ -4,9 +4,10 @@ use structopt::StructOpt; use syndicate::actor::*; use syndicate::enclose; +use syndicate::preserves::rec; use syndicate::relay; use syndicate::sturdy; -use syndicate::value::Value; +use syndicate::value::NestedValue; use tokio::net::TcpStream; @@ -22,23 +23,21 @@ async fn main() -> Result<(), Box> { let config = Config::from_args(); let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - Actor::new(None).boot(syndicate::name!("state-producer"), |t| { + Actor::top(None, |t| { relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| { - let account = Account::new(syndicate::name!("account")); - t.linked_task(syndicate::name!("sender"), async move { - let presence: AnyValue = Value::simple_record1( - "Present", - Value::from(std::process::id()).wrap()).wrap(); + let account = Account::new(None, None); + t.linked_task(Some(AnyValue::symbol("sender")), async move { + let presence = rec![AnyValue::symbol("Present"), AnyValue::new(std::process::id())]; let handle = syndicate::actor::next_handle(); let assert_e = || { external_event( - &Arc::clone(&ds.underlying.mailbox), &account, Box::new(enclose!( + &Arc::clone(&ds.underlying.mailbox), None, &account, Box::new(enclose!( (ds, presence, handle) move |t| t.with_entity( &ds.underlying, |t, e| e.assert(t, presence, handle))))) }; let retract_e = || { external_event( - &Arc::clone(&ds.underlying.mailbox), &account, Box::new(enclose!( + &Arc::clone(&ds.underlying.mailbox), None, &account, Box::new(enclose!( (ds, handle) move |t| t.with_entity( &ds.underlying, |t, e| e.retract(t, handle))))) }; diff --git a/syndicate-server/src/dependencies.rs b/syndicate-server/src/dependencies.rs index f1ba67e..d8d2ddd 100644 --- a/syndicate-server/src/dependencies.rs +++ b/syndicate-server/src/dependencies.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use syndicate::actor::*; use syndicate::during::entity; use syndicate::enclose; +use syndicate::preserves::rec; use syndicate::schemas::dataspace::Observe; use syndicate::schemas::service; use syndicate::value::NestedValue; @@ -16,10 +17,10 @@ use crate::schemas::internal_services; use syndicate_macros::during; pub fn boot(t: &mut Activation, ds: Arc) { - t.spawn(syndicate::name!("dependencies"), move |t| { + t.spawn(Some(AnyValue::symbol("dependencies_listener")), move |t| { Ok(during!(t, ds, language(), , |t: &mut Activation| { tracing::debug!(?spec, "tracking dependencies"); - t.spawn_link(syndicate::name!(parent: None, "dependencies", spec = ?spec), + t.spawn_link(Some(rec![AnyValue::symbol("dependencies"), language().unparse(&spec)]), enclose!((ds) |t| run(t, ds, spec))); Ok(()) })) diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index e5d8c10..ac68d87 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -1,5 +1,6 @@ use preserves_schema::Codec; +use std::io; use std::path::PathBuf; use std::sync::Arc; @@ -11,6 +12,7 @@ use syndicate::enclose; use syndicate::relay; use syndicate::schemas::service; use syndicate::schemas::transport_address; +use syndicate::trace; use syndicate::value::Map; use syndicate::value::NestedValue; @@ -50,6 +52,9 @@ struct ServerConfig { #[structopt(long)] no_banner: bool, + + #[structopt(short = "t", long)] + trace_file: Option, } #[tokio::main] @@ -83,13 +88,18 @@ async fn main() -> Result<(), Box> { tracing::trace!("startup"); - Actor::new(None).boot(tracing::Span::current(), move |t| { - let server_config_ds = Cap::new(&t.create(Dataspace::new(Some(syndicate::name!("config"))))); - let log_ds = Cap::new(&t.create(Dataspace::new(Some(syndicate::name!("log"))))); + let trace_collector = config.trace_file.clone().map( + |p| Ok::(trace::TraceCollector::ascii( + io::BufWriter::new(std::fs::File::create(p)?)))) + .transpose()?; + + Actor::top(trace_collector, move |t| { + let server_config_ds = Cap::new(&t.create(Dataspace::new(Some(AnyValue::symbol("config"))))); + let log_ds = Cap::new(&t.create(Dataspace::new(Some(AnyValue::symbol("log"))))); if config.inferior { tracing::info!("inferior server instance"); - t.spawn(syndicate::name!("parent"), enclose!((server_config_ds) move |t| { + t.spawn(Some(AnyValue::symbol("parent")), enclose!((server_config_ds) move |t| { protocol::run_io_relay(t, relay::Input::Bytes(Box::pin(tokio::io::stdin())), relay::Output::Bytes(Box::pin(tokio::io::stdout())), @@ -154,7 +164,7 @@ async fn main() -> Result<(), Box> { }); } - t.spawn(tracing::Span::current(), enclose!((log_ds) move |t| { + t.spawn(Some(AnyValue::symbol("logger")), enclose!((log_ds) move |t| { let n_unknown: AnyValue = AnyValue::symbol("-"); let n_pid: AnyValue = AnyValue::symbol("pid"); let n_line: AnyValue = AnyValue::symbol("line"); diff --git a/syndicate-server/src/protocol.rs b/syndicate-server/src/protocol.rs index 3b439cc..5187d96 100644 --- a/syndicate-server/src/protocol.rs +++ b/syndicate-server/src/protocol.rs @@ -9,6 +9,7 @@ use syndicate::actor::*; use syndicate::error::Error; use syndicate::error::error; use syndicate::relay; +use syndicate::trace; use syndicate::value::NestedValue; use tokio::net::TcpStream; @@ -37,16 +38,19 @@ pub fn run_io_relay( } pub fn run_connection( + trace_collector: Option, facet: FacetRef, i: relay::Input, o: relay::Output, initial_ref: Arc, ) { - facet.activate(Account::new(syndicate::name!("start-session")), - |t| run_io_relay(t, i, o, initial_ref)); + let cause = trace_collector.as_ref().map(|_| trace::TurnCause::external("start-session")); + let account = Account::new(Some(AnyValue::symbol("start-session")), trace_collector); + facet.activate(&account, cause, |t| run_io_relay(t, i, o, initial_ref)); } pub async fn detect_protocol( + trace_collector: Option, facet: FacetRef, stream: TcpStream, gateway: Arc, @@ -76,7 +80,7 @@ pub async fn detect_protocol( _ => unreachable!() } }; - run_connection(facet, i, o, gateway); + run_connection(trace_collector, facet, i, o, gateway); Ok(()) } diff --git a/syndicate-server/src/services/config_watcher.rs b/syndicate-server/src/services/config_watcher.rs index 21e144d..3ce7d16 100644 --- a/syndicate-server/src/services/config_watcher.rs +++ b/syndicate-server/src/services/config_watcher.rs @@ -3,6 +3,8 @@ use notify::Watcher; use notify::RecursiveMode; use notify::watcher; +use syndicate::preserves::rec; + use std::fs; use std::future; use std::io; @@ -16,6 +18,7 @@ use syndicate::actor::*; use syndicate::error::Error; use syndicate::enclose; use syndicate::supervise::{Supervisor, SupervisorConfiguration}; +use syndicate::trace; use syndicate::value::BinarySource; use syndicate::value::BytesBinarySource; use syndicate::value::Map; @@ -32,11 +35,11 @@ use crate::script; use syndicate_macros::during; pub fn on_demand(t: &mut Activation, config_ds: Arc) { - t.spawn(syndicate::name!("config_watcher"), move |t| { + t.spawn(Some(AnyValue::symbol("config_watcher")), move |t| { Ok(during!(t, config_ds, language(), , |t| { Supervisor::start( t, - syndicate::name!(parent: None, "config", path = ?spec.path), + Some(rec![AnyValue::symbol("config"), AnyValue::new(spec.path.clone())]), SupervisorConfiguration::default(), enclose!((config_ds, spec) lifecycle::updater(config_ds, spec)), enclose!((config_ds) move |t| enclose!((config_ds, spec) run(t, config_ds, spec)))) @@ -175,26 +178,32 @@ fn run( watcher.watch(&env.path, RecursiveMode::Recursive).map_err(convert_notify_error)?; let facet = t.facet.clone(); + let trace_collector = t.trace_collector(); let span = tracing::Span::current(); thread::spawn(move || { let _entry = span.enter(); let mut path_state: Map = Map::new(); - if !facet.activate( - Account::new(syndicate::name!("initial_scan")), - |t| { - initial_scan(t, &mut path_state, &config_ds, env.clone()); - config_ds.assert(t, language(), &lifecycle::ready(&spec)); - Ok(()) - }) { - return; + let cause = trace_collector.as_ref().map(|_| trace::TurnCause::external("initial_scan")); + let account = Account::new(Some(AnyValue::symbol("initial_scan")), trace_collector.clone()); + if !facet.activate( + &account, cause, |t| { + initial_scan(t, &mut path_state, &config_ds, env.clone()); + config_ds.assert(t, language(), &lifecycle::ready(&spec)); + Ok(()) + }) + { + return; + } } tracing::trace!("initial_scan complete"); let mut rescan = |paths: Vec| { - facet.activate(Account::new(syndicate::name!("rescan")), |t| { + let cause = trace_collector.as_ref().map(|_| trace::TurnCause::external("rescan")); + let account = Account::new(Some(AnyValue::symbol("rescan")), trace_collector.clone()); + facet.activate(&account, cause, |t| { let mut to_stop = Vec::new(); for path in paths.into_iter() { let maybe_facet_id = path_state.remove(&path); @@ -236,15 +245,19 @@ fn run( if !keep_running { break; } } - facet.activate(Account::new(syndicate::name!("termination")), |t| { - tracing::trace!("linked thread terminating associated facet"); - Ok(t.stop()) - }); + { + let cause = trace_collector.as_ref().map(|_| trace::TurnCause::external("termination")); + let account = Account::new(Some(AnyValue::symbol("termination")), trace_collector); + facet.activate(&account, cause, |t| { + tracing::trace!("linked thread terminating associated facet"); + Ok(t.stop()) + }); + } tracing::trace!("linked thread done"); }); - t.linked_task(syndicate::name!("cancel-wait"), async move { + t.linked_task(Some(AnyValue::symbol("cancel-wait")), async move { future::pending::<()>().await; drop(watcher); Ok(LinkedTaskTermination::KeepFacet) diff --git a/syndicate-server/src/services/daemon.rs b/syndicate-server/src/services/daemon.rs index dca5ffb..cc10174 100644 --- a/syndicate-server/src/services/daemon.rs +++ b/syndicate-server/src/services/daemon.rs @@ -4,8 +4,10 @@ use std::sync::Arc; use syndicate::actor::*; use syndicate::enclose; +use syndicate::preserves::rec; use syndicate::schemas::service; use syndicate::supervise::{Supervisor, SupervisorConfiguration}; +use syndicate::trace; use syndicate::value::NestedValue; use tokio::io::AsyncRead; @@ -21,7 +23,7 @@ use crate::schemas::external_services::*; use syndicate_macros::during; pub fn on_demand(t: &mut Activation, config_ds: Arc, root_ds: Arc) { - t.spawn(syndicate::name!("daemon"), move |t| { + t.spawn(Some(AnyValue::symbol("daemon_listener")), move |t| { Ok(during!(t, config_ds, language(), , enclose!((config_ds, root_ds) move |t: &mut Activation| { supervise_daemon(t, config_ds, root_ds, spec) @@ -46,7 +48,7 @@ fn supervise_daemon( })); Supervisor::start( t, - syndicate::name!(parent: None, "daemon", id = ?spec.id), + Some(language().unparse(&spec)), SupervisorConfiguration::on_error_only(), enclose!((config_ds, spec) lifecycle::updater(config_ds, spec)), enclose!((config_ds, root_ds) move |t| @@ -162,7 +164,6 @@ struct DaemonInstance { config_ds: Arc, log_ds: Arc, service: AnyValue, - name: tracing::Span, cmd: process::Command, announce_presumed_readiness: bool, unready_configs: Arc>, @@ -236,9 +237,13 @@ impl DaemonInstance { Some(n) => AnyValue::new(n), None => AnyValue::symbol("unknown"), }; - t.spawn(syndicate::name!(parent: self.name.clone(), "log"), move |t| { - t.linked_task(tracing::Span::current(), async move { + let trace_collector = t.trace_collector(); + t.spawn(Some(rec![AnyValue::symbol("log"), kind.clone(), self.service.clone()]), move |t| { + t.linked_task(None, async move { let mut r = BufReader::new(r); + let cause = trace_collector.as_ref().map( + |_| trace::TurnCause::external(kind.value().as_symbol().unwrap())); + let account = Account::new(None, trace_collector); loop { let mut buf = Vec::new(); if r.read_until(b'\n', &mut buf).await? == 0 { @@ -250,8 +255,7 @@ impl DaemonInstance { }; let now = AnyValue::new(chrono::Utc::now().to_rfc3339()); if !facet.activate( - Account::new(tracing::Span::current()), - enclose!((pid, service, kind) |t| { + &account, cause.clone(), enclose!((pid, service, kind) |t| { log_ds.message(t, &(), &syndicate_macros::template!( ", { - enclose!((spec, config_ds, root_ds, unready_configs, completed_processes) + enclose!((spec, config_ds, root_ds, unready_configs, completed_processes, trace_collector) |t: &mut Activation| { tracing::debug!(?config, "new config"); counter::adjust(t, &unready_configs, 1); @@ -391,7 +400,7 @@ fn run( tracing::info!(?config); let config = config.elaborate(); let facet = t.facet.clone(); - t.linked_task(syndicate::name!("subprocess"), async move { + t.linked_task(Some(AnyValue::symbol("subprocess")), async move { let mut cmd = config.process.build_command().ok_or("Cannot start daemon process")?; let announce_presumed_readiness = match config.ready_on_start { @@ -432,7 +441,6 @@ fn run( config_ds, log_ds: root_ds, service: spec, - name: tracing::Span::current(), cmd, announce_presumed_readiness, unready_configs, @@ -441,7 +449,10 @@ fn run( protocol, }; - facet.activate(Account::new(syndicate::name!("instance-startup")), |t| { + let cause = trace_collector.as_ref().map( + |_| trace::TurnCause::external("instance-startup")); + let account = Account::new(Some(AnyValue::symbol("instance-startup")), trace_collector); + facet.activate(&account, cause, |t| { daemon_instance.start(t) }); Ok(LinkedTaskTermination::KeepFacet) diff --git a/syndicate-server/src/services/debt_reporter.rs b/syndicate-server/src/services/debt_reporter.rs index cdd9a9c..8836f31 100644 --- a/syndicate-server/src/services/debt_reporter.rs +++ b/syndicate-server/src/services/debt_reporter.rs @@ -1,7 +1,11 @@ +use preserves_schema::Codec; + use std::sync::Arc; use syndicate::actor::*; use syndicate::enclose; +use syndicate::preserves::rec; +use syndicate::preserves::value::NestedValue; use crate::language::language; use crate::lifecycle; @@ -10,9 +14,10 @@ use crate::schemas::internal_services::DebtReporter; use syndicate_macros::during; pub fn on_demand(t: &mut Activation, ds: Arc) { - t.spawn(syndicate::name!("debt_reporter"), move |t| { + t.spawn(Some(AnyValue::symbol("debt_reporter_listener")), move |t| { Ok(during!(t, ds, language(), , |t: &mut Activation| { - t.spawn_link(tracing::Span::current(), enclose!((ds) |t| run(t, ds, spec))); + t.spawn_link(Some(rec![AnyValue::symbol("debt_reporter"), language().unparse(&spec)]), + enclose!((ds) |t| run(t, ds, spec))); Ok(()) })) }); @@ -23,8 +28,7 @@ fn run(t: &mut Activation, ds: Arc, spec: DebtReporter) -> ActorResult { ds.assert(t, language(), &lifecycle::ready(&spec)); t.every(core::time::Duration::from_millis((spec.interval_seconds.0 * 1000.0) as u64), |_t| { for (id, (name, debt)) in syndicate::actor::ACCOUNTS.read().iter() { - let _enter = name.enter(); - tracing::info!(id, debt = ?debt.load(std::sync::atomic::Ordering::Relaxed)); + tracing::info!(id, ?name, debt = ?debt.load(std::sync::atomic::Ordering::Relaxed)); } Ok(()) }) diff --git a/syndicate-server/src/services/milestone.rs b/syndicate-server/src/services/milestone.rs index 91eed9f..1f2198b 100644 --- a/syndicate-server/src/services/milestone.rs +++ b/syndicate-server/src/services/milestone.rs @@ -1,7 +1,10 @@ +use preserves_schema::Codec; + use std::sync::Arc; use syndicate::actor::*; use syndicate::enclose; +use syndicate::preserves::value::NestedValue; use syndicate::supervise::{Supervisor, SupervisorConfiguration}; use crate::language::language; @@ -11,11 +14,11 @@ use crate::schemas::internal_services::Milestone; use syndicate_macros::during; pub fn on_demand(t: &mut Activation, ds: Arc) { - t.spawn(syndicate::name!("milestone"), move |t| { + t.spawn(Some(AnyValue::symbol("milestone_listener")), move |t| { Ok(during!(t, ds, language(), , |t: &mut Activation| { Supervisor::start( t, - syndicate::name!(parent: None, "milestone", name = ?spec.name), + Some(language().unparse(&spec)), SupervisorConfiguration::default(), |_, _| Ok(()), enclose!((ds) move |t| enclose!((ds, spec) run(t, ds, spec)))) diff --git a/syndicate-server/src/services/tcp_relay_listener.rs b/syndicate-server/src/services/tcp_relay_listener.rs index f1bad04..04375f2 100644 --- a/syndicate-server/src/services/tcp_relay_listener.rs +++ b/syndicate-server/src/services/tcp_relay_listener.rs @@ -1,9 +1,14 @@ +use preserves_schema::Codec; + use std::convert::TryFrom; use std::sync::Arc; use syndicate::actor::*; use syndicate::enclose; +use syndicate::preserves::rec; +use syndicate::preserves::value::NestedValue; use syndicate::supervise::{Supervisor, SupervisorConfiguration}; +use syndicate::trace; use tokio::net::TcpListener; @@ -15,11 +20,11 @@ use crate::schemas::internal_services::TcpRelayListener; use syndicate_macros::during; pub fn on_demand(t: &mut Activation, ds: Arc) { - t.spawn(syndicate::name!("tcp_relay_listener"), move |t| { + t.spawn(Some(AnyValue::symbol("tcp_relay_listener")), move |t| { Ok(during!(t, ds, language(), , |t| { Supervisor::start( t, - syndicate::name!(parent: None, "relay", addr = ?spec), + Some(rec![AnyValue::symbol("relay"), language().unparse(&spec)]), SupervisorConfiguration::default(), enclose!((ds, spec) lifecycle::updater(ds, spec)), enclose!((ds) move |t| enclose!((ds, spec) run(t, ds, spec)))) @@ -31,40 +36,45 @@ fn run(t: &mut Activation, ds: Arc, spec: TcpRelayListener) -> ActorResult lifecycle::terminate_on_service_restart(t, &ds, &spec); let host = spec.addr.host.clone(); let port = u16::try_from(&spec.addr.port).map_err(|_| "Invalid TCP port number")?; - let parent_span = tracing::Span::current(); let facet = t.facet.clone(); - t.linked_task(syndicate::name!("listener"), async move { + let trace_collector = t.trace_collector(); + t.linked_task(Some(AnyValue::symbol("listener")), async move { let listen_addr = format!("{}:{}", host, port); let listener = TcpListener::bind(listen_addr).await?; - if !facet.activate( - Account::new(syndicate::name!("readiness")), |t| { - tracing::info!("listening"); - ds.assert(t, language(), &lifecycle::ready(&spec)); - Ok(()) - }) { - return Ok(LinkedTaskTermination::Normal); + let cause = trace_collector.as_ref().map(|_| trace::TurnCause::external("readiness")); + let account = Account::new(Some(AnyValue::symbol("readiness")), trace_collector.clone()); + if !facet.activate( + &account, cause, |t| { + tracing::info!("listening"); + ds.assert(t, language(), &lifecycle::ready(&spec)); + Ok(()) + }) + { + return Ok(LinkedTaskTermination::Normal); + } } loop { let (stream, addr) = listener.accept().await?; let gatekeeper = spec.gatekeeper.clone(); - let name = syndicate::name!(parent: parent_span.clone(), "conn"); + let name = Some(rec![AnyValue::symbol("tcp"), AnyValue::new(format!("{}", &addr))]); + let cause = trace_collector.as_ref().map(|_| trace::TurnCause::external("connect")); + let account = Account::new(name.clone(), trace_collector.clone()); if !facet.activate( - Account::new(name.clone()), - move |t| { + &account, cause, enclose!((trace_collector) move |t| { t.spawn(name, move |t| { - Ok(t.linked_task(tracing::Span::current(), { + Ok(t.linked_task(None, { let facet = t.facet.clone(); async move { - detect_protocol(facet, stream, gatekeeper, addr).await?; + detect_protocol(trace_collector, facet, stream, gatekeeper, addr).await?; Ok(LinkedTaskTermination::KeepFacet) } })) }); Ok(()) - }) + })) { return Ok(LinkedTaskTermination::Normal); } diff --git a/syndicate-server/src/services/unix_relay_listener.rs b/syndicate-server/src/services/unix_relay_listener.rs index 4efc1f2..a36391f 100644 --- a/syndicate-server/src/services/unix_relay_listener.rs +++ b/syndicate-server/src/services/unix_relay_listener.rs @@ -1,3 +1,5 @@ +use preserves_schema::Codec; + use std::io; use std::path::PathBuf; use std::sync::Arc; @@ -5,8 +7,11 @@ use std::sync::Arc; use syndicate::actor::*; use syndicate::enclose; use syndicate::error::Error; +use syndicate::preserves::rec; +use syndicate::preserves::value::NestedValue; use syndicate::relay; use syndicate::supervise::{Supervisor, SupervisorConfiguration}; +use syndicate::trace; use tokio::net::UnixListener; use tokio::net::UnixStream; @@ -19,11 +24,11 @@ use crate::schemas::internal_services::UnixRelayListener; use syndicate_macros::during; pub fn on_demand(t: &mut Activation, ds: Arc) { - t.spawn(syndicate::name!("unix_relay_listener"), move |t| { + t.spawn(Some(AnyValue::symbol("unix_relay_listener")), move |t| { Ok(during!(t, ds, language(), , |t| { Supervisor::start( t, - syndicate::name!(parent: None, "relay", addr = ?spec), + Some(rec![AnyValue::symbol("relay"), language().unparse(&spec)]), SupervisorConfiguration::default(), enclose!((ds, spec) lifecycle::updater(ds, spec)), enclose!((ds) move |t| enclose!((ds, spec) run(t, ds, spec)))) @@ -34,38 +39,44 @@ pub fn on_demand(t: &mut Activation, ds: Arc) { fn run(t: &mut Activation, ds: Arc, spec: UnixRelayListener) -> ActorResult { lifecycle::terminate_on_service_restart(t, &ds, &spec); let path_str = spec.addr.path.clone(); - let parent_span = tracing::Span::current(); let facet = t.facet.clone(); - t.linked_task(syndicate::name!("listener"), async move { + let trace_collector = t.trace_collector(); + t.linked_task(Some(AnyValue::symbol("listener")), async move { let listener = bind_unix_listener(&PathBuf::from(path_str)).await?; - if !facet.activate( - Account::new(syndicate::name!("readiness")), |t| { - tracing::info!("listening"); - ds.assert(t, language(), &lifecycle::ready(&spec)); - Ok(()) - }) { - return Ok(LinkedTaskTermination::Normal); + let cause = trace_collector.as_ref().map(|_| trace::TurnCause::external("readiness")); + let account = Account::new(Some(AnyValue::symbol("readiness")), trace_collector.clone()); + if !facet.activate( + &account, cause, |t| { + tracing::info!("listening"); + ds.assert(t, language(), &lifecycle::ready(&spec)); + Ok(()) + }) + { + return Ok(LinkedTaskTermination::Normal); + } } loop { let (stream, _addr) = listener.accept().await?; let peer = stream.peer_cred()?; let gatekeeper = spec.gatekeeper.clone(); - let name = syndicate::name!(parent: parent_span.clone(), "conn", - pid = ?peer.pid().unwrap_or(-1), - uid = peer.uid()); + let name = Some(rec![AnyValue::symbol("unix"), + AnyValue::new(peer.pid().unwrap_or(-1)), + AnyValue::new(peer.uid())]); + let cause = trace_collector.as_ref().map(|_| trace::TurnCause::external("connect")); + let account = Account::new(name.clone(), trace_collector.clone()); if !facet.activate( - Account::new(name.clone()), - move |t| { + &account, cause, enclose!((trace_collector) move |t| { t.spawn(name, |t| { - Ok(t.linked_task(tracing::Span::current(), { + Ok(t.linked_task(None, { let facet = t.facet.clone(); async move { tracing::info!(protocol = %"unix"); let (i, o) = stream.into_split(); - run_connection(facet, + run_connection(trace_collector, + facet, relay::Input::Bytes(Box::pin(i)), relay::Output::Bytes(Box::pin(o)), gatekeeper); @@ -74,7 +85,7 @@ fn run(t: &mut Activation, ds: Arc, spec: UnixRelayListener) -> ActorResult })) }); Ok(()) - }) + })) { return Ok(LinkedTaskTermination::Normal); } diff --git a/syndicate/benches/bench_dataspace.rs b/syndicate/benches/bench_dataspace.rs index ce59a4f..5095069 100644 --- a/syndicate/benches/bench_dataspace.rs +++ b/syndicate/benches/bench_dataspace.rs @@ -52,19 +52,19 @@ pub fn bench_pub(c: &mut Criterion) { b.iter_custom(|iters| { let start = Instant::now(); rt.block_on(async move { - Actor::new(None).boot(syndicate::name!("dataspace"), move |t| { + Actor::top(None, move |t| { let ds = t.create(Dataspace::new(None)); let shutdown = t.create(ShutdownEntity); - let account = Account::new(syndicate::name!("sender-account")); - t.linked_task(syndicate::name!("sender"), async move { + let account = Account::new(None, None); + t.linked_task(Some(AnyValue::symbol("sender")), async move { for _ in 0..iters { - external_event(&ds.mailbox, &account, Box::new( + external_event(&ds.mailbox, None, &account, Box::new( enclose!((ds) move |t| t.with_entity( &ds, |t, e| e.message(t, says(AnyValue::new("bench_pub"), Value::ByteString(vec![]).wrap()))))))? } - external_event(&shutdown.mailbox, &account, Box::new( + external_event(&shutdown.mailbox, None, &account, Box::new( enclose!((shutdown) move |t| t.with_entity( &shutdown, |t, e| e.message(t, AnyValue::new(true))))))?; @@ -83,7 +83,7 @@ pub fn bench_pub(c: &mut Criterion) { rt.block_on(async move { let turn_count = Arc::new(AtomicU64::new(0)); - Actor::new(None).boot(syndicate::name!("dataspace"), { + Actor::top(None, { let iters = iters.clone(); let turn_count = Arc::clone(&turn_count); @@ -103,7 +103,7 @@ pub fn bench_pub(c: &mut Criterion) { observer: shutdown, }); - t.spawn(syndicate::name!("consumer"), move |t| { + t.spawn(Some(AnyValue::symbol("consumer")), move |t| { struct Receiver(Arc); impl Entity for Receiver { fn message(&mut self, _t: &mut Activation, _m: AnyValue) -> ActorResult { @@ -139,10 +139,11 @@ pub fn bench_pub(c: &mut Criterion) { }); let account = Arc::clone(t.account()); - t.linked_task(syndicate::name!("sender"), async move { + t.linked_task(Some(AnyValue::symbol("sender")), async move { for _i in 0..iters { let ds = Arc::clone(&ds); - external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new( + external_event( + &Arc::clone(&ds.underlying.mailbox), None, &account, Box::new( move |t| t.with_entity( &ds.underlying, |t, e| e.message(t, says(AnyValue::new("bench_pub"), @@ -150,7 +151,8 @@ pub fn bench_pub(c: &mut Criterion) { } { let ds = Arc::clone(&ds); - external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new( + external_event( + &Arc::clone(&ds.underlying.mailbox), None, &account, Box::new( move |t| t.with_entity( &ds.underlying, |t, e| e.message(t, AnyValue::new(true)))))?; diff --git a/syndicate/benches/ring.rs b/syndicate/benches/ring.rs index 68bbc7e..018d1e7 100644 --- a/syndicate/benches/ring.rs +++ b/syndicate/benches/ring.rs @@ -7,6 +7,8 @@ use std::time::Duration; use std::time::Instant; use syndicate::actor::*; +use syndicate::preserves::rec; +use syndicate::value::NestedValue; use tokio::runtime::Runtime; @@ -88,14 +90,16 @@ pub fn bench_ring(c: &mut Criterion) { self.i += 1; let spawner_ref = Arc::clone(&self.self_ref); ACTORS_CREATED.fetch_add(1, Ordering::Relaxed); - t.spawn(syndicate::name!("forwarder", ?i), move |t| { - let _ = t.prevent_inert_check(); - let f = t.create(Forwarder { - next, + t.spawn( + Some(rec![AnyValue::symbol("forwarder"), AnyValue::new(i)]), + move |t| { + let _ = t.prevent_inert_check(); + let f = t.create(Forwarder { + next, + }); + t.message(&spawner_ref, f); + Ok(()) }); - t.message(&spawner_ref, f); - Ok(()) - }); } else { let mut c_state = Counter { start: Instant::now(), @@ -118,7 +122,7 @@ pub fn bench_ring(c: &mut Criterion) { } ACTORS_CREATED.fetch_add(1, Ordering::Relaxed); - Actor::new(None).boot(syndicate::name!("counter"), move |t| { + Actor::top(None, move |t| { let _ = t.prevent_inert_check(); let mut s = Spawner { self_ref: t.create_inert(), diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index 3e179d6..a7e5dc5 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -5,12 +5,16 @@ include_str!("../doc/linked-tasks.md"), )] +use crate::enclose; + use super::dataflow::Graph; use super::error::Error; use super::error::error; use super::rewrite::CaveatError; use super::rewrite::CheckedCaveat; +use super::schemas::protocol; use super::schemas::sturdy; +use super::trace; use parking_lot::Mutex; use parking_lot::RwLock; @@ -42,7 +46,7 @@ use tokio::sync::Notify; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; use tokio_util::sync::CancellationToken; -use tracing::Instrument; +// use tracing::Instrument; /// The type of messages and assertions that can be exchanged among /// distributed objects, including via [dataspace][crate::dataspace]. @@ -56,6 +60,9 @@ use tracing::Instrument; /// that language. pub type AnyValue = ArcValue>; +/// The type of the optional names attached to actors, tasks, and [`Account`]s. +pub type Name = Option; + /// The type of process-unique actor IDs. pub type ActorId = u64; @@ -211,9 +218,12 @@ pub trait Entity: Send { pub struct InertEntity; impl Entity for InertEntity {} +type ActionDescriber = Box trace::ActionDescription>; +type TracedAction = (Option, Action); + enum CleanupAction { - ForMyself(Action), - ForAnother(Arc, Action), + ForMyself(TracedAction), + ForAnother(Arc, TracedAction), } type CleanupActions = Map; @@ -250,6 +260,9 @@ pub struct Activation<'activation> { } struct EventBuffer { + pub source_actor_id: ActorId, + pub desc: Option, + pub trace_collector: Option, pub account: Arc, queues: HashMap, PendingEventQueue)>, for_myself: PendingEventQueue, @@ -264,6 +277,7 @@ pub struct Account { id: u64, debt: Arc, notify: Notify, + trace_collector: Option, } /// A `LoanedItem` is a `T` with an associated `cost` recorded @@ -285,7 +299,7 @@ pub struct LoanedItem { enum SystemMessage { Release, ReleaseField(FieldId), - Turn(LoanedItem), + Turn(Option, LoanedItem), Crash(Error), } @@ -302,6 +316,7 @@ pub struct Mailbox { /// to the actor's private state. pub struct Actor { rx: UnboundedReceiver, + trace_collector: Option, ac_ref: ActorRef, } @@ -349,6 +364,9 @@ pub struct RunningActor { root: FacetId, } +/// The type of process-unique task IDs. +pub type TaskId = u64; + /// Handle to a shared, mutable field (i.e. a *dataflow variable*) within a [`RunningActor`]. /// /// Use [`Activation::field`] to create fields, and use [`Activation::get`], @@ -389,7 +407,7 @@ pub struct Facet { pub parent_facet_id: Option, outbound_handles: Set, stop_actions: Vec, - linked_tasks: Map, + linked_tasks: Map, inert_check_preventers: Arc, } @@ -495,6 +513,8 @@ static NEXT_FIELD_ID: AtomicU64 = AtomicU64::new(6); static NEXT_BLOCK_ID: AtomicU64 = AtomicU64::new(7); +static NEXT_ACTIVATION_ID: AtomicU64 = AtomicU64::new(9); + preserves_schema::support::lazy_static! { #[doc(hidden)] pub static ref SYNDICATE_CREDIT: i64 = { @@ -506,10 +526,10 @@ preserves_schema::support::lazy_static! { }; #[doc(hidden)] - pub static ref ACCOUNTS: RwLock)>> = Default::default(); + pub static ref ACCOUNTS: RwLock)>> = Default::default(); #[doc(hidden)] - pub static ref ACTORS: RwLock> = Default::default(); + pub static ref ACTORS: RwLock> = Default::default(); } impl TryFrom<&AnyValue> for Synced { @@ -560,12 +580,13 @@ impl FacetRef { /// Bills any activity to `account`. pub fn activate( &self, - account: Arc, + account: &Arc, + cause: Option, f: F, ) -> bool where F: FnOnce(&mut Activation) -> ActorResult, { - self.activate_exit(account, |t| f(t).into()) + self.activate_exit(account, cause, |t| f(t).into()) } /// Executes `f` in a new "[turn][Activation]" for `actor`. If `f` returns @@ -578,7 +599,8 @@ impl FacetRef { /// Bills any activity to `account`. pub fn activate_exit( &self, - account: Arc, + account: &Arc, + cause: Option, f: F, ) -> bool where F: FnOnce(&mut Activation) -> RunDisposition, @@ -588,8 +610,13 @@ impl FacetRef { ActorState::Terminated { .. } => false, ActorState::Running(state) => { - tracing::trace!(actor_id=?self.actor.actor_id, "activate"); - let mut activation = Activation::make(self, account, state); + // let _entry = tracing::info_span!(parent: None, "actor", actor_id = ?self.actor.actor_id).entered(); + let mut activation = + Activation::make(self, + Arc::clone(account), + cause.clone(), + account.trace_collector.clone(), + state); let f_result = f(&mut activation); let is_alive = match activation.restore_invariants(f_result) { RunDisposition::Continue => true, @@ -599,12 +626,13 @@ impl FacetRef { } drop(activation); let exit_status = Arc::new(exit_status); - state.cleanup(&self.actor, &exit_status); + state.cleanup(&self.actor, + &exit_status, + account.trace_collector.clone()); *g = ActorState::Terminated { exit_status }; false } }; - tracing::trace!(actor_id=?self.actor.actor_id, "deactivate"); is_alive } } @@ -615,16 +643,34 @@ impl<'activation> Activation<'activation> { fn make( facet: &FacetRef, account: Arc, + cause: Option, + trace_collector: Option, state: &'activation mut RunningActor, ) -> Self { Activation { facet: facet.clone(), state, active_block: None, - pending: EventBuffer::new(account), + pending: EventBuffer::new( + facet.actor.actor_id, + account, + cause.map(|c| trace::TurnDescription::new( + NEXT_ACTIVATION_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed), + c)), + trace_collector), } } + pub fn trace_collector(&self) -> Option { + self.pending.trace_collector.clone() + } + + /// Constructs a new [`Account`] with the given `name`, inheriting + /// its `trace_collector` from the current [`Activation`]'s cause. + pub fn create_account(&self, name: Name) -> Arc { + Account::new(name, self.trace_collector()) + } + fn immediate_oid(&self, r: &Arc>) { if r.mailbox.actor_id != self.facet.actor.actor_id { panic!("Cannot use for_myself to send to remote peers"); @@ -639,6 +685,7 @@ impl<'activation> Activation<'activation> { tracing::trace!(check_existence, facet_id, "is_alive"); let old_facet_id = self.facet.facet_id; self.facet.facet_id = facet_id; + // let _entry = tracing::info_span!("facet", ?facet_id).entered(); let result = f(self); self.facet.facet_id = old_facet_id; result @@ -662,8 +709,11 @@ impl<'activation> Activation<'activation> { /// Retrieves the chain of facet IDs, in order, from the currently-active [`Facet`] up to /// and including the root facet of the active actor. Useful for debugging. pub fn facet_ids(&mut self) -> Vec { + self.facet_ids_for(self.facet.facet_id) + } + + fn facet_ids_for(&mut self, mut id: FacetId) -> Vec { let mut ids = Vec::new(); - let mut id = self.facet.facet_id; loop { ids.push(id); match self.state.get_facet(id) { @@ -677,6 +727,14 @@ impl<'activation> Activation<'activation> { ids } + #[inline(always)] + fn trace trace::ActionDescription>(&mut self, f: F) { + if self.pending.desc.is_some() { + let a = f(self); + self.pending.desc.as_mut().unwrap().record(a); + } + } + /// Core API: assert `a` at recipient `r`. /// /// Returns the [`Handle`] for the new assertion. @@ -686,14 +744,22 @@ impl<'activation> Activation<'activation> { tracing::trace!(?r, ?handle, ?a, "assert"); f.outbound_handles.insert(handle); drop(f); - self.state.insert_retract_cleanup_action(&r, handle); + self.state.insert_retract_cleanup_action(&r, handle, self.pending.desc.as_ref().map( + enclose!((r) |_| Box::new(move || trace::ActionDescription::Retract { + target: Box::new(r.as_ref().into()), + handle: Box::new(protocol::Handle(handle.into())), + }) as ActionDescriber))); { let r = Arc::clone(r); - self.pending.queue_for(&r).push(Box::new( - move |t| t.with_entity(&r, |t, e| { - tracing::trace!(?handle, ?a, "asserted"); - e.assert(t, a, handle) - }))); + self.pending.trace(|| trace::ActionDescription::Assert { + target: Box::new(r.as_ref().into()), + assertion: Box::new((&a).into()), + handle: Box::new(protocol::Handle(handle.into())), + }); + self.pending.queue_for(&r).push(Box::new(move |t| t.with_entity(&r, |t, e| { + tracing::trace!(?handle, ?a, "asserted"); + e.assert(t, a, handle) + }))); } } handle @@ -724,34 +790,49 @@ impl<'activation> Activation<'activation> { let r = Arc::clone(r); self.state.cleanup_actions.insert( handle, - CleanupAction::ForMyself(Box::new( - move |t| t.with_entity(&r, |t, e| { + CleanupAction::ForMyself(( + self.pending.desc.as_ref().map(enclose!((r, handle) move |_| Box::new( + move || trace::ActionDescription::Retract { + target: Box::new(r.as_ref().into()), + handle: Box::new(protocol::Handle(handle.into())), + }) as ActionDescriber)), + Box::new(move |t| t.with_entity(&r, |t, e| { tracing::trace!(?handle, "retracted"); if let Some(f) = t.active_facet() { f.outbound_handles.remove(&handle); } e.retract(t, handle) - })))); + }))))); } { let r = Arc::clone(r); - self.pending.for_myself.push(Box::new( - move |t| t.with_entity(&r, |t, e| { - tracing::trace!(?handle, ?a, "asserted"); - e.assert(t, a, handle) - }))); + self.pending.trace(|| trace::ActionDescription::Assert { + target: Box::new(r.as_ref().into()), + assertion: Box::new((&a).into()), + handle: Box::new(protocol::Handle(handle.into())), + }); + self.pending.for_myself.push(Box::new(move |t| t.with_entity(&r, |t, e| { + tracing::trace!(?handle, ?a, "asserted"); + e.assert(t, a, handle) + }))); } } handle } - fn half_link(&mut self, t_other: &mut Activation) { + fn half_link(&mut self, t_other: &mut Activation) -> Handle { + let other_actor_id = t_other.state.actor_id; let entity_ref = t_other.create::(StopOnRetract); let handle = next_handle(); tracing::trace!(?handle, ?entity_ref, "half_link"); - self.state.insert_retract_cleanup_action(&entity_ref, handle); + self.state.insert_retract_cleanup_action(&entity_ref, handle, self.pending.desc.as_ref().map( + move |_| Box::new(move || trace::ActionDescription::BreakLink { + peer: Box::new(trace::ActorId(AnyValue::new(other_actor_id))), + handle: Box::new(protocol::Handle(handle.into())), + }) as ActionDescriber)); self.active_facet().unwrap().outbound_handles.insert(handle); t_other.with_entity(&entity_ref, |t, e| e.assert(t, AnyValue::new(true), handle)).unwrap(); + handle } /// Core API: retract a previously-established assertion. @@ -782,11 +863,14 @@ impl<'activation> Activation<'activation> { pub fn message(&mut self, r: &Arc>, m: M) { tracing::trace!(?r, ?m, "message"); let r = Arc::clone(r); - self.pending.queue_for(&r).push(Box::new( - move |t| t.with_entity(&r, |t, e| { - tracing::trace!(?m, "delivered"); - e.message(t, m) - }))) + self.pending.trace(|| trace::ActionDescription::Message { + target: Box::new(r.as_ref().into()), + body: Box::new((&m).into()), + }); + self.pending.queue_for(&r).push(Box::new(move |t| t.with_entity(&r, |t, e| { + tracing::trace!(?m, "delivered"); + e.message(t, m) + }))) } /// Core API: send message `m` to recipient `r`, which must be a @@ -799,9 +883,13 @@ impl<'activation> Activation<'activation> { /// # Panics /// /// Panics if `r` is not part of the active actor. - pub fn message_for_myself(&mut self, r: &Arc>, m: M) { + pub fn message_for_myself(&mut self, r: &Arc>, m: M) { self.immediate_oid(r); let r = Arc::clone(r); + self.pending.trace(|| trace::ActionDescription::Message { + target: Box::new(r.as_ref().into()), + body: Box::new((&m).into()), + }); self.pending.for_myself.push(Box::new( move |t| t.with_entity(&r, |t, e| e.message(t, m)))) } @@ -813,6 +901,9 @@ impl<'activation> Activation<'activation> { /// the synchronisation request. pub fn sync(&mut self, r: &Arc>, peer: Arc>) { let r = Arc::clone(r); + self.pending.trace(|| trace::ActionDescription::Sync { + target: Box::new(r.as_ref().into()), + }); self.pending.queue_for(&r).push(Box::new( move |t| t.with_entity(&r, |t, e| e.sync(t, peer)))) } @@ -903,28 +994,34 @@ impl<'activation> Activation<'activation> { /// facet when the linked task completes. Uses `name` for log messages emitted by the task. pub fn linked_task>>( &mut self, - name: tracing::Span, + name: Name, boot: F, ) { let mailbox = self.state.mailbox(); let facet = self.facet.clone(); - if let Some(f) = self.active_facet() { - let token = CancellationToken::new(); + let trace_collector = self.trace_collector(); + if self.active_facet().is_some() { let task_id = NEXT_TASK_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed); - name.record("task_id", &task_id); + self.pending.trace(|| trace::ActionDescription::LinkedTaskStart { + task_name: Box::new(name.into()), + id: Box::new(trace::TaskId(AnyValue::new(task_id))), + }); + + let f = self.active_facet().unwrap(); + let token = CancellationToken::new(); { let token = token.clone(); tokio::spawn(async move { tracing::trace!(task_id, "linked task start"); - let result = select! { + let (result, reason) = select! { _ = token.cancelled() => { tracing::trace!(task_id, "linked task cancelled"); - LinkedTaskTermination::Normal + (LinkedTaskTermination::Normal, trace::LinkedTaskReleaseReason::Cancelled) } result = boot => match result { Ok(t) => { tracing::trace!(task_id, "linked task normal stop"); - t + (t, trace::LinkedTaskReleaseReason::Normal) } Err(e) => { tracing::error!(task_id, "linked task error: {}", e); @@ -933,18 +1030,28 @@ impl<'activation> Activation<'activation> { } } }; - facet.activate(Account::new(crate::name!("release_linked_task")), |t| { - if let Some(f) = t.active_facet() { - tracing::trace!(task_id, "cancellation token removed"); - f.linked_tasks.remove(&task_id); - } - if let LinkedTaskTermination::Normal = result { - t.stop(); - } - Ok(()) - }); + let release_account = + Account::new(Some(AnyValue::symbol("linked-task-release")), trace_collector); + facet.activate( + &release_account, + release_account.trace_collector.as_ref().map( + |_| trace::TurnCause::LinkedTaskRelease { + id: Box::new(trace::TaskId(AnyValue::new(task_id))), + reason: Box::new(reason), + }), + |t| { + if let Some(f) = t.active_facet() { + tracing::trace!(task_id, "cancellation token removed"); + f.linked_tasks.remove(&task_id); + } + if let LinkedTaskTermination::Normal = result { + t.stop(); + } + Ok(()) + }); Ok::<(), Error>(()) - }.instrument(name)); + }); + // }.instrument(tracing::info_span!("task", ?task_id).or_current())); } f.linked_tasks.insert(task_id, token); } @@ -957,7 +1064,18 @@ impl<'activation> Activation<'activation> { duration: time::Duration, a: F, ) { - self.at(time::Instant::now() + duration, a) + let account = Arc::clone(self.account()); + let desc = self.pending.desc.as_ref().map(|d| trace::TurnCause::Delay { + causing_turn: Box::new(d.id.clone()), + amount: duration.as_secs_f64().into(), + }); + let instant = time::Instant::now() + duration; + let facet = self.facet.clone(); + self.linked_task(Some(AnyValue::symbol("delay")), async move { + tokio::time::sleep_until(instant.into()).await; + facet.activate(&account, desc, a); + Ok(LinkedTaskTermination::KeepFacet) + }); } /// Executes the given action immediately, and then every time another multiple of the @@ -969,13 +1087,14 @@ impl<'activation> Activation<'activation> { ) -> ActorResult { let account = Arc::clone(self.account()); let facet = self.facet.clone(); - let span = tracing::Span::current().clone(); - self.linked_task(crate::name!(parent: None, "Activation::every"), async move { + let desc = trace::TurnCause::PeriodicActivation { period: duration.as_secs_f64().into() }; + self.linked_task(Some(AnyValue::symbol("periodic-activation")), async move { let mut timer = tokio::time::interval(duration); loop { timer.tick().await; - let _entry = span.enter(); - if !facet.activate(Arc::clone(&account), |t| a(t)) { break; } + if !facet.activate(&account, Some(desc.clone()), |t| a(t)) { + break; + } } Ok(LinkedTaskTermination::Normal) }); @@ -989,28 +1108,25 @@ impl<'activation> Activation<'activation> { instant: I, a: F, ) { - let account = Arc::clone(self.account()); - let instant = instant.into(); - let facet = self.facet.clone(); - let span = tracing::Span::current().clone(); - self.linked_task(crate::name!(parent: None, "Activation::at"), async move { - tokio::time::sleep_until(instant.into()).await; - let _entry = span.enter(); - facet.activate(account, a); - Ok(LinkedTaskTermination::KeepFacet) - }); + let delay = instant.into().checked_duration_since(tokio::time::Instant::now()) + .unwrap_or(time::Duration::ZERO); + self.after(delay, a) } /// Schedule the creation of a new actor when the Activation commits. pub fn spawn ActorResult>( &mut self, - name: tracing::Span, + name: Name, boot: F, ) -> ActorRef { - let ac = Actor::new(Some(self.state.actor_id)); + let ac = Actor::new(Some(self.state.actor_id), self.trace_collector()); let ac_ref = ac.ac_ref.clone(); - self.pending.for_myself.push(Box::new(move |_| { - ac.boot(name, boot); + self.pending.trace(|| trace::ActionDescription::Spawn { + link: false, + id: Box::new(trace::ActorId(AnyValue::new(ac_ref.actor_id))), + }); + self.pending.for_myself.push(Box::new(move |t| { + ac.boot(name, Arc::clone(t.account()), Some(trace::TurnCause::ActorBoot), boot); Ok(()) })); ac_ref @@ -1023,15 +1139,22 @@ impl<'activation> Activation<'activation> { /// stopped. pub fn spawn_link ActorResult>( &mut self, - name: tracing::Span, + name: Name, boot: F, ) -> ActorRef { - let ac = Actor::new(Some(self.state.actor_id)); + let ac = Actor::new(Some(self.state.actor_id), self.trace_collector()); let ac_ref = ac.ac_ref.clone(); let facet_id = self.facet.facet_id; + self.pending.trace(|| trace::ActionDescription::Spawn { + link: true, + id: Box::new(trace::ActorId(AnyValue::new(ac_ref.actor_id))), + }); self.pending.for_myself.push(Box::new(move |t| { t.with_facet(true, facet_id, move |t| { - ac.link(t)?.boot(name, boot); + ac.link(t)?.boot(name, + Arc::clone(t.account()), + Some(trace::TurnCause::ActorBoot), + boot); Ok(()) }) })); @@ -1048,6 +1171,9 @@ impl<'activation> Activation<'activation> { let f = Facet::new(Some(self.facet.facet_id)); let facet_id = f.facet_id; self.state.facet_nodes.insert(facet_id, f); + self.trace(|t| trace::ActionDescription::FacetStart { + path: t.facet_ids_for(facet_id).iter().map(|i| trace::FacetId(AnyValue::new(u64::from(*i)))).collect(), + }); tracing::trace!(parent_id = ?self.facet.facet_id, ?facet_id, actor_facet_count = ?self.state.facet_nodes.len()); @@ -1093,7 +1219,7 @@ impl<'activation> Activation<'activation> { if let Some(k) = continuation { self.on_facet_stop(facet_id, k); } - self._terminate_facet(facet_id, true) + self._terminate_facet(facet_id, true, trace::FacetStopReason::ExplicitAction) } /// Arranges for the [`Facet`] named by `facet_id` to be stopped cleanly when `self` @@ -1127,7 +1253,8 @@ impl<'activation> Activation<'activation> { tracing::trace!("Checking inertness of facet {} from facet {}", facet_id, t.facet.facet_id); if t.state.facet_exists_and_is_inert(facet_id) { tracing::trace!(" - facet {} is inert, stopping it", facet_id); - t.stop_facet(facet_id); + t._terminate_facet(facet_id, true, trace::FacetStopReason::Inert) + .expect("Non-failing _terminate_facet in stop_if_inert"); } else { tracing::trace!(" - facet {} is not inert", facet_id); } @@ -1135,8 +1262,17 @@ impl<'activation> Activation<'activation> { })) } - fn _terminate_facet(&mut self, facet_id: FacetId, alive: bool) -> ActorResult { + fn _terminate_facet( + &mut self, + facet_id: FacetId, + alive: bool, + reason: trace::FacetStopReason, + ) -> ActorResult { if let Some(mut f) = self.state.facet_nodes.remove(&facet_id) { + self.trace(|t| trace::ActionDescription::FacetStop { + path: t.facet_ids_for(facet_id).iter().map(|i| trace::FacetId(AnyValue::new(u64::from(*i)))).collect(), + reason: Box::new(reason), + }); tracing::trace!(actor_facet_count = ?self.state.facet_nodes.len(), "{} termination of {:?}", if alive { "living" } else { "post-exit" }, @@ -1147,7 +1283,7 @@ impl<'activation> Activation<'activation> { self.with_facet(false, facet_id, |t| { if let Some(children) = t.state.facet_children.remove(&facet_id) { for child_id in children.into_iter() { - t._terminate_facet(child_id, alive)?; + t._terminate_facet(child_id, alive, trace::FacetStopReason::ParentStopping)?; } } if alive { @@ -1164,7 +1300,7 @@ impl<'activation> Activation<'activation> { if let Some(p) = parent_facet_id { if t.state.facet_exists_and_is_inert(p) { tracing::trace!("terminating parent {:?} of facet {:?}", p, facet_id); - t._terminate_facet(p, true)?; + t._terminate_facet(p, true, trace::FacetStopReason::Inert)?; } else { tracing::trace!("not terminating parent {:?} of facet {:?}", p, facet_id); } @@ -1329,8 +1465,16 @@ impl<'activation> Activation<'activation> { } impl EventBuffer { - fn new(account: Arc) -> Self { + fn new( + source_actor_id: ActorId, + account: Arc, + desc: Option, + trace_collector: Option, + ) -> Self { EventBuffer { + source_actor_id, + desc, + trace_collector, account, queues: HashMap::new(), for_myself: Vec::new(), @@ -1339,10 +1483,18 @@ impl EventBuffer { fn execute_cleanup_action(&mut self, d: CleanupAction) { match d { - CleanupAction::ForAnother(mailbox, action) => - self.queue_for_mailbox(&mailbox).push(action), - CleanupAction::ForMyself(action) => - self.for_myself.push(action), + CleanupAction::ForAnother(mailbox, (tracer, action)) => { + if let Some(f) = tracer { + self.trace(f); + } + self.queue_for_mailbox(&mailbox).push(action); + } + CleanupAction::ForMyself((tracer, action)) => { + if let Some(f) = tracer { + self.trace(f); + } + self.for_myself.push(action); + } } } @@ -1365,6 +1517,11 @@ impl EventBuffer { if !self.for_myself.is_empty() { panic!("Unprocessed for_myself events remain at deliver() time"); } + if let Some(d) = &mut self.desc { + if let Some(c) = &self.trace_collector { + c.record(self.source_actor_id, trace::ActorActivation::Turn(Box::new(d.take()))); + } + } for (_actor_id, (tx, turn)) in std::mem::take(&mut self.queues).into_iter() { // Deliberately ignore send errors here: they indicate that the recipient is no // longer alive. But we don't care about that case, since we have to be robust to @@ -1372,7 +1529,19 @@ impl EventBuffer { // problems it caused was a relay output_loop that received EPIPE causing the relay // to crash, just as it was receiving thousands of messages a second, leading to // many, many log reports of failed send_actions from the following line.) - let _ = send_actions(&tx, &self.account, turn); + let _ = send_actions(&tx, + self.desc.as_ref().map(|d| trace::TurnCause::Turn { + id: Box::new(d.id.clone()), + }), + &self.account, + turn); + } + } + + #[inline(always)] + fn trace trace::ActionDescription>(&mut self, f: F) { + if let Some(d) = &mut self.desc { + d.record(f()); } } } @@ -1386,14 +1555,15 @@ impl Drop for EventBuffer { impl Account { /// Construct a new `Account`, storing `name` within it for /// debugging use. - pub fn new(name: tracing::Span) -> Arc { - let id = NEXT_ACCOUNT_ID.fetch_add(1, Ordering::Relaxed); + pub fn new(name: Name, trace_collector: Option) -> Arc { + let id = NEXT_ACCOUNT_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed); let debt = Arc::new(AtomicI64::new(0)); ACCOUNTS.write().insert(id, (name, Arc::clone(&debt))); Arc::new(Account { id, debt, notify: Notify::new(), + trace_collector, }) } @@ -1454,11 +1624,12 @@ impl Drop for LoanedItem { #[must_use] fn send_actions( tx: &UnboundedSender, + caused_by: Option, account: &Arc, t: PendingEventQueue, ) -> ActorResult { let token_count = t.len(); - tx.send(SystemMessage::Turn(LoanedItem::new(account, token_count, t))) + tx.send(SystemMessage::Turn(caused_by, LoanedItem::new(account, token_count, t))) .map_err(|_| error("Target actor not running", AnyValue::new(false))) } @@ -1497,17 +1668,29 @@ impl Drop for Mailbox { fn drop(&mut self) { tracing::debug!("Last reference to mailbox of actor id {:?} was dropped", self.actor_id); let _ = self.tx.send(SystemMessage::Release); - () } } impl Actor { + /// Create and start a new "top-level" actor: an actor not + /// causally related to another. This is the usual way to start a + /// Syndicate program. + pub fn top ActorResult>( + trace_collector: Option, + boot: F, + ) -> ActorHandle { + let ac = Actor::new(None, trace_collector.clone()); + let cause = trace_collector.as_ref().map(|_| trace::TurnCause::ActorBoot); + let account = Account::new(None, trace_collector); + ac.boot(None, account, cause, boot) + } + /// Create a new actor. It still needs to be [`boot`ed][Self::boot]. - pub fn new(parent_actor_id: Option) -> Self { + pub fn new(_parent_actor_id: Option, trace_collector: Option) -> Self { let (tx, rx) = unbounded_channel(); let actor_id = next_actor_id(); let root = Facet::new(None); - tracing::debug!(?actor_id, ?parent_actor_id, root_facet_id = ?root.facet_id, "Actor::new"); + tracing::debug!(?actor_id, ?_parent_actor_id, root_facet_id = ?root.facet_id, "Actor::new"); let mut st = RunningActor { actor_id, tx, @@ -1523,20 +1706,32 @@ impl Actor { }; st.facet_nodes.insert(root.facet_id, root); let ac_ref = ActorRef { actor_id, state: Arc::new(Mutex::new(ActorState::Running(st))) }; - Actor { rx, ac_ref } + Actor { rx, trace_collector, ac_ref } } fn link(self, t_parent: &mut Activation) -> Result { if t_parent.active_facet().is_none() { panic!("No active facet when calling spawn_link"); } + let account = Arc::clone(t_parent.account()); + let mut h_to_child = None; + let mut h_to_parent = None; let is_alive = self.ac_ref.root_facet_ref().activate( - Account::new(crate::name!("link")), |t_child| { - t_parent.half_link(t_child); - t_child.half_link(t_parent); + &account, + None, + |t_child| { + h_to_child = Some(t_parent.half_link(t_child)); + h_to_parent = Some(t_child.half_link(t_parent)); Ok(()) }); if is_alive { + let parent_actor = t_parent.state.actor_id; + t_parent.trace(|_| trace::ActionDescription::Link { + parent_actor: Box::new(trace::ActorId(AnyValue::new(parent_actor))), + parent_to_child: Box::new(protocol::Handle(h_to_child.unwrap().into())), + child_actor: Box::new(trace::ActorId(AnyValue::new(self.ac_ref.actor_id))), + child_to_parent: Box::new(protocol::Handle(h_to_parent.unwrap().into())), + }); Ok(self) } else { Err(error("spawn_link'd actor terminated before link could happen", AnyValue::new(false))) @@ -1549,38 +1744,58 @@ impl Actor { /// and then the mainloop is entered. pub fn boot ActorResult>( mut self, - name: tracing::Span, + name: Name, + boot_account: Arc, + boot_cause: Option, boot: F, ) -> ActorHandle { - ACTORS.write().insert(self.ac_ref.actor_id, (name.clone(), self.ac_ref.clone())); - name.record("actor_id", &self.ac_ref.actor_id); + let actor_id = self.ac_ref.actor_id; + ACTORS.write().insert(actor_id, (name.clone(), self.ac_ref.clone())); + let trace_collector = boot_account.trace_collector.clone(); + if let Some(c) = &trace_collector { + c.record(actor_id, trace::ActorActivation::Start { + actor_name: Box::new(name.into()), + }); + } tokio::spawn(async move { tracing::trace!("start"); - self.run(|t| { + self.run(boot_account, boot_cause, move |t| { t.facet(boot)?; Ok(()) }).await; let result = self.ac_ref.exit_status().expect("terminated"); + if let Some(c) = trace_collector { + c.record(actor_id, trace::ActorActivation::Stop { + status: Box::new(match &result { + Ok(()) => trace::ExitStatus::Ok, + Err(e) => trace::ExitStatus::Error(Box::new(e.clone())), + }), + }); + } match &result { Ok(()) => tracing::trace!("normal stop"), Err(e) => tracing::error!("error stop: {}", e), } result - }.instrument(name)) + }) + // }.instrument(tracing::info_span!(parent: None, "actor", ?actor_id).or_current())) } async fn run ActorResult>( &mut self, + boot_account: Arc, + boot_cause: Option, boot: F, ) -> () { let root_facet_ref = self.ac_ref.root_facet_ref(); let terminate = |result: ActorResult| { - root_facet_ref.activate_exit(Account::new(crate::name!("shutdown")), + root_facet_ref.activate_exit(&Account::new(None, None), + None, |_| RunDisposition::Terminate(result)); }; - if !root_facet_ref.activate(Account::new(crate::name!("boot")), boot) { + if !root_facet_ref.activate(&boot_account, boot_cause, boot) { return; } @@ -1602,15 +1817,17 @@ impl Actor { ra.fields.remove(&field_id); }) } - SystemMessage::Turn(mut loaned_item) => { + SystemMessage::Turn(cause, mut loaned_item) => { tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::Turn"); let actions = std::mem::take(&mut loaned_item.item); - let is_alive = root_facet_ref.activate( - Arc::clone(&loaned_item.account), |t| { + if !root_facet_ref.activate( + &loaned_item.account, cause, |t| { for action in actions.into_iter() { action(t)? } Ok(()) - }); - if !is_alive { return; } + }) + { + return; + } } SystemMessage::Crash(e) => { tracing::trace!(actor_id = ?self.ac_ref.actor_id, @@ -1760,25 +1977,32 @@ impl RunningActor { &mut self, r: &Arc>, handle: Handle, + describer: Option, ) { let r = Arc::clone(r); self.cleanup_actions.insert( handle, - CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new( - move |t| t.with_entity(&r, |t, e| { + CleanupAction::ForAnother(Arc::clone(&r.mailbox), ( + describer, + Box::new(move |t| t.with_entity(&r, |t, e| { tracing::trace!(?handle, "retracted"); if let Some(f) = t.active_facet() { f.outbound_handles.remove(&handle); } e.retract(t, handle) - })))); + }))))); } - fn cleanup(&mut self, ac_ref: &ActorRef, exit_status: &Arc) { - let mut t = Activation::make(&ac_ref.facet_ref(self.root), - Account::new(crate::name!("cleanup")), - self); - if let Err(err) = t._terminate_facet(t.state.root, exit_status.is_ok()) { + fn cleanup( + &mut self, + ac_ref: &ActorRef, + exit_status: &Arc, + trace_collector: Option, + ) { + let cause = trace_collector.as_ref().map(|_| trace::TurnCause::ActorCleanup); + let account = Account::new(Some(AnyValue::symbol("cleanup")), trace_collector.clone()); + let mut t = Activation::make(&ac_ref.facet_ref(self.root), account, cause, trace_collector, self); + if let Err(err) = t._terminate_facet(t.state.root, exit_status.is_ok(), trace::FacetStopReason::ActorStopping) { // This can only occur as the result of an internal error in this file's code. tracing::error!(?err, "unexpected error from terminate_facet"); panic!("Unexpected error result from terminate_facet"); @@ -1787,8 +2011,8 @@ impl RunningActor { // TODO: We don't want that: we want (? do we?) exit hooks to run before linked_tasks are cancelled. // TODO: Example: send an error message in an exit_hook that is processed and delivered by a linked_task. for action in std::mem::take(&mut t.state.exit_hooks) { - if let Err(err) = action(&mut t, &exit_status) { - tracing::error!(?err, "error in exit hook"); + if let Err(_err) = action(&mut t, &exit_status) { + tracing::error!(?_err, "error in exit hook"); } } } @@ -1817,19 +2041,17 @@ impl Drop for Field { impl Drop for Actor { fn drop(&mut self) { self.rx.close(); - let _name = ACTORS.write().remove(&self.ac_ref.actor_id) - .map_or_else(|| crate::name!(parent: None, "DROPPED", actor_id=?self.ac_ref.actor_id), - |(span, _ac_ref)| span); - let _scope = _name.enter(); + ACTORS.write().remove(&self.ac_ref.actor_id); + // let _scope = tracing::info_span!(parent: None, "actor", actor_id = ?self.ac_ref.actor_id).entered(); let mut g = self.ac_ref.state.lock(); if let ActorState::Running(ref mut state) = *g { - tracing::warn!(actor_id = ?self.ac_ref.actor_id, "Force-terminated by Actor::drop"); + tracing::warn!("Force-terminated by Actor::drop"); let exit_status = Arc::new(Err(error("Force-terminated by Actor::drop", AnyValue::new(false)))); - state.cleanup(&self.ac_ref, &exit_status); + state.cleanup(&self.ac_ref, &exit_status, self.trace_collector.clone()); *g = ActorState::Terminated { exit_status }; } - tracing::debug!(actor_id = ?self.ac_ref.actor_id, "Actor::drop"); + tracing::debug!("Actor::drop"); } } @@ -1851,16 +2073,26 @@ impl Drop for Facet { /// /// Primarily for use by [linked tasks][Activation::linked_task]. #[must_use] -pub fn external_event(mailbox: &Arc, account: &Arc, action: Action) -> ActorResult { - send_actions(&mailbox.tx, account, vec![action]) +pub fn external_event( + mailbox: &Arc, + cause: Option, + account: &Arc, + action: Action, +) -> ActorResult { + send_actions(&mailbox.tx, cause, account, vec![action]) } /// Directly injects `actions` into `mailbox`, billing subsequent activity against `account`. /// /// Primarily for use by [linked tasks][Activation::linked_task]. #[must_use] -pub fn external_events(mailbox: &Arc, account: &Arc, actions: PendingEventQueue) -> ActorResult { - send_actions(&mailbox.tx, account, actions) +pub fn external_events( + mailbox: &Arc, + cause: Option, + account: &Arc, + actions: PendingEventQueue, +) -> ActorResult { + send_actions(&mailbox.tx, cause, account, actions) } impl Ref { @@ -1880,6 +2112,7 @@ impl Ref { fn internal_with_entity) -> R>(&self, f: F) -> R { let mut g = self.target.lock(); + // let _entry = tracing::info_span!("entity", r = ?self).entered(); f(g.as_mut().expect("initialized").as_mut()) } } @@ -1890,6 +2123,10 @@ impl Ref { pub fn oid(&self) -> usize { std::ptr::addr_of!(*self) as usize } + + pub fn debug_str(&self) -> String { + format!("{}/{}:{:016x}", self.mailbox.actor_id, self.facet_id, self.oid()) + } } impl PartialEq for Ref { @@ -1920,7 +2157,7 @@ impl Ord for Ref { impl std::fmt::Debug for Ref { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { - write!(f, "⌜{}/{}:{:016x}⌝", self.mailbox.actor_id, self.facet_id, self.oid()) + write!(f, "⌜{}⌝", self.debug_str()) } } @@ -2007,19 +2244,23 @@ impl Cap { pub fn sync(&self, t: &mut Activation, peer: Arc>) { t.sync(&self.underlying, peer) } + + pub fn debug_str(&self) -> String { + if self.attenuation.is_empty() { + self.underlying.debug_str() + } else { + format!("{}/{}:{:016x}\\{:?}", + self.underlying.mailbox.actor_id, + self.underlying.facet_id, + self.underlying.oid(), + self.attenuation) + } + } } impl std::fmt::Debug for Cap { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { - if self.attenuation.is_empty() { - self.underlying.fmt(f) - } else { - write!(f, "⌜{}/{}:{:016x}\\{:?}⌝", - self.underlying.mailbox.actor_id, - self.underlying.facet_id, - self.underlying.oid(), - self.attenuation) - } + write!(f, "⌜{}⌝", self.debug_str()) } } @@ -2099,8 +2340,7 @@ pub async fn wait_for_all_actors_to_stop(wait_time: time::Duration) { if remaining.len() > 0 { tracing::warn!("Some actors remain after {:?}:", wait_time); for (name, actor) in remaining.into_values() { - let _entry = name.enter(); - tracing::warn!(?actor, "still running, requesting shutdown"); + tracing::warn!(?name, ?actor.actor_id, "actor still running, requesting shutdown"); let g = actor.state.lock(); if let ActorState::Running(state) = &*g { state.shutdown(); @@ -2111,8 +2351,7 @@ pub async fn wait_for_all_actors_to_stop(wait_time: time::Duration) { if remaining.len() > 0 { tracing::error!("Some actors failed to stop after being explicitly shut down:"); for (name, actor) in remaining.into_values() { - let _entry = name.enter(); - tracing::error!(?actor, "failed to stop"); + tracing::error!(?name, ?actor.actor_id, "actor failed to stop"); } } else { tracing::debug!("All remaining actors have stopped."); @@ -2122,23 +2361,6 @@ pub async fn wait_for_all_actors_to_stop(wait_time: time::Duration) { } } -/// A convenient Syndicate-enhanced variation on -/// [`tracing::info_span`]. -/// -/// Includes fields `actor_id`, `task_id` and `oid`, so that they show -/// up in those circumstances where they happen to be defined as part -/// of the operation of the [`crate::actor`] module. -#[macro_export] -macro_rules! name { - () => {tracing::info_span!(actor_id = tracing::field::Empty, - task_id = tracing::field::Empty, - oid = tracing::field::Empty)}; - ($($item:tt)*) => {tracing::info_span!($($item)*, - actor_id = tracing::field::Empty, - task_id = tracing::field::Empty, - oid = tracing::field::Empty)} -} - /// A convenient way of cloning a bunch of state shared among [entities][Entity], actions, /// linked tasks, etc. /// diff --git a/syndicate/src/dataspace.rs b/syndicate/src/dataspace.rs index ae65ef4..46dddec 100644 --- a/syndicate/src/dataspace.rs +++ b/syndicate/src/dataspace.rs @@ -20,7 +20,7 @@ use preserves_schema::Codec; /// A Dataspace object (entity). #[derive(Debug)] pub struct Dataspace { - pub name: tracing::Span, + pub name: Name, /// Index over assertions placed in the dataspace; used to /// efficiently route assertion changes and messages to observers. pub index: skeleton::Index, @@ -31,10 +31,9 @@ pub struct Dataspace { impl Dataspace { /// Construct a new, empty dataspace. - pub fn new(name: Option) -> Self { + pub fn new(name: Name) -> Self { Self { - name: name.map_or_else(|| crate::name!("anonymous_dataspace"), - |n| crate::name!(parent: &n, "dataspace")), + name: name, index: skeleton::Index::new(), handle_map: Map::new(), } @@ -62,10 +61,8 @@ impl Dataspace { impl Entity<_Any> for Dataspace { fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult { - let _guard = self.name.enter(); - let is_new = self.index.insert(t, &a); - tracing::trace!(assertion = ?a, handle = ?h, ?is_new, "assert"); + tracing::trace!(dataspace = ?self.name, assertion = ?a, handle = ?h, ?is_new, "assert"); if is_new { if let Ok(o) = language().parse::(&a) { @@ -78,13 +75,11 @@ impl Entity<_Any> for Dataspace { } fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult { - let _guard = self.name.enter(); - match self.handle_map.remove(&h) { - None => tracing::warn!(handle = ?h, "retract of unknown handle"), + None => tracing::warn!(dataspace = ?self.name, handle = ?h, "retract of unknown handle"), Some(a) => { let is_last = self.index.remove(t, &a); - tracing::trace!(assertion = ?a, handle = ?h, ?is_last, "retract"); + tracing::trace!(dataspace = ?self.name, assertion = ?a, handle = ?h, ?is_last, "retract"); if is_last { if let Ok(o) = language().parse::(&a) { @@ -97,9 +92,7 @@ impl Entity<_Any> for Dataspace { } fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult { - let _guard = self.name.enter(); - tracing::trace!(body = ?m, "message"); - + tracing::trace!(dataspace = ?self.name, body = ?m, "message"); self.index.send(t, &m); Ok(()) } diff --git a/syndicate/src/lib.rs b/syndicate/src/lib.rs index 6dc83df..4ee4565 100644 --- a/syndicate/src/lib.rs +++ b/syndicate/src/lib.rs @@ -1,4 +1,5 @@ #![doc = include_str!("../README.md")] +#![feature(min_specialization)] #[doc(inline)] pub use preserves::value; @@ -29,13 +30,33 @@ pub mod schemas { pub mod skeleton; pub mod sturdy; -pub mod tracer; +pub mod trace; #[doc(inline)] pub use during::entity; -#[doc(inline)] -pub use tracer::convenient_logging; +/// Sets up [`tracing`] logging in a reasonable way. +/// +/// Useful at the top of `main` functions. +pub fn convenient_logging() -> Result<(), Box> { + let filter = match std::env::var(tracing_subscriber::filter::EnvFilter::DEFAULT_ENV) { + Err(std::env::VarError::NotPresent) => + tracing_subscriber::filter::EnvFilter::default() + .add_directive(tracing_subscriber::filter::LevelFilter::INFO.into()), + _ => + tracing_subscriber::filter::EnvFilter::try_from_default_env()?, + }; + let subscriber = tracing_subscriber::fmt() + .with_ansi(true) + .with_thread_ids(true) + .with_max_level(tracing::Level::TRACE) + .with_env_filter(filter) + .with_writer(std::io::stderr) + .finish(); + tracing::subscriber::set_global_default(subscriber) + .expect("Could not set tracing global subscriber"); + Ok(()) +} preserves_schema::define_language!(language(): Language { syndicate: schemas::Language, diff --git a/syndicate/src/relay.rs b/syndicate/src/relay.rs index 0354394..5af9df6 100644 --- a/syndicate/src/relay.rs +++ b/syndicate/src/relay.rs @@ -9,6 +9,7 @@ use crate::error::error; use crate::schemas::gatekeeper; use crate::schemas::protocol as P; use crate::schemas::sturdy; +use crate::trace; use futures::Sink; use futures::SinkExt; @@ -248,8 +249,10 @@ impl TunnelRelay { |io| Arc::clone(&tr.membranes.import_oid(t, &tr_ref, io).inc_ref().obj)); dump_membranes!(tr.membranes); *tr_ref.lock() = Some(tr); - t.linked_task(crate::name!("writer"), output_loop(o, output_rx)); - t.linked_task(crate::name!("reader"), input_loop(t.facet.clone(), i, tr_ref)); + t.linked_task(Some(AnyValue::symbol("writer")), + output_loop(o, output_rx)); + t.linked_task(Some(AnyValue::symbol("reader")), + input_loop(t.trace_collector(), t.facet.clone(), i, tr_ref)); t.state.add_exit_hook(&self_entity); result } @@ -619,11 +622,13 @@ impl DomainEncode> for Membranes { } async fn input_loop( + trace_collector: Option, facet: FacetRef, i: Input, relay: TunnelRelayRef, ) -> Result { - let account = Account::new(crate::name!("input-loop")); + let account = Account::new(Some(AnyValue::symbol("input-loop")), trace_collector); + let cause = trace::TurnCause::external("input-loop"); match i { Input::Packets(mut src) => { loop { @@ -631,12 +636,15 @@ async fn input_loop( match src.next().await { None => break, Some(bs) => { - let is_alive = facet.activate(Arc::clone(&account), |t| { - let mut g = relay.lock(); - let tr = g.as_mut().expect("initialized"); - tr.handle_inbound_datagram(t, &bs?) - }); - if !is_alive { break; } + if !facet.activate( + &account, Some(cause.clone()), |t| { + let mut g = relay.lock(); + let tr = g.as_mut().expect("initialized"); + tr.handle_inbound_datagram(t, &bs?) + }) + { + break; + } } } } @@ -659,12 +667,15 @@ async fn input_loop( match n { 0 => break, _ => { - let is_alive = facet.activate(Arc::clone(&account), |t| { - let mut g = relay.lock(); - let tr = g.as_mut().expect("initialized"); - tr.handle_inbound_stream(t, &mut buf) - }); - if !is_alive { break; } + if !facet.activate( + &account, Some(cause.clone()), |t| { + let mut g = relay.lock(); + let tr = g.as_mut().expect("initialized"); + tr.handle_inbound_stream(t, &mut buf) + }) + { + break; + } } } } diff --git a/syndicate/src/supervise.rs b/syndicate/src/supervise.rs index afe384f..ca242f5 100644 --- a/syndicate/src/supervise.rs +++ b/syndicate/src/supervise.rs @@ -1,6 +1,8 @@ //! Extremely simple single-actor supervision. Vastly simplified compared to the available //! options in [Erlang/OTP](https://erlang.org/doc/man/supervisor.html). +use preserves::value::NestedValue; + use std::collections::VecDeque; use std::sync::Arc; use std::time::Duration; @@ -8,7 +10,6 @@ use std::time::Duration; use tokio::time::Instant; use crate::actor::*; -use crate::enclose; use crate::schemas::service::State; pub type Boot = Box ActorResult>; @@ -36,7 +37,8 @@ pub struct SupervisorConfiguration { pub struct Supervisor { self_ref: Arc>, - name: tracing::Span, + my_name: Name, + child_name: Name, config: SupervisorConfiguration, boot_fn: Option, restarts: VecDeque, @@ -86,8 +88,7 @@ impl Entity for Supervisor } fn retract(&mut self, t: &mut Activation, _h: Handle) -> ActorResult { - let _name = self.name.clone(); - let _entry = _name.enter(); + let _entry = tracing::info_span!("supervisor", name = ?self.child_name).entered(); let exit_status = self.ac_ref.take().expect("valid supervisee ActorRef") .exit_status() @@ -144,8 +145,9 @@ impl Entity for Supervisor } fn stop(&mut self, _t: &mut Activation) -> ActorResult { - let _entry = self.name.enter(); - tracing::info!(self_ref = ?self.self_ref, "Supervisor terminating"); + tracing::info!(name = ?self.my_name, + self_ref = ?self.self_ref, + "Supervisor terminating"); Ok(()) } } @@ -154,18 +156,21 @@ impl Supervisor { pub fn start ActorResult, B: 'static + Send + FnMut(&mut Activation) -> ActorResult>( t: &mut Activation, - name: tracing::Span, + name: Name, config: SupervisorConfiguration, mut state_cb: C, boot_fn: B, ) -> ActorResult { - let _entry = name.enter(); + let _entry = tracing::info_span!("supervisor", ?name).entered(); tracing::trace!(?config); let self_ref = t.create_inert(); let state_field = t.named_field("supervisee_state", State::Started); + let my_name = name.as_ref().map( + |n| preserves::rec![AnyValue::symbol("supervisor"), n.clone()]); let mut supervisor = Supervisor { self_ref: Arc::clone(&self_ref), - name: name.clone(), + my_name: my_name.clone(), + child_name: name, config, boot_fn: Some(Box::new(boot_fn)), restarts: VecDeque::new(), @@ -174,14 +179,11 @@ impl Supervisor { }; tracing::info!(self_ref = ?supervisor.self_ref, "Supervisor starting"); supervisor.ensure_started(t)?; - t.dataflow(enclose!((name) move |t| { + t.dataflow(move |t| { let state = t.get(&state_field).clone(); - { - let _entry = name.enter(); - tracing::debug!(?state); - } + tracing::debug!(name = ?my_name, ?state); state_cb(t, state) - }))?; + })?; self_ref.become_entity(supervisor); t.on_stop_notify(&self_ref); Ok(()) @@ -190,16 +192,16 @@ impl Supervisor { fn ensure_started(&mut self, t: &mut Activation) -> ActorResult { match self.boot_fn.take() { None => { - let _entry = self.name.enter(); t.set(&self.state, State::Failed); - tracing::error!("Cannot restart supervisee, because it panicked at startup") + tracing::error!(name = ?self.my_name, + "Cannot restart supervisee, because it panicked at startup") } Some(mut boot_fn) => { let self_ref = Arc::clone(&self.self_ref); t.facet(|t: &mut Activation| { t.assert(&self.self_ref, Protocol::SuperviseeStarted); self.ac_ref = Some(t.spawn_link( - crate::name!(parent: &self.name, "supervisee"), + self.child_name.clone(), move |t| { match boot_fn(t) { Ok(()) => { diff --git a/syndicate/src/trace.rs b/syndicate/src/trace.rs new file mode 100644 index 0000000..534299d --- /dev/null +++ b/syndicate/src/trace.rs @@ -0,0 +1,187 @@ +//! Records *describing* actions committed at the end of a turn and +//! events triggering the start of a turn. These are not the actions +//! or events themselves: they are reflective information on the +//! action of the system, enough to reconstruct interesting +//! projections of system activity. + +pub use super::schemas::trace::*; + +use preserves::value::NestedValue; +use preserves::value::Writer; +use preserves_schema::Codec; + +use super::actor::{self, AnyValue, Ref, Cap}; +use super::language; + +use std::sync::Arc; +use std::time::SystemTime; + +use tokio::select; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; + +#[derive(Debug, Clone)] +pub struct TraceCollector { + pub tx: UnboundedSender, +} + +impl From<&Ref> for Target { + fn from(v: &Ref) -> Target { + Target { + actor: ActorId(AnyValue::new(v.mailbox.actor_id)), + facet: FacetId(AnyValue::new(u64::from(v.facet_id))), + oid: Oid(AnyValue::new(v.oid())), + } + } +} + +impl From<&M> for AssertionDescription { + default fn from(v: &M) -> Self { + Self::Opaque { description: AnyValue::new(format!("{:?}", v)) } + } +} + +impl From<&AnyValue> for AssertionDescription { + fn from(v: &AnyValue) -> Self { + Self::Value { value: v.clone() } + } +} + +impl TraceCollector { + pub fn record(&self, id: actor::ActorId, a: ActorActivation) { + let _ = self.tx.send(TraceEntry { + timestamp: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) + .expect("Time after Unix epoch").as_secs_f64().into(), + actor: ActorId(AnyValue::new(id)), + item: a, + }); + } +} + +impl TurnDescription { + pub fn new(activation_id: u64, cause: TurnCause) -> Self { + Self { + id: TurnId(AnyValue::new(activation_id)), + cause, + actions: Vec::new(), + } + } + + pub fn record(&mut self, a: ActionDescription) { + self.actions.push(a) + } + + pub fn take(&mut self) -> Self { + Self { + id: self.id.clone(), + cause: self.cause.clone(), + actions: std::mem::take(&mut self.actions), + } + } +} + +impl TurnCause { + pub fn external(description: &str) -> Self { + Self::External { description: AnyValue::new(description) } + } +} + +struct CapEncoder; + +impl preserves::value::DomainEncode> for CapEncoder { + fn encode_embedded( + &mut self, + w: &mut W, + d: &Arc, + ) -> std::io::Result<()> { + w.write_string(&d.debug_str()) + + // use preserves::value::writer::CompoundWriter; + // use preserves::value::boundary as B; + // let mut c = w.start_record(Some(3))?; + + // let mut b = B::start(B::Item::RecordLabel); + // c.boundary(&b)?; + // c.write_symbol("cap")?; + + // b.shift(Some(B::Item::RecordField)); + // c.boundary(&b)?; + // c.write_u64(d.underlying.mailbox.actor_id)?; + + // b.shift(Some(B::Item::RecordField)); + // c.boundary(&b)?; + // c.write_u64(u64::from(d.underlying.facet_id))?; + + // b.shift(Some(B::Item::RecordField)); + // c.boundary(&b)?; + // c.write_u64(d.underlying.oid() as u64)?; + + // // TODO: write attenuation? + + // b.shift(None); + // c.boundary(&b)?; + // w.end_record(c) + } +} + +pub enum CollectorEvent { + Event(TraceEntry), + PeriodicFlush, +} + +impl TraceCollector { + pub fn new(mut f: F) -> TraceCollector { + let (tx, mut rx) = unbounded_channel::(); + tokio::spawn(async move { + let mut timer = tokio::time::interval(std::time::Duration::from_secs(1)); + loop { + select! { + maybe_entry = rx.recv() => { + match maybe_entry { + None => break, + Some(entry) => { + tracing::trace!(?entry); + f(CollectorEvent::Event(entry)); + } + } + }, + _ = timer.tick() => f(CollectorEvent::PeriodicFlush), + } + } + }); + TraceCollector { tx } + } + + pub fn ascii(w: W) -> TraceCollector { + let mut writer = preserves::value::TextWriter::new(w); + Self::new(move |event| match event { + CollectorEvent::Event(entry) => { + writer.write(&mut CapEncoder, &language().unparse(&entry)) + .expect("failed to write TraceCollector entry"); + writer.borrow_write().write_all(b"\n") + .expect("failed to write TraceCollector newline"); + }, + CollectorEvent::PeriodicFlush => + writer.flush().expect("failed to flush TraceCollector output"), + }) + } + + pub fn packed(w: W) -> TraceCollector { + let mut writer = preserves::value::PackedWriter::new(w); + Self::new(move |event| match event { + CollectorEvent::Event(entry) => + writer.write(&mut CapEncoder, &language().unparse(&entry)) + .expect("failed to write TraceCollector entry"), + CollectorEvent::PeriodicFlush => + writer.flush().expect("failed to flush TraceCollector output"), + }) + } +} + +impl From for Name { + fn from(v: actor::Name) -> Name { + match v { + None => Name::Anonymous, + Some(n) => Name::Named { name: n.clone() }, + } + } +} diff --git a/syndicate/src/tracer.rs b/syndicate/src/tracer.rs deleted file mode 100644 index 25e3a9b..0000000 --- a/syndicate/src/tracer.rs +++ /dev/null @@ -1,66 +0,0 @@ -use crate::actor::*; - -use std::fmt::Debug; -use std::io; -use std::sync::Arc; - -struct Tracer(tracing::Span); - -fn set_name_oid(t: &mut Tracer, r: &Arc>) { - t.0.record("oid", &tracing::field::display(&r.oid())); -} - -pub fn tracer(t: &mut Activation, name: tracing::Span) -> Arc> { - let mut e = Tracer(name); - let r = t.create_inert(); - set_name_oid(&mut e, &r); - r.become_entity(e); - r -} - -impl Entity for Tracer { - fn assert(&mut self, _t: &mut Activation, a: M, h: Handle) -> ActorResult { - let _guard = self.0.enter(); - tracing::trace!(?a, ?h, "assert"); - Ok(()) - } - fn retract(&mut self, _t: &mut Activation, h: Handle) -> ActorResult { - let _guard = self.0.enter(); - tracing::trace!(?h, "retract"); - Ok(()) - } - fn message(&mut self, _t: &mut Activation, m: M) -> ActorResult { - let _guard = self.0.enter(); - tracing::trace!(?m, "message"); - Ok(()) - } - fn sync(&mut self, t: &mut Activation, peer: Arc>) -> ActorResult { - let _guard = self.0.enter(); - tracing::trace!(?peer, "sync"); - t.message(&peer, Synced); - Ok(()) - } -} - -/// Sets up [`tracing`] logging in a reasonable way. -/// -/// Useful at the top of `main` functions. -pub fn convenient_logging() -> Result<(), Box> { - let filter = match std::env::var(tracing_subscriber::filter::EnvFilter::DEFAULT_ENV) { - Err(std::env::VarError::NotPresent) => - tracing_subscriber::filter::EnvFilter::default() - .add_directive(tracing_subscriber::filter::LevelFilter::INFO.into()), - _ => - tracing_subscriber::filter::EnvFilter::try_from_default_env()?, - }; - let subscriber = tracing_subscriber::fmt() - .with_ansi(true) - .with_thread_ids(true) - .with_max_level(tracing::Level::TRACE) - .with_env_filter(filter) - .with_writer(io::stderr) - .finish(); - tracing::subscriber::set_global_default(subscriber) - .expect("Could not set tracing global subscriber"); - Ok(()) -}