use std::env; use std::sync::Arc; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; type Ref = UnboundedSender>; #[derive(Debug)] enum Instruction { SetPeer(Arc>), HandleMessage(u64), } struct Forwarder { hop_limit: u64, supervisor: Arc>, peer: Option>>, } impl Drop for Forwarder { fn drop(&mut self) { let r = self.peer.take(); let _ = tokio::spawn(async move { drop(r); }); } } enum Action { Continue, Stop } trait Actor { fn message(&mut self, message: T) -> Action; } fn send(ch: &Arc>, message: T) -> () { match ch.send(Box::new(message)) { Ok(()) => (), Err(v) => panic!("Aiee! Could not send {:?}", v), } } fn spawn + std::marker::Send + 'static>(rt: Option>, mut ac: R) -> Arc> { let (tx, mut rx) = unbounded_channel::>(); if let Some(ref c) = rt { c.fetch_add(1, Ordering::SeqCst); } tokio::spawn(async move { loop { match rx.recv().await { None => break, Some(message) => { match ac.message(*message) { Action::Continue => continue, Action::Stop => break, } } } } if let Some(c) = rt { c.fetch_sub(1, Ordering::SeqCst); } }); Arc::new(tx) } impl Actor for Forwarder { fn message(&mut self, message: Instruction) -> Action { match message { Instruction::SetPeer(r) => { tracing::info!("Setting peer {:?}", r); self.peer = Some(r); } Instruction::HandleMessage(n) => { let target = if n >= self.hop_limit { &self.supervisor } else { self.peer.as_ref().expect("peer") }; send(target, Instruction::HandleMessage(n + 1)); } } Action::Continue } } struct Supervisor { latency_mode: bool, total_transfers: u64, remaining_to_receive: u32, start_time: Option, } impl Actor for Supervisor { fn message(&mut self, message: Instruction) -> Action { match message { Instruction::SetPeer(_) => { tracing::info!("Start"); self.start_time = Some(std::time::Instant::now()); }, Instruction::HandleMessage(_n) => { self.remaining_to_receive -= 1; if self.remaining_to_receive == 0 { let stop_time = std::time::Instant::now(); let duration = stop_time - self.start_time.unwrap(); tracing::info!("Stop after {:?}; {:?} messages, so {:?} Hz ({} mode)", duration, self.total_transfers, (1000.0 * self.total_transfers as f64) / duration.as_millis() as f64, if self.latency_mode { "latency" } else { "throughput" }); return Action::Stop; } }, } Action::Continue } } #[tokio::main] async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; let args: Vec = env::args().collect(); let n_actors: u32 = args.get(1).unwrap_or(&"1000000".to_string()).parse()?; let n_rounds: u32 = args.get(2).unwrap_or(&"200".to_string()).parse()?; let latency_mode: bool = match args.get(3).unwrap_or(&"throughput".to_string()).as_str() { "latency" => true, "throughput" => false, _other => return Err("Invalid throughput/latency mode".into()), }; tracing::info!("Will run {:?} actors for {:?} rounds", n_actors, n_rounds); let count = Arc::new(AtomicU64::new(0)); let total_transfers: u64 = n_actors as u64 * n_rounds as u64; let (hop_limit, injection_count) = if latency_mode { (total_transfers, 1) } else { (n_rounds as u64, n_actors) }; let me = spawn(Some(count.clone()), Supervisor { latency_mode, total_transfers, remaining_to_receive: injection_count, start_time: None, }); let mut forwarders: Vec>> = Vec::new(); for _i in 0 .. n_actors { if _i % 10000 == 0 { tracing::info!("Actor {:?}", _i); } forwarders.push(spawn(None, Forwarder { hop_limit, supervisor: me.clone(), peer: forwarders.last().cloned(), })); } send(&forwarders[0], Instruction::SetPeer(forwarders.last().expect("an entity").clone())); send(&me, Instruction::SetPeer(me.clone())); let mut injected: u32 = 0; for f in forwarders.into_iter() { if injected >= injection_count { break; } send(&f, Instruction::HandleMessage(0)); injected += 1; } loop { if count.load(Ordering::SeqCst) == 0 { break; } tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; } Ok(()) }