From d8fa812bb1ebcf4b895895c241c8ee849c27ea1c Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 23 Sep 2021 21:44:19 +0200 Subject: [PATCH] Box-and-client dataflow example --- Cargo.lock | 2 + syndicate-macros/Cargo.toml | 4 + syndicate-macros/examples/box-and-client.rs | 95 +++++++++++++++++++++ 3 files changed, 101 insertions(+) create mode 100644 syndicate-macros/examples/box-and-client.rs diff --git a/Cargo.lock b/Cargo.lock index 425696f..fa778ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1494,6 +1494,8 @@ dependencies = [ "quote", "syn", "syndicate", + "tokio", + "tracing", ] [[package]] diff --git a/syndicate-macros/Cargo.toml b/syndicate-macros/Cargo.toml index a6b93d4..c426d60 100644 --- a/syndicate-macros/Cargo.toml +++ b/syndicate-macros/Cargo.toml @@ -18,3 +18,7 @@ syndicate = { path = "../syndicate", version = "^0.12.0"} proc-macro2 = { version = "^1.0", features = ["span-locations"] } quote = "^1.0" syn = "^1.0" + +[dev-dependencies] +tokio = { version = "1.10", features = ["io-std"] } +tracing = "0.1" diff --git a/syndicate-macros/examples/box-and-client.rs b/syndicate-macros/examples/box-and-client.rs new file mode 100644 index 0000000..42e0f67 --- /dev/null +++ b/syndicate-macros/examples/box-and-client.rs @@ -0,0 +1,95 @@ +use std::sync::Arc; + +use syndicate::actor::*; +use syndicate::enclose; +use syndicate::dataspace::Dataspace; +use syndicate::language; +use syndicate::schemas::dataspace::Observe; +use syndicate::value::NestedValue; + +#[tokio::main] +async fn main() -> Result<(), Box> { + syndicate::convenient_logging()?; + Actor::new().boot(tracing::Span::current(), |t| { + let ds = Cap::new(&t.create(Dataspace::new())); + let _ = t.prevent_inert_check(); + + Actor::new().boot(syndicate::name!("box"), enclose!((ds) move |t| { + let current_value = t.field(0u64); + + t.dataflow({ + let mut state_assertion_handle = None; + let ds = ds.clone(); + let current_value = Arc::clone(¤t_value); + move |t| { + let v = AnyValue::new(*t.get(¤t_value)); + tracing::info!(?v, "asserting"); + ds.update(t, &mut state_assertion_handle, &(), + Some(&syndicate_macros::template!(""))); + Ok(()) + } + })?; + + let set_box_handler = syndicate::entity(()) + .on_message({ + let current_value = Arc::clone(¤t_value); + move |(), t, captures: AnyValue| { + let v = captures.value().to_sequence()?[0].value().to_u64()?; + tracing::info!(?v, "from set-box"); + t.set(¤t_value, v); + Ok(()) + } + }) + .create_cap(t); + ds.assert(t, language(), &Observe { + pattern: syndicate_macros::pattern!{}, + observer: set_box_handler, + }); + + t.dataflow({ + let current_value = Arc::clone(¤t_value); + move |t| { + if *t.get(¤t_value) == 1000000 { + t.stop(); + } + Ok(()) + } + })?; + + Ok(()) + })); + + Actor::new().boot(syndicate::name!("client"), enclose!((ds) move |t| { + let box_state_handler = syndicate::entity(0u32) + .on_asserted({ + let ds = ds.clone(); + move |count, t, captures: AnyValue| { + *count = *count + 1; + let value = captures.value().to_sequence()?[0].value().to_u64()?; + tracing::info!(?value); + let next = AnyValue::new(value + 1); + tracing::info!(?next, "sending"); + ds.message(t, &(), &syndicate_macros::template!("")); + Ok(Some(Box::new(|count, t| { + *count = *count - 1; + if *count == 0 { + tracing::info!("box state retracted"); + t.stop(); + } + Ok(()) + }))) + } + }) + .create_cap(t); + ds.assert(t, language(), &Observe { + pattern: syndicate_macros::pattern!{}, + observer: box_state_handler, + }); + + Ok(()) + })); + + Ok(()) + }).await??; + Ok(()) +}