Avoid needless translation of internal events

This commit is contained in:
Tony Garnock-Jones 2021-07-21 23:53:55 +02:00
parent 052da62572
commit aa1755be0f
8 changed files with 57 additions and 70 deletions

View File

@ -59,12 +59,12 @@ pub fn bench_pub(c: &mut Criterion) {
let debtor = Debtor::new(syndicate::name!("sender-debtor"));
ac.linked_task(syndicate::name!("sender"), async move {
for _ in 0..iters {
ds.external_event(&debtor, Event::Message(Box::new(Message {
external_event(&ds, &debtor, Event::Message(Box::new(Message {
body: Assertion(says(_Any::new("bench_pub"),
Value::ByteString(vec![]).wrap())),
}))).await?
}
shutdown.external_event(&debtor, Event::Message(Box::new(Message {
external_event(&shutdown, &debtor, Event::Message(Box::new(Message {
body: Assertion(_Any::new(true)),
}))).await?;
Ok(())
@ -127,12 +127,12 @@ pub fn bench_pub(c: &mut Criterion) {
let debtor = t.debtor.clone();
t.actor.linked_task(syndicate::name!("sender"), async move {
for _ in 0..iters {
ds.external_event(&debtor, Event::Message(Box::new(Message {
external_event(&ds, &debtor, Event::Message(Box::new(Message {
body: Assertion(says(_Any::new("bench_pub"),
Value::ByteString(vec![]).wrap())),
}))).await?
}
ds.external_event(&debtor, Event::Message(Box::new(Message {
external_event(&ds, &debtor, Event::Message(Box::new(Message {
body: Assertion(_Any::new(true)),
}))).await?;
Ok(())

View File

@ -68,10 +68,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut stats_timer = interval(Duration::from_secs(1));
loop {
stats_timer.tick().await;
consumer.external_event(&Debtor::new(syndicate::name!("debtor")),
Event::Message(Box::new(Message {
body: Assertion(_Any::new(true)),
}))).await?;
external_event(&consumer,
&Debtor::new(syndicate::name!("debtor")),
Event::Message(Box::new(Message {
body: Assertion(_Any::new(true)),
}))).await?;
}
});
Ok(None)

View File

@ -190,10 +190,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut stats_timer = interval(Duration::from_secs(1));
loop {
stats_timer.tick().await;
consumer.external_event(&Debtor::new(syndicate::name!("debtor")),
Event::Message(Box::new(Message {
body: Assertion(_Any::new(true)),
}))).await?;
external_event(&consumer,
&Debtor::new(syndicate::name!("debtor")),
Event::Message(Box::new(Message {
body: Assertion(_Any::new(true)),
}))).await?;
}
});
@ -209,11 +210,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Value::from(now()).wrap(),
padding.clone());
for _ in 0..action_count {
events.push(Event::Message(Box::new(Message {
events.push((ds.clone(), Event::Message(Box::new(Message {
body: Assertion(current_rec.clone()),
})));
}))));
}
ds.external_events(&debtor, events).await?
external_events(&ds, &debtor, events).await?
}
Ok(())
});

View File

@ -45,11 +45,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
debtor.ensure_clear_funds().await;
let mut events = Vec::new();
for _ in 0..action_count {
events.push(Event::Message(Box::new(Message {
events.push((ds.clone(), Event::Message(Box::new(Message {
body: Assertion(says(Value::from("producer").wrap(), padding.clone())),
})));
}))));
}
ds.external_events(&debtor, events).await?;
external_events(&ds, &debtor, events).await?;
}
});
Ok(None)

View File

@ -84,10 +84,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut stats_timer = interval(Duration::from_secs(1));
loop {
stats_timer.tick().await;
consumer.external_event(&Debtor::new(syndicate::name!("debtor")),
Event::Message(Box::new(Message {
body: Assertion(_Any::new(true)),
}))).await?;
external_event(&consumer,
&Debtor::new(syndicate::name!("debtor")),
Event::Message(Box::new(Message {
body: Assertion(_Any::new(true)),
}))).await?;
}
});
Ok(None)

View File

@ -35,11 +35,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let retract_e = Event::Retract(Box::new(Retract {
handle,
}));
ds.external_event(&debtor, assert_e.clone()).await?;
external_event(&ds, &debtor, assert_e.clone()).await?;
loop {
debtor.ensure_clear_funds().await;
ds.external_event(&debtor, retract_e.clone()).await?;
ds.external_event(&debtor, assert_e.clone()).await?;
external_event(&ds, &debtor, retract_e.clone()).await?;
external_event(&ds, &debtor, assert_e.clone()).await?;
}
});
Ok(None)

View File

@ -59,7 +59,7 @@ pub trait Entity: Send {
}
enum Destination {
ImmediateSelf(Oid),
ImmediateSelf(Arc<Ref>),
Remote(Arc<Ref>),
}
@ -72,7 +72,7 @@ pub struct Activation<'activation> {
pub actor: &'activation mut Actor,
pub debtor: Arc<Debtor>,
queues: HashMap<ActorId, PendingEventQueue>,
immediate_self: Vec<TurnEvent>,
immediate_self: PendingEventQueue,
}
#[derive(Debug)]
@ -93,7 +93,7 @@ pub struct LoanedItem<T> {
enum SystemMessage {
Release,
ReleaseOid(Oid),
Turn(LoanedItem<Turn>),
Turn(LoanedItem<PendingEventQueue>),
Crash(Error),
}
@ -181,11 +181,10 @@ impl<'activation> Activation<'activation> {
}
}
fn immediate_oid(&self, r: &Arc<Ref>) -> Oid {
fn immediate_oid(&self, r: &Arc<Ref>) {
if r.addr.mailbox.actor_id != self.actor.actor_id {
panic!("Cannot use immediate_self to send to remote peers");
}
r.addr.oid.clone()
}
pub fn assert<M>(&mut self, r: &Arc<Ref>, a: M) -> Handle where M: Into<_Any> {
@ -199,15 +198,12 @@ impl<'activation> Activation<'activation> {
}
pub fn assert_immediate_self<M>(&mut self, r: &Arc<Ref>, a: M) -> Handle where M: Into<_Any> {
let oid = self.immediate_oid(r);
self.immediate_oid(r);
let handle = crate::next_handle();
if let Some(assertion) = r.rewrite(a.into()) {
self.immediate_self.push(TurnEvent {
oid: oid.clone(),
event: Event::Assert(Box::new(
Assert { assertion, handle: handle.clone() })),
});
self.actor.outbound_assertions.insert(handle.clone(), Destination::ImmediateSelf(oid));
self.immediate_self.push((r.clone(), Event::Assert(Box::new(
Assert { assertion, handle: handle.clone() }))));
self.actor.outbound_assertions.insert(handle.clone(), Destination::ImmediateSelf(r.clone()));
}
handle
}
@ -222,11 +218,8 @@ impl<'activation> Activation<'activation> {
match d {
Destination::Remote(r) =>
self.queue_for(&r).push((r, Event::Retract(Box::new(Retract { handle })))),
Destination::ImmediateSelf(oid) =>
self.immediate_self.push(TurnEvent {
oid,
event: Event::Retract(Box::new(Retract { handle })),
}),
Destination::ImmediateSelf(r) =>
self.immediate_self.push((r, Event::Retract(Box::new(Retract { handle })))),
}
}
@ -238,11 +231,9 @@ impl<'activation> Activation<'activation> {
}
pub fn message_immediate_self<M>(&mut self, r: &Arc<Ref>, m: M) where M: Into<_Any> {
self.immediate_oid(r);
if let Some(body) = r.rewrite(m.into()) {
self.immediate_self.push(TurnEvent {
oid: self.immediate_oid(r),
event: Event::Message(Box::new(Message { body })),
})
self.immediate_self.push((r.clone(), Event::Message(Box::new(Message { body }))));
}
}
@ -262,11 +253,7 @@ impl<'activation> Activation<'activation> {
if turn.len() == 0 { continue; }
let first_ref = Arc::clone(&turn[0].0);
let target = &first_ref.addr.mailbox;
let mut turn_events = Vec::new();
for (r, e) in turn.into_iter() {
turn_events.push(TurnEvent { oid: r.addr.oid.clone(), event: e });
}
let _ = target.send(&self.debtor, Turn(turn_events));
let _ = target.send(&self.debtor, turn);
}
}
@ -355,8 +342,8 @@ impl<T> Drop for LoanedItem<T> {
impl Mailbox {
#[must_use]
pub fn send(&self, debtor: &Arc<Debtor>, t: Turn) -> ActorResult {
let token_count = t.0.len();
pub fn send(&self, debtor: &Arc<Debtor>, t: PendingEventQueue) -> ActorResult {
let token_count = t.len();
self.tx.send(SystemMessage::Turn(LoanedItem::new(debtor, token_count, t)))
.map_err(|_| error("Target actor not running", _Any::new(false)))
}
@ -580,11 +567,11 @@ impl Actor {
Ok(false)
}
SystemMessage::Turn(mut loaned_item) => {
let mut events = std::mem::take(&mut loaned_item.item.0);
let mut events = std::mem::take(&mut loaned_item.item);
let mut t = Activation::new(self, Arc::clone(&loaned_item.debtor));
loop {
for TurnEvent { oid, event } in events.into_iter() {
t.with_oid(&oid, |_| Ok(()), |t, e| match event {
for (r, event) in events.into_iter() {
t.with_oid(&r.addr.oid, |_| Ok(()), |t, e| match event {
Event::Assert(b) => {
let Assert { assertion: Assertion(assertion), handle } = *b;
e.assert(t, assertion, handle)
@ -677,20 +664,17 @@ impl Drop for Actor {
}
}
#[must_use]
pub async fn external_event(r: &Arc<Ref>, debtor: &Arc<Debtor>, event: Event) -> ActorResult {
r.addr.mailbox.send(debtor, vec![(r.clone(), event)])
}
#[must_use]
pub async fn external_events(r: &Arc<Ref>, debtor: &Arc<Debtor>, events: PendingEventQueue) -> ActorResult {
r.addr.mailbox.send(debtor, events)
}
impl Ref {
#[must_use]
pub async fn external_event(&self, debtor: &Arc<Debtor>, event: Event) -> ActorResult {
self.addr.mailbox.send(debtor, Turn(vec![TurnEvent { oid: self.addr.oid.clone(), event }]))
}
#[must_use]
pub async fn external_events(&self, debtor: &Arc<Debtor>, events: Vec<Event>) -> ActorResult {
self.addr.mailbox.send(debtor, Turn(events.into_iter().map(|event| TurnEvent {
oid: self.addr.oid.clone(),
event,
}).collect()))
}
pub fn attenuate(&self, attenuation: &sturdy::Attenuation) -> Result<Arc<Self>, CaveatError> {
let mut r = Ref {
addr: Arc::clone(&self.addr),

View File

@ -451,7 +451,7 @@ pub async fn input_loop(
#[must_use]
async fn s<M: Into<_Any>>(relay: &Arc<Ref>, debtor: &Arc<Debtor>, m: M) -> ActorResult {
debtor.ensure_clear_funds().await;
relay.external_event(debtor, Event::Message(Box::new(Message { body: Assertion(m.into()) }))).await
external_event(relay, debtor, Event::Message(Box::new(Message { body: Assertion(m.into()) }))).await
}
let debtor = Debtor::new(crate::name!("input-loop"));