diff --git a/syndicate-server/examples/producer.rs b/syndicate-server/examples/producer.rs index f8ecc18..659e387 100644 --- a/syndicate-server/examples/producer.rs +++ b/syndicate-server/examples/producer.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use structopt::StructOpt; use syndicate::actor::*; +use syndicate::enclose; use syndicate::relay; use syndicate::sturdy; use syndicate::value::Value; @@ -49,11 +50,9 @@ async fn main() -> Result<(), Box> { account.ensure_clear_funds().await; let mut events: PendingEventQueue = Vec::new(); for _ in 0..action_count { - let ds = Arc::clone(&ds); - let padding = padding.clone(); - events.push(Box::new(move |t| t.with_entity( - &ds.underlying, - |t, e| e.message(t, says(Value::from("producer").wrap(), padding))))); + events.push(Box::new(enclose!((ds, padding) move |t| t.with_entity( + &ds.underlying, |t, e| e.message( + t, says(Value::from("producer").wrap(), padding)))))); } external_events(&ds.underlying.mailbox, &account, events)?; } diff --git a/syndicate-server/examples/state-producer.rs b/syndicate-server/examples/state-producer.rs index 6376ed4..e03a05f 100644 --- a/syndicate-server/examples/state-producer.rs +++ b/syndicate-server/examples/state-producer.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use structopt::StructOpt; use syndicate::actor::*; +use syndicate::enclose; use syndicate::relay; use syndicate::sturdy; use syndicate::value::Value; @@ -34,17 +35,16 @@ async fn main() -> Result<(), Box> { Value::from(std::process::id()).wrap()).wrap(); let handle = syndicate::actor::next_handle(); let assert_e = || { - let ds = Arc::clone(&ds); - let presence = presence.clone(); - let handle = handle.clone(); - external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new( - move |t| t.with_entity(&ds.underlying, |t, e| e.assert(t, presence, handle)))) + external_event( + &Arc::clone(&ds.underlying.mailbox), &account, Box::new(enclose!( + (ds, presence, handle) move |t| t.with_entity( + &ds.underlying, |t, e| e.assert(t, presence, handle))))) }; let retract_e = || { - let ds = Arc::clone(&ds); - let handle = handle.clone(); - external_event(&Arc::clone(&ds.underlying.mailbox), &account, Box::new( - move |t| t.with_entity(&ds.underlying, |t, e| e.retract(t, handle)))) + external_event( + &Arc::clone(&ds.underlying.mailbox), &account, Box::new(enclose!( + (ds, handle) move |t| t.with_entity( + &ds.underlying, |t, e| e.retract(t, handle))))) }; assert_e()?; loop { diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index 2564858..7c72591 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -7,6 +7,7 @@ use structopt::StructOpt; use syndicate::actor::*; use syndicate::dataspace::*; +use syndicate::enclose; use syndicate::relay; use syndicate::schemas::service; use syndicate::schemas::transport_address; @@ -79,12 +80,11 @@ async fn main() -> Result<(), Box> { if config.inferior { tracing::info!("inferior server instance"); - let root_ds = Arc::clone(&root_ds); - t.spawn(syndicate::name!("parent"), move |t| protocol::run_io_relay( + t.spawn(syndicate::name!("parent"), enclose!((root_ds) move |t| protocol::run_io_relay( t, relay::Input::Bytes(Box::pin(tokio::io::stdin())), relay::Output::Bytes(Box::pin(tokio::io::stdout())), - root_ds)); + root_ds))); } let server_config_ds = Cap::new(&t.create(Dataspace::new())); diff --git a/syndicate-server/src/services/config_watcher.rs b/syndicate-server/src/services/config_watcher.rs index cd15e84..f020bd6 100644 --- a/syndicate-server/src/services/config_watcher.rs +++ b/syndicate-server/src/services/config_watcher.rs @@ -15,6 +15,7 @@ use std::thread; use std::time::Duration; use syndicate::actor::*; +use syndicate::enclose; use syndicate::value::BinarySource; use syndicate::value::IOBinarySource; use syndicate::value::Map; @@ -33,9 +34,8 @@ pub fn on_demand(t: &mut Activation, ds: Arc) { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { Ok(during!(t, ds, language(), , |t: &mut Activation| { - let ds = Arc::clone(&ds); t.spawn_link(syndicate::name!(parent: None, "config", spec = ?spec), - |t| run(t, ds, spec)); + enclose!((ds) |t| run(t, ds, spec))); Ok(()) })) }); diff --git a/syndicate-server/src/services/daemon.rs b/syndicate-server/src/services/daemon.rs index 51d1728..62b5966 100644 --- a/syndicate-server/src/services/daemon.rs +++ b/syndicate-server/src/services/daemon.rs @@ -3,6 +3,7 @@ use preserves_schema::Codec; use std::sync::Arc; use syndicate::actor::*; +use syndicate::enclose; use syndicate::supervise::{Supervisor, SupervisorConfiguration}; use tokio::process; @@ -15,13 +16,12 @@ use syndicate_macros::during; pub fn on_demand(t: &mut Activation, config_ds: Arc, root_ds: Arc) { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { Ok(during!(t, config_ds, language(), , |t| { - let config_ds = Arc::clone(&config_ds); - let root_ds = Arc::clone(&root_ds); Ok(Supervisor::start( t, syndicate::name!(parent: None, "daemon", service = ?spec), SupervisorConfiguration::default(), - move |t| run(t, Arc::clone(&config_ds), Arc::clone(&root_ds), spec.clone()))) + enclose!((config_ds, root_ds) move |t| + enclose!((config_ds, root_ds, spec) run(t, config_ds, root_ds, spec))))) })) }); } diff --git a/syndicate-server/src/services/debt_reporter.rs b/syndicate-server/src/services/debt_reporter.rs index 6932de4..f5934c6 100644 --- a/syndicate-server/src/services/debt_reporter.rs +++ b/syndicate-server/src/services/debt_reporter.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use syndicate::actor::*; +use syndicate::enclose; use syndicate::preserves_schema::Codec; use crate::language::language; @@ -11,8 +12,7 @@ use syndicate_macros::during; pub fn on_demand(t: &mut Activation, ds: Arc) { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { Ok(during!(t, ds, language(), , |t: &mut Activation| { - let ds = Arc::clone(&ds); - t.spawn_link(tracing::Span::current(), |t| run(t, ds)); + t.spawn_link(tracing::Span::current(), enclose!((ds) |t| run(t, ds))); Ok(()) })) }); diff --git a/syndicate-server/src/services/tcp_relay_listener.rs b/syndicate-server/src/services/tcp_relay_listener.rs index ff0cd05..9e13423 100644 --- a/syndicate-server/src/services/tcp_relay_listener.rs +++ b/syndicate-server/src/services/tcp_relay_listener.rs @@ -4,6 +4,7 @@ use std::convert::TryFrom; use std::sync::Arc; use syndicate::actor::*; +use syndicate::enclose; use syndicate::supervise::{Supervisor, SupervisorConfiguration}; use tokio::net::TcpListener; @@ -18,13 +19,12 @@ pub fn on_demand(t: &mut Activation, ds: Arc, gateway: Arc) { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { Ok(during!(t, ds, language(), , |t| { - let ds = Arc::clone(&ds); - let gateway = Arc::clone(&gateway); Supervisor::start( t, syndicate::name!(parent: None, "relay", addr = ?spec), SupervisorConfiguration::default(), - move |t| run(t, Arc::clone(&ds), Arc::clone(&gateway), spec.clone())); + enclose!((ds, gateway) move |t| + enclose!((ds, gateway, spec) run(t, ds, gateway, spec)))); Ok(()) })) }); @@ -49,11 +49,10 @@ fn run( tracing::info!("listening"); loop { let (stream, addr) = listener.accept().await?; - let gateway = Arc::clone(&gateway); Actor::new().boot(syndicate::name!(parent: parent_span.clone(), "conn"), - move |t| Ok(t.linked_task( + enclose!((gateway) move |t| Ok(t.linked_task( tracing::Span::current(), - detect_protocol(t.facet.clone(), stream, gateway, addr)))); + detect_protocol(t.facet.clone(), stream, gateway, addr))))); } }); Ok(()) diff --git a/syndicate-server/src/services/unix_relay_listener.rs b/syndicate-server/src/services/unix_relay_listener.rs index 76702a2..7ff27ac 100644 --- a/syndicate-server/src/services/unix_relay_listener.rs +++ b/syndicate-server/src/services/unix_relay_listener.rs @@ -5,6 +5,7 @@ use std::path::PathBuf; use std::sync::Arc; use syndicate::actor::*; +use syndicate::enclose; use syndicate::error::Error; use syndicate::relay; @@ -21,10 +22,8 @@ pub fn on_demand(t: &mut Activation, ds: Arc, gateway: Arc) { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { Ok(during!(t, ds, language(), , |t: &mut Activation| { - let ds = Arc::clone(&ds); - let gateway = Arc::clone(&gateway); t.spawn_link(syndicate::name!(parent: None, "relay", addr = ?spec), - |t| run(t, ds, gateway, spec)); + enclose!((ds, gateway) |t| run(t, ds, gateway, spec))); Ok(()) })) }); @@ -48,12 +47,11 @@ fn run( loop { let (stream, _addr) = listener.accept().await?; let peer = stream.peer_cred()?; - let gateway = Arc::clone(&gateway); Actor::new().boot( syndicate::name!(parent: parent_span.clone(), "conn", pid = ?peer.pid().unwrap_or(-1), uid = peer.uid()), - |t| Ok(t.linked_task( + enclose!((gateway) |t| Ok(t.linked_task( tracing::Span::current(), { let facet = t.facet.clone(); @@ -65,7 +63,7 @@ fn run( relay::Output::Bytes(Box::pin(o)), gateway) } - }))); + })))); } }); Ok(())