diff --git a/syndicate-macros/src/lib.rs b/syndicate-macros/src/lib.rs index e367106..5578d72 100644 --- a/syndicate-macros/src/lib.rs +++ b/syndicate-macros/src/lib.rs @@ -106,6 +106,8 @@ fn compile_pattern(v: &IOValue) -> TokenStream { let P_: TokenStream = quote!(syndicate::schemas::dataspace_patterns); #[allow(non_snake_case)] let V_: TokenStream = quote!(syndicate::value); + #[allow(non_snake_case)] + let MapFromIterator_: TokenStream = quote!(<#V_::Map<_, _> as std::iter::FromIterator<_>>::from_iter); match v.value() { Value::Symbol(s) => match s.as_str() { @@ -139,7 +141,7 @@ fn compile_pattern(v: &IOValue) -> TokenStream { label: #label_stx, arity: #arity .into(), }), - members: #V_::Map::from_iter(vec![#(#members),*]) + members: #MapFromIterator_(vec![#(#members),*]) }))) } } @@ -151,7 +153,7 @@ fn compile_pattern(v: &IOValue) -> TokenStream { ctor: Box::new(#P_::CArr { arity: #arity .into(), }), - members: #V_::Map::from_iter(vec![#(#members),*]) + members: #MapFromIterator_(vec![#(#members),*]) }))) } Value::Set(_) => @@ -164,7 +166,7 @@ fn compile_pattern(v: &IOValue) -> TokenStream { }).collect::>(); quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Dict { ctor: Box::new(#P_::CDict), - members: #V_::Map::from_iter(vec![#(#members),*]) + members: #MapFromIterator_(vec![#(#members),*]) }))) } _ => lit(compile_value(v)), diff --git a/syndicate-server/examples/consumer.rs b/syndicate-server/examples/consumer.rs index 6d93b63..54dda3c 100644 --- a/syndicate-server/examples/consumer.rs +++ b/syndicate-server/examples/consumer.rs @@ -1,4 +1,3 @@ -use std::iter::FromIterator; use std::sync::Arc; use structopt::StructOpt; diff --git a/syndicate-server/examples/pingpong.rs b/syndicate-server/examples/pingpong.rs index f579645..f9869ce 100644 --- a/syndicate-server/examples/pingpong.rs +++ b/syndicate-server/examples/pingpong.rs @@ -1,4 +1,3 @@ -use std::iter::FromIterator; use std::sync::Arc; use std::time::SystemTime; diff --git a/syndicate-server/examples/state-consumer.rs b/syndicate-server/examples/state-consumer.rs index fdbc01c..e3cb742 100644 --- a/syndicate-server/examples/state-consumer.rs +++ b/syndicate-server/examples/state-consumer.rs @@ -1,4 +1,3 @@ -use std::iter::FromIterator; use std::sync::Arc; use structopt::StructOpt; diff --git a/syndicate-server/src/gatekeeper.rs b/syndicate-server/src/gatekeeper.rs new file mode 100644 index 0000000..dffc62d --- /dev/null +++ b/syndicate-server/src/gatekeeper.rs @@ -0,0 +1,50 @@ +use std::sync::Arc; + +use syndicate::actor::*; +use syndicate::during::DuringResult; +use syndicate::schemas::gatekeeper; +use syndicate::value::NestedValue; + +pub fn handle_resolve( + ds: &mut Arc, + t: &mut Activation, + a: gatekeeper::Resolve, +) -> DuringResult> { + use syndicate::schemas::dataspace; + + let gatekeeper::Resolve { sturdyref, observer } = a; + let queried_oid = sturdyref.oid.clone(); + let handler = syndicate::entity(observer) + .on_asserted(move |observer, t, a: AnyValue| { + let bindings = a.value().to_sequence()?; + let key = bindings[0].value().to_bytestring()?; + let unattenuated_target = bindings[1].value().to_embedded()?; + match sturdyref.validate_and_attenuate(key, unattenuated_target) { + Err(e) => { + tracing::warn!(sturdyref = debug(&AnyValue::from(&sturdyref)), + "sturdyref failed validation: {}", e); + Ok(None) + }, + Ok(target) => { + tracing::trace!(sturdyref = debug(&AnyValue::from(&sturdyref)), + target = debug(&target), + "sturdyref resolved"); + if let Some(h) = observer.assert(t, AnyValue::domain(target)) { + Ok(Some(Box::new(move |_observer, t| Ok(t.retract(h))))) + } else { + Ok(None) + } + } + } + }) + .create_cap(t); + if let Some(oh) = ds.assert(t, &dataspace::Observe { + // TODO: codegen plugin to generate pattern constructors + pattern: syndicate_macros::pattern!(""), + observer: handler, + }) { + Ok(Some(Box::new(move |_ds, t| Ok(t.retract(oh))))) + } else { + Ok(None) + } +} diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index 53d1c1f..ad3525e 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -1,9 +1,3 @@ -use futures::SinkExt; -use futures::StreamExt; - -use std::future::ready; -use std::io; -use std::iter::FromIterator; use std::path::PathBuf; use std::sync::Arc; @@ -11,22 +5,13 @@ use structopt::StructOpt; use syndicate::actor::*; use syndicate::dataspace::*; -use syndicate::during::DuringResult; -use syndicate::error::Error; -use syndicate::error::error; -use syndicate::relay; -use syndicate::schemas::internal_protocol::_Any; -use syndicate::schemas::gatekeeper; use syndicate::sturdy; use syndicate::value::NestedValue; -use tokio::net::TcpListener; -use tokio::net::TcpStream; -use tokio::net::UnixListener; -use tokio::net::UnixStream; - -use tungstenite::Message; +mod gatekeeper; +mod protocol; +mod services; #[derive(Clone, StructOpt)] struct ServerConfig { @@ -74,61 +59,35 @@ async fn main() -> Result<(), Box> { tracing::trace!("startup"); - if config.debt_reporter { - Actor::new().boot(syndicate::name!("debt-reporter"), |t| { - t.linked_task(syndicate::name!("tick"), async { - let mut timer = tokio::time::interval(core::time::Duration::from_secs(1)); - loop { - timer.tick().await; - for (id, (name, debt)) in syndicate::actor::ACCOUNTS.read().unwrap().iter() { - let _enter = name.enter(); - tracing::info!(id, debt = debug( - debt.load(std::sync::atomic::Ordering::Relaxed))); - } - } - }); - Ok(()) - }); - } - Actor::new().boot(syndicate::name!("dataspace"), move |t| { let ds = Cap::new(&t.create(Dataspace::new())); { use syndicate::schemas::gatekeeper; let key = vec![0; 16]; - let sr = sturdy::SturdyRef::mint(_Any::new("syndicate"), &key); - tracing::info!(rootcap = debug(&_Any::from(&sr))); + let sr = sturdy::SturdyRef::mint(AnyValue::new("syndicate"), &key); + tracing::info!(rootcap = debug(&AnyValue::from(&sr))); tracing::info!(rootcap = display(sr.to_hex())); ds.assert(t, &gatekeeper::Bind { oid: sr.oid.clone(), key, target: ds.clone() }); } + if config.debt_reporter { + services::debt_reporter::spawn(t); + } + if config.inferior { - let ds = Arc::clone(&ds); - Actor::new().boot(syndicate::name!("parent"), move |t| run_io_relay( - t, - relay::Input::Bytes(Box::pin(tokio::io::stdin())), - relay::Output::Bytes(Box::pin(tokio::io::stdout())), - ds)); + services::stdio_relay_listener::spawn(t, Arc::clone(&ds)); } let gateway = Cap::guard(&t.create( - syndicate::entity(Arc::clone(&ds)).on_asserted(handle_resolve))); + syndicate::entity(Arc::clone(&ds)).on_asserted(gatekeeper::handle_resolve))); for port in config.ports.clone() { - let gateway = Arc::clone(&gateway); - Actor::new().boot( - syndicate::name!("tcp", port), - move |t| Ok(t.linked_task(syndicate::name!("listener"), - run_tcp_listener(gateway, port)))); + services::tcp_relay_listener::spawn(t, Arc::clone(&gateway), port); } for path in config.sockets.clone() { - let gateway = Arc::clone(&gateway); - Actor::new().boot( - syndicate::name!("unix", socket = debug(path.to_str().expect("representable UnixListener path"))), - move |t| Ok(t.linked_task(syndicate::name!("listener"), - run_unix_listener(gateway, path)))); + services::unix_relay_listener::spawn(t, Arc::clone(&gateway), path); } Ok(()) @@ -136,213 +95,3 @@ async fn main() -> Result<(), Box> { Ok(()) } - -//--------------------------------------------------------------------------- - -fn message_error(e: E) -> Error { - error(&e.to_string(), _Any::new(false)) -} - -fn extract_binary_packets( - r: Result, -) -> Result>, Error> { - match r { - Ok(m) => match m { - Message::Text(_) => - Err("Text websocket frames are not accepted")?, - Message::Binary(bs) => - Ok(Some(bs)), - Message::Ping(_) => - Ok(None), // pings are handled by tungstenite before we see them - Message::Pong(_) => - Ok(None), // unsolicited pongs are to be ignored - Message::Close(_) => - Ok(None), // we're about to see the end of the stream, so ignore this - }, - Err(e) => Err(message_error(e)), - } -} - -#[doc(hidden)] -struct ExitListener; - -impl Entity<()> for ExitListener { - fn exit_hook(&mut self, _t: &mut Activation, exit_status: &Arc) -> ActorResult { - tracing::info!(exit_status = debug(exit_status), "disconnect"); - Ok(()) - } -} - -fn run_io_relay( - t: &mut Activation, - i: relay::Input, - o: relay::Output, - initial_ref: Arc, -) -> ActorResult { - let exit_listener = t.create(ExitListener); - t.state.add_exit_hook(&exit_listener); - relay::TunnelRelay::run(t, i, o, Some(initial_ref), None); - Ok(()) -} - -fn run_connection( - facet: FacetRef, - i: relay::Input, - o: relay::Output, - initial_ref: Arc, -) -> ActorResult { - facet.activate(Account::new(syndicate::name!("start-session")), - |t| run_io_relay(t, i, o, initial_ref)) -} - -async fn detect_protocol( - facet: FacetRef, - stream: TcpStream, - gateway: Arc, - addr: std::net::SocketAddr, -) -> ActorResult { - let (i, o) = { - let mut buf = [0; 1]; // peek at the first byte to see what kind of connection to expect - match stream.peek(&mut buf).await? { - 1 => match buf[0] { - b'G' /* ASCII 'G' for "GET" */ => { - tracing::info!(protocol = display("websocket"), peer = debug(addr)); - let s = tokio_tungstenite::accept_async(stream).await - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - let (o, i) = s.split(); - let i = i.filter_map(|r| ready(extract_binary_packets(r).transpose())); - let o = o.sink_map_err(message_error).with(|bs| ready(Ok(Message::Binary(bs)))); - (relay::Input::Packets(Box::pin(i)), relay::Output::Packets(Box::pin(o))) - }, - _ => { - tracing::info!(protocol = display("raw"), peer = debug(addr)); - let (i, o) = stream.into_split(); - (relay::Input::Bytes(Box::pin(i)), - relay::Output::Bytes(Box::pin(o /* BufWriter::new(o) */))) - } - } - 0 => Err(error("closed before starting", _Any::new(false)))?, - _ => unreachable!() - } - }; - run_connection(facet, i, o, gateway) -} - -async fn run_tcp_listener( - gateway: Arc, - port: u16, -) -> ActorResult { - let listen_addr = format!("0.0.0.0:{}", port); - tracing::info!("Listening on {}", listen_addr); - let listener = TcpListener::bind(listen_addr).await?; - loop { - let (stream, addr) = listener.accept().await?; - let gateway = Arc::clone(&gateway); - let ac = Actor::new(); - ac.boot(syndicate::name!(parent: None, "tcp"), - move |t| Ok(t.linked_task( - tracing::Span::current(), - detect_protocol(t.facet.clone(), stream, gateway, addr)))); - } -} - -async fn run_unix_listener( - gateway: Arc, - path: PathBuf, -) -> ActorResult { - let path_str = path.to_str().expect("representable UnixListener path"); - tracing::info!("Listening on {:?}", path_str); - let listener = bind_unix_listener(&path).await?; - loop { - let (stream, _addr) = listener.accept().await?; - let peer = stream.peer_cred()?; - let gateway = Arc::clone(&gateway); - Actor::new().boot( - syndicate::name!(parent: None, "unix", pid = debug(peer.pid().unwrap_or(-1)), uid = peer.uid()), - |t| Ok(t.linked_task( - tracing::Span::current(), - { - let facet = t.facet.clone(); - async move { - tracing::info!(protocol = display("unix")); - let (i, o) = stream.into_split(); - run_connection(facet, - relay::Input::Bytes(Box::pin(i)), - relay::Output::Bytes(Box::pin(o)), - gateway) - } - }))); - } -} - -async fn bind_unix_listener(path: &PathBuf) -> Result { - match UnixListener::bind(path) { - Ok(s) => Ok(s), - Err(e) if e.kind() == io::ErrorKind::AddrInUse => { - // Potentially-stale socket file sitting around. Try - // connecting to it to see if it is alive, and remove it - // if not. - match UnixStream::connect(path).await { - Ok(_probe) => Err(e)?, // Someone's already there! Give up. - Err(f) if f.kind() == io::ErrorKind::ConnectionRefused => { - // Try to steal the socket. - tracing::info!("Cleaning stale socket"); - std::fs::remove_file(path)?; - Ok(UnixListener::bind(path)?) - } - Err(f) => { - tracing::error!(error = debug(f), - "Problem while probing potentially-stale socket"); - return Err(e)? // signal the *original* error, not the probe error - } - } - }, - Err(e) => Err(e)?, - } -} - -//--------------------------------------------------------------------------- - -fn handle_resolve( - ds: &mut Arc, - t: &mut Activation, - a: gatekeeper::Resolve, -) -> DuringResult> { - use syndicate::schemas::dataspace; - - let gatekeeper::Resolve { sturdyref, observer } = a; - let queried_oid = sturdyref.oid.clone(); - let handler = syndicate::entity(observer) - .on_asserted(move |observer, t, a: _Any| { - let bindings = a.value().to_sequence()?; - let key = bindings[0].value().to_bytestring()?; - let unattenuated_target = bindings[1].value().to_embedded()?; - match sturdyref.validate_and_attenuate(key, unattenuated_target) { - Err(e) => { - tracing::warn!(sturdyref = debug(&_Any::from(&sturdyref)), - "sturdyref failed validation: {}", e); - Ok(None) - }, - Ok(target) => { - tracing::trace!(sturdyref = debug(&_Any::from(&sturdyref)), - target = debug(&target), - "sturdyref resolved"); - if let Some(h) = observer.assert(t, _Any::domain(target)) { - Ok(Some(Box::new(move |_observer, t| Ok(t.retract(h))))) - } else { - Ok(None) - } - } - } - }) - .create_cap(t); - if let Some(oh) = ds.assert(t, &dataspace::Observe { - // TODO: codegen plugin to generate pattern constructors - pattern: syndicate_macros::pattern!(""), - observer: handler, - }) { - Ok(Some(Box::new(move |_ds, t| Ok(t.retract(oh))))) - } else { - Ok(None) - } -} diff --git a/syndicate-server/src/protocol.rs b/syndicate-server/src/protocol.rs new file mode 100644 index 0000000..089c097 --- /dev/null +++ b/syndicate-server/src/protocol.rs @@ -0,0 +1,104 @@ +use futures::SinkExt; +use futures::StreamExt; + +use std::future::ready; +use std::io; +use std::sync::Arc; + +use syndicate::actor::*; +use syndicate::error::Error; +use syndicate::error::error; +use syndicate::relay; +use syndicate::value::NestedValue; + +use tokio::net::TcpStream; + +use tungstenite::Message; + +struct ExitListener; + +impl Entity<()> for ExitListener { + fn exit_hook(&mut self, _t: &mut Activation, exit_status: &Arc) -> ActorResult { + tracing::info!(exit_status = debug(exit_status), "disconnect"); + Ok(()) + } +} + +pub fn run_io_relay( + t: &mut Activation, + i: relay::Input, + o: relay::Output, + initial_ref: Arc, +) -> ActorResult { + let exit_listener = t.create(ExitListener); + t.state.add_exit_hook(&exit_listener); + relay::TunnelRelay::run(t, i, o, Some(initial_ref), None); + Ok(()) +} + +pub fn run_connection( + facet: FacetRef, + i: relay::Input, + o: relay::Output, + initial_ref: Arc, +) -> ActorResult { + facet.activate(Account::new(syndicate::name!("start-session")), + |t| run_io_relay(t, i, o, initial_ref)) +} + +pub async fn detect_protocol( + facet: FacetRef, + stream: TcpStream, + gateway: Arc, + addr: std::net::SocketAddr, +) -> ActorResult { + let (i, o) = { + let mut buf = [0; 1]; // peek at the first byte to see what kind of connection to expect + match stream.peek(&mut buf).await? { + 1 => match buf[0] { + b'G' /* ASCII 'G' for "GET" */ => { + tracing::info!(protocol = display("websocket"), peer = debug(addr)); + let s = tokio_tungstenite::accept_async(stream).await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + let (o, i) = s.split(); + let i = i.filter_map(|r| ready(extract_binary_packets(r).transpose())); + let o = o.sink_map_err(message_error).with(|bs| ready(Ok(Message::Binary(bs)))); + (relay::Input::Packets(Box::pin(i)), relay::Output::Packets(Box::pin(o))) + }, + _ => { + tracing::info!(protocol = display("raw"), peer = debug(addr)); + let (i, o) = stream.into_split(); + (relay::Input::Bytes(Box::pin(i)), + relay::Output::Bytes(Box::pin(o /* BufWriter::new(o) */))) + } + } + 0 => Err(error("closed before starting", AnyValue::new(false)))?, + _ => unreachable!() + } + }; + run_connection(facet, i, o, gateway) +} + +fn message_error(e: E) -> Error { + error(&e.to_string(), AnyValue::new(false)) +} + +fn extract_binary_packets( + r: Result, +) -> Result>, Error> { + match r { + Ok(m) => match m { + Message::Text(_) => + Err("Text websocket frames are not accepted")?, + Message::Binary(bs) => + Ok(Some(bs)), + Message::Ping(_) => + Ok(None), // pings are handled by tungstenite before we see them + Message::Pong(_) => + Ok(None), // unsolicited pongs are to be ignored + Message::Close(_) => + Ok(None), // we're about to see the end of the stream, so ignore this + }, + Err(e) => Err(message_error(e)), + } +} diff --git a/syndicate-server/src/services/debt_reporter.rs b/syndicate-server/src/services/debt_reporter.rs new file mode 100644 index 0000000..139e442 --- /dev/null +++ b/syndicate-server/src/services/debt_reporter.rs @@ -0,0 +1,18 @@ +use syndicate::actor::*; + +pub fn spawn(t: &mut Activation) { + t.spawn(syndicate::name!("debt-reporter"), |t| { + t.linked_task(syndicate::name!("tick"), async { + let mut timer = tokio::time::interval(core::time::Duration::from_secs(1)); + loop { + timer.tick().await; + for (id, (name, debt)) in syndicate::actor::ACCOUNTS.read().unwrap().iter() { + let _enter = name.enter(); + tracing::info!(id, debt = debug( + debt.load(std::sync::atomic::Ordering::Relaxed))); + } + } + }); + Ok(()) + }); +} diff --git a/syndicate-server/src/services/mod.rs b/syndicate-server/src/services/mod.rs new file mode 100644 index 0000000..a0dfb57 --- /dev/null +++ b/syndicate-server/src/services/mod.rs @@ -0,0 +1,4 @@ +pub mod debt_reporter; +pub mod stdio_relay_listener; +pub mod tcp_relay_listener; +pub mod unix_relay_listener; diff --git a/syndicate-server/src/services/stdio_relay_listener.rs b/syndicate-server/src/services/stdio_relay_listener.rs new file mode 100644 index 0000000..539959f --- /dev/null +++ b/syndicate-server/src/services/stdio_relay_listener.rs @@ -0,0 +1,14 @@ +use std::sync::Arc; + +use syndicate::actor::*; +use syndicate::relay; + +use crate::protocol::run_io_relay; + +pub fn spawn(t: &mut Activation, ds: Arc) { + t.spawn(syndicate::name!("parent"), move |t| run_io_relay( + t, + relay::Input::Bytes(Box::pin(tokio::io::stdin())), + relay::Output::Bytes(Box::pin(tokio::io::stdout())), + ds)) +} diff --git a/syndicate-server/src/services/tcp_relay_listener.rs b/syndicate-server/src/services/tcp_relay_listener.rs new file mode 100644 index 0000000..2c36431 --- /dev/null +++ b/syndicate-server/src/services/tcp_relay_listener.rs @@ -0,0 +1,29 @@ +use std::sync::Arc; + +use syndicate::actor::*; + +use tokio::net::TcpListener; + +use crate::protocol::detect_protocol; + +pub fn spawn(t: &mut Activation, gateway: Arc, port: u16) { + t.spawn(syndicate::name!("tcp", port), + move |t| Ok(t.linked_task(syndicate::name!("listener"), run(gateway, port)))) +} + +pub async fn run( + gateway: Arc, + port: u16, +) -> ActorResult { + let listen_addr = format!("0.0.0.0:{}", port); + tracing::info!("Listening on {}", listen_addr); + let listener = TcpListener::bind(listen_addr).await?; + loop { + let (stream, addr) = listener.accept().await?; + let gateway = Arc::clone(&gateway); + Actor::new().boot(syndicate::name!(parent: None, "tcp"), + move |t| Ok(t.linked_task( + tracing::Span::current(), + detect_protocol(t.facet.clone(), stream, gateway, addr)))); + } +} diff --git a/syndicate-server/src/services/unix_relay_listener.rs b/syndicate-server/src/services/unix_relay_listener.rs new file mode 100644 index 0000000..92fafc5 --- /dev/null +++ b/syndicate-server/src/services/unix_relay_listener.rs @@ -0,0 +1,72 @@ +use std::io; +use std::path::PathBuf; +use std::sync::Arc; + +use syndicate::actor::*; +use syndicate::error::Error; +use syndicate::relay; + +use tokio::net::UnixListener; +use tokio::net::UnixStream; + +use crate::protocol::run_connection; + +pub fn spawn(t: &mut Activation, gateway: Arc, path: PathBuf) { + t.spawn(syndicate::name!("unix", socket = debug(path.to_str().expect("representable UnixListener path"))), + |t| Ok(t.linked_task(syndicate::name!("listener!"), run(gateway, path)))) +} + +pub async fn run( + gateway: Arc, + path: PathBuf, +) -> ActorResult { + let path_str = path.to_str().expect("representable UnixListener path"); + tracing::info!("Listening on {:?}", path_str); + let listener = bind_unix_listener(&path).await?; + loop { + let (stream, _addr) = listener.accept().await?; + let peer = stream.peer_cred()?; + let gateway = Arc::clone(&gateway); + Actor::new().boot( + syndicate::name!(parent: None, "unix", pid = debug(peer.pid().unwrap_or(-1)), uid = peer.uid()), + |t| Ok(t.linked_task( + tracing::Span::current(), + { + let facet = t.facet.clone(); + async move { + tracing::info!(protocol = display("unix")); + let (i, o) = stream.into_split(); + run_connection(facet, + relay::Input::Bytes(Box::pin(i)), + relay::Output::Bytes(Box::pin(o)), + gateway) + } + }))); + } +} + +pub async fn bind_unix_listener(path: &PathBuf) -> Result { + match UnixListener::bind(path) { + Ok(s) => Ok(s), + Err(e) if e.kind() == io::ErrorKind::AddrInUse => { + // Potentially-stale socket file sitting around. Try + // connecting to it to see if it is alive, and remove it + // if not. + match UnixStream::connect(path).await { + Ok(_probe) => Err(e)?, // Someone's already there! Give up. + Err(f) if f.kind() == io::ErrorKind::ConnectionRefused => { + // Try to steal the socket. + tracing::info!("Cleaning stale socket"); + std::fs::remove_file(path)?; + Ok(UnixListener::bind(path)?) + } + Err(f) => { + tracing::error!(error = debug(f), + "Problem while probing potentially-stale socket"); + return Err(e)? // signal the *original* error, not the probe error + } + } + }, + Err(e) => Err(e)?, + } +} diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index 4651b1b..bff25f5 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -765,6 +765,23 @@ impl<'activation> Activation<'activation> { } } + fn enqueue_for_myself_at_commit(&mut self, action: Action) { + let mailbox = self.state.mailbox(); + self.pending.queue_for_mailbox(&mailbox).push(action); + } + + /// Schedule the creation of a new actor when the Activation commits. + pub fn spawn ActorResult>( + &mut self, + name: tracing::Span, + boot: F, + ) { + self.enqueue_for_myself_at_commit(Box::new(move |_| { + Actor::new().boot(name, boot); + Ok(()) + })); + } + /// Create a new subfacet of the currently-active facet. Runs `boot` in the new facet's /// context. If `boot` returns leaving the new facet [inert][Facet#inert-facets], pub fn facet ActorResult>( @@ -810,18 +827,16 @@ impl<'activation> Activation<'activation> { /// yet, none of the shutdown handlers yields an error, and the facet's parent facet is /// alive, executes `continuation` in the parent facet's context. pub fn stop_facet(&mut self, facet_id: FacetId, continuation: Option) { - let mailbox = self.state.mailbox(); let maybe_parent_id = self.active_facet().and_then(|f| f.parent_facet_id); - self.pending.queue_for_mailbox(&mailbox).push(Box::new( - move |t| { - t._terminate_facet(facet_id, true)?; - if let Some(k) = continuation { - if let Some(parent_id) = maybe_parent_id { - t.with_facet(true, parent_id, k)?; - } + self.enqueue_for_myself_at_commit(Box::new(move |t| { + t._terminate_facet(facet_id, true)?; + if let Some(k) = continuation { + if let Some(parent_id) = maybe_parent_id { + t.with_facet(true, parent_id, k)?; } - Ok(()) - })); + } + Ok(()) + })); } /// Arranges for the active facet to be stopped cleanly when `self` commits.