Simplify examples

This commit is contained in:
Tony Garnock-Jones 2021-09-28 13:00:48 +02:00
parent 013e99af70
commit 982a258a8c
5 changed files with 258 additions and 296 deletions

View File

@ -23,49 +23,42 @@ pub struct Config {
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
syndicate::convenient_logging()?; syndicate::convenient_logging()?;
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Actor::new().boot(syndicate::name!("consumer"), |t| { Actor::new().boot(syndicate::name!("consumer"), |t| {
let facet = t.facet.clone(); relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| {
let boot_account = Arc::clone(t.account()); let consumer = syndicate::entity(0)
Ok(t.linked_task(tracing::Span::current(), async move { .on_message(|message_count, _t, m: AnyValue| {
let config = Config::from_args(); if m.value().is_boolean() {
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; tracing::info!("{:?} messages in the last second", message_count);
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); *message_count = 0;
facet.activate(boot_account, |t| { } else {
relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| { *message_count += 1;
let consumer = syndicate::entity(0) }
.on_message(|message_count, _t, m: AnyValue| { Ok(())
if m.value().is_boolean() { })
tracing::info!("{:?} messages in the last second", message_count); .create_cap(t);
*message_count = 0; ds.assert(t, language(), &Observe {
} else { pattern: syndicate_macros::pattern!{<Says $ $>},
*message_count += 1; observer: Arc::clone(&consumer),
} });
Ok(())
})
.create_cap(t);
ds.assert(t, language(), &Observe {
pattern: syndicate_macros::pattern!{<Says $ $>},
observer: Arc::clone(&consumer),
});
t.linked_task(syndicate::name!("tick"), async move { t.linked_task(syndicate::name!("tick"), async move {
let mut stats_timer = interval(Duration::from_secs(1)); let mut stats_timer = interval(Duration::from_secs(1));
loop { loop {
stats_timer.tick().await; stats_timer.tick().await;
let consumer = Arc::clone(&consumer); let consumer = Arc::clone(&consumer);
external_event(&Arc::clone(&consumer.underlying.mailbox), external_event(&Arc::clone(&consumer.underlying.mailbox),
&Account::new(syndicate::name!("account")), &Account::new(syndicate::name!("account")),
Box::new(move |t| t.with_entity( Box::new(move |t| t.with_entity(
&consumer.underlying, &consumer.underlying,
|t, e| e.message(t, AnyValue::new(true)))))?; |t, e| e.message(t, AnyValue::new(true)))))?;
} }
}); });
Ok(None) Ok(None)
}); });
Ok(()) Ok(())
})?;
Ok(LinkedTaskTermination::KeepFacet)
}))
}).await??; }).await??;
Ok(()) Ok(())
} }

View File

@ -90,136 +90,128 @@ fn report_latencies(rtt_ns_samples: &Vec<u64>) {
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
syndicate::convenient_logging()?; syndicate::convenient_logging()?;
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Actor::new().boot(syndicate::name!("pingpong"), |t| { Actor::new().boot(syndicate::name!("pingpong"), |t| {
let facet = t.facet.clone(); relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| {
let boot_account = Arc::clone(t.account());
Ok(t.linked_task(tracing::Span::current(), async move {
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
facet.activate(boot_account, |t| {
relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| {
let (send_label, recv_label, report_latency_every, should_echo, bytes_padding) = let (send_label, recv_label, report_latency_every, should_echo, bytes_padding) =
match config.mode { match config.mode {
PingPongMode::Ping(ref c) => PingPongMode::Ping(ref c) =>
("Ping", "Pong", c.report_latency_every, false, c.bytes_padding), ("Ping", "Pong", c.report_latency_every, false, c.bytes_padding),
PingPongMode::Pong => PingPongMode::Pong =>
("Pong", "Ping", 0, true, 0), ("Pong", "Ping", 0, true, 0),
}; };
let consumer = { let consumer = {
let ds = Arc::clone(&ds); let ds = Arc::clone(&ds);
let mut turn_counter: u64 = 0; let mut turn_counter: u64 = 0;
let mut event_counter: u64 = 0; let mut event_counter: u64 = 0;
let mut rtt_ns_samples: Vec<u64> = vec![0; report_latency_every]; let mut rtt_ns_samples: Vec<u64> = vec![0; report_latency_every];
let mut rtt_batch_count: usize = 0; let mut rtt_batch_count: usize = 0;
let mut current_reply = None; let mut current_reply = None;
let self_ref = t.create_inert(); let self_ref = t.create_inert();
self_ref.become_entity( self_ref.become_entity(
syndicate::entity(Arc::clone(&self_ref)) syndicate::entity(Arc::clone(&self_ref))
.on_message(move |self_ref, t, m: AnyValue| { .on_message(move |self_ref, t, m: AnyValue| {
match m.value().as_boolean() { match m.value().as_boolean() {
Some(true) => { Some(true) => {
tracing::info!("{:?} turns, {:?} events in the last second", tracing::info!("{:?} turns, {:?} events in the last second",
turn_counter, turn_counter,
event_counter); event_counter);
turn_counter = 0; turn_counter = 0;
event_counter = 0; event_counter = 0;
}
Some(false) => {
current_reply = None;
}
None => {
event_counter += 1;
let bindings = m.value().to_sequence()?;
let timestamp = &bindings[0];
let padding = &bindings[1];
if should_echo || (report_latency_every == 0) {
ds.message(t, &(), &simple_record2(&send_label,
timestamp.clone(),
padding.clone()));
} else {
if let None = current_reply {
turn_counter += 1;
t.message_for_myself(&self_ref, AnyValue::new(false));
let rtt_ns = now() - timestamp.value().to_u64()?;
rtt_ns_samples[rtt_batch_count] = rtt_ns;
rtt_batch_count += 1;
if rtt_batch_count == report_latency_every {
rtt_ns_samples.sort();
report_latencies(&rtt_ns_samples);
rtt_batch_count = 0;
}
current_reply = Some(
simple_record2(&send_label,
Value::from(now()).wrap(),
padding.clone()));
}
ds.message(t, &(), current_reply.as_ref().expect("some reply"));
}
}
}
Ok(())
}));
Cap::new(&self_ref)
};
ds.assert(t, language(), &Observe {
pattern: {
let recv_label = AnyValue::symbol(recv_label);
syndicate_macros::pattern!{<#(recv_label) $ $>}
},
observer: Arc::clone(&consumer),
});
t.linked_task(syndicate::name!("tick"), async move {
let mut stats_timer = interval(Duration::from_secs(1));
loop {
stats_timer.tick().await;
let consumer = Arc::clone(&consumer);
external_event(&Arc::clone(&consumer.underlying.mailbox),
&Account::new(syndicate::name!("account")),
Box::new(move |t| t.with_entity(
&consumer.underlying,
|t, e| e.message(t, AnyValue::new(true)))))?;
}
});
if let PingPongMode::Ping(c) = &config.mode {
let turn_count = c.turn_count;
let action_count = c.action_count;
let account = Arc::clone(t.account());
t.linked_task(syndicate::name!("boot-ping"), async move {
let padding = AnyValue::bytestring(vec![0; bytes_padding]);
for _ in 0..turn_count {
let mut events: PendingEventQueue = vec![];
let current_rec = simple_record2(send_label,
Value::from(now()).wrap(),
padding.clone());
for _ in 0..action_count {
let ds = Arc::clone(&ds);
let current_rec = current_rec.clone();
events.push(Box::new(move |t| t.with_entity(
&ds.underlying,
|t, e| e.message(t, current_rec))));
} }
external_events(&ds.underlying.mailbox, &account, events)? Some(false) => {
} current_reply = None;
Ok(LinkedTaskTermination::KeepFacet) }
}); None => {
} event_counter += 1;
let bindings = m.value().to_sequence()?;
let timestamp = &bindings[0];
let padding = &bindings[1];
Ok(None) if should_echo || (report_latency_every == 0) {
ds.message(t, &(), &simple_record2(&send_label,
timestamp.clone(),
padding.clone()));
} else {
if let None = current_reply {
turn_counter += 1;
t.message_for_myself(&self_ref, AnyValue::new(false));
let rtt_ns = now() - timestamp.value().to_u64()?;
rtt_ns_samples[rtt_batch_count] = rtt_ns;
rtt_batch_count += 1;
if rtt_batch_count == report_latency_every {
rtt_ns_samples.sort();
report_latencies(&rtt_ns_samples);
rtt_batch_count = 0;
}
current_reply = Some(
simple_record2(&send_label,
Value::from(now()).wrap(),
padding.clone()));
}
ds.message(t, &(), current_reply.as_ref().expect("some reply"));
}
}
}
Ok(())
}));
Cap::new(&self_ref)
};
ds.assert(t, language(), &Observe {
pattern: {
let recv_label = AnyValue::symbol(recv_label);
syndicate_macros::pattern!{<#(recv_label) $ $>}
},
observer: Arc::clone(&consumer),
});
t.linked_task(syndicate::name!("tick"), async move {
let mut stats_timer = interval(Duration::from_secs(1));
loop {
stats_timer.tick().await;
let consumer = Arc::clone(&consumer);
external_event(&Arc::clone(&consumer.underlying.mailbox),
&Account::new(syndicate::name!("account")),
Box::new(move |t| t.with_entity(
&consumer.underlying,
|t, e| e.message(t, AnyValue::new(true)))))?;
}
});
if let PingPongMode::Ping(c) = &config.mode {
let turn_count = c.turn_count;
let action_count = c.action_count;
let account = Arc::clone(t.account());
t.linked_task(syndicate::name!("boot-ping"), async move {
let padding = AnyValue::bytestring(vec![0; bytes_padding]);
for _ in 0..turn_count {
let mut events: PendingEventQueue = vec![];
let current_rec = simple_record2(send_label,
Value::from(now()).wrap(),
padding.clone());
for _ in 0..action_count {
let ds = Arc::clone(&ds);
let current_rec = current_rec.clone();
events.push(Box::new(move |t| t.with_entity(
&ds.underlying,
|t, e| e.message(t, current_rec))));
}
external_events(&ds.underlying.mailbox, &account, events)?
}
Ok(LinkedTaskTermination::KeepFacet)
}); });
Ok(()) }
})?;
Ok(LinkedTaskTermination::KeepFacet) Ok(None)
})) });
Ok(())
}).await??; }).await??;
Ok(()) Ok(())
} }

View File

@ -1,5 +1,3 @@
use std::sync::Arc;
use structopt::StructOpt; use structopt::StructOpt;
use syndicate::actor::*; use syndicate::actor::*;
@ -33,36 +31,29 @@ fn says(who: AnyValue, what: AnyValue) -> AnyValue {
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
syndicate::convenient_logging()?; syndicate::convenient_logging()?;
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Actor::new().boot(syndicate::name!("producer"), |t| { Actor::new().boot(syndicate::name!("producer"), |t| {
let facet = t.facet.clone(); relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| {
let boot_account = Arc::clone(t.account()); let padding: AnyValue = Value::ByteString(vec![0; config.bytes_padding]).wrap();
Ok(t.linked_task(tracing::Span::current(), async move { let action_count = config.action_count;
let config = Config::from_args(); let account = Account::new(syndicate::name!("account"));
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; t.linked_task(syndicate::name!("sender"), async move {
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); loop {
facet.activate(boot_account, |t| { account.ensure_clear_funds().await;
relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { let mut events: PendingEventQueue = Vec::new();
let padding: AnyValue = Value::ByteString(vec![0; config.bytes_padding]).wrap(); for _ in 0..action_count {
let action_count = config.action_count; events.push(Box::new(enclose!((ds, padding) move |t| t.with_entity(
let account = Account::new(syndicate::name!("account")); &ds.underlying, |t, e| e.message(
t.linked_task(syndicate::name!("sender"), async move { t, says(Value::from("producer").wrap(), padding))))));
loop { }
account.ensure_clear_funds().await; external_events(&ds.underlying.mailbox, &account, events)?;
let mut events: PendingEventQueue = Vec::new(); }
for _ in 0..action_count { });
events.push(Box::new(enclose!((ds, padding) move |t| t.with_entity( Ok(None)
&ds.underlying, |t, e| e.message( });
t, says(Value::from("producer").wrap(), padding)))))); Ok(())
}
external_events(&ds.underlying.mailbox, &account, events)?;
}
});
Ok(None)
});
Ok(())
})?;
Ok(LinkedTaskTermination::KeepFacet)
}))
}).await??; }).await??;
Ok(()) Ok(())
} }

View File

@ -23,70 +23,63 @@ pub struct Config {
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
syndicate::convenient_logging()?; syndicate::convenient_logging()?;
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Actor::new().boot(syndicate::name!("state-consumer"), |t| { Actor::new().boot(syndicate::name!("state-consumer"), |t| {
let facet = t.facet.clone(); relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| {
let boot_account = Arc::clone(t.account()); let consumer = {
Ok(t.linked_task(tracing::Span::current(), async move { #[derive(Default)]
let config = Config::from_args(); struct State {
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; event_counter: u64,
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); arrival_counter: u64,
facet.activate(boot_account, |t| { departure_counter: u64,
relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| { occupancy: u64,
let consumer = { }
#[derive(Default)] syndicate::entity(State::default()).on_asserted(move |s, _, _| {
struct State { s.event_counter += 1;
event_counter: u64, s.arrival_counter += 1;
arrival_counter: u64, s.occupancy += 1;
departure_counter: u64, Ok(Some(Box::new(|s, _| {
occupancy: u64, s.event_counter += 1;
} s.departure_counter += 1;
syndicate::entity(State::default()).on_asserted(move |s, _, _| { s.occupancy -= 1;
s.event_counter += 1; Ok(())
s.arrival_counter += 1; })))
s.occupancy += 1; }).on_message(move |s, _, _| {
Ok(Some(Box::new(|s, _| { tracing::info!(
s.event_counter += 1; "{:?} events, {:?} arrivals, {:?} departures, {:?} present in the last second",
s.departure_counter += 1; s.event_counter,
s.occupancy -= 1; s.arrival_counter,
Ok(()) s.departure_counter,
}))) s.occupancy);
}).on_message(move |s, _, _| { s.event_counter = 0;
tracing::info!( s.arrival_counter = 0;
"{:?} events, {:?} arrivals, {:?} departures, {:?} present in the last second", s.departure_counter = 0;
s.event_counter, Ok(())
s.arrival_counter, }).create_cap(t)
s.departure_counter, };
s.occupancy);
s.event_counter = 0;
s.arrival_counter = 0;
s.departure_counter = 0;
Ok(())
}).create_cap(t)
};
ds.assert(t, language(), &Observe { ds.assert(t, language(), &Observe {
pattern: syndicate_macros::pattern!{<Present $>}, pattern: syndicate_macros::pattern!{<Present $>},
observer: Arc::clone(&consumer), observer: Arc::clone(&consumer),
}); });
t.linked_task(syndicate::name!("tick"), async move { t.linked_task(syndicate::name!("tick"), async move {
let mut stats_timer = interval(Duration::from_secs(1)); let mut stats_timer = interval(Duration::from_secs(1));
loop { loop {
stats_timer.tick().await; stats_timer.tick().await;
let consumer = Arc::clone(&consumer); let consumer = Arc::clone(&consumer);
external_event(&Arc::clone(&consumer.underlying.mailbox), external_event(&Arc::clone(&consumer.underlying.mailbox),
&Account::new(syndicate::name!("account")), &Account::new(syndicate::name!("account")),
Box::new(move |t| t.with_entity( Box::new(move |t| t.with_entity(
&consumer.underlying, &consumer.underlying,
|t, e| e.message(t, AnyValue::new(true)))))?; |t, e| e.message(t, AnyValue::new(true)))))?;
} }
}); });
Ok(None) Ok(None)
}); });
Ok(()) Ok(())
})?;
Ok(LinkedTaskTermination::KeepFacet)
}))
}).await??; }).await??;
Ok(()) Ok(())
} }

View File

@ -19,46 +19,39 @@ pub struct Config {
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
syndicate::convenient_logging()?; syndicate::convenient_logging()?;
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Actor::new().boot(syndicate::name!("state-producer"), |t| { Actor::new().boot(syndicate::name!("state-producer"), |t| {
let facet = t.facet.clone(); relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| {
let boot_account = Arc::clone(t.account()); let account = Account::new(syndicate::name!("account"));
Ok(t.linked_task(tracing::Span::current(), async move { t.linked_task(syndicate::name!("sender"), async move {
let config = Config::from_args(); let presence: AnyValue = Value::simple_record1(
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; "Present",
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); Value::from(std::process::id()).wrap()).wrap();
facet.activate(boot_account, |t| { let handle = syndicate::actor::next_handle();
relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { let assert_e = || {
let account = Account::new(syndicate::name!("account")); external_event(
t.linked_task(syndicate::name!("sender"), async move { &Arc::clone(&ds.underlying.mailbox), &account, Box::new(enclose!(
let presence: AnyValue = Value::simple_record1( (ds, presence, handle) move |t| t.with_entity(
"Present", &ds.underlying, |t, e| e.assert(t, presence, handle)))))
Value::from(std::process::id()).wrap()).wrap(); };
let handle = syndicate::actor::next_handle(); let retract_e = || {
let assert_e = || { external_event(
external_event( &Arc::clone(&ds.underlying.mailbox), &account, Box::new(enclose!(
&Arc::clone(&ds.underlying.mailbox), &account, Box::new(enclose!( (ds, handle) move |t| t.with_entity(
(ds, presence, handle) move |t| t.with_entity( &ds.underlying, |t, e| e.retract(t, handle)))))
&ds.underlying, |t, e| e.assert(t, presence, handle))))) };
}; assert_e()?;
let retract_e = || { loop {
external_event( account.ensure_clear_funds().await;
&Arc::clone(&ds.underlying.mailbox), &account, Box::new(enclose!( retract_e()?;
(ds, handle) move |t| t.with_entity( assert_e()?;
&ds.underlying, |t, e| e.retract(t, handle))))) }
}; });
assert_e()?; Ok(None)
loop { });
account.ensure_clear_funds().await; Ok(())
retract_e()?;
assert_e()?;
}
});
Ok(None)
});
Ok(())
})?;
Ok(LinkedTaskTermination::KeepFacet)
}))
}).await??; }).await??;
Ok(()) Ok(())
} }