Compare commits

...

2 Commits

1 changed files with 51 additions and 32 deletions

View File

@ -324,6 +324,7 @@ pub struct RunningActor {
tx: UnboundedSender<SystemMessage>, tx: UnboundedSender<SystemMessage>,
mailbox: Weak<Mailbox>, mailbox: Weak<Mailbox>,
exit_hooks: Vec<Box<dyn Send + FnOnce(&mut Activation, &Arc<ActorResult>) -> ActorResult>>, exit_hooks: Vec<Box<dyn Send + FnOnce(&mut Activation, &Arc<ActorResult>) -> ActorResult>>,
cleanup_actions: CleanupActions,
facet_nodes: Map<FacetId, Facet>, facet_nodes: Map<FacetId, Facet>,
facet_children: Map<FacetId, Set<FacetId>>, facet_children: Map<FacetId, Set<FacetId>>,
root: FacetId, root: FacetId,
@ -353,7 +354,7 @@ pub struct Facet {
pub facet_id: FacetId, pub facet_id: FacetId,
/// The ID of the facet's parent facet, if any; if None, this facet is the `Actor`'s root facet. /// The ID of the facet's parent facet, if any; if None, this facet is the `Actor`'s root facet.
pub parent_facet_id: Option<FacetId>, pub parent_facet_id: Option<FacetId>,
cleanup_actions: CleanupActions, outbound_handles: Set<Handle>,
stop_actions: Vec<Action>, stop_actions: Vec<Action>,
linked_tasks: Map<u64, CancellationToken>, linked_tasks: Map<u64, CancellationToken>,
inert_check_preventers: Arc<AtomicU64>, inert_check_preventers: Arc<AtomicU64>,
@ -530,6 +531,9 @@ impl FacetRef {
tracing::error!(?err, "unexpected error from terminate_facet"); tracing::error!(?err, "unexpected error from terminate_facet");
panic!("Unexpected error result from terminate_facet"); panic!("Unexpected error result from terminate_facet");
} }
// TODO: The linked_tasks are being cancelled above ^ when their Facets drop.
// TODO: We don't want that: we want (? do we?) exit hooks to run before linked_tasks are cancelled.
// TODO: Example: send an error message in an exit_hook that is processed and delivered by a linked_task.
for action in std::mem::take(&mut t.state.exit_hooks) { for action in std::mem::take(&mut t.state.exit_hooks) {
if let Err(err) = action(&mut t, &exit_status) { if let Err(err) = action(&mut t, &exit_status) {
tracing::error!(?err, "error in exit hook"); tracing::error!(?err, "error in exit hook");
@ -601,10 +605,11 @@ impl<'activation> Activation<'activation> {
/// Returns the [`Handle`] for the new assertion. /// Returns the [`Handle`] for the new assertion.
pub fn assert<M: 'static + Send + std::fmt::Debug>(&mut self, r: &Arc<Ref<M>>, a: M) -> Handle { pub fn assert<M: 'static + Send + std::fmt::Debug>(&mut self, r: &Arc<Ref<M>>, a: M) -> Handle {
let handle = next_handle(); let handle = next_handle();
if let Some(f) = self.active_facet() { if let Some(f) = self.state.get_facet(self.facet.facet_id) {
tracing::trace!(?r, ?handle, ?a, "assert"); tracing::trace!(?r, ?handle, ?a, "assert");
f.insert_retract_cleanup_action(&r, handle); f.outbound_handles.insert(handle);
drop(f); drop(f);
self.state.insert_retract_cleanup_action(&r, handle);
{ {
let r = Arc::clone(r); let r = Arc::clone(r);
self.pending.queue_for(&r).push(Box::new( self.pending.queue_for(&r).push(Box::new(
@ -636,17 +641,21 @@ impl<'activation> Activation<'activation> {
let handle = next_handle(); let handle = next_handle();
if let Some(f) = self.active_facet() { if let Some(f) = self.active_facet() {
tracing::trace!(?r, ?handle, ?a, "assert_for_myself"); tracing::trace!(?r, ?handle, ?a, "assert_for_myself");
f.outbound_handles.insert(handle);
drop(f);
{ {
let r = Arc::clone(r); let r = Arc::clone(r);
f.cleanup_actions.insert( self.state.cleanup_actions.insert(
handle, handle,
CleanupAction::ForMyself(Box::new( CleanupAction::ForMyself(Box::new(
move |t| t.with_entity(&r, |t, e| { move |t| t.with_entity(&r, |t, e| {
tracing::trace!(?handle, "retracted"); tracing::trace!(?handle, "retracted");
if let Some(f) = t.active_facet() {
f.outbound_handles.remove(&handle);
}
e.retract(t, handle) e.retract(t, handle)
})))); }))));
} }
drop(f);
{ {
let r = Arc::clone(r); let r = Arc::clone(r);
self.pending.for_myself.push(Box::new( self.pending.for_myself.push(Box::new(
@ -663,16 +672,15 @@ impl<'activation> Activation<'activation> {
let entity_ref = t_other.create::<AnyValue, _>(StopOnRetract); let entity_ref = t_other.create::<AnyValue, _>(StopOnRetract);
let handle = next_handle(); let handle = next_handle();
tracing::trace!(?handle, ?entity_ref, "half_link"); tracing::trace!(?handle, ?entity_ref, "half_link");
self.active_facet().unwrap().insert_retract_cleanup_action(&entity_ref, handle); self.state.insert_retract_cleanup_action(&entity_ref, handle);
self.active_facet().unwrap().outbound_handles.insert(handle);
t_other.with_entity(&entity_ref, |t, e| e.assert(t, AnyValue::new(true), handle)).unwrap(); t_other.with_entity(&entity_ref, |t, e| e.assert(t, AnyValue::new(true), handle)).unwrap();
} }
/// Core API: retract a previously-established assertion. /// Core API: retract a previously-established assertion.
pub fn retract(&mut self, handle: Handle) { pub fn retract(&mut self, handle: Handle) {
if let Some(f) = self.active_facet() { if let Some(d) = self.state.cleanup_actions.remove(&handle) {
if let Some(d) = f.cleanup_actions.remove(&handle) { self.pending.execute_cleanup_action(d)
self.pending.execute_cleanup_action(d)
}
} }
} }
@ -988,10 +996,9 @@ impl<'activation> Activation<'activation> {
action(t)?; action(t)?;
} }
let parent_facet_id = f.parent_facet_id; let parent_facet_id = f.parent_facet_id;
// if !alive, the drop will happen at the end of this function, but we f.retract_outbound(t);
// need it to happen right here so that child-facet cleanup-actions are // ^ we need retraction to happen right here so that child-facet
// performed before parent-facet cleanup-actions. // cleanup-actions are performed before parent-facet cleanup-actions.
drop(f);
if let Some(p) = parent_facet_id { if let Some(p) = parent_facet_id {
if t.state.facet_exists_and_is_inert(p) { if t.state.facet_exists_and_is_inert(p) {
t._terminate_facet(p, true)?; t._terminate_facet(p, true)?;
@ -999,6 +1006,8 @@ impl<'activation> Activation<'activation> {
} else { } else {
t.state.shutdown(); t.state.shutdown();
} }
} else {
f.retract_outbound(t);
} }
Ok(()) Ok(())
}) })
@ -1192,6 +1201,7 @@ impl Actor {
tx, tx,
mailbox: Weak::new(), mailbox: Weak::new(),
exit_hooks: Vec::new(), exit_hooks: Vec::new(),
cleanup_actions: Map::new(),
facet_nodes: Map::new(), facet_nodes: Map::new(),
facet_children: Map::new(), facet_children: Map::new(),
root: root.facet_id, root: root.facet_id,
@ -1298,7 +1308,7 @@ impl Facet {
Facet { Facet {
facet_id: next_facet_id(), facet_id: next_facet_id(),
parent_facet_id, parent_facet_id,
cleanup_actions: Map::new(), outbound_handles: Set::new(),
stop_actions: Vec::new(), stop_actions: Vec::new(),
linked_tasks: Map::new(), linked_tasks: Map::new(),
inert_check_preventers: Arc::new(AtomicU64::new(0)), inert_check_preventers: Arc::new(AtomicU64::new(0)),
@ -1320,15 +1330,11 @@ impl Facet {
} }
} }
fn insert_retract_cleanup_action<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, handle: Handle) { fn retract_outbound(&mut self, t: &mut Activation) {
let r = Arc::clone(r); for handle in std::mem::take(&mut self.outbound_handles).into_iter() {
self.cleanup_actions.insert( tracing::trace!(h = ?handle, "retract on termination");
handle, t.retract(handle);
CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new( }
move |t| t.with_entity(&r, |t, e| {
tracing::trace!(?handle, "retracted");
e.retract(t, handle)
}))));
} }
} }
@ -1419,13 +1425,31 @@ impl RunningActor {
let no_kids = self.facet_children.get(&facet_id).map(|cs| cs.is_empty()).unwrap_or(true); let no_kids = self.facet_children.get(&facet_id).map(|cs| cs.is_empty()).unwrap_or(true);
if let Some(f) = self.get_facet(facet_id) { if let Some(f) = self.get_facet(facet_id) {
no_kids && no_kids &&
f.cleanup_actions.is_empty() && f.outbound_handles.is_empty() &&
f.linked_tasks.is_empty() && f.linked_tasks.is_empty() &&
f.inert_check_preventers.load(Ordering::Relaxed) == 0 f.inert_check_preventers.load(Ordering::Relaxed) == 0
} else { } else {
false false
} }
} }
fn insert_retract_cleanup_action<M: 'static + Send>(
&mut self,
r: &Arc<Ref<M>>,
handle: Handle,
) {
let r = Arc::clone(r);
self.cleanup_actions.insert(
handle,
CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new(
move |t| t.with_entity(&r, |t, e| {
tracing::trace!(?handle, "retracted");
if let Some(f) = t.active_facet() {
f.outbound_handles.remove(&handle);
}
e.retract(t, handle)
}))));
}
} }
impl Drop for Actor { impl Drop for Actor {
@ -1441,13 +1465,8 @@ impl Drop for Facet {
token.cancel(); token.cancel();
} }
let to_clear = std::mem::take(&mut self.cleanup_actions); if !self.outbound_handles.is_empty() {
{ panic!("Internal error: outbound_handles not empty at drop time");
let mut b = EventBuffer::new(Account::new(crate::name!("drop")));
for (_handle, r) in to_clear.into_iter() {
tracing::trace!(h = ?_handle, "retract on termination");
b.execute_cleanup_action(r);
}
} }
tracing::trace!(facet_id = ?self.facet_id, "Facet::drop"); tracing::trace!(facet_id = ?self.facet_id, "Facet::drop");