From aa1755be0fcfb2a5c0ae7953819124b1434bf130 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 21 Jul 2021 23:53:55 +0200 Subject: [PATCH] Avoid needless translation of internal events --- benches/bench_dataspace.rs | 8 ++--- examples/consumer.rs | 9 ++--- examples/pingpong.rs | 15 ++++---- examples/producer.rs | 6 ++-- examples/state-consumer.rs | 9 ++--- examples/state-producer.rs | 6 ++-- src/actor.rs | 72 +++++++++++++++----------------------- src/relay.rs | 2 +- 8 files changed, 57 insertions(+), 70 deletions(-) diff --git a/benches/bench_dataspace.rs b/benches/bench_dataspace.rs index d84a86b..f2379ed 100644 --- a/benches/bench_dataspace.rs +++ b/benches/bench_dataspace.rs @@ -59,12 +59,12 @@ pub fn bench_pub(c: &mut Criterion) { let debtor = Debtor::new(syndicate::name!("sender-debtor")); ac.linked_task(syndicate::name!("sender"), async move { for _ in 0..iters { - ds.external_event(&debtor, Event::Message(Box::new(Message { + external_event(&ds, &debtor, Event::Message(Box::new(Message { body: Assertion(says(_Any::new("bench_pub"), Value::ByteString(vec![]).wrap())), }))).await? } - shutdown.external_event(&debtor, Event::Message(Box::new(Message { + external_event(&shutdown, &debtor, Event::Message(Box::new(Message { body: Assertion(_Any::new(true)), }))).await?; Ok(()) @@ -127,12 +127,12 @@ pub fn bench_pub(c: &mut Criterion) { let debtor = t.debtor.clone(); t.actor.linked_task(syndicate::name!("sender"), async move { for _ in 0..iters { - ds.external_event(&debtor, Event::Message(Box::new(Message { + external_event(&ds, &debtor, Event::Message(Box::new(Message { body: Assertion(says(_Any::new("bench_pub"), Value::ByteString(vec![]).wrap())), }))).await? } - ds.external_event(&debtor, Event::Message(Box::new(Message { + external_event(&ds, &debtor, Event::Message(Box::new(Message { body: Assertion(_Any::new(true)), }))).await?; Ok(()) diff --git a/examples/consumer.rs b/examples/consumer.rs index 5bb4efd..1b43a49 100644 --- a/examples/consumer.rs +++ b/examples/consumer.rs @@ -68,10 +68,11 @@ async fn main() -> Result<(), Box> { let mut stats_timer = interval(Duration::from_secs(1)); loop { stats_timer.tick().await; - consumer.external_event(&Debtor::new(syndicate::name!("debtor")), - Event::Message(Box::new(Message { - body: Assertion(_Any::new(true)), - }))).await?; + external_event(&consumer, + &Debtor::new(syndicate::name!("debtor")), + Event::Message(Box::new(Message { + body: Assertion(_Any::new(true)), + }))).await?; } }); Ok(None) diff --git a/examples/pingpong.rs b/examples/pingpong.rs index 2f95733..2d0f87e 100644 --- a/examples/pingpong.rs +++ b/examples/pingpong.rs @@ -190,10 +190,11 @@ async fn main() -> Result<(), Box> { let mut stats_timer = interval(Duration::from_secs(1)); loop { stats_timer.tick().await; - consumer.external_event(&Debtor::new(syndicate::name!("debtor")), - Event::Message(Box::new(Message { - body: Assertion(_Any::new(true)), - }))).await?; + external_event(&consumer, + &Debtor::new(syndicate::name!("debtor")), + Event::Message(Box::new(Message { + body: Assertion(_Any::new(true)), + }))).await?; } }); @@ -209,11 +210,11 @@ async fn main() -> Result<(), Box> { Value::from(now()).wrap(), padding.clone()); for _ in 0..action_count { - events.push(Event::Message(Box::new(Message { + events.push((ds.clone(), Event::Message(Box::new(Message { body: Assertion(current_rec.clone()), - }))); + })))); } - ds.external_events(&debtor, events).await? + external_events(&ds, &debtor, events).await? } Ok(()) }); diff --git a/examples/producer.rs b/examples/producer.rs index 909f1ac..93ae694 100644 --- a/examples/producer.rs +++ b/examples/producer.rs @@ -45,11 +45,11 @@ async fn main() -> Result<(), Box> { debtor.ensure_clear_funds().await; let mut events = Vec::new(); for _ in 0..action_count { - events.push(Event::Message(Box::new(Message { + events.push((ds.clone(), Event::Message(Box::new(Message { body: Assertion(says(Value::from("producer").wrap(), padding.clone())), - }))); + })))); } - ds.external_events(&debtor, events).await?; + external_events(&ds, &debtor, events).await?; } }); Ok(None) diff --git a/examples/state-consumer.rs b/examples/state-consumer.rs index 6b0793f..6b08322 100644 --- a/examples/state-consumer.rs +++ b/examples/state-consumer.rs @@ -84,10 +84,11 @@ async fn main() -> Result<(), Box> { let mut stats_timer = interval(Duration::from_secs(1)); loop { stats_timer.tick().await; - consumer.external_event(&Debtor::new(syndicate::name!("debtor")), - Event::Message(Box::new(Message { - body: Assertion(_Any::new(true)), - }))).await?; + external_event(&consumer, + &Debtor::new(syndicate::name!("debtor")), + Event::Message(Box::new(Message { + body: Assertion(_Any::new(true)), + }))).await?; } }); Ok(None) diff --git a/examples/state-producer.rs b/examples/state-producer.rs index 248df29..46c8797 100644 --- a/examples/state-producer.rs +++ b/examples/state-producer.rs @@ -35,11 +35,11 @@ async fn main() -> Result<(), Box> { let retract_e = Event::Retract(Box::new(Retract { handle, })); - ds.external_event(&debtor, assert_e.clone()).await?; + external_event(&ds, &debtor, assert_e.clone()).await?; loop { debtor.ensure_clear_funds().await; - ds.external_event(&debtor, retract_e.clone()).await?; - ds.external_event(&debtor, assert_e.clone()).await?; + external_event(&ds, &debtor, retract_e.clone()).await?; + external_event(&ds, &debtor, assert_e.clone()).await?; } }); Ok(None) diff --git a/src/actor.rs b/src/actor.rs index b56857d..8870e50 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -59,7 +59,7 @@ pub trait Entity: Send { } enum Destination { - ImmediateSelf(Oid), + ImmediateSelf(Arc), Remote(Arc), } @@ -72,7 +72,7 @@ pub struct Activation<'activation> { pub actor: &'activation mut Actor, pub debtor: Arc, queues: HashMap, - immediate_self: Vec, + immediate_self: PendingEventQueue, } #[derive(Debug)] @@ -93,7 +93,7 @@ pub struct LoanedItem { enum SystemMessage { Release, ReleaseOid(Oid), - Turn(LoanedItem), + Turn(LoanedItem), Crash(Error), } @@ -181,11 +181,10 @@ impl<'activation> Activation<'activation> { } } - fn immediate_oid(&self, r: &Arc) -> Oid { + fn immediate_oid(&self, r: &Arc) { if r.addr.mailbox.actor_id != self.actor.actor_id { panic!("Cannot use immediate_self to send to remote peers"); } - r.addr.oid.clone() } pub fn assert(&mut self, r: &Arc, a: M) -> Handle where M: Into<_Any> { @@ -199,15 +198,12 @@ impl<'activation> Activation<'activation> { } pub fn assert_immediate_self(&mut self, r: &Arc, a: M) -> Handle where M: Into<_Any> { - let oid = self.immediate_oid(r); + self.immediate_oid(r); let handle = crate::next_handle(); if let Some(assertion) = r.rewrite(a.into()) { - self.immediate_self.push(TurnEvent { - oid: oid.clone(), - event: Event::Assert(Box::new( - Assert { assertion, handle: handle.clone() })), - }); - self.actor.outbound_assertions.insert(handle.clone(), Destination::ImmediateSelf(oid)); + self.immediate_self.push((r.clone(), Event::Assert(Box::new( + Assert { assertion, handle: handle.clone() })))); + self.actor.outbound_assertions.insert(handle.clone(), Destination::ImmediateSelf(r.clone())); } handle } @@ -222,11 +218,8 @@ impl<'activation> Activation<'activation> { match d { Destination::Remote(r) => self.queue_for(&r).push((r, Event::Retract(Box::new(Retract { handle })))), - Destination::ImmediateSelf(oid) => - self.immediate_self.push(TurnEvent { - oid, - event: Event::Retract(Box::new(Retract { handle })), - }), + Destination::ImmediateSelf(r) => + self.immediate_self.push((r, Event::Retract(Box::new(Retract { handle })))), } } @@ -238,11 +231,9 @@ impl<'activation> Activation<'activation> { } pub fn message_immediate_self(&mut self, r: &Arc, m: M) where M: Into<_Any> { + self.immediate_oid(r); if let Some(body) = r.rewrite(m.into()) { - self.immediate_self.push(TurnEvent { - oid: self.immediate_oid(r), - event: Event::Message(Box::new(Message { body })), - }) + self.immediate_self.push((r.clone(), Event::Message(Box::new(Message { body })))); } } @@ -262,11 +253,7 @@ impl<'activation> Activation<'activation> { if turn.len() == 0 { continue; } let first_ref = Arc::clone(&turn[0].0); let target = &first_ref.addr.mailbox; - let mut turn_events = Vec::new(); - for (r, e) in turn.into_iter() { - turn_events.push(TurnEvent { oid: r.addr.oid.clone(), event: e }); - } - let _ = target.send(&self.debtor, Turn(turn_events)); + let _ = target.send(&self.debtor, turn); } } @@ -355,8 +342,8 @@ impl Drop for LoanedItem { impl Mailbox { #[must_use] - pub fn send(&self, debtor: &Arc, t: Turn) -> ActorResult { - let token_count = t.0.len(); + pub fn send(&self, debtor: &Arc, t: PendingEventQueue) -> ActorResult { + let token_count = t.len(); self.tx.send(SystemMessage::Turn(LoanedItem::new(debtor, token_count, t))) .map_err(|_| error("Target actor not running", _Any::new(false))) } @@ -580,11 +567,11 @@ impl Actor { Ok(false) } SystemMessage::Turn(mut loaned_item) => { - let mut events = std::mem::take(&mut loaned_item.item.0); + let mut events = std::mem::take(&mut loaned_item.item); let mut t = Activation::new(self, Arc::clone(&loaned_item.debtor)); loop { - for TurnEvent { oid, event } in events.into_iter() { - t.with_oid(&oid, |_| Ok(()), |t, e| match event { + for (r, event) in events.into_iter() { + t.with_oid(&r.addr.oid, |_| Ok(()), |t, e| match event { Event::Assert(b) => { let Assert { assertion: Assertion(assertion), handle } = *b; e.assert(t, assertion, handle) @@ -677,20 +664,17 @@ impl Drop for Actor { } } +#[must_use] +pub async fn external_event(r: &Arc, debtor: &Arc, event: Event) -> ActorResult { + r.addr.mailbox.send(debtor, vec![(r.clone(), event)]) +} + +#[must_use] +pub async fn external_events(r: &Arc, debtor: &Arc, events: PendingEventQueue) -> ActorResult { + r.addr.mailbox.send(debtor, events) +} + impl Ref { - #[must_use] - pub async fn external_event(&self, debtor: &Arc, event: Event) -> ActorResult { - self.addr.mailbox.send(debtor, Turn(vec![TurnEvent { oid: self.addr.oid.clone(), event }])) - } - - #[must_use] - pub async fn external_events(&self, debtor: &Arc, events: Vec) -> ActorResult { - self.addr.mailbox.send(debtor, Turn(events.into_iter().map(|event| TurnEvent { - oid: self.addr.oid.clone(), - event, - }).collect())) - } - pub fn attenuate(&self, attenuation: &sturdy::Attenuation) -> Result, CaveatError> { let mut r = Ref { addr: Arc::clone(&self.addr), diff --git a/src/relay.rs b/src/relay.rs index 28fd9d7..52dc3d2 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -451,7 +451,7 @@ pub async fn input_loop( #[must_use] async fn s>(relay: &Arc, debtor: &Arc, m: M) -> ActorResult { debtor.ensure_clear_funds().await; - relay.external_event(debtor, Event::Message(Box::new(Message { body: Assertion(m.into()) }))).await + external_event(relay, debtor, Event::Message(Box::new(Message { body: Assertion(m.into()) }))).await } let debtor = Debtor::new(crate::name!("input-loop"));