diff --git a/Cargo.lock b/Cargo.lock index 7cd7046..1099b45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,14 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "aho-corasick" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8716408b8bc624ed7f65d223ddb9ac2d044c0547b6fa4b0d554f3a9540496ada" +dependencies = [ + "memchr", +] + [[package]] name = "ansi_term" version = "0.11.0" @@ -89,6 +98,17 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" +[[package]] +name = "chrono" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80094f509cf8b5ae86a4966a39b3ff66cd7e2a3e594accec3743ff3fabeab5b2" +dependencies = [ + "num-integer", + "num-traits", + "time", +] + [[package]] name = "clap" version = "2.33.1" @@ -399,6 +419,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "matchers" +version = "0.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" +dependencies = [ + "regex-automata", +] + [[package]] name = "matches" version = "0.1.8" @@ -810,6 +839,34 @@ version = "0.1.56" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" +[[package]] +name = "regex" +version = "1.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6020f034922e3194c711b82a627453881bc4682166cabb07134a10c26ba7692" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", + "thread_local", +] + +[[package]] +name = "regex-automata" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" +dependencies = [ + "byteorder", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fe5bd57d1d7414c6b5ed48563a2c855d995ff777729dcd91c369ec7fea395ae" + [[package]] name = "remove_dir_all" version = "0.5.2" @@ -819,6 +876,12 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "ryu" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed3d612bc64430efeb3f7ee6ef26d590dce0c43249217bddc62112540c7941e1" + [[package]] name = "schannel" version = "0.1.19" @@ -881,6 +944,17 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_json" +version = "1.0.53" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "993948e75b189211a9b31a7528f950c6adc21f9720b6438ff80a7fa2f864cea2" +dependencies = [ + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha-1" version = "0.8.2" @@ -893,6 +967,15 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "sharded-slab" +version = "0.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06d5a3f5166fb5b42a5439f2eee8b9de149e235961e3eb21c5808fc3ea17ff3e" +dependencies = [ + "lazy_static", +] + [[package]] name = "slab" version = "0.4.2" @@ -970,6 +1053,9 @@ dependencies = [ "tokio", "tokio-tungstenite", "tokio-util", + "tracing", + "tracing-futures", + "tracing-subscriber", "tungstenite", ] @@ -996,6 +1082,25 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "thread_local" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "time" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" +dependencies = [ + "libc", + "winapi 0.3.8", +] + [[package]] name = "tokio" version = "0.2.21" @@ -1064,6 +1169,88 @@ dependencies = [ "serde", ] +[[package]] +name = "tracing" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7c6b59d116d218cb2d990eb06b77b64043e0268ef7323aae63d8b30ae462923" +dependencies = [ + "cfg-if", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99bbad0de3fd923c9c3232ead88510b783e5a4d16a6154adffa3d53308de984c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0aa83a9a47081cd522c09c81b31aec2c9273424976f922ad61c053b58350b715" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "tracing-futures" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab7bb6f14721aa00656086e9335d363c5c8747bae02ebe32ea2c7dece5689b4c" +dependencies = [ + "pin-project", + "tracing", +] + +[[package]] +name = "tracing-log" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e0f8c7178e13481ff6765bd169b33e8d554c5d2bbede5e32c356194be02b9b9" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-serde" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6ccba2f8f16e0ed268fc765d9b7ff22e965e7185d32f8f1ec8294fe17d86e79" +dependencies = [ + "serde", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d53c40489aa69c9aed21ff483f26886ca8403df33bdc2d2f87c60c1617826d2" +dependencies = [ + "ansi_term", + "chrono", + "lazy_static", + "matchers", + "regex", + "serde", + "serde_json", + "sharded-slab", + "smallvec", + "tracing-core", + "tracing-log", + "tracing-serde", +] + [[package]] name = "tungstenite" version = "0.10.1" diff --git a/Cargo.toml b/Cargo.toml index b0da14d..c29b1a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,3 +26,7 @@ structopt = "0.3.14" tungstenite = "0.10.1" tokio-tungstenite = "0.10.1" + +tracing = "0.1.14" +tracing-subscriber = "0.2.5" +tracing-futures = "0.2.4" diff --git a/src/bin/syndicate-server.rs b/src/bin/syndicate-server.rs index a568ae3..2d013b4 100644 --- a/src/bin/syndicate-server.rs +++ b/src/bin/syndicate-server.rs @@ -5,6 +5,9 @@ use preserves::value; use std::sync::{Mutex, Arc}; use futures::{SinkExt, StreamExt}; +use tracing::{Level, error, info, trace}; +use tracing_futures::Instrument; + use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio_util::codec::Framed; @@ -80,7 +83,6 @@ async fn run_connection(connid: ConnId, p.run(spaces, &config).await? }, _ => { - println!("First byte: {:?}", buf); let (o, i) = Framed::new(stream, packets::Codec::standard()).split(); let mut p = Peer::new(connid, i, o); p.run(spaces, &config).await? @@ -93,45 +95,54 @@ async fn run_connection(connid: ConnId, Ok(()) } +static NEXT_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1); + async fn run_listener(spaces: Arc>, port: u16, config: config::ServerConfigRef) -> UnitAsyncResult { let mut listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?; - println!("Listening on port {}", port); - let mut id = port as u64 + 100000000000000; loop { let (stream, addr) = listener.accept().await?; - let connid = id; + let id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let spaces = Arc::clone(&spaces); let config = Arc::clone(&config); - id += 100000; if let Some(n) = config.recv_buffer_size { stream.set_recv_buffer_size(n)?; } if let Some(n) = config.send_buffer_size { stream.set_send_buffer_size(n)?; } tokio::spawn(async move { - println!("Connection {} ({:?}) accepted from port {}", connid, addr, port); - match run_connection(connid, stream, spaces, config).await { - Ok(()) => println!("Connection {} ({:?}) terminated normally", connid, addr), - Err(e) => println!("Connection {} ({:?}) terminated: {}", connid, addr, e), + info!(addr = display(addr), "accepted"); + match run_connection(id, stream, spaces, config).await { + Ok(()) => info!("closed"), + Err(e) => info!(error = display(e), "closed"), } - }); + }.instrument(tracing::info_span!("connection", id))); } } async fn periodic_tasks(spaces: Arc>) -> UnitAsyncResult { - let interval = core::time::Duration::from_secs(5); + let interval = core::time::Duration::from_secs(10); let mut delay = tokio::time::interval(interval); loop { delay.next().await.unwrap(); { let mut spaces = spaces.lock().unwrap(); spaces.cleanup(); - println!("{}", spaces.stats_string(interval)); + spaces.dump_stats(interval); } } } #[tokio::main] async fn main() -> Result<(), Box> { + let filter = tracing_subscriber::filter::EnvFilter::from_default_env() + .add_directive(tracing_subscriber::filter::LevelFilter::INFO.into()); + let subscriber = tracing_subscriber::FmtSubscriber::builder() + .with_ansi(true) + .with_max_level(Level::TRACE) + .with_env_filter(filter) + .finish(); + tracing::subscriber::set_global_default(subscriber) + .expect("Could not set tracing global subscriber"); + let config = Arc::new(config::ServerConfig::from_args()); let spaces = Arc::new(Mutex::new(spaces::Spaces::new())); @@ -144,18 +155,21 @@ async fn main() -> Result<(), Box> { }); } + trace!("startup"); + for port in config.ports.clone() { let spaces = Arc::clone(&spaces); let config = Arc::clone(&config); daemons.push(tokio::spawn(async move { + info!(port, "listening"); match run_listener(spaces, port, config).await { Ok(()) => (), Err(e) => { - eprintln!("Error from listener for port {}: {}", port, e); + error!("{}", e); std::process::exit(2) } } - })); + }.instrument(tracing::info_span!("listener", port)))); } futures::future::join_all(daemons).await; diff --git a/src/dataspace.rs b/src/dataspace.rs index 3d29060..fb59dc1 100644 --- a/src/dataspace.rs +++ b/src/dataspace.rs @@ -133,7 +133,7 @@ impl Dataspace { { let mut outbound_turns: Map> = Map::new(); for a in actions { - // println!("Turn action: {:?}", &a); + tracing::trace!(action = debug(&a), "turn"); match a { packets::Action::Assert(ref epname, ref assertion) => { let ac = self.peers.get_mut(&id).unwrap(); diff --git a/src/peer.rs b/src/peer.rs index 463dff9..2fa78bf 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -73,7 +73,7 @@ where I: Stream + Send, let queue_depth_sample = queue_depth.load(Ordering::Relaxed); if queue_depth_sample > config.overload_threshold { let n = overloaded.unwrap_or(0); - println!("{:?} overloaded({}): {:?}", self.id, n, queue_depth_sample); + tracing::warn!(turns=n, queue_depth=queue_depth_sample, "overloaded"); if n == config.overload_turn_limit { to_send.push(err("Overloaded", value::Value::from(queue_depth_sample as u64).wrap())); @@ -87,7 +87,7 @@ where I: Stream + Send, } } else { if let Some(_) = overloaded { - println!("{:?} recovered: {:?}", self.id, queue_depth_sample); + tracing::info!(queue_depth=queue_depth_sample, "recovered"); } overloaded = None; } @@ -98,7 +98,7 @@ where I: Stream + Send, frame = self.i.next().fuse() => match frame { Some(res) => match res { Ok(p) => { - // println!("{:?}: input {:?}", self.id, &p); + tracing::trace!(packet = debug(&p), "input"); match p { packets::C2S::Turn(actions) => { match self.space.as_ref().unwrap().write().unwrap() @@ -165,10 +165,9 @@ where I: Stream + Send, } for v in to_send { if let packets::S2C::Err(ref msg, ref ctx) = v { - println!("{:?}: connection crashed: {}; context {:?}", self.id, msg, ctx); + tracing::error!(context = debug(ctx), msg = display(msg), "error"); } else { - // println!("{:?}: output {:?}", self.id, &v); - () + tracing::trace!(packet = debug(&v), "output"); } self.o.send(v).await?; } diff --git a/src/spaces.rs b/src/spaces.rs index 0a55a3b..115e291 100644 --- a/src/spaces.rs +++ b/src/spaces.rs @@ -3,6 +3,8 @@ use super::dataspace; use std::sync::Arc; +use tracing::{info, debug}; + use preserves::value::Map; pub struct Spaces { @@ -24,9 +26,8 @@ impl Spaces { } }; - println!("Dataspace {:?} {}", - name, - if is_new { "created" } else { "accessed" }); + debug!(name = debug(name), + action = display(if is_new { "created" } else { "accessed" })); space } @@ -38,23 +39,17 @@ impl Spaces { .collect(); } - pub fn stats_string(&self, delta: core::time::Duration) -> String { - let mut v = vec![]; - v.push(format!("{} dataspace(s)", self.index.len())); + pub fn dump_stats(&self, delta: core::time::Duration) { + info!("{} dataspace(s)", self.index.len()); for (dsname, dsref) in &self.index { let mut ds = dsref.write().unwrap(); - v.push(format!(" {:?}: {} connection(s) {}, {} assertion(s) {}, {} endpoint(s) {}, msgs in {}/sec, out {}/sec", - dsname, - ds.peer_count(), - format!("(+{}/-{})", ds.churn.peers_added, ds.churn.peers_removed), - ds.assertion_count(), - format!("(+{}/-{})", ds.churn.assertions_added, ds.churn.assertions_removed), - ds.endpoint_count(), - format!("(+{}/-{})", ds.churn.endpoints_added, ds.churn.endpoints_removed), - ds.churn.messages_injected as f32 / delta.as_secs() as f32, - ds.churn.messages_delivered as f32 / delta.as_secs() as f32)); + info!(name = debug(dsname), + connections = display(format!("{} (+{}/-{})", ds.peer_count(), ds.churn.peers_added, ds.churn.peers_removed)), + assertions = display(format!("{} (+{}/-{})", ds.assertion_count(), ds.churn.assertions_added, ds.churn.assertions_removed)), + endpoints = display(format!("{} (+{}/-{})", ds.endpoint_count(), ds.churn.endpoints_added, ds.churn.endpoints_removed)), + msg_in_rate = display(ds.churn.messages_injected as f32 / delta.as_secs() as f32), + msg_out_rate = display(ds.churn.messages_delivered as f32 / delta.as_secs() as f32)); ds.churn.reset(); } - v.join("\n") } }