Round-trip and (estimated) one-way latencies in pingpong example

This commit is contained in:
Tony Garnock-Jones 2020-06-12 21:30:15 +02:00
parent a525ce6f4a
commit 14b7aad9c6
1 changed files with 89 additions and 21 deletions

View File

@ -1,28 +1,34 @@
#![recursion_limit = "256"] #![recursion_limit = "512"]
use core::time::Duration; use core::time::Duration;
use futures::FutureExt; use futures::FutureExt;
use futures::SinkExt; use futures::SinkExt;
use futures::StreamExt; use futures::StreamExt;
use futures::select; use futures::select;
use std::time::{SystemTime, SystemTimeError};
use structopt::StructOpt; use structopt::StructOpt;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::time::interval; use tokio::time::interval;
use tokio_util::codec::Framed; use tokio_util::codec::Framed;
use syndicate::packets::{ClientCodec, C2S, S2C, Action, Event}; use syndicate::packets::{ClientCodec, C2S, S2C, Action, Event};
use syndicate::value::Value; use syndicate::value::{NestedValue, Value, IOValue};
#[derive(Clone, Debug, StructOpt)]
pub struct PingConfig {
#[structopt(short = "t", default_value = "1")]
turn_count: u32,
#[structopt(short = "a", default_value = "1")]
action_count: u32,
#[structopt(short = "l", default_value = "0")]
report_latency_every: usize,
}
#[derive(Clone, Debug, StructOpt)] #[derive(Clone, Debug, StructOpt)]
pub enum PingPongMode { pub enum PingPongMode {
Ping { Ping(PingConfig),
#[structopt(short = "t", default_value = "1")]
turn_count: u32,
#[structopt(short = "a", default_value = "1")]
action_count: u32,
},
Pong, Pong,
} }
@ -35,40 +41,81 @@ pub struct Config {
dataspace: String, dataspace: String,
} }
fn now() -> Result<u64, SystemTimeError> {
Ok(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_nanos() as u64)
}
fn report_latencies(rtt_ns_samples: &Vec<u64>) {
let n = rtt_ns_samples.len();
let rtt_0 = rtt_ns_samples[0];
let rtt_50 = rtt_ns_samples[n * 1 / 2];
let rtt_90 = rtt_ns_samples[n * 90 / 100];
let rtt_95 = rtt_ns_samples[n * 95 / 100];
let rtt_99 = rtt_ns_samples[n * 99 / 100];
let rtt_99_9 = rtt_ns_samples[n * 999 / 1000];
let rtt_99_99 = rtt_ns_samples[n * 9999 / 10000];
let rtt_max = rtt_ns_samples[n - 1];
println!("rtt: 0% {:05.5}ms, 50% {:05.5}ms, 90% {:05.5}ms, 95% {:05.5}ms, 99% {:05.5}ms, 99.9% {:05.5}ms, 99.99% {:05.5}ms, max {:05.5}ms",
rtt_0 as f64 / 1000000.0,
rtt_50 as f64 / 1000000.0,
rtt_90 as f64 / 1000000.0,
rtt_95 as f64 / 1000000.0,
rtt_99 as f64 / 1000000.0,
rtt_99_9 as f64 / 1000000.0,
rtt_99_99 as f64 / 1000000.0,
rtt_max as f64 / 1000000.0);
println!("msg: 0% {:05.5}ms, 50% {:05.5}ms, 90% {:05.5}ms, 95% {:05.5}ms, 99% {:05.5}ms, 99.9% {:05.5}ms, 99.99% {:05.5}ms, max {:05.5}ms",
rtt_0 as f64 / 2000000.0,
rtt_50 as f64 / 2000000.0,
rtt_90 as f64 / 2000000.0,
rtt_95 as f64 / 2000000.0,
rtt_99 as f64 / 2000000.0,
rtt_99_9 as f64 / 2000000.0,
rtt_99_99 as f64 / 2000000.0,
rtt_max as f64 / 2000000.0);
}
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::from_args(); let config = Config::from_args();
let (send_label, recv_label) = match config.mode { let (send_label, recv_label, report_latency_every, should_echo) = match config.mode {
PingPongMode::Ping { turn_count: _, action_count: _ } => ("Ping", "Pong"), PingPongMode::Ping(ref c) => ("Ping", "Pong", c.report_latency_every, false),
PingPongMode::Pong => ("Pong", "Ping"), PingPongMode::Pong => ("Pong", "Ping", 0, true),
}; };
let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, ClientCodec::new()); let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, ClientCodec::new());
frames.send(C2S::Connect(Value::from(config.dataspace).wrap())).await?; frames.send(C2S::Connect(Value::from(config.dataspace).wrap())).await?;
let discard = Value::simple_record("discard", vec![]).wrap();
let capture = Value::simple_record("capture", vec![discard]).wrap();
frames.send( frames.send(
C2S::Turn(vec![Action::Assert( C2S::Turn(vec![Action::Assert(
Value::from(0).wrap(), Value::from(0).wrap(),
Value::simple_record("observe", vec![ Value::simple_record("observe", vec![
Value::simple_record(recv_label, vec![]).wrap()]).wrap())])) Value::simple_record(recv_label, vec![capture]).wrap()]).wrap())]))
.await?; .await?;
let mut stats_timer = interval(Duration::from_secs(1)); let mut stats_timer = interval(Duration::from_secs(1));
let mut turn_counter = 0; let mut turn_counter = 0;
let mut event_counter = 0; let mut event_counter = 0;
let mut current_stamp: IOValue = Value::from(0).wrap();
if let PingPongMode::Ping { turn_count, action_count } = config.mode { if let PingPongMode::Ping(ref c) = config.mode {
for _ in 0..turn_count { for _ in 0..c.turn_count {
let mut actions = vec![]; let mut actions = vec![];
for _ in 0..action_count { current_stamp = Value::from(now()?).wrap();
for _ in 0..c.action_count {
actions.push(Action::Message( actions.push(Action::Message(
Value::simple_record(send_label, vec![]).wrap())); Value::simple_record(send_label, vec![current_stamp.clone()]).wrap()));
} }
frames.send(C2S::Turn(actions)).await?; frames.send(C2S::Turn(actions)).await?;
} }
} }
let mut rtt_ns_samples: Vec<u64> = vec![0; report_latency_every];
let mut rtt_batch_count = 0;
loop { loop {
select! { select! {
_instant = stats_timer.next().boxed().fuse() => { _instant = stats_timer.next().boxed().fuse() => {
@ -84,11 +131,32 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
turn_counter = turn_counter + 1; turn_counter = turn_counter + 1;
event_counter = event_counter + events.len(); event_counter = event_counter + events.len();
let mut actions = vec![]; let mut actions = vec![];
let mut have_sample = false;
for e in events { for e in events {
match e { match e {
Event::Msg(_, _) => Event::Msg(_, captures) => {
actions.push(Action::Message( if should_echo || (report_latency_every == 0) {
Value::simple_record(send_label, vec![]).wrap())), actions.push(Action::Message(
Value::simple_record(send_label, vec![captures[0].clone()]).wrap()));
} else {
if !have_sample {
let rtt_ns = now()? - captures[0].value().to_u64()?;
rtt_ns_samples[rtt_batch_count] = rtt_ns;
rtt_batch_count = rtt_batch_count + 1;
if rtt_batch_count == report_latency_every {
rtt_ns_samples.sort();
report_latencies(&rtt_ns_samples);
rtt_batch_count = 0;
}
have_sample = true;
current_stamp = Value::from(now()?).wrap();
}
actions.push(Action::Message(
Value::simple_record(send_label, vec![current_stamp.clone()]).wrap()));
}
}
_ => _ =>
() ()
} }