diff --git a/syndicate-server/examples/pingpong.rs b/syndicate-server/examples/pingpong.rs index b9b248f..3dc0a37 100644 --- a/syndicate-server/examples/pingpong.rs +++ b/syndicate-server/examples/pingpong.rs @@ -1,9 +1,11 @@ use std::sync::Arc; +use std::sync::Mutex; use std::time::SystemTime; use structopt::StructOpt; use syndicate::actor::*; +use syndicate::enclose; use syndicate::language; use syndicate::relay; use syndicate::schemas::dataspace::Observe; @@ -109,22 +111,18 @@ async fn main() -> Result<(), Box> { let mut event_counter: u64 = 0; let mut rtt_ns_samples: Vec = vec![0; report_latency_every]; let mut rtt_batch_count: usize = 0; - let mut current_reply = None; - let self_ref = t.create_inert(); - self_ref.become_entity( - syndicate::entity(Arc::clone(&self_ref)) - .on_message(move |self_ref, t, m: AnyValue| { + let current_reply = Arc::new(Mutex::new(None)); + Cap::new(&t.create( + syndicate::entity(()) + .on_message(move |(), t, m: AnyValue| { match m.value().as_boolean() { - Some(true) => { + Some(_) => { tracing::info!("{:?} turns, {:?} events in the last second", turn_counter, event_counter); turn_counter = 0; event_counter = 0; } - Some(false) => { - current_reply = None; - } None => { event_counter += 1; let bindings = m.value().to_sequence()?; @@ -136,9 +134,13 @@ async fn main() -> Result<(), Box> { timestamp.clone(), padding.clone())); } else { - if let None = current_reply { + let mut g = current_reply.lock().expect("unpoisoned"); + if let None = *g { turn_counter += 1; - t.message_for_myself(&self_ref, AnyValue::new(false)); + t.pre_commit(enclose!((current_reply) move |_| { + *current_reply.lock().expect("unpoisoned") = None; + Ok(()) + })); let rtt_ns = now() - timestamp.value().to_u64()?; rtt_ns_samples[rtt_batch_count] = rtt_ns; rtt_batch_count += 1; @@ -149,18 +151,16 @@ async fn main() -> Result<(), Box> { rtt_batch_count = 0; } - current_reply = Some( - simple_record2(&send_label, - Value::from(now()).wrap(), - padding.clone())); + *g = Some(simple_record2(&send_label, + Value::from(now()).wrap(), + padding.clone())); } - ds.message(t, &(), current_reply.as_ref().expect("some reply")); + ds.message(t, &(), g.as_ref().expect("some reply")); } } } Ok(()) - })); - Cap::new(&self_ref) + }))) }; ds.assert(t, language(), &Observe { diff --git a/syndicate-server/src/protocol.rs b/syndicate-server/src/protocol.rs index 5187d96..d6ae3d3 100644 --- a/syndicate-server/src/protocol.rs +++ b/syndicate-server/src/protocol.rs @@ -19,9 +19,8 @@ use tungstenite::Message; struct ExitListener; impl Entity<()> for ExitListener { - fn exit_hook(&mut self, _t: &mut Activation, exit_status: &Arc) -> ActorResult { + fn exit_hook(&mut self, _t: &mut Activation, exit_status: &Arc) { tracing::info!(?exit_status, "disconnect"); - Ok(()) } } diff --git a/syndicate-server/src/services/debt_reporter.rs b/syndicate-server/src/services/debt_reporter.rs index 8836f31..59c5a4d 100644 --- a/syndicate-server/src/services/debt_reporter.rs +++ b/syndicate-server/src/services/debt_reporter.rs @@ -1,6 +1,7 @@ use preserves_schema::Codec; use std::sync::Arc; +use std::sync::atomic::Ordering; use syndicate::actor::*; use syndicate::enclose; @@ -27,9 +28,38 @@ fn run(t: &mut Activation, ds: Arc, spec: DebtReporter) -> ActorResult { ds.assert(t, language(), &lifecycle::started(&spec)); ds.assert(t, language(), &lifecycle::ready(&spec)); t.every(core::time::Duration::from_millis((spec.interval_seconds.0 * 1000.0) as u64), |_t| { - for (id, (name, debt)) in syndicate::actor::ACCOUNTS.read().iter() { - tracing::info!(id, ?name, debt = ?debt.load(std::sync::atomic::Ordering::Relaxed)); + for (account_id, (name, debt)) in syndicate::actor::ACCOUNTS.read().iter() { + tracing::info!(account_id, ?name, debt = ?debt.load(Ordering::Relaxed)); } + + // let snapshot = syndicate::actor::ACTORS.read().clone(); + // for (id, (name, ac_ref)) in snapshot.iter() { + // if *id == _t.state.actor_id { + // tracing::debug!("skipping report on the reporting actor, to avoid deadlock"); + // continue; + // } + // tracing::trace!(?id, "about to lock"); + // tracing::info_span!("actor", id, ?name).in_scope(|| match &*ac_ref.state.lock() { + // ActorState::Terminated { exit_status } => + // tracing::info!(?exit_status, "terminated"), + // ActorState::Running(state) => { + // tracing::info!(field_count = ?state.fields.len(), + // outbound_assertion_count = ?state.outbound_assertions.len(), + // facet_count = ?state.facet_nodes.len()); + // tracing::info_span!("facets").in_scope(|| { + // for (facet_id, f) in state.facet_nodes.iter() { + // tracing::info!( + // ?facet_id, + // parent_id = ?f.parent_facet_id, + // outbound_handle_count = ?f.outbound_handles.len(), + // linked_task_count = ?f.linked_tasks.len(), + // inert_check_preventers = ?f.inert_check_preventers.load(Ordering::Relaxed)); + // } + // }); + // } + // }); + // } + Ok(()) }) } diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index b795e2c..dcb45a4 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -207,8 +207,7 @@ pub trait Entity: Send { /// [RunningActor::add_exit_hook]. /// /// The default implementation does nothing. - fn exit_hook(&mut self, turn: &mut Activation, exit_status: &Arc) -> ActorResult { - Ok(()) + fn exit_hook(&mut self, turn: &mut Activation, exit_status: &Arc) { } } @@ -219,16 +218,20 @@ pub struct InertEntity; impl Entity for InertEntity {} type TracedAction = (Option, Action); - -enum CleanupAction { - ForMyself(TracedAction), - ForAnother(Arc, TracedAction), -} - -type CleanupActions = Map; - type Action = Box ActorResult>; type Block = Box ActorResult>; +type InfallibleAction = Box; + +#[doc(hidden)] +pub struct OutboundAssertion { + established: bool, + asserting_facet_id: FacetId, + peer: Arc, + retractor: TracedAction, +} + +#[doc(hidden)] +pub type OutboundAssertions = Map>>>; #[doc(hidden)] pub type PendingEventQueue = Vec; @@ -256,6 +259,9 @@ pub struct Activation<'activation> { pub state: &'activation mut RunningActor, active_block: Option, pending: EventBuffer, + pre_commit_actions: Vec, + rollback_actions: Vec, + commit_actions: Vec, } struct EventBuffer { @@ -264,8 +270,6 @@ struct EventBuffer { pub trace_collector: Option, pub account: Arc, queues: HashMap, PendingEventQueue)>, - for_myself: PendingEventQueue, - final_actions: Vec>, } /// An `Account` records a "debt" in terms of outstanding work items. @@ -325,7 +329,11 @@ pub struct Actor { pub struct ActorRef { /// The ID of the referenced actor. pub actor_id: ActorId, - state: Arc>, + + // Not intended for ordinary use! You'll break a bunch of invariants if you do! + // Intended for reflective/debug use. + #[doc(hidden)] + pub state: Arc>, } /// A combination of an [`ActorRef`] with a [`FacetId`], acting as a capability to enter the @@ -349,19 +357,25 @@ pub enum ActorState { } /// State associated with each non-terminated [`Actor`]. +// Do not poke at exposed but doc(hidden) fields here! You will violate invariants if you do! +// They are exposed for debug/reflective purposes. pub struct RunningActor { /// The ID of the actor this state belongs to. pub actor_id: ActorId, tx: UnboundedSender, mailbox: Weak, dataflow: Graph, - fields: HashMap>, + #[doc(hidden)] + pub fields: HashMap>, blocks: HashMap, - exit_hooks: Vec) -> ActorResult>>, - cleanup_actions: CleanupActions, - facet_nodes: Map, - facet_children: Map>, - root: FacetId, + exit_hooks: Vec)>>, + #[doc(hidden)] + pub outbound_assertions: OutboundAssertions, + #[doc(hidden)] + pub facet_nodes: Map, + #[doc(hidden)] + pub facet_children: Map>, + pub root: FacetId, } /// The type of process-unique task IDs. @@ -385,12 +399,14 @@ pub struct Field { /// /// # Inert facets /// -/// A facet is considered *inert* if: +/// A facet is considered *inert* if either: /// -/// 1. it has no child facets; -/// 2. it has no cleanup actions (that is, no assertions placed by any of its entities); -/// 3. it has no linked tasks; and -/// 4. it has no "inert check preventers" (see [Activation::prevent_inert_check]). +/// 1. it has a parent facet and that parent facet is terminated; or +/// 2. it is either the root facet or its parent is not yet terminated, and it: +/// 1. has no child facets; +/// 2. has no cleanup actions (that is, no assertions placed by any of its entities); +/// 3. has no linked tasks; and +/// 4. has no "inert check preventers" (see [Activation::prevent_inert_check]). /// /// If a facet is created and is inert at the moment that its `boot` function returns, it is /// automatically terminated. @@ -400,15 +416,20 @@ pub struct Field { /// If the root facet in an actor is terminated, the entire actor is terminated (with exit /// status `Ok(())`). /// +// Do not poke at exposed but doc(hidden) fields here! You will violate invariants if you do! +// They are exposed for debug/reflective purposes. pub struct Facet { /// The ID of the facet. pub facet_id: FacetId, /// The ID of the facet's parent facet, if any; if None, this facet is the `Actor`'s root facet. pub parent_facet_id: Option, - outbound_handles: Set, + #[doc(hidden)] + pub outbound_handles: Set, stop_actions: Vec, - linked_tasks: Map, - inert_check_preventers: Arc, + #[doc(hidden)] + pub linked_tasks: Map, + #[doc(hidden)] + pub inert_check_preventers: Arc, } /// A reference to an object that expects messages/assertions of type @@ -465,13 +486,6 @@ where /// subsequently retracted. pub struct StopOnRetract; -/// Returned from the function given to [`FacetRef::activate_exit`] to indicate how the actor -/// should proceed. -pub enum RunDisposition { - Continue, - Terminate(ActorResult), -} - /// [Linked tasks][Activation::linked_task] terminate yielding values of this type. pub enum LinkedTaskTermination { /// Causes the task's associated [Facet] to be [stop][Activation::stop]ped when the task @@ -482,6 +496,12 @@ pub enum LinkedTaskTermination { KeepFacet, } +#[derive(Debug, Clone, Copy)] +enum TerminationDirection { + BelowStartingPoint, + AtOrAboveStartingPoint, +} + //--------------------------------------------------------------------------- const BUMP_AMOUNT: u8 = 10; @@ -561,11 +581,26 @@ impl<'a> Unparse<&'a (), AnyValue> for Synced { } } -impl From for RunDisposition { - fn from(v: ActorResult) -> Self { - match v { - Ok(()) => RunDisposition::Continue, - Err(e) => RunDisposition::Terminate(Err(e)), +mod panic_guard { + use super::*; + pub struct PanicGuard(Option>); + impl PanicGuard { + pub(super) fn new(tx: UnboundedSender) -> Self { + tracing::trace!("Panic guard armed"); + PanicGuard(Some(tx)) + } + pub fn disarm(&mut self) { + tracing::trace!("Panic guard disarmed"); + self.0 = None; + } + } + impl Drop for PanicGuard { + fn drop(&mut self) { + if let Some(tx) = &self.0 { + tracing::trace!("Panic guard triggering"); + let _ = tx.send(SystemMessage::Crash( + error("Actor panicked during activation", AnyValue::new(false)))); + } } } } @@ -585,31 +620,14 @@ impl FacetRef { f: F, ) -> bool where F: FnOnce(&mut Activation) -> ActorResult, - { - self.activate_exit(account, cause, |t| f(t).into()) - } - - /// Executes `f` in a new "[turn][Activation]" for `actor`. If `f` returns - /// `Some(exit_status)`, terminates `actor` with that `exit_status`. Otherwise, if `f` - /// returns `None`, leaves `actor` in runnable state. [Commits buffered - /// actions][Activation::commit] unless `actor` terminates with an `Err` status. - /// - /// Returns `true` if, at the end of the activation, `actor` had not yet terminated. - /// - /// Bills any activity to `account`. - pub fn activate_exit( - &self, - account: &Arc, - cause: Option, - f: F, - ) -> bool where - F: FnOnce(&mut Activation) -> RunDisposition, { let mut g = self.actor.state.lock(); match &mut *g { ActorState::Terminated { .. } => false, ActorState::Running(state) => { + let mut panic_guard = panic_guard::PanicGuard::new(state.tx.clone()); + // let _entry = tracing::info_span!(parent: None, "actor", actor_id = ?self.actor.actor_id).entered(); let mut activation = Activation::make(self, @@ -618,24 +636,39 @@ impl FacetRef { account.trace_collector.clone(), state); let f_result = f(&mut activation); - let is_alive = match activation.restore_invariants(f_result) { - RunDisposition::Continue => { - activation.commit(); - true - } - RunDisposition::Terminate(exit_status) => { - if exit_status.is_ok() { - activation.commit(); - } - let exit_status = Arc::new(exit_status); - state.cleanup(&self.actor, - &exit_status, - account.trace_collector.clone()); - *g = ActorState::Terminated { exit_status }; - false + + let maybe_exit_status = match f_result { + Ok(()) => + activation.commit().map_or_else( + |e| Some(Err(e)), + |_| { + // If we would otherwise continue, check the root facet: is it + // still alive? If not, then the whole actor should terminate now. + if let None = activation.state.get_facet(activation.state.root) { + tracing::trace!( + "terminating actor because root facet no longer exists"); + Some(Ok(())) + } else { + None + } + }), + Err(e) => { + activation.rollback(); + Some(Err(e)) } }; - is_alive + + panic_guard.disarm(); + drop(panic_guard); + drop(activation); + + match maybe_exit_status { + None => true, + Some(exit_status) => { + g.terminate(exit_status, &self.actor, &account.trace_collector); + false + } + } } } } @@ -660,6 +693,9 @@ impl<'activation> Activation<'activation> { NEXT_ACTIVATION_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed), c)), trace_collector), + pre_commit_actions: Vec::new(), + rollback_actions: Vec::new(), + commit_actions: Vec::new(), } } @@ -673,12 +709,6 @@ impl<'activation> Activation<'activation> { Account::new(name, self.trace_collector()) } - fn immediate_oid(&self, r: &Arc>) { - if r.mailbox.actor_id != self.facet.actor.actor_id { - panic!("Cannot use for_myself to send to remote peers"); - } - } - fn with_facet(&mut self, facet_id: FacetId, f: F) -> ActorResult where F: FnOnce(&mut Activation) -> ActorResult, @@ -747,92 +777,76 @@ impl<'activation> Activation<'activation> { } } + fn insert_outbound_assertion( + &mut self, + r: &Arc>, + handle: Handle, + description: Option, + ) -> bool { + let asserting_facet_id = self.facet.facet_id; + match self.state.get_facet(asserting_facet_id) { + None => false, + Some(f) => { + f.outbound_handles.insert(handle); + drop(f); + + let r = Arc::clone(r); + let details = OutboundAssertion { + established: false, + asserting_facet_id, + peer: Arc::clone(&r.mailbox), + retractor: ( + description, + Box::new(move |remote_t| remote_t.with_entity(&r, |t, e| { + tracing::trace!(?handle, "retracted"); + e.retract(t, handle) + }))), + }; + + self.state.outbound_assertions.insert(handle, Arc::new(Mutex::new(Some(details)))); + + self.on_rollback(move |t| { + if let Some(f) = t.state.get_facet(asserting_facet_id) { + f.outbound_handles.remove(&handle); + } + t.state.outbound_assertions.remove(&handle); + }); + self.on_commit(move |t| { + if let Some(oa_handle) = t.state.outbound_assertions.get_mut(&handle) { + oa_handle.lock().as_mut().expect("OutboundAssertion").established = true; + } + }); + + true + } + } + } + /// Core API: assert `a` at recipient `r`. /// /// Returns the [`Handle`] for the new assertion. pub fn assert(&mut self, r: &Arc>, a: M) -> Handle { let handle = next_handle(); - if let Some(f) = self.state.get_facet(self.facet.facet_id) { + if self.insert_outbound_assertion(r, handle, self.pending.desc.as_ref().map( + enclose!((r) move |_| trace::TargetedTurnEvent { + target: r.as_ref().into(), + detail: trace::TurnEvent::Retract { + handle: Box::new(protocol::Handle(handle.into())), + }, + }))) + { tracing::trace!(?r, ?handle, ?a, "assert"); - f.outbound_handles.insert(handle); - drop(f); - self.state.insert_retract_cleanup_action(&r, handle, self.pending.desc.as_ref().map( - enclose!((r) move |_| trace::TargetedTurnEvent { - target: r.as_ref().into(), - detail: trace::TurnEvent::Retract { - handle: Box::new(protocol::Handle(handle.into())), - }, - }))); - { - let r = Arc::clone(r); - let description = self.pending.trace_targeted(false, &r, || trace::TurnEvent::Assert { - assertion: Box::new((&a).into()), - handle: Box::new(protocol::Handle(handle.into())), - }); - self.pending.queue_for(&r).push(( - description, - Box::new(move |t| t.with_entity(&r, |t, e| { - tracing::trace!(?handle, ?a, "asserted"); - e.assert(t, a, handle) - })))); - } - } - handle - } - - /// Core API: assert `a` at `r`, which must be a `Ref` within the active actor. - /// - /// It's perfectly OK to use method [`assert`][Self::assert] even - /// for `Ref`s that are part of the active actor. The difference - /// between `assert` and `assert_for_myself` is that `r`'s handler - /// for `assert` runs in a separate, later [`Activation`], while - /// `r`'s handler for `assert_for_myself` runs in *this* - /// [`Activation`], before it commits. - /// - /// Returns the [`Handle`] for the new assertion. - /// - /// # Panics - /// - /// Panics if `r` is not part of the active actor. - pub fn assert_for_myself(&mut self, r: &Arc>, a: M) -> Handle { - self.immediate_oid(r); - let handle = next_handle(); - if let Some(f) = self.active_facet() { - tracing::trace!(?r, ?handle, ?a, "assert_for_myself"); - f.outbound_handles.insert(handle); - drop(f); - { - let r = Arc::clone(r); - self.state.cleanup_actions.insert( - handle, - CleanupAction::ForMyself(( - self.pending.desc.as_ref().map(enclose!((r, handle) move |_| trace::TargetedTurnEvent { - target: r.as_ref().into(), - detail: trace::TurnEvent::Retract { - handle: Box::new(protocol::Handle(handle.into())), - } - })), - Box::new(move |t| t.with_entity(&r, |t, e| { - tracing::trace!(?handle, "retracted"); - if let Some(f) = t.active_facet() { - f.outbound_handles.remove(&handle); - } - e.retract(t, handle) - }))))); - } - { - let r = Arc::clone(r); - let description = self.pending.trace_targeted(true, &r, || trace::TurnEvent::Assert { - assertion: Box::new((&a).into()), - handle: Box::new(protocol::Handle(handle.into())), - }); - self.pending.for_myself.push(( - description, - Box::new(move |t| t.with_entity(&r, |t, e| { - tracing::trace!(?handle, ?a, "asserted"); - e.assert(t, a, handle) - })))); - } + let r = Arc::clone(r); + let description = self.pending.trace_targeted(false, &r, || trace::TurnEvent::Assert { + assertion: Box::new((&a).into()), + handle: Box::new(protocol::Handle(handle.into())), + }); + self.pending.queue_for(&r).push(( + description, + Box::new(move |t| t.with_entity(&r, |t, e| { + tracing::trace!(?handle, ?a, "asserted"); + e.assert(t, a, handle) + })))); } handle } @@ -841,16 +855,15 @@ impl<'activation> Activation<'activation> { let this_actor_id = self.state.actor_id; let entity_ref = t_other.create::(StopOnRetract); let handle = next_handle(); - tracing::trace!(?handle, ?entity_ref, "half_link"); - self.state.insert_retract_cleanup_action(&entity_ref, handle, self.pending.desc.as_ref().map( + assert!(self.insert_outbound_assertion(&entity_ref, handle, self.pending.desc.as_ref().map( enclose!((entity_ref) move |_| trace::TargetedTurnEvent { target: entity_ref.as_ref().into(), detail: trace::TurnEvent::BreakLink { source: Box::new(trace::ActorId(AnyValue::new(this_actor_id))), handle: Box::new(protocol::Handle(handle.into())), }, - }))); - self.active_facet().unwrap().outbound_handles.insert(handle); + })))); + tracing::trace!(?handle, ?entity_ref, "half_link"); t_other.with_entity(&entity_ref, |t, e| e.assert(t, AnyValue::new(true), handle)).unwrap(); handle } @@ -858,8 +871,33 @@ impl<'activation> Activation<'activation> { /// Core API: retract a previously-established assertion. pub fn retract(&mut self, handle: Handle) { tracing::trace!(?handle, "retract"); - if let Some(d) = self.state.cleanup_actions.remove(&handle) { - self.pending.execute_cleanup_action(d) + if let Some(oa_handle) = self.state.outbound_assertions.remove(&handle) { + self.on_rollback(enclose!((oa_handle) move |t| { + if oa_handle.lock().as_mut().expect("OutboundAssertion").established { + t.state.outbound_assertions.insert(handle, oa_handle); + } + })); + + let g = oa_handle.lock(); + let oa = g.as_ref().expect("Present OutboundAssertion"); + + if let Some(desc) = &oa.retractor.0 { + self.trace(|_| trace::ActionDescription::Enqueue { event: Box::new(desc.clone()) }); + } + + self.pending.queue_for_mailbox(&oa.peer).push(( + oa.retractor.0.clone(), + Box::new(enclose!((oa_handle) move |remote_t| { + let oa = oa_handle.lock().take().expect("Present OutboundAssertion"); + (oa.retractor.1)(remote_t) + })))); + + let asserting_facet_id = oa.asserting_facet_id; + self.on_commit(move |t| { + if let Some(f) = t.state.get_facet(asserting_facet_id) { + f.outbound_handles.remove(&handle); + } + }); } } @@ -894,27 +932,6 @@ impl<'activation> Activation<'activation> { })))) } - /// Core API: send message `m` to recipient `r`, which must be a - /// `Ref` within the active actor. - /// - /// Method `message_for_myself` is to [`message`][Self::message] - /// as [`assert_for_myself`][Self::assert_for_myself] is to - /// [`assert`][Self::assert]. - /// - /// # Panics - /// - /// Panics if `r` is not part of the active actor. - pub fn message_for_myself(&mut self, r: &Arc>, m: M) { - self.immediate_oid(r); - let r = Arc::clone(r); - let description = self.pending.trace_targeted(true, &r, || trace::TurnEvent::Message { - body: Box::new((&m).into()), - }); - self.pending.for_myself.push(( - description, - Box::new(move |t| t.with_entity(&r, |t, e| e.message(t, m))))) - } - /// Core API: begins a synchronisation with `r`. /// /// Once the synchronisation request reaches `r`'s actor, it will @@ -968,16 +985,60 @@ impl<'activation> Activation<'activation> { &self.pending.account } - /// Delivers all pending actions in this activation and resets it, ready for more. + /// Delivers all pending actions in this activation and resets it, ready for more. Succeeds + /// iff all [pre-commit][Activation::pre_commit] actions succeed. /// - /// # Panics + /// # Commit procedure /// - /// Panics if any pending "`final_actions`" or actions "`for_myself`" (resulting from - /// [`assert_for_myself`][Self::assert_for_myself] or - /// [`message_for_myself`][Self::message_for_myself]) are outstanding at the time of the - /// call. - pub fn commit(&mut self) { + /// If an [activation][FacetRef::activate]'s `f` function returns successfully, the + /// activation *commits* according to the following procedure: + /// + /// 1. While the dataflow graph needs repairing or outstanding + /// [pre-commit][Activation::pre_commit`] actions exist: + /// 1. repair the dataflow graph + /// 2. run all pre-commit actions + /// Note that graph repair or a pre-commit action may fail, causing the commit to + /// abort, or may further damage the dataflow graph or schedule another pre-commit + /// action, causing another go around the loop. + /// + /// 2. The commit becomes final. All queued events are sent; all internal accounting + /// actions are performed. + pub fn commit(&mut self) -> ActorResult { + tracing::trace!("Activation::commit"); + loop { + let mut should_loop = false; + if self.repair_dataflow()? { + should_loop = true; + } + if !self.pre_commit_actions.is_empty() { + should_loop = true; + for ac in std::mem::take(&mut self.pre_commit_actions) { + ac(self)? + } + } + if should_loop { + continue; + } else { + break; + } + } + tracing::trace!("Commit is final"); + std::mem::take(&mut self.rollback_actions); + // ^ drop 'em so they don't run next time we commit, if there is a next time + for ac in std::mem::take(&mut self.commit_actions) { + ac(self); + } self.pending.commit(); + tracing::trace!("Activation::commit complete"); + Ok(()) + } + + fn rollback(&mut self) { + tracing::trace!("Activation::rollback"); + for ac in std::mem::take(&mut self.rollback_actions) { + ac(self) + } + tracing::trace!("Activation::rollback complete"); } /// Construct an entity with behaviour [`InertEntity`] within the active facet. @@ -1127,24 +1188,29 @@ impl<'activation> Activation<'activation> { self.after(delay, a) } + /// Schedules the given action to run just prior to [commit][Activation::commit]. + pub fn pre_commit ActorResult>( + &mut self, + action: F, + ) { + self.pre_commit_actions.push(Box::new(action)); + } + + fn on_rollback(&mut self, action: F) { + self.rollback_actions.push(Box::new(action)); + } + + fn on_commit(&mut self, action: F) { + self.commit_actions.push(Box::new(action)); + } + /// Schedule the creation of a new actor when the Activation commits. pub fn spawn ActorResult>( &mut self, name: Name, boot: F, ) -> ActorRef { - let ac = Actor::new(Some(self.state.actor_id), self.trace_collector()); - let ac_ref = ac.ac_ref.clone(); - self.pending.trace(|| trace::ActionDescription::Spawn { - link: false, - id: Box::new(trace::ActorId(AnyValue::new(ac_ref.actor_id))), - }); - let cause = self.pending.desc.as_ref().map( - |d| trace::TurnCause::Turn { id: Box::new(d.id.clone()) }); - self.pending.final_actions.push(Box::new(move |t| { - ac.boot(name, Arc::clone(t.account()), cause, boot); - })); - ac_ref + self._spawn(name, boot, false) } /// Schedule the creation of a new actor when the Activation commits. @@ -1156,22 +1222,28 @@ impl<'activation> Activation<'activation> { &mut self, name: Name, boot: F, + ) -> ActorRef { + self._spawn(name, boot, true) + } + + fn _spawn ActorResult>( + &mut self, + name: Name, + boot: F, + link: bool, ) -> ActorRef { let ac = Actor::new(Some(self.state.actor_id), self.trace_collector()); let ac_ref = ac.ac_ref.clone(); - let facet_id = self.facet.facet_id; self.pending.trace(|| trace::ActionDescription::Spawn { - link: true, + link, id: Box::new(trace::ActorId(AnyValue::new(ac_ref.actor_id))), }); let cause = self.pending.desc.as_ref().map( |d| trace::TurnCause::Turn { id: Box::new(d.id.clone()) }); - self.pending.final_actions.push(Box::new(move |t| { - t.with_facet(facet_id, move |t| { - ac.link(t).boot(name, Arc::clone(t.account()), cause, boot); - Ok(()) - }).unwrap() - })); + let ac = if link { ac.link(self) } else { ac }; + self.on_commit(move |t| { + ac.boot(name, Arc::clone(t.account()), cause, boot); + }); ac_ref } @@ -1190,7 +1262,7 @@ impl<'activation> Activation<'activation> { self.state.facet_nodes.insert(facet_id, f); tracing::trace!(parent_id = ?self.facet.facet_id, ?facet_id, - actor_facet_count = ?self.state.facet_nodes.len()); + new_actor_facet_count = ?self.state.facet_nodes.len()); self.state.facet_children.entry(self.facet.facet_id).or_default().insert(facet_id); self._with_facet(facet_id, move |t| { boot(t)?; @@ -1233,7 +1305,9 @@ impl<'activation> Activation<'activation> { if let Some(k) = continuation { self.on_facet_stop(facet_id, k); } - self._terminate_facet(facet_id, true, trace::FacetStopReason::ExplicitAction) + self._terminate_facet(facet_id, + TerminationDirection::AtOrAboveStartingPoint, + trace::FacetStopReason::ExplicitAction) } /// Arranges for the [`Facet`] named by `facet_id` to be stopped cleanly when `self` @@ -1263,71 +1337,83 @@ impl<'activation> Activation<'activation> { fn stop_if_inert(&mut self) { let facet_id = self.facet.facet_id; - self.pending.final_actions.push(Box::new(move |t| { + // Registering a pre-commit hook lets this run after the dataflow graph has been repaired. + self.pre_commit(move |t| { tracing::trace!("Checking inertness of facet {} from facet {}", facet_id, t.facet.facet_id); if t.state.facet_exists_and_is_inert(facet_id) { tracing::trace!(" - facet {} is inert, stopping it", facet_id); - t._terminate_facet(facet_id, true, trace::FacetStopReason::Inert) - .expect("Non-failing _terminate_facet in stop_if_inert"); + t._terminate_facet(facet_id, + TerminationDirection::AtOrAboveStartingPoint, + trace::FacetStopReason::Inert)?; } else { tracing::trace!(" - facet {} is not inert", facet_id); } - })) + Ok(()) + }) } fn _terminate_facet( &mut self, facet_id: FacetId, - alive: bool, + direction: TerminationDirection, reason: trace::FacetStopReason, ) -> ActorResult { if let Some(mut f) = self.state.facet_nodes.remove(&facet_id) { + let maybe_parent_id = f.parent_facet_id; + self.trace(|t| trace::ActionDescription::FacetStop { - path: t.facet_ids_for(&f).iter().map(|i| trace::FacetId(AnyValue::new(u64::from(*i)))).collect(), + path: t.facet_ids_for(&f).iter().map( + |i| trace::FacetId(AnyValue::new(u64::from(*i)))).collect(), reason: Box::new(reason), }); - tracing::trace!(actor_facet_count = ?self.state.facet_nodes.len(), - "{} termination of {:?}", - if alive { "living" } else { "post-exit" }, - facet_id); - if let Some(p) = f.parent_facet_id { - self.state.facet_children.get_mut(&p).map(|children| children.remove(&facet_id)); - } - self._with_facet(facet_id, |t| { - if let Some(children) = t.state.facet_children.remove(&facet_id) { - for child_id in children.into_iter() { - t._terminate_facet(child_id, alive, trace::FacetStopReason::ParentStopping)?; - } + tracing::trace!(remaining_actor_facet_count = ?self.state.facet_nodes.len(), + ?facet_id, + ?direction, + "stopping"); + + if let Some(children) = self.state.facet_children.remove(&facet_id) { + for child_id in children.into_iter() { + self._terminate_facet(child_id, + TerminationDirection::BelowStartingPoint, + trace::FacetStopReason::ParentStopping)?; } - if alive { - let parent_facet_id = f.parent_facet_id; - t._with_facet(parent_facet_id.unwrap_or(facet_id), |t| { - for action in std::mem::take(&mut f.stop_actions) { - action(t)?; - } - Ok(()) - })?; - f.retract_outbound(t); - // ^ we need retraction to happen right here so that child-facet - // cleanup-actions are performed before parent-facet cleanup-actions. - if let Some(p) = parent_facet_id { - if t.state.facet_exists_and_is_inert(p) { + } + + if let TerminationDirection::AtOrAboveStartingPoint = direction { + if let Some(p) = maybe_parent_id { + self.state.facet_children.get_mut(&p).map(|children| children.remove(&facet_id)); + } + } + + self._with_facet(maybe_parent_id.unwrap_or(facet_id), |t| { + for ac in std::mem::take(&mut f.stop_actions).into_iter() { + ac(t)? + } + Ok(()) + })?; + + for handle in std::mem::take(&mut f.outbound_handles).into_iter() { + tracing::trace!(h = ?handle, "retract on termination"); + self.retract(handle); + } + + if let TerminationDirection::AtOrAboveStartingPoint = direction { + match maybe_parent_id { + Some(p) => { + if self.state.facet_exists_and_is_inert(p) { tracing::trace!("terminating parent {:?} of facet {:?}", p, facet_id); - t._terminate_facet(p, true, trace::FacetStopReason::Inert)?; + self._terminate_facet(p, + TerminationDirection::AtOrAboveStartingPoint, + trace::FacetStopReason::Inert)?; } else { tracing::trace!("not terminating parent {:?} of facet {:?}", p, facet_id); } - } else { - tracing::trace!("no parent of root facet {:?} to terminate", facet_id); } - } else { - f.retract_outbound(t); + None => tracing::trace!("no parent of root facet {:?} to terminate", facet_id), } - Ok(()) - }) - } else { - Ok(()) + } } + Ok(()) } /// Create a new named dataflow variable (field) within the active [`Actor`]. @@ -1438,51 +1524,13 @@ impl<'activation> Activation<'activation> { } Ok(pass_number > 0) } +} - fn _restore_invariants(&mut self) -> ActorResult { - loop { - loop { - let actions = std::mem::take(&mut self.pending.for_myself); - if actions.is_empty() { break; } - for (maybe_desc, action) in actions.into_iter() { - if let Some(desc) = maybe_desc { - self.pending.trace(|| trace::ActionDescription::DequeueInternal { - event: Box::new(desc), - }); - } - action(self)? - } - } - if !self.repair_dataflow()? { - break; - } - } - for final_action in std::mem::take(&mut self.pending.final_actions) { - final_action(self); - } - Ok(()) - } - - fn restore_invariants(&mut self, d: RunDisposition) -> RunDisposition { - let d = match d { - RunDisposition::Continue => - self._restore_invariants().into(), - RunDisposition::Terminate(Ok(())) => - RunDisposition::Terminate(self._restore_invariants()), - RunDisposition::Terminate(Err(_)) => - d, - }; - - // If we would otherwise continue, check the root facet: is it still alive? - // If not, then the whole actor should terminate now. - if let RunDisposition::Continue = d { - if let None = self.state.get_facet(self.state.root) { - tracing::trace!("terminating actor because root facet no longer exists"); - return RunDisposition::Terminate(Ok(())); - } - } - - d +impl<'activation> Drop for Activation<'activation> { + fn drop(&mut self) { + tracing::trace!("Activation::drop"); + self.rollback(); + tracing::trace!("Activation::drop complete"); } } @@ -1499,29 +1547,6 @@ impl EventBuffer { trace_collector, account, queues: HashMap::new(), - for_myself: PendingEventQueue::new(), - final_actions: Vec::new(), - } - } - - fn execute_cleanup_action(&mut self, d: CleanupAction) { - match d { - CleanupAction::ForAnother(mailbox, (maybe_desc, action)) => { - if let Some(desc) = &maybe_desc { - self.trace(|| trace::ActionDescription::Enqueue { - event: Box::new(desc.clone()), - }) - } - self.queue_for_mailbox(&mailbox).push((maybe_desc, action)); - } - CleanupAction::ForMyself((maybe_desc, action)) => { - if let Some(desc) = &maybe_desc { - self.trace(|| trace::ActionDescription::EnqueueInternal { - event: Box::new(desc.clone()), - }) - } - self.for_myself.push((maybe_desc, action)); - } } } @@ -1535,13 +1560,7 @@ impl EventBuffer { } fn commit(&mut self) { - tracing::trace!("EventBuffer::deliver"); - if !self.final_actions.is_empty() { - panic!("Unprocessed final_actions at deliver() time"); - } - if !self.for_myself.is_empty() { - panic!("Unprocessed for_myself events remain at deliver() time"); - } + tracing::trace!("EventBuffer::commit"); if let Some(d) = &mut self.desc { if let Some(c) = &self.trace_collector { c.record(self.source_actor_id, trace::ActorActivation::Turn(Box::new(d.take()))); @@ -1725,11 +1744,11 @@ impl Actor { } /// Create a new actor. It still needs to be [`boot`ed][Self::boot]. - pub fn new(_parent_actor_id: Option, trace_collector: Option) -> Self { + pub fn new(parent_actor_id: Option, trace_collector: Option) -> Self { let (tx, rx) = unbounded_channel(); let actor_id = next_actor_id(); let root = Facet::new(None); - tracing::debug!(?actor_id, ?_parent_actor_id, root_facet_id = ?root.facet_id, "Actor::new"); + tracing::debug!(?actor_id, ?parent_actor_id, root_facet_id = ?root.facet_id, "Actor::new"); let mut st = RunningActor { actor_id, tx, @@ -1738,7 +1757,7 @@ impl Actor { fields: HashMap::new(), blocks: HashMap::new(), exit_hooks: Vec::new(), - cleanup_actions: Map::new(), + outbound_assertions: Map::new(), facet_nodes: Map::new(), facet_children: Map::new(), root: root.facet_id, @@ -1797,25 +1816,13 @@ impl Actor { }); } tokio::spawn(async move { - tracing::trace!("start"); + tracing::trace!(?actor_id, "start"); self.run(boot_account, boot_cause, move |t| { t.facet(boot)?; Ok(()) }).await; - let result = self.ac_ref.exit_status().expect("terminated"); - if let Some(c) = trace_collector { - c.record(actor_id, trace::ActorActivation::Stop { - status: Box::new(match &result { - Ok(()) => trace::ExitStatus::Ok, - Err(e) => trace::ExitStatus::Error(Box::new(e.clone())), - }), - }); - } - match &result { - Ok(()) => tracing::trace!("normal stop"), - Err(e) => tracing::error!("error stop: {}", e), - } - result + tracing::trace!(?actor_id, "stop"); + self.ac_ref.exit_status().expect("terminated") }) // }.instrument(tracing::info_span!(parent: None, "actor", ?actor_id).or_current())) } @@ -1828,26 +1835,29 @@ impl Actor { ) -> () { let root_facet_ref = self.ac_ref.root_facet_ref(); - let terminate = |result: ActorResult| { - root_facet_ref.activate_exit(&Account::new(None, None), - None, - |_| RunDisposition::Terminate(result)); + let terminate = |e: Error | { + assert!(!root_facet_ref.activate(&Account::new(None, None), None, |_| Err(e))); }; if !root_facet_ref.activate(&boot_account, boot_cause, boot) { return; } - loop { + 'mainloop: loop { tracing::trace!(actor_id = ?self.ac_ref.actor_id, "mainloop top"); match self.rx.recv().await { None => { - return terminate(Err(error("Unexpected channel close", AnyValue::new(false)))); + terminate(error("Unexpected channel close", AnyValue::new(false))); + break 'mainloop; } Some(m) => match m { SystemMessage::Release => { tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::Release"); - return terminate(Ok(())); + assert!(!root_facet_ref.activate(&Account::new(None, None), None, |t| { + t.stop_root(); + Ok(()) + })); + break 'mainloop; } SystemMessage::ReleaseField(field_id) => { tracing::trace!(actor_id = ?self.ac_ref.actor_id, @@ -1872,13 +1882,14 @@ impl Actor { Ok(()) }) { - return; + break 'mainloop; } } SystemMessage::Crash(e) => { tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::Crash({:?})", &e); - return terminate(Err(e)); + terminate(e); + break 'mainloop; } } } @@ -1912,13 +1923,6 @@ impl Facet { } } } - - fn retract_outbound(&mut self, t: &mut Activation) { - for handle in std::mem::take(&mut self.outbound_handles).into_iter() { - tracing::trace!(h = ?handle, "retract on termination"); - t.retract(handle); - } - } } impl ActorRef { @@ -1965,6 +1969,50 @@ impl std::fmt::Debug for ActorRef { } } +impl ActorState { + fn is_running(&self) -> bool { + match self { + ActorState::Terminated { .. } => false, + ActorState::Running(_) => true, + } + } + + fn terminate( + &mut self, + exit_status: ActorResult, + actor: &ActorRef, + trace_collector: &Option, + ) { + if !self.is_running() { + return; + } + + let exit_status = Arc::new(exit_status); + + let final_state = ActorState::Terminated { exit_status: Arc::clone(&exit_status) }; + match std::mem::replace(self, final_state) { + ActorState::Terminated { .. } => + unreachable!(), + ActorState::Running(state) => + state.cleanup(actor, Arc::clone(&exit_status), trace_collector.clone()), + } + + match &*exit_status { + Ok(()) => tracing::trace!(actor_id=?actor.actor_id, "normal stop"), + Err(e) => tracing::error!(actor_id=?actor.actor_id, %e, "error stop"), + } + + if let Some(c) = trace_collector { + c.record(actor.actor_id, trace::ActorActivation::Stop { + status: Box::new(match &*exit_status { + Ok(()) => trace::ExitStatus::Ok, + Err(e) => trace::ExitStatus::Error(Box::new(e.clone())), + }), + }); + } + } +} + impl RunningActor { /// Requests a shutdown of the actor. The shutdown request is /// handled by the actor's main loop, causing it to terminate with @@ -2011,59 +2059,58 @@ impl RunningActor { // The only outbound handle the root facet of an actor may have is a link // assertion, from [Activation::link]. This is not to be considered a "real" // assertion for purposes of keeping the facet alive! + let maybe_parent_id = f.parent_facet_id.clone(); let no_outbound_handles = f.outbound_handles.is_empty(); - let is_root_facet = f.parent_facet_id.is_none(); + let is_root_facet = maybe_parent_id.is_none(); let no_linked_tasks = f.linked_tasks.is_empty(); let no_inert_check_preventers = f.inert_check_preventers.load(Ordering::Relaxed) == 0; - tracing::trace!(?facet_id, ?no_kids, ?no_outbound_handles, ?is_root_facet, ?no_linked_tasks, ?no_inert_check_preventers); - no_kids && (no_outbound_handles || is_root_facet) && no_linked_tasks && no_inert_check_preventers + let parent_facet_missing = maybe_parent_id.map_or(false, |p| self.get_facet_immut(p).is_none()); + tracing::trace!(?facet_id, ?no_kids, ?no_outbound_handles, ?is_root_facet, ?no_linked_tasks, ?no_inert_check_preventers, ?parent_facet_missing); + parent_facet_missing || (no_kids && (no_outbound_handles || is_root_facet) && no_linked_tasks && no_inert_check_preventers) } else { tracing::trace!(?facet_id, exists = ?false); false } } - fn insert_retract_cleanup_action( - &mut self, - r: &Arc>, - handle: Handle, - description: Option, - ) { - let r = Arc::clone(r); - self.cleanup_actions.insert( - handle, - CleanupAction::ForAnother(Arc::clone(&r.mailbox), ( - description, - Box::new(move |t| t.with_entity(&r, |t, e| { - tracing::trace!(?handle, "retracted"); - if let Some(f) = t.active_facet() { - f.outbound_handles.remove(&handle); - } - e.retract(t, handle) - }))))); - } - fn cleanup( - &mut self, + mut self, ac_ref: &ActorRef, - exit_status: &Arc, + exit_status: Arc, trace_collector: Option, ) { + if exit_status.is_ok() { + assert!(self.get_facet(self.root).is_none()); + } + let cause = Some(trace::TurnCause::Cleanup); let account = Account::new(Some(AnyValue::symbol("cleanup")), trace_collector.clone()); - let mut t = Activation::make(&ac_ref.facet_ref(self.root), account, cause, trace_collector, self); - if let Err(err) = t._terminate_facet(t.state.root, exit_status.is_ok(), trace::FacetStopReason::ActorStopping) { - // This can only occur as the result of an internal error in this file's code. - tracing::error!(?err, "unexpected error from terminate_facet"); - panic!("Unexpected error result from terminate_facet"); + let mut t = Activation::make(&ac_ref.facet_ref(self.root), + account, + cause, + trace_collector, + &mut self); + + // NB. In descending order so that we retract newer handles first. Since + // facet-tree-order retraction is children before parents, this isn't quite right - a + // parent facet could have made an assertion after some child made an assertion. But + // given that this is for *unclean shutdown*, maybe it's OK? + // + let handles_descending: Vec = t.state.outbound_assertions.keys().rev().cloned().collect(); + tracing::trace!(actor_id=?t.state.actor_id, + handles=?handles_descending, + "remaining to retract at cleanup time"); + for handle in handles_descending.into_iter() { + t.retract(handle); } - // TODO: The linked_tasks are being cancelled above ^ when their Facets drop. - // TODO: We don't want that: we want (? do we?) exit hooks to run before linked_tasks are cancelled. - // TODO: Example: send an error message in an exit_hook that is processed and delivered by a linked_task. + + if let Err(err) = t.commit() { + // This can only happen through an internal error in this module + tracing::error!(?err, "Internal error during RunningActor::cleanup"); + } + for action in std::mem::take(&mut t.state.exit_hooks) { - if let Err(_err) = action(&mut t, &exit_status) { - tracing::error!(?_err, "error in exit hook"); - } + action(&mut t, &exit_status); } } } @@ -2094,12 +2141,10 @@ impl Drop for Actor { ACTORS.write().remove(&self.ac_ref.actor_id); // let _scope = tracing::info_span!(parent: None, "actor", actor_id = ?self.ac_ref.actor_id).entered(); let mut g = self.ac_ref.state.lock(); - if let ActorState::Running(ref mut state) = *g { - tracing::warn!("Force-terminated by Actor::drop"); - let exit_status = - Arc::new(Err(error("Force-terminated by Actor::drop", AnyValue::new(false)))); - state.cleanup(&self.ac_ref, &exit_status, self.trace_collector.clone()); - *g = ActorState::Terminated { exit_status }; + if g.is_running() { + g.terminate(Err(error("Force-terminated by Actor::drop", AnyValue::new(false))), + &self.ac_ref, + &self.trace_collector); } tracing::debug!("Actor::drop"); } @@ -2112,8 +2157,11 @@ impl Drop for Facet { } if !self.outbound_handles.is_empty() { - panic!("Internal error: outbound_handles for {:?} not empty at drop time", - self.facet_id); + tracing::warn!( + concat!("outbound_handles for {:?} not empty at drop time; ", + "retractions will happen at actor termination, ", + "but may not follow facet-tree order"), + self.facet_id); } tracing::trace!(facet_id = ?self.facet_id, "Facet::drop"); @@ -2330,7 +2378,7 @@ where fn stop(&mut self, t: &mut Activation) -> ActorResult { t.with_entity(&self.underlying, |t, e| e.stop(t)) } - fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) -> ActorResult { + fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) { self.underlying.internal_with_entity(|e| e.exit_hook(t, exit_status)) } } diff --git a/syndicate/src/during.rs b/syndicate/src/during.rs index 15969d6..87b4639 100644 --- a/syndicate/src/during.rs +++ b/syndicate/src/during.rs @@ -17,7 +17,7 @@ where Fa: 'static + Send + FnMut(&mut E, &mut Activation, M) -> DuringResult, Fm: 'static + Send + FnMut(&mut E, &mut Activation, M) -> ActorResult, Fs: 'static + Send + FnMut(&mut E, &mut Activation) -> ActorResult, - Fx: 'static + Send + FnMut(&mut E, &mut Activation, &Arc) -> ActorResult, + Fx: 'static + Send + FnMut(&mut E, &mut Activation, &Arc), { state: E, assertion_handler: Option, @@ -54,7 +54,7 @@ pub fn entity( fn (&mut E, &mut Activation, M) -> DuringResult, fn (&mut E, &mut Activation, M) -> ActorResult, fn (&mut E, &mut Activation) -> ActorResult, - fn (&mut E, &mut Activation, &Arc) -> ActorResult> + fn (&mut E, &mut Activation, &Arc)> where E: 'static + Send, { @@ -68,7 +68,7 @@ where Fa: 'static + Send + FnMut(&mut E, &mut Activation, M) -> DuringResult, Fm: 'static + Send + FnMut(&mut E, &mut Activation, M) -> ActorResult, Fs: 'static + Send + FnMut(&mut E, &mut Activation) -> ActorResult, - Fx: 'static + Send + FnMut(&mut E, &mut Activation, &Arc) -> ActorResult, + Fx: 'static + Send + FnMut(&mut E, &mut Activation, &Arc), { pub fn new( state: E, @@ -154,7 +154,7 @@ where pub fn on_exit(self, exit_handler: Fx1) -> DuringEntity where - Fx1: 'static + Send + FnMut(&mut E, &mut Activation, &Arc) -> ActorResult, + Fx1: 'static + Send + FnMut(&mut E, &mut Activation, &Arc), { DuringEntity { state: self.state, @@ -187,7 +187,7 @@ where Fa: 'static + Send + FnMut(&mut E, &mut Activation, AnyValue) -> DuringResult, Fm: 'static + Send + FnMut(&mut E, &mut Activation, AnyValue) -> ActorResult, Fs: 'static + Send + FnMut(&mut E, &mut Activation) -> ActorResult, - Fx: 'static + Send + FnMut(&mut E, &mut Activation, &Arc) -> ActorResult, + Fx: 'static + Send + FnMut(&mut E, &mut Activation, &Arc), { pub fn create_cap(self, t: &mut Activation) -> Arc { @@ -202,7 +202,7 @@ where Fa: 'static + Send + FnMut(&mut E, &mut Activation, M) -> DuringResult, Fm: 'static + Send + FnMut(&mut E, &mut Activation, M) -> ActorResult, Fs: 'static + Send + FnMut(&mut E, &mut Activation) -> ActorResult, - Fx: 'static + Send + FnMut(&mut E, &mut Activation, &Arc) -> ActorResult, + Fx: 'static + Send + FnMut(&mut E, &mut Activation, &Arc), { fn assert(&mut self, t: &mut Activation, a: M, h: Handle) -> ActorResult { match &mut self.assertion_handler { @@ -232,10 +232,9 @@ where } } - fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) -> ActorResult { - match &mut self.exit_handler { - Some(handler) => handler(&mut self.state, t, exit_status), - None => Ok(()), + fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) { + if let Some(handler) = &mut self.exit_handler { + handler(&mut self.state, t, exit_status); } } } diff --git a/syndicate/src/relay.rs b/syndicate/src/relay.rs index ef7582a..d9c3ad6 100644 --- a/syndicate/src/relay.rs +++ b/syndicate/src/relay.rs @@ -88,9 +88,6 @@ pub enum Output { type TunnelRelayRef = Arc>>; -#[derive(Debug)] -struct SendPendingTurn; - // There are other kinds of relay. This one has exactly two participants connected to each other. pub struct TunnelRelay { @@ -99,7 +96,6 @@ pub struct TunnelRelay outbound_assertions: Map>>, membranes: Membranes, pending_outbound: Vec>, - self_entity: Arc>, output: UnboundedSender>>, output_text: bool, } @@ -243,7 +239,6 @@ impl TunnelRelay { next_export_oid: 0, }, pending_outbound: Vec::new(), - self_entity: self_entity.clone(), }; if let Some(ir) = initial_ref { tr.membranes.export_ref(ir).inc_ref(); @@ -404,8 +399,7 @@ impl TunnelRelay { } } } - t.commit(); - Ok(()) + t.commit() } } } @@ -484,7 +478,19 @@ impl TunnelRelay { pub fn send_event(&mut self, t: &mut Activation, remote_oid: sturdy::Oid, event: P::Event) -> ActorResult { if self.pending_outbound.is_empty() { - t.message_for_myself(&self.self_entity, SendPendingTurn); + let self_ref = Arc::clone(&self.self_ref); + t.pre_commit(move |t| { + let mut g = self_ref.lock(); + let tr = g.as_mut().expect("initialized"); + let events = std::mem::take(&mut tr.pending_outbound); + tr.send_packet(&t.account(), + events.len(), + P::Packet::Turn(Box::new(P::Turn(events.clone()))))?; + for P::TurnEvent { oid, event } in events.into_iter() { + tr.outbound_event_bookkeeping(t, sturdy::Oid(oid.0), &event)?; + } + Ok(()) + }); } self.pending_outbound.push(P::TurnEvent { oid: P::Oid(remote_oid.0), event }); Ok(()) @@ -708,26 +714,16 @@ async fn output_loop( } } -impl Entity for TunnelRefEntity { - fn message(&mut self, t: &mut Activation, _m: SendPendingTurn) -> ActorResult { - let mut g = self.relay_ref.lock(); - let tr = g.as_mut().expect("initialized"); - let events = std::mem::take(&mut tr.pending_outbound); - tr.send_packet(&t.account(), events.len(), P::Packet::Turn(Box::new(P::Turn(events.clone()))))?; - for P::TurnEvent { oid, event } in events.into_iter() { - tr.outbound_event_bookkeeping(t, sturdy::Oid(oid.0), &event)?; - } - Ok(()) - } - - fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) -> ActorResult { +impl Entity<()> for TunnelRefEntity { + fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) { if let Err(e) = &**exit_status { let e = e.clone(); let mut g = self.relay_ref.lock(); let tr = g.as_mut().expect("initialized"); - tr.send_packet(&t.account(), 1, P::Packet::Error(Box::new(e)))?; + if let Err(f) = tr.send_packet(&t.account(), 1, P::Packet::Error(Box::new(e))) { + tracing::error!("Failed to send error packet: {:?}", f); + } } - Ok(()) } } diff --git a/syndicate/src/supervise.rs b/syndicate/src/supervise.rs index 8e4a430..bb930c4 100644 --- a/syndicate/src/supervise.rs +++ b/syndicate/src/supervise.rs @@ -72,17 +72,15 @@ impl Entity for Supervisor fn stop(&mut self, t: &mut Activation) -> ActorResult { let _entry = tracing::info_span!("supervisor", name = ?self.child_name).entered(); - let exit_status = - self.ac_ref.take().expect("valid supervisee ActorRef") - .exit_status() - .expect("supervisee to have terminated"); - tracing::debug!(?exit_status); - match exit_status { - Ok(()) if self.config.restart_policy == RestartPolicy::OnErrorOnly => { + match self.ac_ref.take().expect("valid supervisee ActorRef").exit_status() { + None => + tracing::debug!("Supervisor shut down; supervisee will exit soon"), + Some(Ok(())) if self.config.restart_policy == RestartPolicy::OnErrorOnly => { tracing::trace!("Not restarting: normal exit, restart_policy is OnErrorOnly"); t.set(&self.state, State::Complete); }, - _ => { + Some(exit_status) => { + tracing::debug!(?exit_status); tracing::trace!("Restarting: restart_policy is Always or exit was abnormal"); t.set(&self.state, if exit_status.is_ok() { State::Complete } else { State::Failed });