2020-06-17 11:42:33 +00:00
|
|
|
use criterion::{criterion_group, criterion_main, Criterion};
|
|
|
|
|
2021-07-15 07:13:31 +00:00
|
|
|
use std::iter::FromIterator;
|
|
|
|
use std::sync::Arc;
|
|
|
|
use std::sync::atomic::AtomicU64;
|
|
|
|
use std::sync::atomic::Ordering;
|
|
|
|
use std::time::Instant;
|
2020-06-17 11:42:33 +00:00
|
|
|
|
2021-07-15 07:13:31 +00:00
|
|
|
use syndicate::actor::*;
|
|
|
|
use syndicate::dataspace::Dataspace;
|
|
|
|
use syndicate::schemas::dataspace::Observe;
|
|
|
|
use syndicate::schemas::dataspace_patterns as p;
|
|
|
|
use syndicate::value::Map;
|
|
|
|
use syndicate::value::NestedValue;
|
|
|
|
use syndicate::value::Value;
|
2020-06-17 11:42:33 +00:00
|
|
|
|
2021-07-15 07:13:31 +00:00
|
|
|
use tokio::runtime::Runtime;
|
2020-06-17 11:42:33 +00:00
|
|
|
|
2021-07-15 07:13:31 +00:00
|
|
|
use tracing::Level;
|
2020-06-17 11:42:33 +00:00
|
|
|
|
|
|
|
#[inline]
|
2021-07-15 07:13:31 +00:00
|
|
|
fn says(who: _Any, what: _Any) -> _Any {
|
2020-06-17 11:42:33 +00:00
|
|
|
let mut r = Value::simple_record("Says", 2);
|
|
|
|
r.fields_vec_mut().push(who);
|
|
|
|
r.fields_vec_mut().push(what);
|
|
|
|
r.finish().wrap()
|
|
|
|
}
|
|
|
|
|
2021-07-15 07:13:31 +00:00
|
|
|
struct ShutdownEntity;
|
|
|
|
|
2021-07-22 14:53:56 +00:00
|
|
|
impl Entity<_Any> for ShutdownEntity {
|
2021-07-15 07:13:31 +00:00
|
|
|
fn message(&mut self, t: &mut Activation, _m: _Any) -> ActorResult {
|
2021-07-24 21:22:01 +00:00
|
|
|
t.state.shutdown();
|
2021-07-15 07:13:31 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-06-17 11:42:33 +00:00
|
|
|
pub fn bench_pub(c: &mut Criterion) {
|
|
|
|
let filter = tracing_subscriber::filter::EnvFilter::from_default_env()
|
|
|
|
.add_directive(tracing_subscriber::filter::LevelFilter::INFO.into());
|
|
|
|
let subscriber = tracing_subscriber::FmtSubscriber::builder()
|
|
|
|
.with_ansi(true)
|
|
|
|
.with_max_level(Level::TRACE)
|
|
|
|
.with_env_filter(filter)
|
|
|
|
.finish();
|
|
|
|
tracing::subscriber::set_global_default(subscriber)
|
|
|
|
.expect("Could not set tracing global subscriber");
|
|
|
|
|
2021-07-15 07:13:31 +00:00
|
|
|
let rt = Runtime::new().unwrap();
|
|
|
|
|
2020-06-17 11:42:33 +00:00
|
|
|
c.bench_function("publication alone", |b| {
|
|
|
|
b.iter_custom(|iters| {
|
|
|
|
let start = Instant::now();
|
2021-07-15 07:13:31 +00:00
|
|
|
rt.block_on(async move {
|
2021-07-24 21:22:01 +00:00
|
|
|
Actor::new().boot(syndicate::name!("dataspace"), move |t| {
|
|
|
|
let ds = t.state.create(Dataspace::new());
|
|
|
|
let shutdown = t.state.create(ShutdownEntity);
|
|
|
|
let debtor = Debtor::new(syndicate::name!("sender-debtor"));
|
|
|
|
t.state.linked_task(syndicate::name!("sender"), async move {
|
|
|
|
for _ in 0..iters {
|
|
|
|
let ds = Arc::clone(&ds);
|
|
|
|
external_event(&Arc::clone(&ds.mailbox), &debtor, Box::new(
|
|
|
|
move |t| ds.with_entity(
|
|
|
|
|e| e.message(t, says(_Any::new("bench_pub"),
|
|
|
|
Value::ByteString(vec![]).wrap())))))?
|
|
|
|
}
|
|
|
|
external_event(&Arc::clone(&shutdown.mailbox), &debtor, Box::new(
|
|
|
|
move |t| shutdown.with_entity(
|
|
|
|
|e| e.message(t, _Any::new(true)))))?;
|
|
|
|
Ok(())
|
|
|
|
});
|
2021-07-15 07:13:31 +00:00
|
|
|
Ok(())
|
2021-07-24 21:22:01 +00:00
|
|
|
}).await.unwrap().unwrap();
|
2021-07-15 07:13:31 +00:00
|
|
|
});
|
2020-06-17 11:42:33 +00:00
|
|
|
start.elapsed()
|
|
|
|
})
|
|
|
|
});
|
|
|
|
|
|
|
|
c.bench_function("publication and subscription", |b| {
|
|
|
|
b.iter_custom(|iters| {
|
2021-07-15 07:13:31 +00:00
|
|
|
let start = Instant::now();
|
|
|
|
rt.block_on(async move {
|
2021-07-24 21:22:01 +00:00
|
|
|
let ds = Cap::new(
|
|
|
|
&Actor::create_and_start(syndicate::name!("dataspace"), Dataspace::new()));
|
2021-07-15 07:13:31 +00:00
|
|
|
let turn_count = Arc::new(AtomicU64::new(0));
|
|
|
|
|
|
|
|
{
|
|
|
|
let iters = iters.clone();
|
2021-07-24 21:22:01 +00:00
|
|
|
let turn_count = Arc::clone(&turn_count);
|
|
|
|
Actor::new().boot(syndicate::name!("consumer"), move |t| {
|
|
|
|
struct Receiver(Arc<AtomicU64>);
|
|
|
|
impl Entity<_Any> for Receiver {
|
|
|
|
fn message(&mut self, _t: &mut Activation, _m: _Any) -> ActorResult {
|
|
|
|
self.0.fetch_add(1, Ordering::Relaxed);
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let shutdown = Cap::new(&t.state.create(ShutdownEntity));
|
|
|
|
let receiver = Cap::new(&t.state.create(Receiver(Arc::clone(&turn_count))));
|
|
|
|
|
2021-07-22 14:53:56 +00:00
|
|
|
ds.assert(t, &Observe {
|
2021-07-15 07:13:31 +00:00
|
|
|
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec {
|
|
|
|
ctor: Box::new(p::CRec {
|
|
|
|
label: Value::symbol("Says").wrap(),
|
|
|
|
arity: 2.into(),
|
|
|
|
}),
|
|
|
|
members: Map::from_iter(vec![
|
|
|
|
(0.into(), p::Pattern::DLit(Box::new(p::DLit {
|
|
|
|
value: _Any::new("bench_pub"),
|
|
|
|
}))),
|
|
|
|
(1.into(), p::Pattern::DBind(Box::new(p::DBind {
|
|
|
|
name: "what".to_owned(),
|
|
|
|
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)),
|
|
|
|
}))),
|
|
|
|
].into_iter()),
|
|
|
|
})),
|
|
|
|
observer: receiver,
|
|
|
|
});
|
2021-07-22 14:53:56 +00:00
|
|
|
ds.assert(t, &Observe {
|
2021-07-15 07:13:31 +00:00
|
|
|
pattern: p::Pattern::DBind(Box::new(p::DBind {
|
|
|
|
name: "shutdownTrigger".to_owned(),
|
|
|
|
pattern: p::Pattern::DLit(Box::new(p::DLit {
|
|
|
|
value: _Any::new(true),
|
|
|
|
})),
|
|
|
|
})),
|
|
|
|
observer: shutdown,
|
|
|
|
});
|
2021-07-24 21:22:01 +00:00
|
|
|
let debtor = Arc::clone(t.debtor());
|
|
|
|
t.state.linked_task(syndicate::name!("sender"), async move {
|
2021-07-15 07:13:31 +00:00
|
|
|
for _ in 0..iters {
|
2021-07-22 07:56:21 +00:00
|
|
|
let ds = Arc::clone(&ds);
|
2021-07-22 14:53:56 +00:00
|
|
|
external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new(
|
|
|
|
move |t| ds.underlying.with_entity(
|
2021-07-22 07:56:21 +00:00
|
|
|
|e| e.message(t, says(_Any::new("bench_pub"),
|
|
|
|
Value::ByteString(vec![]).wrap())))))?
|
|
|
|
}
|
|
|
|
{
|
|
|
|
let ds = Arc::clone(&ds);
|
2021-07-22 14:53:56 +00:00
|
|
|
external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new(
|
|
|
|
move |t| ds.underlying.with_entity(
|
2021-07-22 07:56:21 +00:00
|
|
|
|e| e.message(t, _Any::new(true)))))?;
|
2020-06-17 11:42:33 +00:00
|
|
|
}
|
2021-07-15 07:13:31 +00:00
|
|
|
Ok(())
|
2020-06-17 11:42:33 +00:00
|
|
|
});
|
2021-07-15 07:13:31 +00:00
|
|
|
Ok(())
|
2021-07-24 21:22:01 +00:00
|
|
|
}).await.unwrap().unwrap();
|
2021-07-15 07:13:31 +00:00
|
|
|
}
|
2021-07-24 21:22:01 +00:00
|
|
|
|
2021-07-15 07:13:31 +00:00
|
|
|
let actual_turns = turn_count.load(Ordering::SeqCst);
|
|
|
|
if actual_turns != iters {
|
|
|
|
panic!("Expected {}, got {} messages", iters, actual_turns);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
start.elapsed()
|
2020-06-17 11:42:33 +00:00
|
|
|
})
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
criterion_group!(publish, bench_pub);
|
|
|
|
criterion_main!(publish);
|