Actions as closures rather than data

This commit is contained in:
Tony Garnock-Jones 2021-07-22 09:56:21 +02:00
parent 21a69618cf
commit 4a69d5573f
8 changed files with 146 additions and 124 deletions

View File

@ -11,7 +11,6 @@ use syndicate::actor::*;
use syndicate::dataspace::Dataspace;
use syndicate::schemas::dataspace::Observe;
use syndicate::schemas::dataspace_patterns as p;
use syndicate::schemas::internal_protocol::*;
use syndicate::value::Map;
use syndicate::value::NestedValue;
use syndicate::value::Value;
@ -63,14 +62,15 @@ pub fn bench_pub(c: &mut Criterion) {
let debtor = Debtor::new(syndicate::name!("sender-debtor"));
ac.linked_task(syndicate::name!("sender"), async move {
for _ in 0..iters {
external_event(&ds, &debtor, Event::Message(Box::new(Message {
body: Assertion(says(_Any::new("bench_pub"),
Value::ByteString(vec![]).wrap())),
}))).await?
let ds = Arc::clone(&ds);
external_event(&Arc::clone(&ds), &debtor, Box::new(
move |t| ds.with_entity(
|e| e.message(t, says(_Any::new("bench_pub"),
Value::ByteString(vec![]).wrap())))))?
}
external_event(&shutdown, &debtor, Event::Message(Box::new(Message {
body: Assertion(_Any::new(true)),
}))).await?;
external_event(&Arc::clone(&shutdown), &debtor, Box::new(
move |t| shutdown.with_entity(
|e| e.message(t, _Any::new(true)))))?;
Ok(())
});
ac.start(syndicate::name!("dataspace")).await.unwrap().unwrap();
@ -134,14 +134,18 @@ pub fn bench_pub(c: &mut Criterion) {
let debtor = t.debtor.clone();
t.actor.linked_task(syndicate::name!("sender"), async move {
for _ in 0..iters {
external_event(&ds, &debtor, Event::Message(Box::new(Message {
body: Assertion(says(_Any::new("bench_pub"),
Value::ByteString(vec![]).wrap())),
}))).await?
let ds = Arc::clone(&ds);
external_event(&Arc::clone(&ds), &debtor, Box::new(
move |t| ds.with_entity(
|e| e.message(t, says(_Any::new("bench_pub"),
Value::ByteString(vec![]).wrap())))))?
}
{
let ds = Arc::clone(&ds);
external_event(&Arc::clone(&ds), &debtor, Box::new(
move |t| ds.with_entity(
|e| e.message(t, _Any::new(true)))))?;
}
external_event(&ds, &debtor, Event::Message(Box::new(Message {
body: Assertion(_Any::new(true)),
}))).await?;
Ok(())
});
Ok(())

View File

@ -7,7 +7,6 @@ use syndicate::actor::*;
use syndicate::relay;
use syndicate::schemas::dataspace::Observe;
use syndicate::schemas::dataspace_patterns as p;
use syndicate::schemas::internal_protocol::*;
use syndicate::sturdy;
use syndicate::value::Map;
use syndicate::value::NestedValue;
@ -68,11 +67,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut stats_timer = interval(Duration::from_secs(1));
loop {
stats_timer.tick().await;
external_event(&consumer,
let consumer = Arc::clone(&consumer);
external_event(&Arc::clone(&consumer),
&Debtor::new(syndicate::name!("debtor")),
Event::Message(Box::new(Message {
body: Assertion(_Any::new(true)),
}))).await?;
Box::new(move |t| consumer.with_entity(
|e| e.message(t, _Any::new(true)))))?;
}
});
Ok(None)

View File

@ -8,7 +8,6 @@ use syndicate::actor::*;
use syndicate::relay;
use syndicate::schemas::dataspace::Observe;
use syndicate::schemas::dataspace_patterns as p;
use syndicate::schemas::internal_protocol::*;
use syndicate::sturdy;
use syndicate::value::Map;
use syndicate::value::NestedValue;
@ -190,11 +189,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut stats_timer = interval(Duration::from_secs(1));
loop {
stats_timer.tick().await;
external_event(&consumer,
let consumer = Arc::clone(&consumer);
external_event(&Arc::clone(&consumer),
&Debtor::new(syndicate::name!("debtor")),
Event::Message(Box::new(Message {
body: Assertion(_Any::new(true)),
}))).await?;
Box::new(move |t| consumer.with_entity(
|e| e.message(t, _Any::new(true)))))?;
}
});
@ -205,16 +204,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
t.actor.linked_task(syndicate::name!("boot-ping"), async move {
let padding: _Any = Value::ByteString(vec![0; bytes_padding]).wrap();
for _ in 0..turn_count {
let mut events = vec![];
let mut events: PendingEventQueue = vec![];
let current_rec = simple_record2(send_label,
Value::from(now()).wrap(),
padding.clone());
for _ in 0..action_count {
events.push((ds.clone(), Event::Message(Box::new(Message {
body: Assertion(current_rec.clone()),
}))));
let ds = Arc::clone(&ds);
let current_rec = current_rec.clone();
events.push(Box::new(move |t| ds.with_entity(
|e| e.message(t, current_rec))));
}
external_events(&ds, &debtor, events).await?
external_events(&ds, &debtor, events)?
}
Ok(())
});

View File

@ -1,8 +1,9 @@
use std::sync::Arc;
use structopt::StructOpt;
use syndicate::actor::*;
use syndicate::relay;
use syndicate::schemas::internal_protocol::*;
use syndicate::sturdy;
use syndicate::value::Value;
@ -43,13 +44,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
t.actor.linked_task(syndicate::name!("sender"), async move {
loop {
debtor.ensure_clear_funds().await;
let mut events = Vec::new();
let mut events: PendingEventQueue = Vec::new();
for _ in 0..action_count {
events.push((ds.clone(), Event::Message(Box::new(Message {
body: Assertion(says(Value::from("producer").wrap(), padding.clone())),
}))));
let ds = Arc::clone(&ds);
let padding = padding.clone();
events.push(Box::new(move |t| ds.with_entity(
|e| e.message(t, says(Value::from("producer").wrap(), padding)))));
}
external_events(&ds, &debtor, events).await?;
external_events(&ds, &debtor, events)?;
}
});
Ok(None)

View File

@ -7,7 +7,6 @@ use syndicate::actor::*;
use syndicate::relay;
use syndicate::schemas::dataspace::Observe;
use syndicate::schemas::dataspace_patterns as p;
use syndicate::schemas::internal_protocol::*;
use syndicate::sturdy;
use syndicate::value::Map;
use syndicate::value::NestedValue;
@ -84,11 +83,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut stats_timer = interval(Duration::from_secs(1));
loop {
stats_timer.tick().await;
external_event(&consumer,
let consumer = Arc::clone(&consumer);
external_event(&Arc::clone(&consumer),
&Debtor::new(syndicate::name!("debtor")),
Event::Message(Box::new(Message {
body: Assertion(_Any::new(true)),
}))).await?;
Box::new(move |t| consumer.with_entity(
|e| e.message(t, _Any::new(true)))))?;
}
});
Ok(None)

View File

@ -1,8 +1,9 @@
use std::sync::Arc;
use structopt::StructOpt;
use syndicate::actor::*;
use syndicate::relay;
use syndicate::schemas::internal_protocol::*;
use syndicate::sturdy;
use syndicate::value::Value;
@ -28,18 +29,24 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"Present",
Value::from(std::process::id()).wrap()).wrap();
let handle = syndicate::next_handle();
let assert_e = Event::Assert(Box::new(Assert {
assertion: Assertion(presence),
handle: handle.clone(),
}));
let retract_e = Event::Retract(Box::new(Retract {
handle,
}));
external_event(&ds, &debtor, assert_e.clone()).await?;
let assert_e = || {
let ds = Arc::clone(&ds);
let presence = presence.clone();
let handle = handle.clone();
external_event(&Arc::clone(&ds), &debtor, Box::new(
move |t| ds.with_entity(|e| e.assert(t, presence, handle))))
};
let retract_e = || {
let ds = Arc::clone(&ds);
let handle = handle.clone();
external_event(&Arc::clone(&ds), &debtor, Box::new(
move |t| ds.with_entity(|e| e.retract(t, handle))))
};
assert_e()?;
loop {
debtor.ensure_clear_funds().await;
external_event(&ds, &debtor, retract_e.clone()).await?;
external_event(&ds, &debtor, assert_e.clone()).await?;
retract_e()?;
assert_e()?;
}
});
Ok(None)

View File

@ -3,7 +3,6 @@ pub use futures::future::BoxFuture;
pub use std::future::ready;
use super::ActorId;
use super::schemas::internal_protocol::*;
use super::schemas::sturdy;
use super::error::Error;
use super::error::error;
@ -25,7 +24,6 @@ use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
use tokio::select;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver};
// use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
@ -61,19 +59,20 @@ pub trait Entity: Send + std::marker::Sync {
}
enum Destination {
ImmediateSelf(Arc<Ref>),
Remote(Arc<Ref>),
ImmediateSelf(Action),
Remote(Arc<Ref>, Action),
}
type OutboundAssertions = Map<Handle, Destination>;
type PendingEventQueue = Vec<(Arc<Ref>, Event)>;
pub type Action = Box<dyn Send + Sync + FnOnce(&mut Activation) -> ActorResult>;
pub type PendingEventQueue = Vec<Action>;
// This is what other implementations call a "Turn", renamed here to
// avoid conflicts with schemas::internal_protocol::Turn.
pub struct Activation<'activation> {
pub actor: &'activation mut Actor,
pub debtor: Arc<Debtor>,
queues: HashMap<ActorId, PendingEventQueue>,
queues: HashMap<ActorId, (UnboundedSender<SystemMessage>, PendingEventQueue)>,
immediate_self: PendingEventQueue,
}
@ -91,7 +90,6 @@ pub struct LoanedItem<T> {
pub item: T,
}
#[derive(Debug)]
enum SystemMessage {
Release,
Turn(LoanedItem<PendingEventQueue>),
@ -225,9 +223,20 @@ impl<'activation> Activation<'activation> {
pub fn assert<M>(&mut self, r: &Arc<Ref>, a: M) -> Handle where M: Into<_Any> {
let handle = crate::next_handle();
if let Some(assertion) = r.rewrite(a.into()) {
self.queue_for(r).push((Arc::clone(r), Event::Assert(Box::new(
Assert { assertion, handle: handle.clone() }))));
self.actor.outbound_assertions.insert(handle.clone(), Destination::Remote(Arc::clone(r)));
{
let r = Arc::clone(r);
let handle = handle.clone();
self.queue_for(&r).push(Box::new(
move |t| r.with_entity(|e| e.assert(t, assertion, handle))));
}
{
let r = Arc::clone(r);
let handle = handle.clone();
self.actor.outbound_assertions.insert(
handle.clone(),
Destination::Remote(Arc::clone(&r), Box::new(
move |t| r.with_entity(|e| e.retract(t, handle)))));
}
}
handle
}
@ -236,59 +245,73 @@ impl<'activation> Activation<'activation> {
self.immediate_oid(r);
let handle = crate::next_handle();
if let Some(assertion) = r.rewrite(a.into()) {
self.immediate_self.push((r.clone(), Event::Assert(Box::new(
Assert { assertion, handle: handle.clone() }))));
self.actor.outbound_assertions.insert(handle.clone(), Destination::ImmediateSelf(r.clone()));
{
let r = Arc::clone(r);
let handle = handle.clone();
self.immediate_self.push(Box::new(
move |t| r.with_entity(|e| e.assert(t, assertion, handle))));
}
{
let r = Arc::clone(r);
let handle = handle.clone();
self.actor.outbound_assertions.insert(
handle.clone(),
Destination::ImmediateSelf(Box::new(
move |t| r.with_entity(|e| e.retract(t, handle)))));
}
}
handle
}
pub fn retract(&mut self, handle: Handle) {
if let Some(d) = self.actor.outbound_assertions.remove(&handle) {
self.retract_known_ref(d, handle)
self.retract_known_ref(d)
}
}
fn retract_known_ref(&mut self, d: Destination, handle: Handle) {
fn retract_known_ref(&mut self, d: Destination) {
match d {
Destination::Remote(r) =>
self.queue_for(&r).push((r, Event::Retract(Box::new(Retract { handle })))),
Destination::ImmediateSelf(r) =>
self.immediate_self.push((r, Event::Retract(Box::new(Retract { handle })))),
Destination::Remote(r, action) =>
self.queue_for(&r).push(action),
Destination::ImmediateSelf(action) =>
self.immediate_self.push(action),
}
}
pub fn message<M>(&mut self, r: &Arc<Ref>, m: M) where M: Into<_Any> {
if let Some(body) = r.rewrite(m.into()) {
self.queue_for(r).push((Arc::clone(r), Event::Message(Box::new(
Message { body }))))
let r = Arc::clone(r);
self.queue_for(&r).push(Box::new(
move |t| r.with_entity(|e| e.message(t, body))))
}
}
pub fn message_immediate_self<M>(&mut self, r: &Arc<Ref>, m: M) where M: Into<_Any> {
self.immediate_oid(r);
if let Some(body) = r.rewrite(m.into()) {
self.immediate_self.push((r.clone(), Event::Message(Box::new(Message { body }))));
let r = Arc::clone(r);
self.immediate_self.push(Box::new(
move |t| r.with_entity(|e| e.message(t, body))))
}
}
pub fn sync(&mut self, r: &Arc<Ref>, peer: Arc<Ref>) {
self.queue_for(r).push((Arc::clone(r), Event::Sync(Box::new(Sync { peer }))));
let r = Arc::clone(r);
self.queue_for(&r).push(Box::new(
move |t| r.with_entity(|e| e.sync(t, peer))))
}
fn queue_for(&mut self, r: &Arc<Ref>) -> &mut PendingEventQueue {
self.queues.entry(r.addr.mailbox.actor_id).or_default()
&mut self.queues.entry(r.addr.mailbox.actor_id)
.or_insert((r.addr.mailbox.tx.clone(), Vec::new())).1
}
fn deliver(&mut self) {
if !self.immediate_self.is_empty() {
panic!("Unprocessed immediate_self events remain at deliver() time");
}
for (_actor_id, turn) in std::mem::take(&mut self.queues).into_iter() {
if turn.len() == 0 { continue; }
let first_ref = Arc::clone(&turn[0].0);
let target = &first_ref.addr.mailbox;
let _ = target.send(&self.debtor, turn);
for (_actor_id, (tx, turn)) in std::mem::take(&mut self.queues).into_iter() {
let _ = send_actions(&tx, &self.debtor, turn);
}
}
}
@ -357,13 +380,15 @@ impl<T> Drop for LoanedItem<T> {
}
}
impl Mailbox {
#[must_use]
pub fn send(&self, debtor: &Arc<Debtor>, t: PendingEventQueue) -> ActorResult {
let token_count = t.len();
self.tx.send(SystemMessage::Turn(LoanedItem::new(debtor, token_count, t)))
.map_err(|_| error("Target actor not running", _Any::new(false)))
}
#[must_use]
fn send_actions(
tx: &UnboundedSender<SystemMessage>,
debtor: &Arc<Debtor>,
t: PendingEventQueue,
) -> ActorResult {
let token_count = t.len();
tx.send(SystemMessage::Turn(LoanedItem::new(debtor, token_count, t)))
.map_err(|_| error("Target actor not running", _Any::new(false)))
}
impl std::fmt::Debug for Mailbox {
@ -518,8 +543,7 @@ impl Actor {
{
let mut t = Activation::new(&mut self, Debtor::new(crate::name!("shutdown")));
for r in std::mem::take(&mut t.actor.exit_hooks) {
let mut e = r.addr.target.write().expect("unpoisoned");
if let Err(err) = e.exit_hook(&mut t, &result) {
if let Err(err) = r.with_entity(|e| e.exit_hook(&mut t, &result)) {
tracing::error!(err = debug(err),
r = debug(&r),
"error in exit hook");
@ -574,32 +598,12 @@ impl Actor {
Ok(true)
}
SystemMessage::Turn(mut loaned_item) => {
let mut events = std::mem::take(&mut loaned_item.item);
let mut actions = std::mem::take(&mut loaned_item.item);
let mut t = Activation::new(self, Arc::clone(&loaned_item.debtor));
loop {
for (r, event) in events.into_iter() {
let mut e = r.addr.target.write().expect("unpoisoned");
match event {
Event::Assert(b) => {
let Assert { assertion: Assertion(assertion), handle } = *b;
e.assert(&mut t, assertion, handle)?
}
Event::Retract(b) => {
let Retract { handle } = *b;
e.retract(&mut t, handle)?
}
Event::Message(b) => {
let Message { body: Assertion(body) } = *b;
e.message(&mut t, body)?
}
Event::Sync(b) => {
let Sync { peer } = *b;
e.sync(&mut t, peer)?
}
}
}
events = std::mem::take(&mut t.immediate_self);
if events.is_empty() { break; }
for action in actions.into_iter() { action(&mut t)? }
actions = std::mem::take(&mut t.immediate_self);
if actions.is_empty() { break; }
}
Ok(false)
}
@ -662,9 +666,9 @@ impl Drop for Actor {
let to_clear = std::mem::take(&mut self.outbound_assertions);
{
let mut t = Activation::new(self, Debtor::new(crate::name!("drop")));
for (handle, r) in to_clear.into_iter() {
tracing::trace!(h = debug(&handle), "retract on termination");
t.retract_known_ref(r, handle);
for (_handle, r) in to_clear.into_iter() {
tracing::trace!(h = debug(&_handle), "retract on termination");
t.retract_known_ref(r);
}
}
@ -673,16 +677,20 @@ impl Drop for Actor {
}
#[must_use]
pub async fn external_event(r: &Arc<Ref>, debtor: &Arc<Debtor>, event: Event) -> ActorResult {
r.addr.mailbox.send(debtor, vec![(r.clone(), event)])
pub fn external_event(r: &Arc<Ref>, debtor: &Arc<Debtor>, action: Action) -> ActorResult {
send_actions(&r.addr.mailbox.tx, debtor, vec![action])
}
#[must_use]
pub async fn external_events(r: &Arc<Ref>, debtor: &Arc<Debtor>, events: PendingEventQueue) -> ActorResult {
r.addr.mailbox.send(debtor, events)
pub fn external_events(r: &Arc<Ref>, debtor: &Arc<Debtor>, events: PendingEventQueue) -> ActorResult {
send_actions(&r.addr.mailbox.tx, debtor, events)
}
impl Ref {
pub fn with_entity<R, F: FnOnce(&mut dyn Entity) -> R>(&self, f: F) -> R {
f(&mut **self.addr.target.write().expect("unpoisoned"))
}
pub fn attenuate(&self, attenuation: &sturdy::Attenuation) -> Result<Arc<Self>, CaveatError> {
let mut r = Ref {
addr: Arc::clone(&self.addr),
@ -692,14 +700,14 @@ impl Ref {
Ok(Arc::new(r))
}
pub fn rewrite(&self, mut a: _Any) -> Option<Assertion> {
pub fn rewrite(&self, mut a: _Any) -> Option<_Any> {
for c in &self.attenuation {
match c.rewrite(&a) {
Some(v) => a = v,
None => return None,
}
}
Some(Assertion(a))
Some(a)
}
}

View File

@ -455,7 +455,10 @@ pub async fn input_loop(
#[must_use]
async fn s<M: Into<_Any>>(relay: &Arc<Ref>, debtor: &Arc<Debtor>, m: M) -> ActorResult {
debtor.ensure_clear_funds().await;
external_event(relay, debtor, Event::Message(Box::new(Message { body: Assertion(m.into()) }))).await
let m = m.into();
let relay = Arc::clone(relay);
external_event(&Arc::clone(&relay), debtor, Box::new(
move |t| relay.with_entity(|e| e.message(t, m))))
}
let debtor = Debtor::new(crate::name!("input-loop"));