Box-and-client dataflow example
This commit is contained in:
parent
531d66205b
commit
d8fa812bb1
|
@ -1494,6 +1494,8 @@ dependencies = [
|
|||
"quote",
|
||||
"syn",
|
||||
"syndicate",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
@ -18,3 +18,7 @@ syndicate = { path = "../syndicate", version = "^0.12.0"}
|
|||
proc-macro2 = { version = "^1.0", features = ["span-locations"] }
|
||||
quote = "^1.0"
|
||||
syn = "^1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1.10", features = ["io-std"] }
|
||||
tracing = "0.1"
|
||||
|
|
|
@ -0,0 +1,95 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use syndicate::actor::*;
|
||||
use syndicate::enclose;
|
||||
use syndicate::dataspace::Dataspace;
|
||||
use syndicate::language;
|
||||
use syndicate::schemas::dataspace::Observe;
|
||||
use syndicate::value::NestedValue;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
syndicate::convenient_logging()?;
|
||||
Actor::new().boot(tracing::Span::current(), |t| {
|
||||
let ds = Cap::new(&t.create(Dataspace::new()));
|
||||
let _ = t.prevent_inert_check();
|
||||
|
||||
Actor::new().boot(syndicate::name!("box"), enclose!((ds) move |t| {
|
||||
let current_value = t.field(0u64);
|
||||
|
||||
t.dataflow({
|
||||
let mut state_assertion_handle = None;
|
||||
let ds = ds.clone();
|
||||
let current_value = Arc::clone(¤t_value);
|
||||
move |t| {
|
||||
let v = AnyValue::new(*t.get(¤t_value));
|
||||
tracing::info!(?v, "asserting");
|
||||
ds.update(t, &mut state_assertion_handle, &(),
|
||||
Some(&syndicate_macros::template!("<box-state =v>")));
|
||||
Ok(())
|
||||
}
|
||||
})?;
|
||||
|
||||
let set_box_handler = syndicate::entity(())
|
||||
.on_message({
|
||||
let current_value = Arc::clone(¤t_value);
|
||||
move |(), t, captures: AnyValue| {
|
||||
let v = captures.value().to_sequence()?[0].value().to_u64()?;
|
||||
tracing::info!(?v, "from set-box");
|
||||
t.set(¤t_value, v);
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
.create_cap(t);
|
||||
ds.assert(t, language(), &Observe {
|
||||
pattern: syndicate_macros::pattern!{<set-box $>},
|
||||
observer: set_box_handler,
|
||||
});
|
||||
|
||||
t.dataflow({
|
||||
let current_value = Arc::clone(¤t_value);
|
||||
move |t| {
|
||||
if *t.get(¤t_value) == 1000000 {
|
||||
t.stop();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}));
|
||||
|
||||
Actor::new().boot(syndicate::name!("client"), enclose!((ds) move |t| {
|
||||
let box_state_handler = syndicate::entity(0u32)
|
||||
.on_asserted({
|
||||
let ds = ds.clone();
|
||||
move |count, t, captures: AnyValue| {
|
||||
*count = *count + 1;
|
||||
let value = captures.value().to_sequence()?[0].value().to_u64()?;
|
||||
tracing::info!(?value);
|
||||
let next = AnyValue::new(value + 1);
|
||||
tracing::info!(?next, "sending");
|
||||
ds.message(t, &(), &syndicate_macros::template!("<set-box =next>"));
|
||||
Ok(Some(Box::new(|count, t| {
|
||||
*count = *count - 1;
|
||||
if *count == 0 {
|
||||
tracing::info!("box state retracted");
|
||||
t.stop();
|
||||
}
|
||||
Ok(())
|
||||
})))
|
||||
}
|
||||
})
|
||||
.create_cap(t);
|
||||
ds.assert(t, language(), &Observe {
|
||||
pattern: syndicate_macros::pattern!{<box-state $>},
|
||||
observer: box_state_handler,
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}));
|
||||
|
||||
Ok(())
|
||||
}).await??;
|
||||
Ok(())
|
||||
}
|
Loading…
Reference in New Issue