Compare commits

...

2 Commits

1 changed files with 51 additions and 32 deletions

View File

@ -324,6 +324,7 @@ pub struct RunningActor {
tx: UnboundedSender<SystemMessage>,
mailbox: Weak<Mailbox>,
exit_hooks: Vec<Box<dyn Send + FnOnce(&mut Activation, &Arc<ActorResult>) -> ActorResult>>,
cleanup_actions: CleanupActions,
facet_nodes: Map<FacetId, Facet>,
facet_children: Map<FacetId, Set<FacetId>>,
root: FacetId,
@ -353,7 +354,7 @@ pub struct Facet {
pub facet_id: FacetId,
/// The ID of the facet's parent facet, if any; if None, this facet is the `Actor`'s root facet.
pub parent_facet_id: Option<FacetId>,
cleanup_actions: CleanupActions,
outbound_handles: Set<Handle>,
stop_actions: Vec<Action>,
linked_tasks: Map<u64, CancellationToken>,
inert_check_preventers: Arc<AtomicU64>,
@ -530,6 +531,9 @@ impl FacetRef {
tracing::error!(?err, "unexpected error from terminate_facet");
panic!("Unexpected error result from terminate_facet");
}
// TODO: The linked_tasks are being cancelled above ^ when their Facets drop.
// TODO: We don't want that: we want (? do we?) exit hooks to run before linked_tasks are cancelled.
// TODO: Example: send an error message in an exit_hook that is processed and delivered by a linked_task.
for action in std::mem::take(&mut t.state.exit_hooks) {
if let Err(err) = action(&mut t, &exit_status) {
tracing::error!(?err, "error in exit hook");
@ -601,10 +605,11 @@ impl<'activation> Activation<'activation> {
/// Returns the [`Handle`] for the new assertion.
pub fn assert<M: 'static + Send + std::fmt::Debug>(&mut self, r: &Arc<Ref<M>>, a: M) -> Handle {
let handle = next_handle();
if let Some(f) = self.active_facet() {
if let Some(f) = self.state.get_facet(self.facet.facet_id) {
tracing::trace!(?r, ?handle, ?a, "assert");
f.insert_retract_cleanup_action(&r, handle);
f.outbound_handles.insert(handle);
drop(f);
self.state.insert_retract_cleanup_action(&r, handle);
{
let r = Arc::clone(r);
self.pending.queue_for(&r).push(Box::new(
@ -636,17 +641,21 @@ impl<'activation> Activation<'activation> {
let handle = next_handle();
if let Some(f) = self.active_facet() {
tracing::trace!(?r, ?handle, ?a, "assert_for_myself");
f.outbound_handles.insert(handle);
drop(f);
{
let r = Arc::clone(r);
f.cleanup_actions.insert(
self.state.cleanup_actions.insert(
handle,
CleanupAction::ForMyself(Box::new(
move |t| t.with_entity(&r, |t, e| {
tracing::trace!(?handle, "retracted");
if let Some(f) = t.active_facet() {
f.outbound_handles.remove(&handle);
}
e.retract(t, handle)
}))));
}
drop(f);
{
let r = Arc::clone(r);
self.pending.for_myself.push(Box::new(
@ -663,16 +672,15 @@ impl<'activation> Activation<'activation> {
let entity_ref = t_other.create::<AnyValue, _>(StopOnRetract);
let handle = next_handle();
tracing::trace!(?handle, ?entity_ref, "half_link");
self.active_facet().unwrap().insert_retract_cleanup_action(&entity_ref, handle);
self.state.insert_retract_cleanup_action(&entity_ref, handle);
self.active_facet().unwrap().outbound_handles.insert(handle);
t_other.with_entity(&entity_ref, |t, e| e.assert(t, AnyValue::new(true), handle)).unwrap();
}
/// Core API: retract a previously-established assertion.
pub fn retract(&mut self, handle: Handle) {
if let Some(f) = self.active_facet() {
if let Some(d) = f.cleanup_actions.remove(&handle) {
self.pending.execute_cleanup_action(d)
}
if let Some(d) = self.state.cleanup_actions.remove(&handle) {
self.pending.execute_cleanup_action(d)
}
}
@ -988,10 +996,9 @@ impl<'activation> Activation<'activation> {
action(t)?;
}
let parent_facet_id = f.parent_facet_id;
// if !alive, the drop will happen at the end of this function, but we
// need it to happen right here so that child-facet cleanup-actions are
// performed before parent-facet cleanup-actions.
drop(f);
f.retract_outbound(t);
// ^ we need retraction to happen right here so that child-facet
// cleanup-actions are performed before parent-facet cleanup-actions.
if let Some(p) = parent_facet_id {
if t.state.facet_exists_and_is_inert(p) {
t._terminate_facet(p, true)?;
@ -999,6 +1006,8 @@ impl<'activation> Activation<'activation> {
} else {
t.state.shutdown();
}
} else {
f.retract_outbound(t);
}
Ok(())
})
@ -1192,6 +1201,7 @@ impl Actor {
tx,
mailbox: Weak::new(),
exit_hooks: Vec::new(),
cleanup_actions: Map::new(),
facet_nodes: Map::new(),
facet_children: Map::new(),
root: root.facet_id,
@ -1298,7 +1308,7 @@ impl Facet {
Facet {
facet_id: next_facet_id(),
parent_facet_id,
cleanup_actions: Map::new(),
outbound_handles: Set::new(),
stop_actions: Vec::new(),
linked_tasks: Map::new(),
inert_check_preventers: Arc::new(AtomicU64::new(0)),
@ -1320,15 +1330,11 @@ impl Facet {
}
}
fn insert_retract_cleanup_action<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, handle: Handle) {
let r = Arc::clone(r);
self.cleanup_actions.insert(
handle,
CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new(
move |t| t.with_entity(&r, |t, e| {
tracing::trace!(?handle, "retracted");
e.retract(t, handle)
}))));
fn retract_outbound(&mut self, t: &mut Activation) {
for handle in std::mem::take(&mut self.outbound_handles).into_iter() {
tracing::trace!(h = ?handle, "retract on termination");
t.retract(handle);
}
}
}
@ -1419,13 +1425,31 @@ impl RunningActor {
let no_kids = self.facet_children.get(&facet_id).map(|cs| cs.is_empty()).unwrap_or(true);
if let Some(f) = self.get_facet(facet_id) {
no_kids &&
f.cleanup_actions.is_empty() &&
f.outbound_handles.is_empty() &&
f.linked_tasks.is_empty() &&
f.inert_check_preventers.load(Ordering::Relaxed) == 0
} else {
false
}
}
fn insert_retract_cleanup_action<M: 'static + Send>(
&mut self,
r: &Arc<Ref<M>>,
handle: Handle,
) {
let r = Arc::clone(r);
self.cleanup_actions.insert(
handle,
CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new(
move |t| t.with_entity(&r, |t, e| {
tracing::trace!(?handle, "retracted");
if let Some(f) = t.active_facet() {
f.outbound_handles.remove(&handle);
}
e.retract(t, handle)
}))));
}
}
impl Drop for Actor {
@ -1441,13 +1465,8 @@ impl Drop for Facet {
token.cancel();
}
let to_clear = std::mem::take(&mut self.cleanup_actions);
{
let mut b = EventBuffer::new(Account::new(crate::name!("drop")));
for (_handle, r) in to_clear.into_iter() {
tracing::trace!(h = ?_handle, "retract on termination");
b.execute_cleanup_action(r);
}
if !self.outbound_handles.is_empty() {
panic!("Internal error: outbound_handles not empty at drop time");
}
tracing::trace!(facet_id = ?self.facet_id, "Facet::drop");