diff --git a/syndicate-macros/examples/ring.rs b/syndicate-macros/examples/ring.rs new file mode 100644 index 0000000..ee879d1 --- /dev/null +++ b/syndicate-macros/examples/ring.rs @@ -0,0 +1,116 @@ +use syndicate::actor::*; +use std::env; +use std::sync::Arc; + +#[derive(Debug)] +enum Instruction { + SetPeer(Arc>), + HandleMessage(u32), +} + +struct Forwarder { + n_rounds: u32, + supervisor: Arc>, + peer: Option>>, +} + +impl Drop for Forwarder { + fn drop(&mut self) { + let r = self.peer.take(); + let _ = tokio::spawn(async move { + drop(r); + }); + } +} + +impl Entity for Forwarder { + fn message(&mut self, turn: &mut Activation, message: Instruction) -> ActorResult { + match message { + Instruction::SetPeer(r) => { + tracing::info!("Setting peer {:?}", r); + self.peer = Some(r); + } + Instruction::HandleMessage(n) => { + let target = if n >= self.n_rounds { &self.supervisor } else { self.peer.as_ref().expect("peer") }; + turn.message(target, Instruction::HandleMessage(n + 1)); + } + } + Ok(()) + } +} + +struct Supervisor { + n_actors: u32, + n_rounds: u32, + remaining_to_receive: u32, + start_time: Option, +} + +impl Entity for Supervisor { + fn message(&mut self, turn: &mut Activation, message: Instruction) -> ActorResult { + match message { + Instruction::SetPeer(_) => { + tracing::info!("Start"); + self.start_time = Some(std::time::Instant::now()); + }, + Instruction::HandleMessage(_n) => { + self.remaining_to_receive -= 1; + if self.remaining_to_receive == 0 { + let stop_time = std::time::Instant::now(); + let duration = stop_time - self.start_time.unwrap(); + let n_messages: u64 = self.n_actors as u64 * self.n_rounds as u64; + tracing::info!("Stop after {:?}; {:?} messages, so {:?} Hz", + duration, + n_messages, + (1000.0 * n_messages as f64) / duration.as_millis() as f64); + turn.stop_root(); + } + }, + } + Ok(()) + } +} + +#[tokio::main] +async fn main() -> ActorResult { + syndicate::convenient_logging()?; + Actor::top(None, |t| { + let args: Vec = env::args().collect(); + let n_actors: u32 = args.get(1).unwrap_or(&"1000000".to_string()).parse()?; + let n_rounds: u32 = args.get(2).unwrap_or(&"200".to_string()).parse()?; + tracing::info!("Will run {:?} actors for {:?} rounds", n_actors, n_rounds); + + let me = t.create(Supervisor { + n_actors, + n_rounds, + remaining_to_receive: n_actors, + start_time: None, + }); + + let mut forwarders: Vec>> = Vec::new(); + for _i in 0 .. n_actors { + if _i % 10000 == 0 { tracing::info!("Actor {:?}", _i); } + forwarders.push( + t.spawn_for_entity(None, true, Box::new( + Forwarder { + n_rounds, + supervisor: me.clone(), + peer: forwarders.last().cloned(), + })) + .0.expect("an entity")); + } + t.message(&forwarders[0], Instruction::SetPeer(forwarders.last().expect("an entity").clone())); + t.later(move |t| { + t.message(&me, Instruction::SetPeer(me.clone())); + t.later(move |t| { + for f in forwarders.into_iter() { + t.message(&f, Instruction::HandleMessage(0)); + } + Ok(()) + }); + Ok(()) + }); + Ok(()) + }).await??; + Ok(()) +} diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index ef7d240..19c807f 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -955,6 +955,13 @@ impl<'activation> Activation<'activation> { Box::new(move |t| t.with_entity(&r, |t, e| e.sync(t, peer))))) } + pub fn later ActorResult>(&mut self, action: F) { + // TODO: properly describe this in traces + self.pending.queue_for_mailbox(&self.state.mailbox()).push(( + None, + Box::new(action))); + } + /// Registers the entity `r` in the list of stop actions for the active facet. If the facet /// terminates cleanly, `r`'s [`stop`][Entity::stop] will be called in the context of the /// facet's parent. @@ -1212,6 +1219,30 @@ impl<'activation> Activation<'activation> { self.commit_actions.push(Box::new(action)); } + /// Schedule the creation of a new actor wrapping an entity. + pub fn spawn_for_entity( + &mut self, + name: Name, + link: bool, + target: Box>, + ) -> (Option>>, ActorRef) { + let ac_ref = self._spawn(name, |t| { + let _ = t.prevent_inert_check(); + Ok(()) + }, link); + let mut g = ac_ref.state.lock(); + let r = match &mut *g { + ActorState::Terminated { .. } => None, + ActorState::Running(state) => Some(Arc::new(Ref { + mailbox: state.mailbox(), + facet_id: state.root, + target: Mutex::new(Some(target)), + })), + }; + drop(g); + (r, ac_ref) + } + /// Schedule the creation of a new actor when the Activation commits. pub fn spawn ActorResult>( &mut self,