From a7cb035b45ef15b28a1a601653e8c226b266f5bf Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 7 Sep 2021 19:12:32 +0200 Subject: [PATCH] Make it possible to retract a handle from a non-current facet in the current actor --- syndicate/src/actor.rs | 80 +++++++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 32 deletions(-) diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index eb1228b..d9b6f6c 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -324,6 +324,7 @@ pub struct RunningActor { tx: UnboundedSender, mailbox: Weak, exit_hooks: Vec) -> ActorResult>>, + cleanup_actions: CleanupActions, facet_nodes: Map, facet_children: Map>, root: FacetId, @@ -353,7 +354,7 @@ pub struct Facet { pub facet_id: FacetId, /// The ID of the facet's parent facet, if any; if None, this facet is the `Actor`'s root facet. pub parent_facet_id: Option, - cleanup_actions: CleanupActions, + outbound_handles: Set, stop_actions: Vec, linked_tasks: Map, inert_check_preventers: Arc, @@ -604,10 +605,11 @@ impl<'activation> Activation<'activation> { /// Returns the [`Handle`] for the new assertion. pub fn assert(&mut self, r: &Arc>, a: M) -> Handle { let handle = next_handle(); - if let Some(f) = self.active_facet() { + if let Some(f) = self.state.get_facet(self.facet.facet_id) { tracing::trace!(?r, ?handle, ?a, "assert"); - f.insert_retract_cleanup_action(&r, handle); + f.outbound_handles.insert(handle); drop(f); + self.state.insert_retract_cleanup_action(&r, handle); { let r = Arc::clone(r); self.pending.queue_for(&r).push(Box::new( @@ -639,17 +641,21 @@ impl<'activation> Activation<'activation> { let handle = next_handle(); if let Some(f) = self.active_facet() { tracing::trace!(?r, ?handle, ?a, "assert_for_myself"); + f.outbound_handles.insert(handle); + drop(f); { let r = Arc::clone(r); - f.cleanup_actions.insert( + self.state.cleanup_actions.insert( handle, CleanupAction::ForMyself(Box::new( move |t| t.with_entity(&r, |t, e| { tracing::trace!(?handle, "retracted"); + if let Some(f) = t.active_facet() { + f.outbound_handles.remove(&handle); + } e.retract(t, handle) })))); } - drop(f); { let r = Arc::clone(r); self.pending.for_myself.push(Box::new( @@ -666,16 +672,15 @@ impl<'activation> Activation<'activation> { let entity_ref = t_other.create::(StopOnRetract); let handle = next_handle(); tracing::trace!(?handle, ?entity_ref, "half_link"); - self.active_facet().unwrap().insert_retract_cleanup_action(&entity_ref, handle); + self.state.insert_retract_cleanup_action(&entity_ref, handle); + self.active_facet().unwrap().outbound_handles.insert(handle); t_other.with_entity(&entity_ref, |t, e| e.assert(t, AnyValue::new(true), handle)).unwrap(); } /// Core API: retract a previously-established assertion. pub fn retract(&mut self, handle: Handle) { - if let Some(f) = self.active_facet() { - if let Some(d) = f.cleanup_actions.remove(&handle) { - self.pending.execute_cleanup_action(d) - } + if let Some(d) = self.state.cleanup_actions.remove(&handle) { + self.pending.execute_cleanup_action(d) } } @@ -991,10 +996,9 @@ impl<'activation> Activation<'activation> { action(t)?; } let parent_facet_id = f.parent_facet_id; - // if !alive, the drop will happen at the end of this function, but we - // need it to happen right here so that child-facet cleanup-actions are - // performed before parent-facet cleanup-actions. - drop(f); + f.retract_outbound(t); + // ^ we need retraction to happen right here so that child-facet + // cleanup-actions are performed before parent-facet cleanup-actions. if let Some(p) = parent_facet_id { if t.state.facet_exists_and_is_inert(p) { t._terminate_facet(p, true)?; @@ -1002,6 +1006,8 @@ impl<'activation> Activation<'activation> { } else { t.state.shutdown(); } + } else { + f.retract_outbound(t); } Ok(()) }) @@ -1195,6 +1201,7 @@ impl Actor { tx, mailbox: Weak::new(), exit_hooks: Vec::new(), + cleanup_actions: Map::new(), facet_nodes: Map::new(), facet_children: Map::new(), root: root.facet_id, @@ -1301,7 +1308,7 @@ impl Facet { Facet { facet_id: next_facet_id(), parent_facet_id, - cleanup_actions: Map::new(), + outbound_handles: Set::new(), stop_actions: Vec::new(), linked_tasks: Map::new(), inert_check_preventers: Arc::new(AtomicU64::new(0)), @@ -1323,15 +1330,11 @@ impl Facet { } } - fn insert_retract_cleanup_action(&mut self, r: &Arc>, handle: Handle) { - let r = Arc::clone(r); - self.cleanup_actions.insert( - handle, - CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new( - move |t| t.with_entity(&r, |t, e| { - tracing::trace!(?handle, "retracted"); - e.retract(t, handle) - })))); + fn retract_outbound(&mut self, t: &mut Activation) { + for handle in std::mem::take(&mut self.outbound_handles).into_iter() { + tracing::trace!(h = ?handle, "retract on termination"); + t.retract(handle); + } } } @@ -1422,13 +1425,31 @@ impl RunningActor { let no_kids = self.facet_children.get(&facet_id).map(|cs| cs.is_empty()).unwrap_or(true); if let Some(f) = self.get_facet(facet_id) { no_kids && - f.cleanup_actions.is_empty() && + f.outbound_handles.is_empty() && f.linked_tasks.is_empty() && f.inert_check_preventers.load(Ordering::Relaxed) == 0 } else { false } } + + fn insert_retract_cleanup_action( + &mut self, + r: &Arc>, + handle: Handle, + ) { + let r = Arc::clone(r); + self.cleanup_actions.insert( + handle, + CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new( + move |t| t.with_entity(&r, |t, e| { + tracing::trace!(?handle, "retracted"); + if let Some(f) = t.active_facet() { + f.outbound_handles.remove(&handle); + } + e.retract(t, handle) + })))); + } } impl Drop for Actor { @@ -1444,13 +1465,8 @@ impl Drop for Facet { token.cancel(); } - let to_clear = std::mem::take(&mut self.cleanup_actions); - { - let mut b = EventBuffer::new(Account::new(crate::name!("drop"))); - for (_handle, r) in to_clear.into_iter() { - tracing::trace!(h = ?_handle, "retract on termination"); - b.execute_cleanup_action(r); - } + if !self.outbound_handles.is_empty() { + panic!("Internal error: outbound_handles not empty at drop time"); } tracing::trace!(facet_id = ?self.facet_id, "Facet::drop");