Use `enclose!` macro

This commit is contained in:
Tony Garnock-Jones 2021-09-23 21:46:10 +02:00
parent d8fa812bb1
commit b81e936caf
8 changed files with 32 additions and 36 deletions

View File

@ -3,6 +3,7 @@ use std::sync::Arc;
use structopt::StructOpt; use structopt::StructOpt;
use syndicate::actor::*; use syndicate::actor::*;
use syndicate::enclose;
use syndicate::relay; use syndicate::relay;
use syndicate::sturdy; use syndicate::sturdy;
use syndicate::value::Value; use syndicate::value::Value;
@ -49,11 +50,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
account.ensure_clear_funds().await; account.ensure_clear_funds().await;
let mut events: PendingEventQueue = Vec::new(); let mut events: PendingEventQueue = Vec::new();
for _ in 0..action_count { for _ in 0..action_count {
let ds = Arc::clone(&ds); events.push(Box::new(enclose!((ds, padding) move |t| t.with_entity(
let padding = padding.clone(); &ds.underlying, |t, e| e.message(
events.push(Box::new(move |t| t.with_entity( t, says(Value::from("producer").wrap(), padding))))));
&ds.underlying,
|t, e| e.message(t, says(Value::from("producer").wrap(), padding)))));
} }
external_events(&ds.underlying.mailbox, &account, events)?; external_events(&ds.underlying.mailbox, &account, events)?;
} }

View File

@ -3,6 +3,7 @@ use std::sync::Arc;
use structopt::StructOpt; use structopt::StructOpt;
use syndicate::actor::*; use syndicate::actor::*;
use syndicate::enclose;
use syndicate::relay; use syndicate::relay;
use syndicate::sturdy; use syndicate::sturdy;
use syndicate::value::Value; use syndicate::value::Value;
@ -34,17 +35,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Value::from(std::process::id()).wrap()).wrap(); Value::from(std::process::id()).wrap()).wrap();
let handle = syndicate::actor::next_handle(); let handle = syndicate::actor::next_handle();
let assert_e = || { let assert_e = || {
let ds = Arc::clone(&ds); external_event(
let presence = presence.clone(); &Arc::clone(&ds.underlying.mailbox), &account, Box::new(enclose!(
let handle = handle.clone(); (ds, presence, handle) move |t| t.with_entity(
external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new( &ds.underlying, |t, e| e.assert(t, presence, handle)))))
move |t| t.with_entity(&ds.underlying, |t, e| e.assert(t, presence, handle))))
}; };
let retract_e = || { let retract_e = || {
let ds = Arc::clone(&ds); external_event(
let handle = handle.clone(); &Arc::clone(&ds.underlying.mailbox), &account, Box::new(enclose!(
external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new( (ds, handle) move |t| t.with_entity(
move |t| t.with_entity(&ds.underlying, |t, e| e.retract(t, handle)))) &ds.underlying, |t, e| e.retract(t, handle)))))
}; };
assert_e()?; assert_e()?;
loop { loop {

View File

@ -7,6 +7,7 @@ use structopt::StructOpt;
use syndicate::actor::*; use syndicate::actor::*;
use syndicate::dataspace::*; use syndicate::dataspace::*;
use syndicate::enclose;
use syndicate::relay; use syndicate::relay;
use syndicate::schemas::service; use syndicate::schemas::service;
use syndicate::schemas::transport_address; use syndicate::schemas::transport_address;
@ -79,12 +80,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
if config.inferior { if config.inferior {
tracing::info!("inferior server instance"); tracing::info!("inferior server instance");
let root_ds = Arc::clone(&root_ds); t.spawn(syndicate::name!("parent"), enclose!((root_ds) move |t| protocol::run_io_relay(
t.spawn(syndicate::name!("parent"), move |t| protocol::run_io_relay(
t, t,
relay::Input::Bytes(Box::pin(tokio::io::stdin())), relay::Input::Bytes(Box::pin(tokio::io::stdin())),
relay::Output::Bytes(Box::pin(tokio::io::stdout())), relay::Output::Bytes(Box::pin(tokio::io::stdout())),
root_ds)); root_ds)));
} }
let server_config_ds = Cap::new(&t.create(Dataspace::new())); let server_config_ds = Cap::new(&t.create(Dataspace::new()));

View File

@ -15,6 +15,7 @@ use std::thread;
use std::time::Duration; use std::time::Duration;
use syndicate::actor::*; use syndicate::actor::*;
use syndicate::enclose;
use syndicate::value::BinarySource; use syndicate::value::BinarySource;
use syndicate::value::IOBinarySource; use syndicate::value::IOBinarySource;
use syndicate::value::Map; use syndicate::value::Map;
@ -33,9 +34,8 @@ pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| {
Ok(during!(t, ds, language(), <require-service $spec: internal_services::ConfigWatcher>, Ok(during!(t, ds, language(), <require-service $spec: internal_services::ConfigWatcher>,
|t: &mut Activation| { |t: &mut Activation| {
let ds = Arc::clone(&ds);
t.spawn_link(syndicate::name!(parent: None, "config", spec = ?spec), t.spawn_link(syndicate::name!(parent: None, "config", spec = ?spec),
|t| run(t, ds, spec)); enclose!((ds) |t| run(t, ds, spec)));
Ok(()) Ok(())
})) }))
}); });

View File

@ -3,6 +3,7 @@ use preserves_schema::Codec;
use std::sync::Arc; use std::sync::Arc;
use syndicate::actor::*; use syndicate::actor::*;
use syndicate::enclose;
use syndicate::supervise::{Supervisor, SupervisorConfiguration}; use syndicate::supervise::{Supervisor, SupervisorConfiguration};
use tokio::process; use tokio::process;
@ -15,13 +16,12 @@ use syndicate_macros::during;
pub fn on_demand(t: &mut Activation, config_ds: Arc<Cap>, root_ds: Arc<Cap>) { pub fn on_demand(t: &mut Activation, config_ds: Arc<Cap>, root_ds: Arc<Cap>) {
t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| {
Ok(during!(t, config_ds, language(), <require-service $spec: DaemonService>, |t| { Ok(during!(t, config_ds, language(), <require-service $spec: DaemonService>, |t| {
let config_ds = Arc::clone(&config_ds);
let root_ds = Arc::clone(&root_ds);
Ok(Supervisor::start( Ok(Supervisor::start(
t, t,
syndicate::name!(parent: None, "daemon", service = ?spec), syndicate::name!(parent: None, "daemon", service = ?spec),
SupervisorConfiguration::default(), SupervisorConfiguration::default(),
move |t| run(t, Arc::clone(&config_ds), Arc::clone(&root_ds), spec.clone()))) enclose!((config_ds, root_ds) move |t|
enclose!((config_ds, root_ds, spec) run(t, config_ds, root_ds, spec)))))
})) }))
}); });
} }

View File

@ -1,6 +1,7 @@
use std::sync::Arc; use std::sync::Arc;
use syndicate::actor::*; use syndicate::actor::*;
use syndicate::enclose;
use syndicate::preserves_schema::Codec; use syndicate::preserves_schema::Codec;
use crate::language::language; use crate::language::language;
@ -11,8 +12,7 @@ use syndicate_macros::during;
pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) { pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| {
Ok(during!(t, ds, language(), <require-service $_spec: DebtReporter>, |t: &mut Activation| { Ok(during!(t, ds, language(), <require-service $_spec: DebtReporter>, |t: &mut Activation| {
let ds = Arc::clone(&ds); t.spawn_link(tracing::Span::current(), enclose!((ds) |t| run(t, ds)));
t.spawn_link(tracing::Span::current(), |t| run(t, ds));
Ok(()) Ok(())
})) }))
}); });

View File

@ -4,6 +4,7 @@ use std::convert::TryFrom;
use std::sync::Arc; use std::sync::Arc;
use syndicate::actor::*; use syndicate::actor::*;
use syndicate::enclose;
use syndicate::supervise::{Supervisor, SupervisorConfiguration}; use syndicate::supervise::{Supervisor, SupervisorConfiguration};
use tokio::net::TcpListener; use tokio::net::TcpListener;
@ -18,13 +19,12 @@ pub fn on_demand(t: &mut Activation, ds: Arc<Cap>, gateway: Arc<Cap>) {
t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| {
Ok(during!(t, ds, language(), <require-service $spec: internal_services::TcpRelayListener>, Ok(during!(t, ds, language(), <require-service $spec: internal_services::TcpRelayListener>,
|t| { |t| {
let ds = Arc::clone(&ds);
let gateway = Arc::clone(&gateway);
Supervisor::start( Supervisor::start(
t, t,
syndicate::name!(parent: None, "relay", addr = ?spec), syndicate::name!(parent: None, "relay", addr = ?spec),
SupervisorConfiguration::default(), SupervisorConfiguration::default(),
move |t| run(t, Arc::clone(&ds), Arc::clone(&gateway), spec.clone())); enclose!((ds, gateway) move |t|
enclose!((ds, gateway, spec) run(t, ds, gateway, spec))));
Ok(()) Ok(())
})) }))
}); });
@ -49,11 +49,10 @@ fn run(
tracing::info!("listening"); tracing::info!("listening");
loop { loop {
let (stream, addr) = listener.accept().await?; let (stream, addr) = listener.accept().await?;
let gateway = Arc::clone(&gateway);
Actor::new().boot(syndicate::name!(parent: parent_span.clone(), "conn"), Actor::new().boot(syndicate::name!(parent: parent_span.clone(), "conn"),
move |t| Ok(t.linked_task( enclose!((gateway) move |t| Ok(t.linked_task(
tracing::Span::current(), tracing::Span::current(),
detect_protocol(t.facet.clone(), stream, gateway, addr)))); detect_protocol(t.facet.clone(), stream, gateway, addr)))));
} }
}); });
Ok(()) Ok(())

View File

@ -5,6 +5,7 @@ use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use syndicate::actor::*; use syndicate::actor::*;
use syndicate::enclose;
use syndicate::error::Error; use syndicate::error::Error;
use syndicate::relay; use syndicate::relay;
@ -21,10 +22,8 @@ pub fn on_demand(t: &mut Activation, ds: Arc<Cap>, gateway: Arc<Cap>) {
t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| {
Ok(during!(t, ds, language(), <require-service $spec: internal_services::UnixRelayListener>, Ok(during!(t, ds, language(), <require-service $spec: internal_services::UnixRelayListener>,
|t: &mut Activation| { |t: &mut Activation| {
let ds = Arc::clone(&ds);
let gateway = Arc::clone(&gateway);
t.spawn_link(syndicate::name!(parent: None, "relay", addr = ?spec), t.spawn_link(syndicate::name!(parent: None, "relay", addr = ?spec),
|t| run(t, ds, gateway, spec)); enclose!((ds, gateway) |t| run(t, ds, gateway, spec)));
Ok(()) Ok(())
})) }))
}); });
@ -48,12 +47,11 @@ fn run(
loop { loop {
let (stream, _addr) = listener.accept().await?; let (stream, _addr) = listener.accept().await?;
let peer = stream.peer_cred()?; let peer = stream.peer_cred()?;
let gateway = Arc::clone(&gateway);
Actor::new().boot( Actor::new().boot(
syndicate::name!(parent: parent_span.clone(), "conn", syndicate::name!(parent: parent_span.clone(), "conn",
pid = ?peer.pid().unwrap_or(-1), pid = ?peer.pid().unwrap_or(-1),
uid = peer.uid()), uid = peer.uid()),
|t| Ok(t.linked_task( enclose!((gateway) |t| Ok(t.linked_task(
tracing::Span::current(), tracing::Span::current(),
{ {
let facet = t.facet.clone(); let facet = t.facet.clone();
@ -65,7 +63,7 @@ fn run(
relay::Output::Bytes(Box::pin(o)), relay::Output::Bytes(Box::pin(o)),
gateway) gateway)
} }
}))); }))));
} }
}); });
Ok(()) Ok(())