Latency-mode for syndicate-macros/example/ring.rs

This commit is contained in:
Tony Garnock-Jones 2024-03-03 10:34:18 +01:00
parent b4f355aa0d
commit 4fcb14d63e
1 changed files with 30 additions and 13 deletions

View File

@ -5,11 +5,11 @@ use std::sync::Arc;
#[derive(Debug)] #[derive(Debug)]
enum Instruction { enum Instruction {
SetPeer(Arc<Ref<Instruction>>), SetPeer(Arc<Ref<Instruction>>),
HandleMessage(u32), HandleMessage(u64),
} }
struct Forwarder { struct Forwarder {
n_rounds: u32, hop_limit: u64,
supervisor: Arc<Ref<Instruction>>, supervisor: Arc<Ref<Instruction>>,
peer: Option<Arc<Ref<Instruction>>>, peer: Option<Arc<Ref<Instruction>>>,
} }
@ -31,7 +31,7 @@ impl Entity<Instruction> for Forwarder {
self.peer = Some(r); self.peer = Some(r);
} }
Instruction::HandleMessage(n) => { Instruction::HandleMessage(n) => {
let target = if n >= self.n_rounds { &self.supervisor } else { self.peer.as_ref().expect("peer") }; let target = if n >= self.hop_limit { &self.supervisor } else { self.peer.as_ref().expect("peer") };
turn.message(target, Instruction::HandleMessage(n + 1)); turn.message(target, Instruction::HandleMessage(n + 1));
} }
} }
@ -40,8 +40,8 @@ impl Entity<Instruction> for Forwarder {
} }
struct Supervisor { struct Supervisor {
n_actors: u32, latency_mode: bool,
n_rounds: u32, total_transfers: u64,
remaining_to_receive: u32, remaining_to_receive: u32,
start_time: Option<std::time::Instant>, start_time: Option<std::time::Instant>,
} }
@ -58,11 +58,11 @@ impl Entity<Instruction> for Supervisor {
if self.remaining_to_receive == 0 { if self.remaining_to_receive == 0 {
let stop_time = std::time::Instant::now(); let stop_time = std::time::Instant::now();
let duration = stop_time - self.start_time.unwrap(); let duration = stop_time - self.start_time.unwrap();
let n_messages: u64 = self.n_actors as u64 * self.n_rounds as u64; tracing::info!("Stop after {:?}; {:?} messages, so {:?} Hz ({} mode)",
tracing::info!("Stop after {:?}; {:?} messages, so {:?} Hz",
duration, duration,
n_messages, self.total_transfers,
(1000.0 * n_messages as f64) / duration.as_millis() as f64); (1000.0 * self.total_transfers as f64) / duration.as_millis() as f64,
if self.latency_mode { "latency" } else { "throughput" });
turn.stop_root(); turn.stop_root();
} }
}, },
@ -78,12 +78,24 @@ async fn main() -> ActorResult {
let args: Vec<String> = env::args().collect(); let args: Vec<String> = env::args().collect();
let n_actors: u32 = args.get(1).unwrap_or(&"1000000".to_string()).parse()?; let n_actors: u32 = args.get(1).unwrap_or(&"1000000".to_string()).parse()?;
let n_rounds: u32 = args.get(2).unwrap_or(&"200".to_string()).parse()?; let n_rounds: u32 = args.get(2).unwrap_or(&"200".to_string()).parse()?;
let latency_mode: bool = match args.get(3).unwrap_or(&"throughput".to_string()).as_str() {
"latency" => true,
"throughput" => false,
_other => return Err("Invalid throughput/latency mode".into()),
};
tracing::info!("Will run {:?} actors for {:?} rounds", n_actors, n_rounds); tracing::info!("Will run {:?} actors for {:?} rounds", n_actors, n_rounds);
let total_transfers: u64 = n_actors as u64 * n_rounds as u64;
let (hop_limit, injection_count) = if latency_mode {
(total_transfers, 1)
} else {
(n_rounds as u64, n_actors)
};
let me = t.create(Supervisor { let me = t.create(Supervisor {
n_actors, latency_mode,
n_rounds, total_transfers,
remaining_to_receive: n_actors, remaining_to_receive: injection_count,
start_time: None, start_time: None,
}); });
@ -93,7 +105,7 @@ async fn main() -> ActorResult {
forwarders.push( forwarders.push(
t.spawn_for_entity(None, true, Box::new( t.spawn_for_entity(None, true, Box::new(
Forwarder { Forwarder {
n_rounds, hop_limit,
supervisor: me.clone(), supervisor: me.clone(),
peer: forwarders.last().cloned(), peer: forwarders.last().cloned(),
})) }))
@ -103,8 +115,13 @@ async fn main() -> ActorResult {
t.later(move |t| { t.later(move |t| {
t.message(&me, Instruction::SetPeer(me.clone())); t.message(&me, Instruction::SetPeer(me.clone()));
t.later(move |t| { t.later(move |t| {
let mut injected: u32 = 0;
for f in forwarders.into_iter() { for f in forwarders.into_iter() {
if injected >= injection_count {
break;
}
t.message(&f, Instruction::HandleMessage(0)); t.message(&f, Instruction::HandleMessage(0));
injected += 1;
} }
Ok(()) Ok(())
}); });