From 41cf85f8658734d30d47604bdec301de70a45a7e Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sun, 3 Mar 2024 10:34:25 +0100 Subject: [PATCH] tokio-ring.rs --- syndicate-macros/examples/tokio-ring.rs | 175 ++++++++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 syndicate-macros/examples/tokio-ring.rs diff --git a/syndicate-macros/examples/tokio-ring.rs b/syndicate-macros/examples/tokio-ring.rs new file mode 100644 index 0000000..480eedb --- /dev/null +++ b/syndicate-macros/examples/tokio-ring.rs @@ -0,0 +1,175 @@ +use std::env; +use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; + +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; + +type Ref = UnboundedSender; + +#[derive(Debug)] +enum Instruction { + SetPeer(Arc>), + HandleMessage(u64), +} + +struct Forwarder { + hop_limit: u64, + supervisor: Arc>, + peer: Option>>, +} + +impl Drop for Forwarder { + fn drop(&mut self) { + let r = self.peer.take(); + let _ = tokio::spawn(async move { + drop(r); + }); + } +} + +enum Action { Continue, Stop } + +trait Actor { + fn message(&mut self, message: T) -> Action; +} + +fn send(ch: &Arc>, message: T) -> () { + match ch.send(message) { + Ok(()) => (), + Err(v) => panic!("Aiee! Could not send {:?}", v), + } +} + +fn spawn + std::marker::Send + 'static>(rt: Option>, mut ac: R) -> Arc> { + let (tx, mut rx) = unbounded_channel(); + if let Some(ref c) = rt { + c.fetch_add(1, Ordering::SeqCst); + } + tokio::spawn(async move { + loop { + match rx.recv().await { + None => break, + Some(message) => { + match ac.message(message) { + Action::Continue => continue, + Action::Stop => break, + } + } + } + } + if let Some(c) = rt { + c.fetch_sub(1, Ordering::SeqCst); + } + }); + Arc::new(tx) +} + +impl Actor for Forwarder { + fn message(&mut self, message: Instruction) -> Action { + match message { + Instruction::SetPeer(r) => { + tracing::info!("Setting peer {:?}", r); + self.peer = Some(r); + } + Instruction::HandleMessage(n) => { + let target = if n >= self.hop_limit { &self.supervisor } else { self.peer.as_ref().expect("peer") }; + send(target, Instruction::HandleMessage(n + 1)); + } + } + Action::Continue + } +} + +struct Supervisor { + latency_mode: bool, + total_transfers: u64, + remaining_to_receive: u32, + start_time: Option, +} + +impl Actor for Supervisor { + fn message(&mut self, message: Instruction) -> Action { + match message { + Instruction::SetPeer(_) => { + tracing::info!("Start"); + self.start_time = Some(std::time::Instant::now()); + }, + Instruction::HandleMessage(_n) => { + self.remaining_to_receive -= 1; + if self.remaining_to_receive == 0 { + let stop_time = std::time::Instant::now(); + let duration = stop_time - self.start_time.unwrap(); + tracing::info!("Stop after {:?}; {:?} messages, so {:?} Hz ({} mode)", + duration, + self.total_transfers, + (1000.0 * self.total_transfers as f64) / duration.as_millis() as f64, + if self.latency_mode { "latency" } else { "throughput" }); + return Action::Stop; + } + }, + } + Action::Continue + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + syndicate::convenient_logging()?; + + let args: Vec = env::args().collect(); + let n_actors: u32 = args.get(1).unwrap_or(&"1000000".to_string()).parse()?; + let n_rounds: u32 = args.get(2).unwrap_or(&"200".to_string()).parse()?; + let latency_mode: bool = match args.get(3).unwrap_or(&"throughput".to_string()).as_str() { + "latency" => true, + "throughput" => false, + _other => return Err("Invalid throughput/latency mode".into()), + }; + tracing::info!("Will run {:?} actors for {:?} rounds", n_actors, n_rounds); + + let count = Arc::new(AtomicU64::new(0)); + + let total_transfers: u64 = n_actors as u64 * n_rounds as u64; + let (hop_limit, injection_count) = if latency_mode { + (total_transfers, 1) + } else { + (n_rounds as u64, n_actors) + }; + + let me = spawn(Some(count.clone()), Supervisor { + latency_mode, + total_transfers, + remaining_to_receive: injection_count, + start_time: None, + }); + + let mut forwarders: Vec>> = Vec::new(); + for _i in 0 .. n_actors { + if _i % 10000 == 0 { tracing::info!("Actor {:?}", _i); } + forwarders.push(spawn(None, Forwarder { + hop_limit, + supervisor: me.clone(), + peer: forwarders.last().cloned(), + })); + } + send(&forwarders[0], Instruction::SetPeer(forwarders.last().expect("an entity").clone())); + send(&me, Instruction::SetPeer(me.clone())); + + let mut injected: u32 = 0; + for f in forwarders.into_iter() { + if injected >= injection_count { + break; + } + send(&f, Instruction::HandleMessage(0)); + injected += 1; + } + + loop { + if count.load(Ordering::SeqCst) == 0 { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + + Ok(()) +}