From 4a69d5573f7f52ee7a27b739eb0b24644aa9c61d Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 22 Jul 2021 09:56:21 +0200 Subject: [PATCH] Actions as closures rather than data --- benches/bench_dataspace.rs | 34 +++++---- examples/consumer.rs | 9 +-- examples/pingpong.rs | 20 ++--- examples/producer.rs | 14 ++-- examples/state-consumer.rs | 9 +-- examples/state-producer.rs | 29 ++++--- src/actor.rs | 150 +++++++++++++++++++------------------ src/relay.rs | 5 +- 8 files changed, 146 insertions(+), 124 deletions(-) diff --git a/benches/bench_dataspace.rs b/benches/bench_dataspace.rs index 9231cdf..d4ea516 100644 --- a/benches/bench_dataspace.rs +++ b/benches/bench_dataspace.rs @@ -11,7 +11,6 @@ use syndicate::actor::*; use syndicate::dataspace::Dataspace; use syndicate::schemas::dataspace::Observe; use syndicate::schemas::dataspace_patterns as p; -use syndicate::schemas::internal_protocol::*; use syndicate::value::Map; use syndicate::value::NestedValue; use syndicate::value::Value; @@ -63,14 +62,15 @@ pub fn bench_pub(c: &mut Criterion) { let debtor = Debtor::new(syndicate::name!("sender-debtor")); ac.linked_task(syndicate::name!("sender"), async move { for _ in 0..iters { - external_event(&ds, &debtor, Event::Message(Box::new(Message { - body: Assertion(says(_Any::new("bench_pub"), - Value::ByteString(vec![]).wrap())), - }))).await? + let ds = Arc::clone(&ds); + external_event(&Arc::clone(&ds), &debtor, Box::new( + move |t| ds.with_entity( + |e| e.message(t, says(_Any::new("bench_pub"), + Value::ByteString(vec![]).wrap())))))? } - external_event(&shutdown, &debtor, Event::Message(Box::new(Message { - body: Assertion(_Any::new(true)), - }))).await?; + external_event(&Arc::clone(&shutdown), &debtor, Box::new( + move |t| shutdown.with_entity( + |e| e.message(t, _Any::new(true)))))?; Ok(()) }); ac.start(syndicate::name!("dataspace")).await.unwrap().unwrap(); @@ -134,14 +134,18 @@ pub fn bench_pub(c: &mut Criterion) { let debtor = t.debtor.clone(); t.actor.linked_task(syndicate::name!("sender"), async move { for _ in 0..iters { - external_event(&ds, &debtor, Event::Message(Box::new(Message { - body: Assertion(says(_Any::new("bench_pub"), - Value::ByteString(vec![]).wrap())), - }))).await? + let ds = Arc::clone(&ds); + external_event(&Arc::clone(&ds), &debtor, Box::new( + move |t| ds.with_entity( + |e| e.message(t, says(_Any::new("bench_pub"), + Value::ByteString(vec![]).wrap())))))? + } + { + let ds = Arc::clone(&ds); + external_event(&Arc::clone(&ds), &debtor, Box::new( + move |t| ds.with_entity( + |e| e.message(t, _Any::new(true)))))?; } - external_event(&ds, &debtor, Event::Message(Box::new(Message { - body: Assertion(_Any::new(true)), - }))).await?; Ok(()) }); Ok(()) diff --git a/examples/consumer.rs b/examples/consumer.rs index 1b43a49..b8bc27e 100644 --- a/examples/consumer.rs +++ b/examples/consumer.rs @@ -7,7 +7,6 @@ use syndicate::actor::*; use syndicate::relay; use syndicate::schemas::dataspace::Observe; use syndicate::schemas::dataspace_patterns as p; -use syndicate::schemas::internal_protocol::*; use syndicate::sturdy; use syndicate::value::Map; use syndicate::value::NestedValue; @@ -68,11 +67,11 @@ async fn main() -> Result<(), Box> { let mut stats_timer = interval(Duration::from_secs(1)); loop { stats_timer.tick().await; - external_event(&consumer, + let consumer = Arc::clone(&consumer); + external_event(&Arc::clone(&consumer), &Debtor::new(syndicate::name!("debtor")), - Event::Message(Box::new(Message { - body: Assertion(_Any::new(true)), - }))).await?; + Box::new(move |t| consumer.with_entity( + |e| e.message(t, _Any::new(true)))))?; } }); Ok(None) diff --git a/examples/pingpong.rs b/examples/pingpong.rs index 2d0f87e..618aefe 100644 --- a/examples/pingpong.rs +++ b/examples/pingpong.rs @@ -8,7 +8,6 @@ use syndicate::actor::*; use syndicate::relay; use syndicate::schemas::dataspace::Observe; use syndicate::schemas::dataspace_patterns as p; -use syndicate::schemas::internal_protocol::*; use syndicate::sturdy; use syndicate::value::Map; use syndicate::value::NestedValue; @@ -190,11 +189,11 @@ async fn main() -> Result<(), Box> { let mut stats_timer = interval(Duration::from_secs(1)); loop { stats_timer.tick().await; - external_event(&consumer, + let consumer = Arc::clone(&consumer); + external_event(&Arc::clone(&consumer), &Debtor::new(syndicate::name!("debtor")), - Event::Message(Box::new(Message { - body: Assertion(_Any::new(true)), - }))).await?; + Box::new(move |t| consumer.with_entity( + |e| e.message(t, _Any::new(true)))))?; } }); @@ -205,16 +204,17 @@ async fn main() -> Result<(), Box> { t.actor.linked_task(syndicate::name!("boot-ping"), async move { let padding: _Any = Value::ByteString(vec![0; bytes_padding]).wrap(); for _ in 0..turn_count { - let mut events = vec![]; + let mut events: PendingEventQueue = vec![]; let current_rec = simple_record2(send_label, Value::from(now()).wrap(), padding.clone()); for _ in 0..action_count { - events.push((ds.clone(), Event::Message(Box::new(Message { - body: Assertion(current_rec.clone()), - })))); + let ds = Arc::clone(&ds); + let current_rec = current_rec.clone(); + events.push(Box::new(move |t| ds.with_entity( + |e| e.message(t, current_rec)))); } - external_events(&ds, &debtor, events).await? + external_events(&ds, &debtor, events)? } Ok(()) }); diff --git a/examples/producer.rs b/examples/producer.rs index 93ae694..524f42b 100644 --- a/examples/producer.rs +++ b/examples/producer.rs @@ -1,8 +1,9 @@ +use std::sync::Arc; + use structopt::StructOpt; use syndicate::actor::*; use syndicate::relay; -use syndicate::schemas::internal_protocol::*; use syndicate::sturdy; use syndicate::value::Value; @@ -43,13 +44,14 @@ async fn main() -> Result<(), Box> { t.actor.linked_task(syndicate::name!("sender"), async move { loop { debtor.ensure_clear_funds().await; - let mut events = Vec::new(); + let mut events: PendingEventQueue = Vec::new(); for _ in 0..action_count { - events.push((ds.clone(), Event::Message(Box::new(Message { - body: Assertion(says(Value::from("producer").wrap(), padding.clone())), - })))); + let ds = Arc::clone(&ds); + let padding = padding.clone(); + events.push(Box::new(move |t| ds.with_entity( + |e| e.message(t, says(Value::from("producer").wrap(), padding))))); } - external_events(&ds, &debtor, events).await?; + external_events(&ds, &debtor, events)?; } }); Ok(None) diff --git a/examples/state-consumer.rs b/examples/state-consumer.rs index 6b08322..5bb77a7 100644 --- a/examples/state-consumer.rs +++ b/examples/state-consumer.rs @@ -7,7 +7,6 @@ use syndicate::actor::*; use syndicate::relay; use syndicate::schemas::dataspace::Observe; use syndicate::schemas::dataspace_patterns as p; -use syndicate::schemas::internal_protocol::*; use syndicate::sturdy; use syndicate::value::Map; use syndicate::value::NestedValue; @@ -84,11 +83,11 @@ async fn main() -> Result<(), Box> { let mut stats_timer = interval(Duration::from_secs(1)); loop { stats_timer.tick().await; - external_event(&consumer, + let consumer = Arc::clone(&consumer); + external_event(&Arc::clone(&consumer), &Debtor::new(syndicate::name!("debtor")), - Event::Message(Box::new(Message { - body: Assertion(_Any::new(true)), - }))).await?; + Box::new(move |t| consumer.with_entity( + |e| e.message(t, _Any::new(true)))))?; } }); Ok(None) diff --git a/examples/state-producer.rs b/examples/state-producer.rs index 46c8797..00abd16 100644 --- a/examples/state-producer.rs +++ b/examples/state-producer.rs @@ -1,8 +1,9 @@ +use std::sync::Arc; + use structopt::StructOpt; use syndicate::actor::*; use syndicate::relay; -use syndicate::schemas::internal_protocol::*; use syndicate::sturdy; use syndicate::value::Value; @@ -28,18 +29,24 @@ async fn main() -> Result<(), Box> { "Present", Value::from(std::process::id()).wrap()).wrap(); let handle = syndicate::next_handle(); - let assert_e = Event::Assert(Box::new(Assert { - assertion: Assertion(presence), - handle: handle.clone(), - })); - let retract_e = Event::Retract(Box::new(Retract { - handle, - })); - external_event(&ds, &debtor, assert_e.clone()).await?; + let assert_e = || { + let ds = Arc::clone(&ds); + let presence = presence.clone(); + let handle = handle.clone(); + external_event(&Arc::clone(&ds), &debtor, Box::new( + move |t| ds.with_entity(|e| e.assert(t, presence, handle)))) + }; + let retract_e = || { + let ds = Arc::clone(&ds); + let handle = handle.clone(); + external_event(&Arc::clone(&ds), &debtor, Box::new( + move |t| ds.with_entity(|e| e.retract(t, handle)))) + }; + assert_e()?; loop { debtor.ensure_clear_funds().await; - external_event(&ds, &debtor, retract_e.clone()).await?; - external_event(&ds, &debtor, assert_e.clone()).await?; + retract_e()?; + assert_e()?; } }); Ok(None) diff --git a/src/actor.rs b/src/actor.rs index 38ca4c7..03e1dfd 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -3,7 +3,6 @@ pub use futures::future::BoxFuture; pub use std::future::ready; use super::ActorId; -use super::schemas::internal_protocol::*; use super::schemas::sturdy; use super::error::Error; use super::error::error; @@ -25,7 +24,6 @@ use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering}; use tokio::select; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; -// use tokio::sync::Notify; use tokio_util::sync::CancellationToken; use tracing::Instrument; @@ -61,19 +59,20 @@ pub trait Entity: Send + std::marker::Sync { } enum Destination { - ImmediateSelf(Arc), - Remote(Arc), + ImmediateSelf(Action), + Remote(Arc, Action), } type OutboundAssertions = Map; -type PendingEventQueue = Vec<(Arc, Event)>; +pub type Action = Box ActorResult>; +pub type PendingEventQueue = Vec; // This is what other implementations call a "Turn", renamed here to // avoid conflicts with schemas::internal_protocol::Turn. pub struct Activation<'activation> { pub actor: &'activation mut Actor, pub debtor: Arc, - queues: HashMap, + queues: HashMap, PendingEventQueue)>, immediate_self: PendingEventQueue, } @@ -91,7 +90,6 @@ pub struct LoanedItem { pub item: T, } -#[derive(Debug)] enum SystemMessage { Release, Turn(LoanedItem), @@ -225,9 +223,20 @@ impl<'activation> Activation<'activation> { pub fn assert(&mut self, r: &Arc, a: M) -> Handle where M: Into<_Any> { let handle = crate::next_handle(); if let Some(assertion) = r.rewrite(a.into()) { - self.queue_for(r).push((Arc::clone(r), Event::Assert(Box::new( - Assert { assertion, handle: handle.clone() })))); - self.actor.outbound_assertions.insert(handle.clone(), Destination::Remote(Arc::clone(r))); + { + let r = Arc::clone(r); + let handle = handle.clone(); + self.queue_for(&r).push(Box::new( + move |t| r.with_entity(|e| e.assert(t, assertion, handle)))); + } + { + let r = Arc::clone(r); + let handle = handle.clone(); + self.actor.outbound_assertions.insert( + handle.clone(), + Destination::Remote(Arc::clone(&r), Box::new( + move |t| r.with_entity(|e| e.retract(t, handle))))); + } } handle } @@ -236,59 +245,73 @@ impl<'activation> Activation<'activation> { self.immediate_oid(r); let handle = crate::next_handle(); if let Some(assertion) = r.rewrite(a.into()) { - self.immediate_self.push((r.clone(), Event::Assert(Box::new( - Assert { assertion, handle: handle.clone() })))); - self.actor.outbound_assertions.insert(handle.clone(), Destination::ImmediateSelf(r.clone())); + { + let r = Arc::clone(r); + let handle = handle.clone(); + self.immediate_self.push(Box::new( + move |t| r.with_entity(|e| e.assert(t, assertion, handle)))); + } + { + let r = Arc::clone(r); + let handle = handle.clone(); + self.actor.outbound_assertions.insert( + handle.clone(), + Destination::ImmediateSelf(Box::new( + move |t| r.with_entity(|e| e.retract(t, handle))))); + } } handle } pub fn retract(&mut self, handle: Handle) { if let Some(d) = self.actor.outbound_assertions.remove(&handle) { - self.retract_known_ref(d, handle) + self.retract_known_ref(d) } } - fn retract_known_ref(&mut self, d: Destination, handle: Handle) { + fn retract_known_ref(&mut self, d: Destination) { match d { - Destination::Remote(r) => - self.queue_for(&r).push((r, Event::Retract(Box::new(Retract { handle })))), - Destination::ImmediateSelf(r) => - self.immediate_self.push((r, Event::Retract(Box::new(Retract { handle })))), + Destination::Remote(r, action) => + self.queue_for(&r).push(action), + Destination::ImmediateSelf(action) => + self.immediate_self.push(action), } } pub fn message(&mut self, r: &Arc, m: M) where M: Into<_Any> { if let Some(body) = r.rewrite(m.into()) { - self.queue_for(r).push((Arc::clone(r), Event::Message(Box::new( - Message { body })))) + let r = Arc::clone(r); + self.queue_for(&r).push(Box::new( + move |t| r.with_entity(|e| e.message(t, body)))) } } pub fn message_immediate_self(&mut self, r: &Arc, m: M) where M: Into<_Any> { self.immediate_oid(r); if let Some(body) = r.rewrite(m.into()) { - self.immediate_self.push((r.clone(), Event::Message(Box::new(Message { body })))); + let r = Arc::clone(r); + self.immediate_self.push(Box::new( + move |t| r.with_entity(|e| e.message(t, body)))) } } pub fn sync(&mut self, r: &Arc, peer: Arc) { - self.queue_for(r).push((Arc::clone(r), Event::Sync(Box::new(Sync { peer })))); + let r = Arc::clone(r); + self.queue_for(&r).push(Box::new( + move |t| r.with_entity(|e| e.sync(t, peer)))) } fn queue_for(&mut self, r: &Arc) -> &mut PendingEventQueue { - self.queues.entry(r.addr.mailbox.actor_id).or_default() + &mut self.queues.entry(r.addr.mailbox.actor_id) + .or_insert((r.addr.mailbox.tx.clone(), Vec::new())).1 } fn deliver(&mut self) { if !self.immediate_self.is_empty() { panic!("Unprocessed immediate_self events remain at deliver() time"); } - for (_actor_id, turn) in std::mem::take(&mut self.queues).into_iter() { - if turn.len() == 0 { continue; } - let first_ref = Arc::clone(&turn[0].0); - let target = &first_ref.addr.mailbox; - let _ = target.send(&self.debtor, turn); + for (_actor_id, (tx, turn)) in std::mem::take(&mut self.queues).into_iter() { + let _ = send_actions(&tx, &self.debtor, turn); } } } @@ -357,13 +380,15 @@ impl Drop for LoanedItem { } } -impl Mailbox { - #[must_use] - pub fn send(&self, debtor: &Arc, t: PendingEventQueue) -> ActorResult { - let token_count = t.len(); - self.tx.send(SystemMessage::Turn(LoanedItem::new(debtor, token_count, t))) - .map_err(|_| error("Target actor not running", _Any::new(false))) - } +#[must_use] +fn send_actions( + tx: &UnboundedSender, + debtor: &Arc, + t: PendingEventQueue, +) -> ActorResult { + let token_count = t.len(); + tx.send(SystemMessage::Turn(LoanedItem::new(debtor, token_count, t))) + .map_err(|_| error("Target actor not running", _Any::new(false))) } impl std::fmt::Debug for Mailbox { @@ -518,8 +543,7 @@ impl Actor { { let mut t = Activation::new(&mut self, Debtor::new(crate::name!("shutdown"))); for r in std::mem::take(&mut t.actor.exit_hooks) { - let mut e = r.addr.target.write().expect("unpoisoned"); - if let Err(err) = e.exit_hook(&mut t, &result) { + if let Err(err) = r.with_entity(|e| e.exit_hook(&mut t, &result)) { tracing::error!(err = debug(err), r = debug(&r), "error in exit hook"); @@ -574,32 +598,12 @@ impl Actor { Ok(true) } SystemMessage::Turn(mut loaned_item) => { - let mut events = std::mem::take(&mut loaned_item.item); + let mut actions = std::mem::take(&mut loaned_item.item); let mut t = Activation::new(self, Arc::clone(&loaned_item.debtor)); loop { - for (r, event) in events.into_iter() { - let mut e = r.addr.target.write().expect("unpoisoned"); - match event { - Event::Assert(b) => { - let Assert { assertion: Assertion(assertion), handle } = *b; - e.assert(&mut t, assertion, handle)? - } - Event::Retract(b) => { - let Retract { handle } = *b; - e.retract(&mut t, handle)? - } - Event::Message(b) => { - let Message { body: Assertion(body) } = *b; - e.message(&mut t, body)? - } - Event::Sync(b) => { - let Sync { peer } = *b; - e.sync(&mut t, peer)? - } - } - } - events = std::mem::take(&mut t.immediate_self); - if events.is_empty() { break; } + for action in actions.into_iter() { action(&mut t)? } + actions = std::mem::take(&mut t.immediate_self); + if actions.is_empty() { break; } } Ok(false) } @@ -662,9 +666,9 @@ impl Drop for Actor { let to_clear = std::mem::take(&mut self.outbound_assertions); { let mut t = Activation::new(self, Debtor::new(crate::name!("drop"))); - for (handle, r) in to_clear.into_iter() { - tracing::trace!(h = debug(&handle), "retract on termination"); - t.retract_known_ref(r, handle); + for (_handle, r) in to_clear.into_iter() { + tracing::trace!(h = debug(&_handle), "retract on termination"); + t.retract_known_ref(r); } } @@ -673,16 +677,20 @@ impl Drop for Actor { } #[must_use] -pub async fn external_event(r: &Arc, debtor: &Arc, event: Event) -> ActorResult { - r.addr.mailbox.send(debtor, vec![(r.clone(), event)]) +pub fn external_event(r: &Arc, debtor: &Arc, action: Action) -> ActorResult { + send_actions(&r.addr.mailbox.tx, debtor, vec![action]) } #[must_use] -pub async fn external_events(r: &Arc, debtor: &Arc, events: PendingEventQueue) -> ActorResult { - r.addr.mailbox.send(debtor, events) +pub fn external_events(r: &Arc, debtor: &Arc, events: PendingEventQueue) -> ActorResult { + send_actions(&r.addr.mailbox.tx, debtor, events) } impl Ref { + pub fn with_entity R>(&self, f: F) -> R { + f(&mut **self.addr.target.write().expect("unpoisoned")) + } + pub fn attenuate(&self, attenuation: &sturdy::Attenuation) -> Result, CaveatError> { let mut r = Ref { addr: Arc::clone(&self.addr), @@ -692,14 +700,14 @@ impl Ref { Ok(Arc::new(r)) } - pub fn rewrite(&self, mut a: _Any) -> Option { + pub fn rewrite(&self, mut a: _Any) -> Option<_Any> { for c in &self.attenuation { match c.rewrite(&a) { Some(v) => a = v, None => return None, } } - Some(Assertion(a)) + Some(a) } } diff --git a/src/relay.rs b/src/relay.rs index 79c8eb7..850b4c9 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -455,7 +455,10 @@ pub async fn input_loop( #[must_use] async fn s>(relay: &Arc, debtor: &Arc, m: M) -> ActorResult { debtor.ensure_clear_funds().await; - external_event(relay, debtor, Event::Message(Box::new(Message { body: Assertion(m.into()) }))).await + let m = m.into(); + let relay = Arc::clone(relay); + external_event(&Arc::clone(&relay), debtor, Box::new( + move |t| relay.with_entity(|e| e.message(t, m)))) } let debtor = Debtor::new(crate::name!("input-loop"));