diff --git a/syndicate-server/examples/consumer.rs b/syndicate-server/examples/consumer.rs index ee458f8..703fecf 100644 --- a/syndicate-server/examples/consumer.rs +++ b/syndicate-server/examples/consumer.rs @@ -23,49 +23,42 @@ pub struct Config { #[tokio::main] async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; + let config = Config::from_args(); + let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; + let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); Actor::new().boot(syndicate::name!("consumer"), |t| { - let facet = t.facet.clone(); - let boot_account = Arc::clone(t.account()); - Ok(t.linked_task(tracing::Span::current(), async move { - let config = Config::from_args(); - let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; - let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - facet.activate(boot_account, |t| { - relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| { - let consumer = syndicate::entity(0) - .on_message(|message_count, _t, m: AnyValue| { - if m.value().is_boolean() { - tracing::info!("{:?} messages in the last second", message_count); - *message_count = 0; - } else { - *message_count += 1; - } - Ok(()) - }) - .create_cap(t); - ds.assert(t, language(), &Observe { - pattern: syndicate_macros::pattern!{}, - observer: Arc::clone(&consumer), - }); + relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| { + let consumer = syndicate::entity(0) + .on_message(|message_count, _t, m: AnyValue| { + if m.value().is_boolean() { + tracing::info!("{:?} messages in the last second", message_count); + *message_count = 0; + } else { + *message_count += 1; + } + Ok(()) + }) + .create_cap(t); + ds.assert(t, language(), &Observe { + pattern: syndicate_macros::pattern!{}, + observer: Arc::clone(&consumer), + }); - t.linked_task(syndicate::name!("tick"), async move { - let mut stats_timer = interval(Duration::from_secs(1)); - loop { - stats_timer.tick().await; - let consumer = Arc::clone(&consumer); - external_event(&Arc::clone(&consumer.underlying.mailbox), - &Account::new(syndicate::name!("account")), - Box::new(move |t| t.with_entity( - &consumer.underlying, - |t, e| e.message(t, AnyValue::new(true)))))?; - } - }); - Ok(None) - }); - Ok(()) - })?; - Ok(LinkedTaskTermination::KeepFacet) - })) + t.linked_task(syndicate::name!("tick"), async move { + let mut stats_timer = interval(Duration::from_secs(1)); + loop { + stats_timer.tick().await; + let consumer = Arc::clone(&consumer); + external_event(&Arc::clone(&consumer.underlying.mailbox), + &Account::new(syndicate::name!("account")), + Box::new(move |t| t.with_entity( + &consumer.underlying, + |t, e| e.message(t, AnyValue::new(true)))))?; + } + }); + Ok(None) + }); + Ok(()) }).await??; Ok(()) } diff --git a/syndicate-server/examples/pingpong.rs b/syndicate-server/examples/pingpong.rs index 0a5d131..0b015ff 100644 --- a/syndicate-server/examples/pingpong.rs +++ b/syndicate-server/examples/pingpong.rs @@ -90,136 +90,128 @@ fn report_latencies(rtt_ns_samples: &Vec) { #[tokio::main] async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; - + let config = Config::from_args(); + let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; + let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); Actor::new().boot(syndicate::name!("pingpong"), |t| { - let facet = t.facet.clone(); - let boot_account = Arc::clone(t.account()); - Ok(t.linked_task(tracing::Span::current(), async move { - let config = Config::from_args(); - let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; - let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - facet.activate(boot_account, |t| { - relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { + relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { - let (send_label, recv_label, report_latency_every, should_echo, bytes_padding) = - match config.mode { - PingPongMode::Ping(ref c) => - ("Ping", "Pong", c.report_latency_every, false, c.bytes_padding), - PingPongMode::Pong => - ("Pong", "Ping", 0, true, 0), - }; + let (send_label, recv_label, report_latency_every, should_echo, bytes_padding) = + match config.mode { + PingPongMode::Ping(ref c) => + ("Ping", "Pong", c.report_latency_every, false, c.bytes_padding), + PingPongMode::Pong => + ("Pong", "Ping", 0, true, 0), + }; - let consumer = { - let ds = Arc::clone(&ds); - let mut turn_counter: u64 = 0; - let mut event_counter: u64 = 0; - let mut rtt_ns_samples: Vec = vec![0; report_latency_every]; - let mut rtt_batch_count: usize = 0; - let mut current_reply = None; - let self_ref = t.create_inert(); - self_ref.become_entity( - syndicate::entity(Arc::clone(&self_ref)) - .on_message(move |self_ref, t, m: AnyValue| { - match m.value().as_boolean() { - Some(true) => { - tracing::info!("{:?} turns, {:?} events in the last second", - turn_counter, - event_counter); - turn_counter = 0; - event_counter = 0; - } - Some(false) => { - current_reply = None; - } - None => { - event_counter += 1; - let bindings = m.value().to_sequence()?; - let timestamp = &bindings[0]; - let padding = &bindings[1]; - - if should_echo || (report_latency_every == 0) { - ds.message(t, &(), &simple_record2(&send_label, - timestamp.clone(), - padding.clone())); - } else { - if let None = current_reply { - turn_counter += 1; - t.message_for_myself(&self_ref, AnyValue::new(false)); - let rtt_ns = now() - timestamp.value().to_u64()?; - rtt_ns_samples[rtt_batch_count] = rtt_ns; - rtt_batch_count += 1; - - if rtt_batch_count == report_latency_every { - rtt_ns_samples.sort(); - report_latencies(&rtt_ns_samples); - rtt_batch_count = 0; - } - - current_reply = Some( - simple_record2(&send_label, - Value::from(now()).wrap(), - padding.clone())); - } - ds.message(t, &(), current_reply.as_ref().expect("some reply")); - } - } - } - Ok(()) - })); - Cap::new(&self_ref) - }; - - ds.assert(t, language(), &Observe { - pattern: { - let recv_label = AnyValue::symbol(recv_label); - syndicate_macros::pattern!{<#(recv_label) $ $>} - }, - observer: Arc::clone(&consumer), - }); - - t.linked_task(syndicate::name!("tick"), async move { - let mut stats_timer = interval(Duration::from_secs(1)); - loop { - stats_timer.tick().await; - let consumer = Arc::clone(&consumer); - external_event(&Arc::clone(&consumer.underlying.mailbox), - &Account::new(syndicate::name!("account")), - Box::new(move |t| t.with_entity( - &consumer.underlying, - |t, e| e.message(t, AnyValue::new(true)))))?; - } - }); - - if let PingPongMode::Ping(c) = &config.mode { - let turn_count = c.turn_count; - let action_count = c.action_count; - let account = Arc::clone(t.account()); - t.linked_task(syndicate::name!("boot-ping"), async move { - let padding = AnyValue::bytestring(vec![0; bytes_padding]); - for _ in 0..turn_count { - let mut events: PendingEventQueue = vec![]; - let current_rec = simple_record2(send_label, - Value::from(now()).wrap(), - padding.clone()); - for _ in 0..action_count { - let ds = Arc::clone(&ds); - let current_rec = current_rec.clone(); - events.push(Box::new(move |t| t.with_entity( - &ds.underlying, - |t, e| e.message(t, current_rec)))); + let consumer = { + let ds = Arc::clone(&ds); + let mut turn_counter: u64 = 0; + let mut event_counter: u64 = 0; + let mut rtt_ns_samples: Vec = vec![0; report_latency_every]; + let mut rtt_batch_count: usize = 0; + let mut current_reply = None; + let self_ref = t.create_inert(); + self_ref.become_entity( + syndicate::entity(Arc::clone(&self_ref)) + .on_message(move |self_ref, t, m: AnyValue| { + match m.value().as_boolean() { + Some(true) => { + tracing::info!("{:?} turns, {:?} events in the last second", + turn_counter, + event_counter); + turn_counter = 0; + event_counter = 0; } - external_events(&ds.underlying.mailbox, &account, events)? - } - Ok(LinkedTaskTermination::KeepFacet) - }); - } + Some(false) => { + current_reply = None; + } + None => { + event_counter += 1; + let bindings = m.value().to_sequence()?; + let timestamp = &bindings[0]; + let padding = &bindings[1]; - Ok(None) + if should_echo || (report_latency_every == 0) { + ds.message(t, &(), &simple_record2(&send_label, + timestamp.clone(), + padding.clone())); + } else { + if let None = current_reply { + turn_counter += 1; + t.message_for_myself(&self_ref, AnyValue::new(false)); + let rtt_ns = now() - timestamp.value().to_u64()?; + rtt_ns_samples[rtt_batch_count] = rtt_ns; + rtt_batch_count += 1; + + if rtt_batch_count == report_latency_every { + rtt_ns_samples.sort(); + report_latencies(&rtt_ns_samples); + rtt_batch_count = 0; + } + + current_reply = Some( + simple_record2(&send_label, + Value::from(now()).wrap(), + padding.clone())); + } + ds.message(t, &(), current_reply.as_ref().expect("some reply")); + } + } + } + Ok(()) + })); + Cap::new(&self_ref) + }; + + ds.assert(t, language(), &Observe { + pattern: { + let recv_label = AnyValue::symbol(recv_label); + syndicate_macros::pattern!{<#(recv_label) $ $>} + }, + observer: Arc::clone(&consumer), + }); + + t.linked_task(syndicate::name!("tick"), async move { + let mut stats_timer = interval(Duration::from_secs(1)); + loop { + stats_timer.tick().await; + let consumer = Arc::clone(&consumer); + external_event(&Arc::clone(&consumer.underlying.mailbox), + &Account::new(syndicate::name!("account")), + Box::new(move |t| t.with_entity( + &consumer.underlying, + |t, e| e.message(t, AnyValue::new(true)))))?; + } + }); + + if let PingPongMode::Ping(c) = &config.mode { + let turn_count = c.turn_count; + let action_count = c.action_count; + let account = Arc::clone(t.account()); + t.linked_task(syndicate::name!("boot-ping"), async move { + let padding = AnyValue::bytestring(vec![0; bytes_padding]); + for _ in 0..turn_count { + let mut events: PendingEventQueue = vec![]; + let current_rec = simple_record2(send_label, + Value::from(now()).wrap(), + padding.clone()); + for _ in 0..action_count { + let ds = Arc::clone(&ds); + let current_rec = current_rec.clone(); + events.push(Box::new(move |t| t.with_entity( + &ds.underlying, + |t, e| e.message(t, current_rec)))); + } + external_events(&ds.underlying.mailbox, &account, events)? + } + Ok(LinkedTaskTermination::KeepFacet) }); - Ok(()) - })?; - Ok(LinkedTaskTermination::KeepFacet) - })) + } + + Ok(None) + }); + Ok(()) }).await??; Ok(()) } diff --git a/syndicate-server/examples/producer.rs b/syndicate-server/examples/producer.rs index 65a1286..dd74579 100644 --- a/syndicate-server/examples/producer.rs +++ b/syndicate-server/examples/producer.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use structopt::StructOpt; use syndicate::actor::*; @@ -33,36 +31,29 @@ fn says(who: AnyValue, what: AnyValue) -> AnyValue { #[tokio::main] async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; + let config = Config::from_args(); + let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; + let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); Actor::new().boot(syndicate::name!("producer"), |t| { - let facet = t.facet.clone(); - let boot_account = Arc::clone(t.account()); - Ok(t.linked_task(tracing::Span::current(), async move { - let config = Config::from_args(); - let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; - let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - facet.activate(boot_account, |t| { - relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { - let padding: AnyValue = Value::ByteString(vec![0; config.bytes_padding]).wrap(); - let action_count = config.action_count; - let account = Account::new(syndicate::name!("account")); - t.linked_task(syndicate::name!("sender"), async move { - loop { - account.ensure_clear_funds().await; - let mut events: PendingEventQueue = Vec::new(); - for _ in 0..action_count { - events.push(Box::new(enclose!((ds, padding) move |t| t.with_entity( - &ds.underlying, |t, e| e.message( - t, says(Value::from("producer").wrap(), padding)))))); - } - external_events(&ds.underlying.mailbox, &account, events)?; - } - }); - Ok(None) - }); - Ok(()) - })?; - Ok(LinkedTaskTermination::KeepFacet) - })) + relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { + let padding: AnyValue = Value::ByteString(vec![0; config.bytes_padding]).wrap(); + let action_count = config.action_count; + let account = Account::new(syndicate::name!("account")); + t.linked_task(syndicate::name!("sender"), async move { + loop { + account.ensure_clear_funds().await; + let mut events: PendingEventQueue = Vec::new(); + for _ in 0..action_count { + events.push(Box::new(enclose!((ds, padding) move |t| t.with_entity( + &ds.underlying, |t, e| e.message( + t, says(Value::from("producer").wrap(), padding)))))); + } + external_events(&ds.underlying.mailbox, &account, events)?; + } + }); + Ok(None) + }); + Ok(()) }).await??; Ok(()) } diff --git a/syndicate-server/examples/state-consumer.rs b/syndicate-server/examples/state-consumer.rs index 362c77b..f602bf1 100644 --- a/syndicate-server/examples/state-consumer.rs +++ b/syndicate-server/examples/state-consumer.rs @@ -23,70 +23,63 @@ pub struct Config { #[tokio::main] async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; + let config = Config::from_args(); + let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; + let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); Actor::new().boot(syndicate::name!("state-consumer"), |t| { - let facet = t.facet.clone(); - let boot_account = Arc::clone(t.account()); - Ok(t.linked_task(tracing::Span::current(), async move { - let config = Config::from_args(); - let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; - let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - facet.activate(boot_account, |t| { - relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| { - let consumer = { - #[derive(Default)] - struct State { - event_counter: u64, - arrival_counter: u64, - departure_counter: u64, - occupancy: u64, - } - syndicate::entity(State::default()).on_asserted(move |s, _, _| { - s.event_counter += 1; - s.arrival_counter += 1; - s.occupancy += 1; - Ok(Some(Box::new(|s, _| { - s.event_counter += 1; - s.departure_counter += 1; - s.occupancy -= 1; - Ok(()) - }))) - }).on_message(move |s, _, _| { - tracing::info!( - "{:?} events, {:?} arrivals, {:?} departures, {:?} present in the last second", - s.event_counter, - s.arrival_counter, - s.departure_counter, - s.occupancy); - s.event_counter = 0; - s.arrival_counter = 0; - s.departure_counter = 0; - Ok(()) - }).create_cap(t) - }; + relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| { + let consumer = { + #[derive(Default)] + struct State { + event_counter: u64, + arrival_counter: u64, + departure_counter: u64, + occupancy: u64, + } + syndicate::entity(State::default()).on_asserted(move |s, _, _| { + s.event_counter += 1; + s.arrival_counter += 1; + s.occupancy += 1; + Ok(Some(Box::new(|s, _| { + s.event_counter += 1; + s.departure_counter += 1; + s.occupancy -= 1; + Ok(()) + }))) + }).on_message(move |s, _, _| { + tracing::info!( + "{:?} events, {:?} arrivals, {:?} departures, {:?} present in the last second", + s.event_counter, + s.arrival_counter, + s.departure_counter, + s.occupancy); + s.event_counter = 0; + s.arrival_counter = 0; + s.departure_counter = 0; + Ok(()) + }).create_cap(t) + }; - ds.assert(t, language(), &Observe { - pattern: syndicate_macros::pattern!{}, - observer: Arc::clone(&consumer), - }); + ds.assert(t, language(), &Observe { + pattern: syndicate_macros::pattern!{}, + observer: Arc::clone(&consumer), + }); - t.linked_task(syndicate::name!("tick"), async move { - let mut stats_timer = interval(Duration::from_secs(1)); - loop { - stats_timer.tick().await; - let consumer = Arc::clone(&consumer); - external_event(&Arc::clone(&consumer.underlying.mailbox), - &Account::new(syndicate::name!("account")), - Box::new(move |t| t.with_entity( - &consumer.underlying, - |t, e| e.message(t, AnyValue::new(true)))))?; - } - }); - Ok(None) - }); - Ok(()) - })?; - Ok(LinkedTaskTermination::KeepFacet) - })) + t.linked_task(syndicate::name!("tick"), async move { + let mut stats_timer = interval(Duration::from_secs(1)); + loop { + stats_timer.tick().await; + let consumer = Arc::clone(&consumer); + external_event(&Arc::clone(&consumer.underlying.mailbox), + &Account::new(syndicate::name!("account")), + Box::new(move |t| t.with_entity( + &consumer.underlying, + |t, e| e.message(t, AnyValue::new(true)))))?; + } + }); + Ok(None) + }); + Ok(()) }).await??; Ok(()) } diff --git a/syndicate-server/examples/state-producer.rs b/syndicate-server/examples/state-producer.rs index 7537cac..805e993 100644 --- a/syndicate-server/examples/state-producer.rs +++ b/syndicate-server/examples/state-producer.rs @@ -19,46 +19,39 @@ pub struct Config { #[tokio::main] async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; + let config = Config::from_args(); + let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; + let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); Actor::new().boot(syndicate::name!("state-producer"), |t| { - let facet = t.facet.clone(); - let boot_account = Arc::clone(t.account()); - Ok(t.linked_task(tracing::Span::current(), async move { - let config = Config::from_args(); - let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?; - let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); - facet.activate(boot_account, |t| { - relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { - let account = Account::new(syndicate::name!("account")); - t.linked_task(syndicate::name!("sender"), async move { - let presence: AnyValue = Value::simple_record1( - "Present", - Value::from(std::process::id()).wrap()).wrap(); - let handle = syndicate::actor::next_handle(); - let assert_e = || { - external_event( - &Arc::clone(&ds.underlying.mailbox), &account, Box::new(enclose!( - (ds, presence, handle) move |t| t.with_entity( - &ds.underlying, |t, e| e.assert(t, presence, handle))))) - }; - let retract_e = || { - external_event( - &Arc::clone(&ds.underlying.mailbox), &account, Box::new(enclose!( - (ds, handle) move |t| t.with_entity( - &ds.underlying, |t, e| e.retract(t, handle))))) - }; - assert_e()?; - loop { - account.ensure_clear_funds().await; - retract_e()?; - assert_e()?; - } - }); - Ok(None) - }); - Ok(()) - })?; - Ok(LinkedTaskTermination::KeepFacet) - })) + relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { + let account = Account::new(syndicate::name!("account")); + t.linked_task(syndicate::name!("sender"), async move { + let presence: AnyValue = Value::simple_record1( + "Present", + Value::from(std::process::id()).wrap()).wrap(); + let handle = syndicate::actor::next_handle(); + let assert_e = || { + external_event( + &Arc::clone(&ds.underlying.mailbox), &account, Box::new(enclose!( + (ds, presence, handle) move |t| t.with_entity( + &ds.underlying, |t, e| e.assert(t, presence, handle))))) + }; + let retract_e = || { + external_event( + &Arc::clone(&ds.underlying.mailbox), &account, Box::new(enclose!( + (ds, handle) move |t| t.with_entity( + &ds.underlying, |t, e| e.retract(t, handle))))) + }; + assert_e()?; + loop { + account.ensure_clear_funds().await; + retract_e()?; + assert_e()?; + } + }); + Ok(None) + }); + Ok(()) }).await??; Ok(()) }