diff --git a/syndicate-macros/examples/box-and-client.rs b/syndicate-macros/examples/box-and-client.rs index 0fe5b97..c1c3124 100644 --- a/syndicate-macros/examples/box-and-client.rs +++ b/syndicate-macros/examples/box-and-client.rs @@ -9,7 +9,7 @@ use syndicate::value::NestedValue; async fn main() -> Result<(), Box> { syndicate::convenient_logging()?; Actor::new(None).boot(tracing::Span::current(), |t| { - let ds = Cap::new(&t.create(Dataspace::new())); + let ds = Cap::new(&t.create(Dataspace::new(None))); let _ = t.prevent_inert_check(); t.spawn(syndicate::name!("box"), enclose!((ds) move |t| { diff --git a/syndicate-server/src/protocol.rs b/syndicate-server/src/protocol.rs index d19606e..3b439cc 100644 --- a/syndicate-server/src/protocol.rs +++ b/syndicate-server/src/protocol.rs @@ -41,9 +41,9 @@ pub fn run_connection( i: relay::Input, o: relay::Output, initial_ref: Arc, -) -> ActorResult { +) { facet.activate(Account::new(syndicate::name!("start-session")), - |t| run_io_relay(t, i, o, initial_ref)) + |t| run_io_relay(t, i, o, initial_ref)); } pub async fn detect_protocol( @@ -76,7 +76,8 @@ pub async fn detect_protocol( _ => unreachable!() } }; - run_connection(facet, i, o, gateway) + run_connection(facet, i, o, gateway); + Ok(()) } fn message_error(e: E) -> Error { diff --git a/syndicate-server/src/services/config_watcher.rs b/syndicate-server/src/services/config_watcher.rs index a6825db..21e144d 100644 --- a/syndicate-server/src/services/config_watcher.rs +++ b/syndicate-server/src/services/config_watcher.rs @@ -181,14 +181,17 @@ fn run( let mut path_state: Map = Map::new(); - { - facet.activate(Account::new(syndicate::name!("initial_scan")), |t| { + if !facet.activate( + Account::new(syndicate::name!("initial_scan")), + |t| { initial_scan(t, &mut path_state, &config_ds, env.clone()); config_ds.assert(t, language(), &lifecycle::ready(&spec)); Ok(()) - }).unwrap(); - tracing::trace!("initial_scan complete"); + }) + { + return; } + tracing::trace!("initial_scan complete"); let mut rescan = |paths: Vec| { facet.activate(Account::new(syndicate::name!("rescan")), |t| { @@ -209,15 +212,15 @@ fn run( t.stop_facet(facet_id); } Ok(()) - }).unwrap() + }) }; while let Ok(event) = rx.recv() { tracing::trace!("notification: {:?}", &event); - match event { + let keep_running = match event { DebouncedEvent::NoticeWrite(_p) | DebouncedEvent::NoticeRemove(_p) => - (), + true, DebouncedEvent::Create(p) | DebouncedEvent::Write(p) | DebouncedEvent::Chmod(p) | @@ -225,12 +228,15 @@ fn run( rescan(vec![p]), DebouncedEvent::Rename(p, q) => rescan(vec![p, q]), - _ => - tracing::info!("{:?}", event), - } + _ => { + tracing::info!("{:?}", event); + true + } + }; + if !keep_running { break; } } - let _ = facet.activate(Account::new(syndicate::name!("termination")), |t| { + facet.activate(Account::new(syndicate::name!("termination")), |t| { tracing::trace!("linked thread terminating associated facet"); Ok(t.stop()) }); diff --git a/syndicate-server/src/services/daemon.rs b/syndicate-server/src/services/daemon.rs index 79c22bd..dca5ffb 100644 --- a/syndicate-server/src/services/daemon.rs +++ b/syndicate-server/src/services/daemon.rs @@ -249,7 +249,7 @@ impl DaemonInstance { Err(_) => AnyValue::bytestring(buf), }; let now = AnyValue::new(chrono::Utc::now().to_rfc3339()); - if facet.activate( + if !facet.activate( Account::new(tracing::Span::current()), enclose!((pid, service, kind) |t| { log_ds.message(t, &(), &syndicate_macros::template!( @@ -260,7 +260,7 @@ impl DaemonInstance { line: =buf, }>")); Ok(()) - })).is_err() + })) { break; } @@ -313,7 +313,7 @@ impl DaemonInstance { facet.activate(Account::new(syndicate::name!("instance-terminated")), |t| { let m = if status.success() { None } else { Some(format!("{}", status)) }; self.handle_exit(t, m) - })?; + }); Ok(LinkedTaskTermination::Normal) })); Ok(()) @@ -443,7 +443,7 @@ fn run( facet.activate(Account::new(syndicate::name!("instance-startup")), |t| { daemon_instance.start(t) - })?; + }); Ok(LinkedTaskTermination::KeepFacet) }); Ok(()) diff --git a/syndicate-server/src/services/tcp_relay_listener.rs b/syndicate-server/src/services/tcp_relay_listener.rs index 40cf177..f1bad04 100644 --- a/syndicate-server/src/services/tcp_relay_listener.rs +++ b/syndicate-server/src/services/tcp_relay_listener.rs @@ -36,27 +36,38 @@ fn run(t: &mut Activation, ds: Arc, spec: TcpRelayListener) -> ActorResult t.linked_task(syndicate::name!("listener"), async move { let listen_addr = format!("{}:{}", host, port); let listener = TcpListener::bind(listen_addr).await?; - facet.activate(Account::new(syndicate::name!("readiness")), |t| { - tracing::info!("listening"); - ds.assert(t, language(), &lifecycle::ready(&spec)); - Ok(()) - })?; + + if !facet.activate( + Account::new(syndicate::name!("readiness")), |t| { + tracing::info!("listening"); + ds.assert(t, language(), &lifecycle::ready(&spec)); + Ok(()) + }) + { + return Ok(LinkedTaskTermination::Normal); + } + loop { let (stream, addr) = listener.accept().await?; let gatekeeper = spec.gatekeeper.clone(); let name = syndicate::name!(parent: parent_span.clone(), "conn"); - facet.activate(Account::new(name.clone()), move |t| { - t.spawn(name, move |t| { - Ok(t.linked_task(tracing::Span::current(), { - let facet = t.facet.clone(); - async move { - detect_protocol(facet, stream, gatekeeper, addr).await?; - Ok(LinkedTaskTermination::KeepFacet) - } - })) - }); - Ok(()) - })?; + if !facet.activate( + Account::new(name.clone()), + move |t| { + t.spawn(name, move |t| { + Ok(t.linked_task(tracing::Span::current(), { + let facet = t.facet.clone(); + async move { + detect_protocol(facet, stream, gatekeeper, addr).await?; + Ok(LinkedTaskTermination::KeepFacet) + } + })) + }); + Ok(()) + }) + { + return Ok(LinkedTaskTermination::Normal); + } } }); Ok(()) diff --git a/syndicate-server/src/services/unix_relay_listener.rs b/syndicate-server/src/services/unix_relay_listener.rs index 1aedb4e..4efc1f2 100644 --- a/syndicate-server/src/services/unix_relay_listener.rs +++ b/syndicate-server/src/services/unix_relay_listener.rs @@ -38,11 +38,17 @@ fn run(t: &mut Activation, ds: Arc, spec: UnixRelayListener) -> ActorResult let facet = t.facet.clone(); t.linked_task(syndicate::name!("listener"), async move { let listener = bind_unix_listener(&PathBuf::from(path_str)).await?; - facet.activate(Account::new(syndicate::name!("readiness")), |t| { - tracing::info!("listening"); - ds.assert(t, language(), &lifecycle::ready(&spec)); - Ok(()) - })?; + + if !facet.activate( + Account::new(syndicate::name!("readiness")), |t| { + tracing::info!("listening"); + ds.assert(t, language(), &lifecycle::ready(&spec)); + Ok(()) + }) + { + return Ok(LinkedTaskTermination::Normal); + } + loop { let (stream, _addr) = listener.accept().await?; let peer = stream.peer_cred()?; @@ -50,23 +56,28 @@ fn run(t: &mut Activation, ds: Arc, spec: UnixRelayListener) -> ActorResult let name = syndicate::name!(parent: parent_span.clone(), "conn", pid = ?peer.pid().unwrap_or(-1), uid = peer.uid()); - facet.activate(Account::new(name.clone()), move |t| { - t.spawn(name, |t| { - Ok(t.linked_task(tracing::Span::current(), { - let facet = t.facet.clone(); - async move { - tracing::info!(protocol = %"unix"); - let (i, o) = stream.into_split(); - run_connection(facet, - relay::Input::Bytes(Box::pin(i)), - relay::Output::Bytes(Box::pin(o)), - gatekeeper)?; - Ok(LinkedTaskTermination::KeepFacet) - } - })) - }); - Ok(()) - })?; + if !facet.activate( + Account::new(name.clone()), + move |t| { + t.spawn(name, |t| { + Ok(t.linked_task(tracing::Span::current(), { + let facet = t.facet.clone(); + async move { + tracing::info!(protocol = %"unix"); + let (i, o) = stream.into_split(); + run_connection(facet, + relay::Input::Bytes(Box::pin(i)), + relay::Output::Bytes(Box::pin(o)), + gatekeeper); + Ok(LinkedTaskTermination::KeepFacet) + } + })) + }); + Ok(()) + }) + { + return Ok(LinkedTaskTermination::Normal); + } } }); Ok(()) diff --git a/syndicate/benches/bench_dataspace.rs b/syndicate/benches/bench_dataspace.rs index 4304869..ce59a4f 100644 --- a/syndicate/benches/bench_dataspace.rs +++ b/syndicate/benches/bench_dataspace.rs @@ -53,7 +53,7 @@ pub fn bench_pub(c: &mut Criterion) { let start = Instant::now(); rt.block_on(async move { Actor::new(None).boot(syndicate::name!("dataspace"), move |t| { - let ds = t.create(Dataspace::new()); + let ds = t.create(Dataspace::new(None)); let shutdown = t.create(ShutdownEntity); let account = Account::new(syndicate::name!("sender-account")); t.linked_task(syndicate::name!("sender"), async move { @@ -88,7 +88,7 @@ pub fn bench_pub(c: &mut Criterion) { let turn_count = Arc::clone(&turn_count); move |t| { - let ds = Cap::new(&t.create(Dataspace::new())); + let ds = Cap::new(&t.create(Dataspace::new(None))); let shutdown = entity(()) .on_asserted(|_, _, _| Ok(Some(Box::new(|_, t| Ok(t.stop()))))) diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index 8eeb39f..3e179d6 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -7,7 +7,6 @@ use super::dataflow::Graph; use super::error::Error; -use super::error::encode_error; use super::error::error; use super::rewrite::CaveatError; use super::rewrite::CheckedCaveat; @@ -556,12 +555,14 @@ impl FacetRef { /// [commits the turn][Activation::deliver] and performs the buffered actions; otherwise, /// [abandons the turn][Activation::clear] and discards the buffered actions. /// + /// Returns `true` if, at the end of the activation, `actor` had not yet terminated. + /// /// Bills any activity to `account`. pub fn activate( &self, account: Arc, f: F, - ) -> ActorResult where + ) -> bool where F: FnOnce(&mut Activation) -> ActorResult, { self.activate_exit(account, |t| f(t).into()) @@ -572,25 +573,26 @@ impl FacetRef { /// returns `None`, leaves `actor` in runnable state. [Commits buffered /// actions][Activation::deliver] unless `actor` terminates with an `Err` status. /// + /// Returns `true` if, at the end of the activation, `actor` had not yet terminated. + /// /// Bills any activity to `account`. pub fn activate_exit( &self, account: Arc, f: F, - ) -> ActorResult where + ) -> bool where F: FnOnce(&mut Activation) -> RunDisposition, { let mut g = self.actor.state.lock(); match &mut *g { - ActorState::Terminated { exit_status } => - Err(error("Could not activate terminated actor", - encode_error((**exit_status).clone()))), + ActorState::Terminated { .. } => + false, ActorState::Running(state) => { tracing::trace!(actor_id=?self.actor.actor_id, "activate"); let mut activation = Activation::make(self, account, state); let f_result = f(&mut activation); - let result = match activation.restore_invariants(f_result) { - RunDisposition::Continue => Ok(()), + let is_alive = match activation.restore_invariants(f_result) { + RunDisposition::Continue => true, RunDisposition::Terminate(exit_status) => { if exit_status.is_err() { activation.clear(); @@ -598,14 +600,12 @@ impl FacetRef { drop(activation); let exit_status = Arc::new(exit_status); state.cleanup(&self.actor, &exit_status); - *g = ActorState::Terminated { - exit_status: Arc::clone(&exit_status), - }; - (*exit_status).clone() + *g = ActorState::Terminated { exit_status }; + false } }; tracing::trace!(actor_id=?self.actor.actor_id, "deactivate"); - result + is_alive } } } @@ -933,17 +933,16 @@ impl<'activation> Activation<'activation> { } } }; - let _ = facet.activate( - Account::new(crate::name!("release_linked_task")), |t| { - if let Some(f) = t.active_facet() { - tracing::trace!(task_id, "cancellation token removed"); - f.linked_tasks.remove(&task_id); - } - if let LinkedTaskTermination::Normal = result { - t.stop(); - } - Ok(()) - }); + facet.activate(Account::new(crate::name!("release_linked_task")), |t| { + if let Some(f) = t.active_facet() { + tracing::trace!(task_id, "cancellation token removed"); + f.linked_tasks.remove(&task_id); + } + if let LinkedTaskTermination::Normal = result { + t.stop(); + } + Ok(()) + }); Ok::<(), Error>(()) }.instrument(name)); } @@ -976,8 +975,9 @@ impl<'activation> Activation<'activation> { loop { timer.tick().await; let _entry = span.enter(); - facet.activate(Arc::clone(&account), |t| a(t))?; + if !facet.activate(Arc::clone(&account), |t| a(t)) { break; } } + Ok(LinkedTaskTermination::Normal) }); Ok(()) } @@ -996,7 +996,7 @@ impl<'activation> Activation<'activation> { self.linked_task(crate::name!(parent: None, "Activation::at"), async move { tokio::time::sleep_until(instant.into()).await; let _entry = span.enter(); - facet.activate(account, a)?; + facet.activate(account, a); Ok(LinkedTaskTermination::KeepFacet) }); } @@ -1031,7 +1031,7 @@ impl<'activation> Activation<'activation> { let facet_id = self.facet.facet_id; self.pending.for_myself.push(Box::new(move |t| { t.with_facet(true, facet_id, move |t| { - ac.link(t).boot(name, boot); + ac.link(t)?.boot(name, boot); Ok(()) }) })); @@ -1526,16 +1526,21 @@ impl Actor { Actor { rx, ac_ref } } - fn link(self, t_parent: &mut Activation) -> Self { + fn link(self, t_parent: &mut Activation) -> Result { if t_parent.active_facet().is_none() { panic!("No active facet when calling spawn_link"); } - self.ac_ref.root_facet_ref().activate(Account::new(crate::name!("link")), |t_child| { - t_parent.half_link(t_child); - t_child.half_link(t_parent); - Ok(()) - }).expect("Failed during link"); - self + let is_alive = self.ac_ref.root_facet_ref().activate( + Account::new(crate::name!("link")), |t_child| { + t_parent.half_link(t_child); + t_child.half_link(t_parent); + Ok(()) + }); + if is_alive { + Ok(self) + } else { + Err(error("spawn_link'd actor terminated before link could happen", AnyValue::new(false))) + } } /// Start the actor's mainloop. Takes ownership of `self`. The @@ -1571,11 +1576,11 @@ impl Actor { let root_facet_ref = self.ac_ref.root_facet_ref(); let terminate = |result: ActorResult| { - let _ = root_facet_ref.activate_exit(Account::new(crate::name!("shutdown")), - |_| RunDisposition::Terminate(result)); + root_facet_ref.activate_exit(Account::new(crate::name!("shutdown")), + |_| RunDisposition::Terminate(result)); }; - if root_facet_ref.activate(Account::new(crate::name!("boot")), boot).is_err() { + if !root_facet_ref.activate(Account::new(crate::name!("boot")), boot) { return; } @@ -1600,11 +1605,12 @@ impl Actor { SystemMessage::Turn(mut loaned_item) => { tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::Turn"); let actions = std::mem::take(&mut loaned_item.item); - let r = root_facet_ref.activate(Arc::clone(&loaned_item.account), |t| { - for action in actions.into_iter() { action(t)? } - Ok(()) - }); - if r.is_err() { return; } + let is_alive = root_facet_ref.activate( + Arc::clone(&loaned_item.account), |t| { + for action in actions.into_iter() { action(t)? } + Ok(()) + }); + if !is_alive { return; } } SystemMessage::Crash(e) => { tracing::trace!(actor_id = ?self.ac_ref.actor_id, diff --git a/syndicate/src/relay.rs b/syndicate/src/relay.rs index eb4d732..17510ed 100644 --- a/syndicate/src/relay.rs +++ b/syndicate/src/relay.rs @@ -624,12 +624,15 @@ async fn input_loop( loop { account.ensure_clear_funds().await; match src.next().await { - None => return Ok(LinkedTaskTermination::Normal), - Some(bs) => facet.activate(Arc::clone(&account), |t| { - let mut g = relay.lock(); - let tr = g.as_mut().expect("initialized"); - tr.handle_inbound_datagram(t, &bs?) - })?, + None => break, + Some(bs) => { + let is_alive = facet.activate(Arc::clone(&account), |t| { + let mut g = relay.lock(); + let tr = g.as_mut().expect("initialized"); + tr.handle_inbound_datagram(t, &bs?) + }); + if !is_alive { break; } + } } } } @@ -643,22 +646,26 @@ async fn input_loop( Ok(n) => n, Err(e) => if e.kind() == io::ErrorKind::ConnectionReset { - return Ok(LinkedTaskTermination::Normal); + break; } else { return Err(e)?; }, }; match n { - 0 => return Ok(LinkedTaskTermination::Normal), - _ => facet.activate(Arc::clone(&account), |t| { - let mut g = relay.lock(); - let tr = g.as_mut().expect("initialized"); - tr.handle_inbound_stream(t, &mut buf) - })?, + 0 => break, + _ => { + let is_alive = facet.activate(Arc::clone(&account), |t| { + let mut g = relay.lock(); + let tr = g.as_mut().expect("initialized"); + tr.handle_inbound_stream(t, &mut buf) + }); + if !is_alive { break; } + } } } } } + Ok(LinkedTaskTermination::Normal) } async fn output_loop(