diff --git a/Cargo.lock b/Cargo.lock index a2df21b..2822fa2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -68,6 +68,24 @@ dependencies = [ "byte-tools", ] +[[package]] +name = "bstr" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31accafdb70df7871592c058eca3985b71104e15ac32f64706022c58867da931" +dependencies = [ + "lazy_static", + "memchr", + "regex-automata", + "serde", +] + +[[package]] +name = "bumpalo" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e8c087f005730276d1096a652e92a8bacee2e2472bcc9715a74d2bec38b5820" + [[package]] name = "byte-tools" version = "0.3.1" @@ -86,6 +104,15 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" +[[package]] +name = "cast" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b9434b9a5aa1450faa3f9cb14ea0e8c53bb5d2b3c1bfd1ab4fc03e9f33fbfb0" +dependencies = [ + "rustc_version", +] + [[package]] name = "cc" version = "1.0.54" @@ -140,6 +167,111 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3a71ab494c0b5b860bdc8407ae08978052417070c2ced38573a9157ad75b8ac" +[[package]] +name = "criterion" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63f696897c88b57f4ffe3c69d8e1a0613c7d0e6c4833363c8560fbde9c47b966" +dependencies = [ + "atty", + "cast", + "clap", + "criterion-plot", + "csv", + "itertools", + "lazy_static", + "num-traits", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddeaf7989f00f2e1d871a26a110f3ed713632feac17f65f03ca938c542618b60" +dependencies = [ + "cast", + "itertools", +] + +[[package]] +name = "crossbeam-deque" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "maybe-uninit", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "lazy_static", + "maybe-uninit", + "memoffset", + "scopeguard", +] + +[[package]] +name = "crossbeam-queue" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "maybe-uninit", +] + +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +dependencies = [ + "autocfg", + "cfg-if", + "lazy_static", +] + +[[package]] +name = "csv" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00affe7f6ab566df61b4be3ce8cf16bc2576bca0963ceb0955e45d514bf9a279" +dependencies = [ + "bstr", + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" +dependencies = [ + "memchr", +] + [[package]] name = "derivative" version = "2.1.1" @@ -160,6 +292,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "either" +version = "1.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb1f6b1ce1c140482ea30ddd3335fc0024ac7ee112895426e0a629a6c20adfe3" + [[package]] name = "fake-simd" version = "0.1.2" @@ -382,12 +520,30 @@ dependencies = [ "libc", ] +[[package]] +name = "itertools" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "284f18f85651fe11e8a991b2adb42cb078325c996ed026d994719efcfca1d54b" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e" +[[package]] +name = "js-sys" +version = "0.3.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce10c23ad2ea25ceca0093bd3192229da4c5b3c0f2de499c1ecac0d98d452177" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "kernel32-sys" version = "0.2.2" @@ -434,12 +590,27 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" +[[package]] +name = "maybe-uninit" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" + [[package]] name = "memchr" version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" +[[package]] +name = "memoffset" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4fc2c02a7e374099d4ee95a193111f72d2110197fe200272371758f6c3643d8" +dependencies = [ + "autocfg", +] + [[package]] name = "mio" version = "0.6.22" @@ -626,6 +797,12 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" +[[package]] +name = "oorandom" +version = "11.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a170cebd8021a008ea92e4db85a72f80b35df514ec664b296fdcbb654eac0b2c" + [[package]] name = "opaque-debug" version = "0.2.3" @@ -719,6 +896,18 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677" +[[package]] +name = "plotters" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d1685fbe7beba33de0330629da9d955ac75bd54f33d7b79f9a895590124f6bb" +dependencies = [ + "js-sys", + "num-traits", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "ppv-lite86" version = "0.2.8" @@ -844,6 +1033,31 @@ dependencies = [ "rand_core", ] +[[package]] +name = "rayon" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62f02856753d04e03e26929f820d0a0a337ebe71f849801eea335d464b349080" +dependencies = [ + "autocfg", + "crossbeam-deque", + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e92e15d89083484e11353891f1af602cc661426deb9564c298b270c726973280" +dependencies = [ + "crossbeam-deque", + "crossbeam-queue", + "crossbeam-utils", + "lazy_static", + "num_cpus", +] + [[package]] name = "redox_syscall" version = "0.1.56" @@ -887,12 +1101,30 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "rustc_version" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" +dependencies = [ + "semver", +] + [[package]] name = "ryu" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed3d612bc64430efeb3f7ee6ef26d590dce0c43249217bddc62112540c7941e1" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.19" @@ -903,6 +1135,12 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + [[package]] name = "security-framework" version = "0.4.4" @@ -926,6 +1164,21 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" +dependencies = [ + "semver-parser", +] + +[[package]] +name = "semver-parser" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" + [[package]] name = "serde" version = "1.0.110" @@ -1056,6 +1309,7 @@ name = "syndicate-rs" version = "0.1.0" dependencies = [ "bytes", + "criterion", "futures", "openssl", "preserves", @@ -1113,6 +1367,16 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "tinytemplate" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d3dc76004a03cec1c5932bca4cdc2e39aaa798e3f82363dd94f9adf6098c12f" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tokio" version = "0.2.21" @@ -1360,12 +1624,87 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" +[[package]] +name = "walkdir" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "777182bc735b6424e1a57516d35ed72cb8019d85c8c9bf536dccb3445c1a2f7d" +dependencies = [ + "same-file", + "winapi 0.3.8", + "winapi-util", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" +[[package]] +name = "wasm-bindgen" +version = "0.2.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c2dc4aa152834bc334f506c1a06b866416a8b6697d5c9f75b9a689c8486def0" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ded84f06e0ed21499f6184df0e0cb3494727b0c5da89534e0fcc55c51d812101" +dependencies = [ + "bumpalo", + "lazy_static", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "838e423688dac18d73e31edce74ddfac468e37b1506ad163ffaf0a46f703ffe3" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3156052d8ec77142051a533cdd686cba889537b213f948cd1d20869926e68e92" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9ba19973a58daf4db6f352eda73dc0e289493cd29fb2632eb172085b6521acd" + +[[package]] +name = "web-sys" +version = "0.3.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b72fe77fd39e4bd3eaa4412fd299a0be6b3dfe9d2597e2f1c20beb968f41d17" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "winapi" version = "0.2.8" @@ -1394,6 +1733,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi 0.3.8", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 315b345..1fdbfc9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,3 +40,10 @@ tracing-futures = "0.2.4" # Only used for vendored-openssl, which in turn is being used for cross-builds openssl = { version = "0.10", optional = true } + +[dev-dependencies] +criterion = "0.3" + +[[bench]] +name = "bench_dataspace" +harness = false diff --git a/benches/bench_dataspace.rs b/benches/bench_dataspace.rs new file mode 100644 index 0000000..bfd4e3c --- /dev/null +++ b/benches/bench_dataspace.rs @@ -0,0 +1,184 @@ +use criterion::{criterion_group, criterion_main, Criterion}; +use futures::Sink; +use std::mem::drop; +use std::pin::Pin; +use std::sync::{Arc, Mutex, atomic::{AtomicU64, Ordering}}; +use std::task::{Context, Poll}; +use std::thread; +use std::time::Instant; +use structopt::StructOpt; +use syndicate::peer::Peer; +use syndicate::{config, spaces, packets, value::{Value, IOValue}}; +use tokio::runtime::Runtime; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; +use tracing::Level; + +struct SinkTx { + tx: Option>, +} + +impl SinkTx { + fn new(tx: UnboundedSender) -> Self { + SinkTx { tx: Some(tx) } + } +} + +impl Sink for SinkTx { + type Error = packets::Error; + + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn start_send(self: Pin<&mut Self>, v: T) -> Result<(), packets::Error> { + self.tx.as_ref().unwrap().send(v).map_err(|e| packets::Error::Message(e.to_string())) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + (&mut self).tx = None; + Poll::Ready(Ok(())) + } +} + +#[inline] +fn says(who: IOValue, what: IOValue) -> IOValue { + let mut r = Value::simple_record("Says", 2); + r.fields_vec_mut().push(who); + r.fields_vec_mut().push(what); + r.finish().wrap() +} + +pub fn bench_pub(c: &mut Criterion) { + let filter = tracing_subscriber::filter::EnvFilter::from_default_env() + .add_directive(tracing_subscriber::filter::LevelFilter::INFO.into()); + let subscriber = tracing_subscriber::FmtSubscriber::builder() + .with_ansi(true) + .with_max_level(Level::TRACE) + .with_env_filter(filter) + .finish(); + tracing::subscriber::set_global_default(subscriber) + .expect("Could not set tracing global subscriber"); + + c.bench_function("publication alone", |b| { + b.iter_custom(|iters| { + let no_args: Vec = vec![]; + let config = Arc::new(config::ServerConfig::from_iter(no_args.iter())); + let spaces = Arc::new(Mutex::new(spaces::Spaces::new())); + + let (c2s_tx, c2s_rx) = unbounded_channel(); + let (s2c_tx, _s2c_rx) = unbounded_channel(); + let runtime_handle = thread::spawn(move || { + let mut rt = Runtime::new().unwrap(); + rt.block_on(async { + Peer::new(0, c2s_rx, SinkTx::new(s2c_tx)).run(spaces, &config).await.unwrap(); + }) + }); + + c2s_tx.send(Ok(packets::C2S::Connect(Value::from("bench_pub").wrap()))).unwrap(); + + let turn = packets::C2S::Turn(vec![ + packets::Action::Message(says(Value::from("bench_pub").wrap(), + Value::ByteString(vec![]).wrap()))]); + + let start = Instant::now(); + for _ in 0..iters { + c2s_tx.send(Ok(turn.clone())).unwrap(); + } + drop(c2s_tx); + runtime_handle.join().unwrap(); + start.elapsed() + }) + }); + + c.bench_function("publication and subscription", |b| { + b.iter_custom(|iters| { + let no_args: Vec = vec![]; + let config = Arc::new(config::ServerConfig::from_iter(no_args.iter())); + let spaces = Arc::new(Mutex::new(spaces::Spaces::new())); + + let turn_count = Arc::new(AtomicU64::new(0)); + + let (c2s_tx, c2s_rx) = unbounded_channel(); + let c2s_tx = Arc::new(c2s_tx); + + { + let c2s_tx = c2s_tx.clone(); + + c2s_tx.send(Ok(packets::C2S::Connect(Value::from("bench_pub").wrap()))).unwrap(); + + let discard: IOValue = Value::simple_record0("discard").wrap(); + let capture: IOValue = Value::simple_record1("capture", discard).wrap(); + c2s_tx.send(Ok(packets::C2S::Turn(vec![ + packets::Action::Assert(Value::from(0).wrap(), + Value::simple_record1( + "observe", + says(Value::from("bench_pub").wrap(), + capture)).wrap())]))).unwrap(); + + // tracing::info!("Sending {} messages", iters); + let turn = packets::C2S::Turn(vec![ + packets::Action::Message(says(Value::from("bench_pub").wrap(), + Value::ByteString(vec![]).wrap()))]); + for _ in 0..iters { + c2s_tx.send(Ok(turn.clone())).unwrap(); + } + + c2s_tx.send(Ok(packets::C2S::Turn(vec![ + packets::Action::Clear(Value::from(0).wrap())]))).unwrap(); + } + + let start = Instant::now(); + let runtime_handle = { + let turn_count = turn_count.clone(); + let mut c2s_tx = Some(c2s_tx.clone()); + thread::spawn(move || { + let mut rt = Runtime::new().unwrap(); + rt.block_on(async move { + let (s2c_tx, mut s2c_rx) = unbounded_channel(); + let consumer_handle = tokio::spawn(async move { + while let Some(p) = s2c_rx.recv().await { + // tracing::info!("Consumer got {:?}", &p); + match p { + packets::S2C::Ping() => (), + packets::S2C::Turn(actions) => { + for a in actions { + match a { + packets::Event::Msg(_, _) => { + turn_count.fetch_add(1, Ordering::Relaxed); + }, + packets::Event::End(_) => { + c2s_tx.take(); + } + _ => panic!("Unexpected action: {:?}", a), + } + } + }, + _ => panic!("Unexpected packet: {:?}", p), + } + } + // tracing::info!("Consumer terminating"); + }); + Peer::new(0, c2s_rx, SinkTx::new(s2c_tx)).run(spaces, &config).await.unwrap(); + consumer_handle.await.unwrap(); + }) + }) + }; + drop(c2s_tx); + runtime_handle.join().unwrap(); + let elapsed = start.elapsed(); + + let actual_turns = turn_count.load(Ordering::SeqCst); + if actual_turns != iters { + panic!("Expected {}, got {} messages", iters, actual_turns); + } + elapsed + }) + }); +} + +criterion_group!(publish, bench_pub); +criterion_main!(publish);