From b2ee8daf4b1c71ed506849ad27c553ecfe6efda0 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sat, 15 Jan 2022 23:24:34 +0100 Subject: [PATCH] (Failed?) Experiment: alternate approach to reporting Facet::activate failures --- syndicate-server/src/protocol.rs | 2 +- .../src/services/config_watcher.rs | 44 ++++++---- syndicate-server/src/services/daemon.rs | 8 +- .../src/services/tcp_relay_listener.rs | 8 +- .../src/services/unix_relay_listener.rs | 8 +- syndicate/src/actor.rs | 82 +++++++++++++++---- syndicate/src/relay.rs | 26 +++--- 7 files changed, 122 insertions(+), 56 deletions(-) diff --git a/syndicate-server/src/protocol.rs b/syndicate-server/src/protocol.rs index d19606e..2a3eea2 100644 --- a/syndicate-server/src/protocol.rs +++ b/syndicate-server/src/protocol.rs @@ -43,7 +43,7 @@ pub fn run_connection( initial_ref: Arc, ) -> ActorResult { facet.activate(Account::new(syndicate::name!("start-session")), - |t| run_io_relay(t, i, o, initial_ref)) + |t| run_io_relay(t, i, o, initial_ref)).into() } pub async fn detect_protocol( diff --git a/syndicate-server/src/services/config_watcher.rs b/syndicate-server/src/services/config_watcher.rs index a6825db..45206be 100644 --- a/syndicate-server/src/services/config_watcher.rs +++ b/syndicate-server/src/services/config_watcher.rs @@ -181,14 +181,16 @@ fn run( let mut path_state: Map = Map::new(); - { - facet.activate(Account::new(syndicate::name!("initial_scan")), |t| { + let initial_scan_result = facet.activate( + Account::new(syndicate::name!("initial_scan")), |t| { initial_scan(t, &mut path_state, &config_ds, env.clone()); config_ds.assert(t, language(), &lifecycle::ready(&spec)); Ok(()) - }).unwrap(); - tracing::trace!("initial_scan complete"); + }); + if initial_scan_result.is_already_terminated() { + return; } + tracing::trace!("initial_scan complete"); let mut rescan = |paths: Vec| { facet.activate(Account::new(syndicate::name!("rescan")), |t| { @@ -209,24 +211,30 @@ fn run( t.stop_facet(facet_id); } Ok(()) - }).unwrap() + }).as_result() }; while let Ok(event) = rx.recv() { tracing::trace!("notification: {:?}", &event); - match event { - DebouncedEvent::NoticeWrite(_p) | - DebouncedEvent::NoticeRemove(_p) => - (), - DebouncedEvent::Create(p) | - DebouncedEvent::Write(p) | - DebouncedEvent::Chmod(p) | - DebouncedEvent::Remove(p) => - rescan(vec![p]), - DebouncedEvent::Rename(p, q) => - rescan(vec![p, q]), - _ => - tracing::info!("{:?}", event), + if + match event { + DebouncedEvent::NoticeWrite(_p) | + DebouncedEvent::NoticeRemove(_p) => + Ok(()), + DebouncedEvent::Create(p) | + DebouncedEvent::Write(p) | + DebouncedEvent::Chmod(p) | + DebouncedEvent::Remove(p) => + rescan(vec![p]), + DebouncedEvent::Rename(p, q) => + rescan(vec![p, q]), + _ => { + tracing::info!("{:?}", event); + Ok(()) + } + }.is_err() + { + return; } } diff --git a/syndicate-server/src/services/daemon.rs b/syndicate-server/src/services/daemon.rs index 79c22bd..2db9472 100644 --- a/syndicate-server/src/services/daemon.rs +++ b/syndicate-server/src/services/daemon.rs @@ -249,7 +249,7 @@ impl DaemonInstance { Err(_) => AnyValue::bytestring(buf), }; let now = AnyValue::new(chrono::Utc::now().to_rfc3339()); - if facet.activate( + if !facet.activate( Account::new(tracing::Span::current()), enclose!((pid, service, kind) |t| { log_ds.message(t, &(), &syndicate_macros::template!( @@ -260,7 +260,7 @@ impl DaemonInstance { line: =buf, }>")); Ok(()) - })).is_err() + })).is_success() { break; } @@ -313,7 +313,7 @@ impl DaemonInstance { facet.activate(Account::new(syndicate::name!("instance-terminated")), |t| { let m = if status.success() { None } else { Some(format!("{}", status)) }; self.handle_exit(t, m) - })?; + }).ignore_termination()?; Ok(LinkedTaskTermination::Normal) })); Ok(()) @@ -443,7 +443,7 @@ fn run( facet.activate(Account::new(syndicate::name!("instance-startup")), |t| { daemon_instance.start(t) - })?; + }).ignore_termination()?; Ok(LinkedTaskTermination::KeepFacet) }); Ok(()) diff --git a/syndicate-server/src/services/tcp_relay_listener.rs b/syndicate-server/src/services/tcp_relay_listener.rs index 40cf177..ffc8859 100644 --- a/syndicate-server/src/services/tcp_relay_listener.rs +++ b/syndicate-server/src/services/tcp_relay_listener.rs @@ -36,16 +36,16 @@ fn run(t: &mut Activation, ds: Arc, spec: TcpRelayListener) -> ActorResult t.linked_task(syndicate::name!("listener"), async move { let listen_addr = format!("{}:{}", host, port); let listener = TcpListener::bind(listen_addr).await?; - facet.activate(Account::new(syndicate::name!("readiness")), |t| { + if !facet.activate(Account::new(syndicate::name!("readiness")), |t| { tracing::info!("listening"); ds.assert(t, language(), &lifecycle::ready(&spec)); Ok(()) - })?; + }).is_success() { return Ok(LinkedTaskTermination::Normal); } loop { let (stream, addr) = listener.accept().await?; let gatekeeper = spec.gatekeeper.clone(); let name = syndicate::name!(parent: parent_span.clone(), "conn"); - facet.activate(Account::new(name.clone()), move |t| { + if !facet.activate(Account::new(name.clone()), move |t| { t.spawn(name, move |t| { Ok(t.linked_task(tracing::Span::current(), { let facet = t.facet.clone(); @@ -56,7 +56,7 @@ fn run(t: &mut Activation, ds: Arc, spec: TcpRelayListener) -> ActorResult })) }); Ok(()) - })?; + }).is_success() { return Ok(LinkedTaskTermination::Normal); } } }); Ok(()) diff --git a/syndicate-server/src/services/unix_relay_listener.rs b/syndicate-server/src/services/unix_relay_listener.rs index 1aedb4e..ba4b039 100644 --- a/syndicate-server/src/services/unix_relay_listener.rs +++ b/syndicate-server/src/services/unix_relay_listener.rs @@ -38,11 +38,11 @@ fn run(t: &mut Activation, ds: Arc, spec: UnixRelayListener) -> ActorResult let facet = t.facet.clone(); t.linked_task(syndicate::name!("listener"), async move { let listener = bind_unix_listener(&PathBuf::from(path_str)).await?; - facet.activate(Account::new(syndicate::name!("readiness")), |t| { + if !facet.activate(Account::new(syndicate::name!("readiness")), |t| { tracing::info!("listening"); ds.assert(t, language(), &lifecycle::ready(&spec)); Ok(()) - })?; + }).is_success() { return Ok(LinkedTaskTermination::Normal); } loop { let (stream, _addr) = listener.accept().await?; let peer = stream.peer_cred()?; @@ -50,7 +50,7 @@ fn run(t: &mut Activation, ds: Arc, spec: UnixRelayListener) -> ActorResult let name = syndicate::name!(parent: parent_span.clone(), "conn", pid = ?peer.pid().unwrap_or(-1), uid = peer.uid()); - facet.activate(Account::new(name.clone()), move |t| { + if !facet.activate(Account::new(name.clone()), move |t| { t.spawn(name, |t| { Ok(t.linked_task(tracing::Span::current(), { let facet = t.facet.clone(); @@ -66,7 +66,7 @@ fn run(t: &mut Activation, ds: Arc, spec: UnixRelayListener) -> ActorResult })) }); Ok(()) - })?; + }).is_success() { return Ok(LinkedTaskTermination::Normal); } } }); Ok(()) diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index 8eeb39f..9bbf06c 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -7,7 +7,6 @@ use super::dataflow::Graph; use super::error::Error; -use super::error::encode_error; use super::error::error; use super::rewrite::CaveatError; use super::rewrite::CheckedCaveat; @@ -455,6 +454,18 @@ pub enum RunDisposition { Terminate(ActorResult), } +/// Returned from [`Facet::activate`] and [`Facet::activate_exit`]. +#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[must_use] +pub enum ActivationResult { + /// The actor to be activated had already terminated by the time of the activation attempt. + AlreadyTerminated, + /// The activation succeeded. + Success, + /// The activation failed, + Failure(Error), +} + /// [Linked tasks][Activation::linked_task] terminate yielding values of this type. pub enum LinkedTaskTermination { /// Causes the task's associated [Facet] to be [stop][Activation::stop]ped when the task @@ -561,7 +572,7 @@ impl FacetRef { &self, account: Arc, f: F, - ) -> ActorResult where + ) -> ActivationResult where F: FnOnce(&mut Activation) -> ActorResult, { self.activate_exit(account, |t| f(t).into()) @@ -577,14 +588,15 @@ impl FacetRef { &self, account: Arc, f: F, - ) -> ActorResult where + ) -> ActivationResult where F: FnOnce(&mut Activation) -> RunDisposition, { let mut g = self.actor.state.lock(); match &mut *g { - ActorState::Terminated { exit_status } => - Err(error("Could not activate terminated actor", - encode_error((**exit_status).clone()))), + ActorState::Terminated { exit_status } => { + tracing::debug!(?exit_status, "Could not activate terminated actor"); + ActivationResult::AlreadyTerminated + } ActorState::Running(state) => { tracing::trace!(actor_id=?self.actor.actor_id, "activate"); let mut activation = Activation::make(self, account, state); @@ -605,7 +617,10 @@ impl FacetRef { } }; tracing::trace!(actor_id=?self.actor.actor_id, "deactivate"); - result + match result { + Ok(()) => ActivationResult::Success, + Err(e) => ActivationResult::Failure(e), + } } } } @@ -976,7 +991,9 @@ impl<'activation> Activation<'activation> { loop { timer.tick().await; let _entry = span.enter(); - facet.activate(Arc::clone(&account), |t| a(t))?; + if facet.activate(Arc::clone(&account), |t| a(t)).as_result().is_err() { + return Ok(LinkedTaskTermination::Normal); + } } }); Ok(()) @@ -996,7 +1013,7 @@ impl<'activation> Activation<'activation> { self.linked_task(crate::name!(parent: None, "Activation::at"), async move { tokio::time::sleep_until(instant.into()).await; let _entry = span.enter(); - facet.activate(account, a)?; + facet.activate(account, a).ignore_termination()?; Ok(LinkedTaskTermination::KeepFacet) }); } @@ -1031,7 +1048,7 @@ impl<'activation> Activation<'activation> { let facet_id = self.facet.facet_id; self.pending.for_myself.push(Box::new(move |t| { t.with_facet(true, facet_id, move |t| { - ac.link(t).boot(name, boot); + ac.link(t)?.boot(name, boot); Ok(()) }) })); @@ -1526,7 +1543,7 @@ impl Actor { Actor { rx, ac_ref } } - fn link(self, t_parent: &mut Activation) -> Self { + fn link(self, t_parent: &mut Activation) -> Result { if t_parent.active_facet().is_none() { panic!("No active facet when calling spawn_link"); } @@ -1534,8 +1551,8 @@ impl Actor { t_parent.half_link(t_child); t_child.half_link(t_parent); Ok(()) - }).expect("Failed during link"); - self + }).as_result()?; + Ok(self) } /// Start the actor's mainloop. Takes ownership of `self`. The @@ -1575,7 +1592,7 @@ impl Actor { |_| RunDisposition::Terminate(result)); }; - if root_facet_ref.activate(Account::new(crate::name!("boot")), boot).is_err() { + if !root_facet_ref.activate(Account::new(crate::name!("boot")), boot).is_success() { return; } @@ -1604,7 +1621,7 @@ impl Actor { for action in actions.into_iter() { action(t)? } Ok(()) }); - if r.is_err() { return; } + if !r.is_success() { return; } } SystemMessage::Crash(e) => { tracing::trace!(actor_id = ?self.ac_ref.actor_id, @@ -2069,6 +2086,41 @@ impl Entity for StopOnRetract { } } +impl ActivationResult { + pub fn is_success(&self) -> bool { + self == &ActivationResult::Success + } + + pub fn as_result(self) -> ActorResult { + self.into() + } + + pub fn ignore_termination(self) -> ActorResult { + match self { + ActivationResult::AlreadyTerminated => Ok(()), + ActivationResult::Failure(e) => Err(e), + ActivationResult::Success => Ok(()), + } + } + + pub fn is_already_terminated(&self) -> bool { + self == &ActivationResult::AlreadyTerminated + } +} + +impl From for ActorResult { + fn from(r: ActivationResult) -> ActorResult { + match r { + ActivationResult::AlreadyTerminated => + Err(error("New actor crashed unexpectedly in spawn_link", AnyValue::new(false))), + ActivationResult::Failure(e) => + Err(e), + ActivationResult::Success => + Ok(()), + } + } +} + impl ActorResult> Entity for F { fn message(&mut self, t: &mut Activation, _m: Synced) -> ActorResult { self(t) diff --git a/syndicate/src/relay.rs b/syndicate/src/relay.rs index eb4d732..d91ced1 100644 --- a/syndicate/src/relay.rs +++ b/syndicate/src/relay.rs @@ -625,11 +625,14 @@ async fn input_loop( account.ensure_clear_funds().await; match src.next().await { None => return Ok(LinkedTaskTermination::Normal), - Some(bs) => facet.activate(Arc::clone(&account), |t| { - let mut g = relay.lock(); - let tr = g.as_mut().expect("initialized"); - tr.handle_inbound_datagram(t, &bs?) - })?, + Some(bs) => { + let r = facet.activate(Arc::clone(&account), |t| { + let mut g = relay.lock(); + let tr = g.as_mut().expect("initialized"); + tr.handle_inbound_datagram(t, &bs?) + }); + if !r.is_success() { return Ok(LinkedTaskTermination::Normal); } + } } } } @@ -650,11 +653,14 @@ async fn input_loop( }; match n { 0 => return Ok(LinkedTaskTermination::Normal), - _ => facet.activate(Arc::clone(&account), |t| { - let mut g = relay.lock(); - let tr = g.as_mut().expect("initialized"); - tr.handle_inbound_stream(t, &mut buf) - })?, + _ => { + let r = facet.activate(Arc::clone(&account), |t| { + let mut g = relay.lock(); + let tr = g.as_mut().expect("initialized"); + tr.handle_inbound_stream(t, &mut buf) + }); + if !r.is_success() { return Ok(LinkedTaskTermination::Normal); } + } } } }