diff --git a/syndicate-macros/examples/ring.rs b/syndicate-macros/examples/ring.rs index ee879d1..4b39c4f 100644 --- a/syndicate-macros/examples/ring.rs +++ b/syndicate-macros/examples/ring.rs @@ -5,11 +5,11 @@ use std::sync::Arc; #[derive(Debug)] enum Instruction { SetPeer(Arc>), - HandleMessage(u32), + HandleMessage(u64), } struct Forwarder { - n_rounds: u32, + hop_limit: u64, supervisor: Arc>, peer: Option>>, } @@ -31,7 +31,7 @@ impl Entity for Forwarder { self.peer = Some(r); } Instruction::HandleMessage(n) => { - let target = if n >= self.n_rounds { &self.supervisor } else { self.peer.as_ref().expect("peer") }; + let target = if n >= self.hop_limit { &self.supervisor } else { self.peer.as_ref().expect("peer") }; turn.message(target, Instruction::HandleMessage(n + 1)); } } @@ -40,8 +40,8 @@ impl Entity for Forwarder { } struct Supervisor { - n_actors: u32, - n_rounds: u32, + latency_mode: bool, + total_transfers: u64, remaining_to_receive: u32, start_time: Option, } @@ -58,11 +58,11 @@ impl Entity for Supervisor { if self.remaining_to_receive == 0 { let stop_time = std::time::Instant::now(); let duration = stop_time - self.start_time.unwrap(); - let n_messages: u64 = self.n_actors as u64 * self.n_rounds as u64; - tracing::info!("Stop after {:?}; {:?} messages, so {:?} Hz", + tracing::info!("Stop after {:?}; {:?} messages, so {:?} Hz ({} mode)", duration, - n_messages, - (1000.0 * n_messages as f64) / duration.as_millis() as f64); + self.total_transfers, + (1000.0 * self.total_transfers as f64) / duration.as_millis() as f64, + if self.latency_mode { "latency" } else { "throughput" }); turn.stop_root(); } }, @@ -78,12 +78,24 @@ async fn main() -> ActorResult { let args: Vec = env::args().collect(); let n_actors: u32 = args.get(1).unwrap_or(&"1000000".to_string()).parse()?; let n_rounds: u32 = args.get(2).unwrap_or(&"200".to_string()).parse()?; + let latency_mode: bool = match args.get(3).unwrap_or(&"throughput".to_string()).as_str() { + "latency" => true, + "throughput" => false, + _other => return Err("Invalid throughput/latency mode".into()), + }; tracing::info!("Will run {:?} actors for {:?} rounds", n_actors, n_rounds); + let total_transfers: u64 = n_actors as u64 * n_rounds as u64; + let (hop_limit, injection_count) = if latency_mode { + (total_transfers, 1) + } else { + (n_rounds as u64, n_actors) + }; + let me = t.create(Supervisor { - n_actors, - n_rounds, - remaining_to_receive: n_actors, + latency_mode, + total_transfers, + remaining_to_receive: injection_count, start_time: None, }); @@ -93,7 +105,7 @@ async fn main() -> ActorResult { forwarders.push( t.spawn_for_entity(None, true, Box::new( Forwarder { - n_rounds, + hop_limit, supervisor: me.clone(), peer: forwarders.last().cloned(), })) @@ -103,8 +115,13 @@ async fn main() -> ActorResult { t.later(move |t| { t.message(&me, Instruction::SetPeer(me.clone())); t.later(move |t| { + let mut injected: u32 = 0; for f in forwarders.into_iter() { + if injected >= injection_count { + break; + } t.message(&f, Instruction::HandleMessage(0)); + injected += 1; } Ok(()) });