From 14b7aad9c65b6530cecc19e80a0da555dd392989 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Fri, 12 Jun 2020 21:30:15 +0200 Subject: [PATCH] Round-trip and (estimated) one-way latencies in pingpong example --- examples/pingpong.rs | 110 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 89 insertions(+), 21 deletions(-) diff --git a/examples/pingpong.rs b/examples/pingpong.rs index 7c3f1ae..37109b1 100644 --- a/examples/pingpong.rs +++ b/examples/pingpong.rs @@ -1,28 +1,34 @@ -#![recursion_limit = "256"] +#![recursion_limit = "512"] use core::time::Duration; use futures::FutureExt; use futures::SinkExt; use futures::StreamExt; use futures::select; +use std::time::{SystemTime, SystemTimeError}; use structopt::StructOpt; use tokio::net::TcpStream; use tokio::time::interval; use tokio_util::codec::Framed; use syndicate::packets::{ClientCodec, C2S, S2C, Action, Event}; -use syndicate::value::Value; +use syndicate::value::{NestedValue, Value, IOValue}; + +#[derive(Clone, Debug, StructOpt)] +pub struct PingConfig { + #[structopt(short = "t", default_value = "1")] + turn_count: u32, + + #[structopt(short = "a", default_value = "1")] + action_count: u32, + + #[structopt(short = "l", default_value = "0")] + report_latency_every: usize, +} #[derive(Clone, Debug, StructOpt)] pub enum PingPongMode { - Ping { - #[structopt(short = "t", default_value = "1")] - turn_count: u32, - - #[structopt(short = "a", default_value = "1")] - action_count: u32, - }, - + Ping(PingConfig), Pong, } @@ -35,40 +41,81 @@ pub struct Config { dataspace: String, } +fn now() -> Result { + Ok(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_nanos() as u64) +} + +fn report_latencies(rtt_ns_samples: &Vec) { + let n = rtt_ns_samples.len(); + let rtt_0 = rtt_ns_samples[0]; + let rtt_50 = rtt_ns_samples[n * 1 / 2]; + let rtt_90 = rtt_ns_samples[n * 90 / 100]; + let rtt_95 = rtt_ns_samples[n * 95 / 100]; + let rtt_99 = rtt_ns_samples[n * 99 / 100]; + let rtt_99_9 = rtt_ns_samples[n * 999 / 1000]; + let rtt_99_99 = rtt_ns_samples[n * 9999 / 10000]; + let rtt_max = rtt_ns_samples[n - 1]; + println!("rtt: 0% {:05.5}ms, 50% {:05.5}ms, 90% {:05.5}ms, 95% {:05.5}ms, 99% {:05.5}ms, 99.9% {:05.5}ms, 99.99% {:05.5}ms, max {:05.5}ms", + rtt_0 as f64 / 1000000.0, + rtt_50 as f64 / 1000000.0, + rtt_90 as f64 / 1000000.0, + rtt_95 as f64 / 1000000.0, + rtt_99 as f64 / 1000000.0, + rtt_99_9 as f64 / 1000000.0, + rtt_99_99 as f64 / 1000000.0, + rtt_max as f64 / 1000000.0); + println!("msg: 0% {:05.5}ms, 50% {:05.5}ms, 90% {:05.5}ms, 95% {:05.5}ms, 99% {:05.5}ms, 99.9% {:05.5}ms, 99.99% {:05.5}ms, max {:05.5}ms", + rtt_0 as f64 / 2000000.0, + rtt_50 as f64 / 2000000.0, + rtt_90 as f64 / 2000000.0, + rtt_95 as f64 / 2000000.0, + rtt_99 as f64 / 2000000.0, + rtt_99_9 as f64 / 2000000.0, + rtt_99_99 as f64 / 2000000.0, + rtt_max as f64 / 2000000.0); +} + #[tokio::main] async fn main() -> Result<(), Box> { let config = Config::from_args(); - let (send_label, recv_label) = match config.mode { - PingPongMode::Ping { turn_count: _, action_count: _ } => ("Ping", "Pong"), - PingPongMode::Pong => ("Pong", "Ping"), + let (send_label, recv_label, report_latency_every, should_echo) = match config.mode { + PingPongMode::Ping(ref c) => ("Ping", "Pong", c.report_latency_every, false), + PingPongMode::Pong => ("Pong", "Ping", 0, true), }; let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, ClientCodec::new()); frames.send(C2S::Connect(Value::from(config.dataspace).wrap())).await?; + let discard = Value::simple_record("discard", vec![]).wrap(); + let capture = Value::simple_record("capture", vec![discard]).wrap(); frames.send( C2S::Turn(vec![Action::Assert( Value::from(0).wrap(), Value::simple_record("observe", vec![ - Value::simple_record(recv_label, vec![]).wrap()]).wrap())])) + Value::simple_record(recv_label, vec![capture]).wrap()]).wrap())])) .await?; let mut stats_timer = interval(Duration::from_secs(1)); let mut turn_counter = 0; let mut event_counter = 0; + let mut current_stamp: IOValue = Value::from(0).wrap(); - if let PingPongMode::Ping { turn_count, action_count } = config.mode { - for _ in 0..turn_count { + if let PingPongMode::Ping(ref c) = config.mode { + for _ in 0..c.turn_count { let mut actions = vec![]; - for _ in 0..action_count { + current_stamp = Value::from(now()?).wrap(); + for _ in 0..c.action_count { actions.push(Action::Message( - Value::simple_record(send_label, vec![]).wrap())); + Value::simple_record(send_label, vec![current_stamp.clone()]).wrap())); } frames.send(C2S::Turn(actions)).await?; } } + let mut rtt_ns_samples: Vec = vec![0; report_latency_every]; + let mut rtt_batch_count = 0; + loop { select! { _instant = stats_timer.next().boxed().fuse() => { @@ -84,11 +131,32 @@ async fn main() -> Result<(), Box> { turn_counter = turn_counter + 1; event_counter = event_counter + events.len(); let mut actions = vec![]; + let mut have_sample = false; for e in events { match e { - Event::Msg(_, _) => - actions.push(Action::Message( - Value::simple_record(send_label, vec![]).wrap())), + Event::Msg(_, captures) => { + if should_echo || (report_latency_every == 0) { + actions.push(Action::Message( + Value::simple_record(send_label, vec![captures[0].clone()]).wrap())); + } else { + if !have_sample { + let rtt_ns = now()? - captures[0].value().to_u64()?; + rtt_ns_samples[rtt_batch_count] = rtt_ns; + rtt_batch_count = rtt_batch_count + 1; + + if rtt_batch_count == report_latency_every { + rtt_ns_samples.sort(); + report_latencies(&rtt_ns_samples); + rtt_batch_count = 0; + } + + have_sample = true; + current_stamp = Value::from(now()?).wrap(); + } + actions.push(Action::Message( + Value::simple_record(send_label, vec![current_stamp.clone()]).wrap())); + } + } _ => () }