Compare commits
2 Commits
9f3b9cfd59
...
a7cb035b45
Author | SHA1 | Date |
---|---|---|
Tony Garnock-Jones | a7cb035b45 | |
Tony Garnock-Jones | 2cb72cd020 |
|
@ -324,6 +324,7 @@ pub struct RunningActor {
|
|||
tx: UnboundedSender<SystemMessage>,
|
||||
mailbox: Weak<Mailbox>,
|
||||
exit_hooks: Vec<Box<dyn Send + FnOnce(&mut Activation, &Arc<ActorResult>) -> ActorResult>>,
|
||||
cleanup_actions: CleanupActions,
|
||||
facet_nodes: Map<FacetId, Facet>,
|
||||
facet_children: Map<FacetId, Set<FacetId>>,
|
||||
root: FacetId,
|
||||
|
@ -353,7 +354,7 @@ pub struct Facet {
|
|||
pub facet_id: FacetId,
|
||||
/// The ID of the facet's parent facet, if any; if None, this facet is the `Actor`'s root facet.
|
||||
pub parent_facet_id: Option<FacetId>,
|
||||
cleanup_actions: CleanupActions,
|
||||
outbound_handles: Set<Handle>,
|
||||
stop_actions: Vec<Action>,
|
||||
linked_tasks: Map<u64, CancellationToken>,
|
||||
inert_check_preventers: Arc<AtomicU64>,
|
||||
|
@ -530,6 +531,9 @@ impl FacetRef {
|
|||
tracing::error!(?err, "unexpected error from terminate_facet");
|
||||
panic!("Unexpected error result from terminate_facet");
|
||||
}
|
||||
// TODO: The linked_tasks are being cancelled above ^ when their Facets drop.
|
||||
// TODO: We don't want that: we want (? do we?) exit hooks to run before linked_tasks are cancelled.
|
||||
// TODO: Example: send an error message in an exit_hook that is processed and delivered by a linked_task.
|
||||
for action in std::mem::take(&mut t.state.exit_hooks) {
|
||||
if let Err(err) = action(&mut t, &exit_status) {
|
||||
tracing::error!(?err, "error in exit hook");
|
||||
|
@ -601,10 +605,11 @@ impl<'activation> Activation<'activation> {
|
|||
/// Returns the [`Handle`] for the new assertion.
|
||||
pub fn assert<M: 'static + Send + std::fmt::Debug>(&mut self, r: &Arc<Ref<M>>, a: M) -> Handle {
|
||||
let handle = next_handle();
|
||||
if let Some(f) = self.active_facet() {
|
||||
if let Some(f) = self.state.get_facet(self.facet.facet_id) {
|
||||
tracing::trace!(?r, ?handle, ?a, "assert");
|
||||
f.insert_retract_cleanup_action(&r, handle);
|
||||
f.outbound_handles.insert(handle);
|
||||
drop(f);
|
||||
self.state.insert_retract_cleanup_action(&r, handle);
|
||||
{
|
||||
let r = Arc::clone(r);
|
||||
self.pending.queue_for(&r).push(Box::new(
|
||||
|
@ -636,17 +641,21 @@ impl<'activation> Activation<'activation> {
|
|||
let handle = next_handle();
|
||||
if let Some(f) = self.active_facet() {
|
||||
tracing::trace!(?r, ?handle, ?a, "assert_for_myself");
|
||||
f.outbound_handles.insert(handle);
|
||||
drop(f);
|
||||
{
|
||||
let r = Arc::clone(r);
|
||||
f.cleanup_actions.insert(
|
||||
self.state.cleanup_actions.insert(
|
||||
handle,
|
||||
CleanupAction::ForMyself(Box::new(
|
||||
move |t| t.with_entity(&r, |t, e| {
|
||||
tracing::trace!(?handle, "retracted");
|
||||
if let Some(f) = t.active_facet() {
|
||||
f.outbound_handles.remove(&handle);
|
||||
}
|
||||
e.retract(t, handle)
|
||||
}))));
|
||||
}
|
||||
drop(f);
|
||||
{
|
||||
let r = Arc::clone(r);
|
||||
self.pending.for_myself.push(Box::new(
|
||||
|
@ -663,16 +672,15 @@ impl<'activation> Activation<'activation> {
|
|||
let entity_ref = t_other.create::<AnyValue, _>(StopOnRetract);
|
||||
let handle = next_handle();
|
||||
tracing::trace!(?handle, ?entity_ref, "half_link");
|
||||
self.active_facet().unwrap().insert_retract_cleanup_action(&entity_ref, handle);
|
||||
self.state.insert_retract_cleanup_action(&entity_ref, handle);
|
||||
self.active_facet().unwrap().outbound_handles.insert(handle);
|
||||
t_other.with_entity(&entity_ref, |t, e| e.assert(t, AnyValue::new(true), handle)).unwrap();
|
||||
}
|
||||
|
||||
/// Core API: retract a previously-established assertion.
|
||||
pub fn retract(&mut self, handle: Handle) {
|
||||
if let Some(f) = self.active_facet() {
|
||||
if let Some(d) = f.cleanup_actions.remove(&handle) {
|
||||
self.pending.execute_cleanup_action(d)
|
||||
}
|
||||
if let Some(d) = self.state.cleanup_actions.remove(&handle) {
|
||||
self.pending.execute_cleanup_action(d)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -988,10 +996,9 @@ impl<'activation> Activation<'activation> {
|
|||
action(t)?;
|
||||
}
|
||||
let parent_facet_id = f.parent_facet_id;
|
||||
// if !alive, the drop will happen at the end of this function, but we
|
||||
// need it to happen right here so that child-facet cleanup-actions are
|
||||
// performed before parent-facet cleanup-actions.
|
||||
drop(f);
|
||||
f.retract_outbound(t);
|
||||
// ^ we need retraction to happen right here so that child-facet
|
||||
// cleanup-actions are performed before parent-facet cleanup-actions.
|
||||
if let Some(p) = parent_facet_id {
|
||||
if t.state.facet_exists_and_is_inert(p) {
|
||||
t._terminate_facet(p, true)?;
|
||||
|
@ -999,6 +1006,8 @@ impl<'activation> Activation<'activation> {
|
|||
} else {
|
||||
t.state.shutdown();
|
||||
}
|
||||
} else {
|
||||
f.retract_outbound(t);
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
|
@ -1192,6 +1201,7 @@ impl Actor {
|
|||
tx,
|
||||
mailbox: Weak::new(),
|
||||
exit_hooks: Vec::new(),
|
||||
cleanup_actions: Map::new(),
|
||||
facet_nodes: Map::new(),
|
||||
facet_children: Map::new(),
|
||||
root: root.facet_id,
|
||||
|
@ -1298,7 +1308,7 @@ impl Facet {
|
|||
Facet {
|
||||
facet_id: next_facet_id(),
|
||||
parent_facet_id,
|
||||
cleanup_actions: Map::new(),
|
||||
outbound_handles: Set::new(),
|
||||
stop_actions: Vec::new(),
|
||||
linked_tasks: Map::new(),
|
||||
inert_check_preventers: Arc::new(AtomicU64::new(0)),
|
||||
|
@ -1320,15 +1330,11 @@ impl Facet {
|
|||
}
|
||||
}
|
||||
|
||||
fn insert_retract_cleanup_action<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, handle: Handle) {
|
||||
let r = Arc::clone(r);
|
||||
self.cleanup_actions.insert(
|
||||
handle,
|
||||
CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new(
|
||||
move |t| t.with_entity(&r, |t, e| {
|
||||
tracing::trace!(?handle, "retracted");
|
||||
e.retract(t, handle)
|
||||
}))));
|
||||
fn retract_outbound(&mut self, t: &mut Activation) {
|
||||
for handle in std::mem::take(&mut self.outbound_handles).into_iter() {
|
||||
tracing::trace!(h = ?handle, "retract on termination");
|
||||
t.retract(handle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1419,13 +1425,31 @@ impl RunningActor {
|
|||
let no_kids = self.facet_children.get(&facet_id).map(|cs| cs.is_empty()).unwrap_or(true);
|
||||
if let Some(f) = self.get_facet(facet_id) {
|
||||
no_kids &&
|
||||
f.cleanup_actions.is_empty() &&
|
||||
f.outbound_handles.is_empty() &&
|
||||
f.linked_tasks.is_empty() &&
|
||||
f.inert_check_preventers.load(Ordering::Relaxed) == 0
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn insert_retract_cleanup_action<M: 'static + Send>(
|
||||
&mut self,
|
||||
r: &Arc<Ref<M>>,
|
||||
handle: Handle,
|
||||
) {
|
||||
let r = Arc::clone(r);
|
||||
self.cleanup_actions.insert(
|
||||
handle,
|
||||
CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new(
|
||||
move |t| t.with_entity(&r, |t, e| {
|
||||
tracing::trace!(?handle, "retracted");
|
||||
if let Some(f) = t.active_facet() {
|
||||
f.outbound_handles.remove(&handle);
|
||||
}
|
||||
e.retract(t, handle)
|
||||
}))));
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Actor {
|
||||
|
@ -1441,13 +1465,8 @@ impl Drop for Facet {
|
|||
token.cancel();
|
||||
}
|
||||
|
||||
let to_clear = std::mem::take(&mut self.cleanup_actions);
|
||||
{
|
||||
let mut b = EventBuffer::new(Account::new(crate::name!("drop")));
|
||||
for (_handle, r) in to_clear.into_iter() {
|
||||
tracing::trace!(h = ?_handle, "retract on termination");
|
||||
b.execute_cleanup_action(r);
|
||||
}
|
||||
if !self.outbound_handles.is_empty() {
|
||||
panic!("Internal error: outbound_handles not empty at drop time");
|
||||
}
|
||||
|
||||
tracing::trace!(facet_id = ?self.facet_id, "Facet::drop");
|
||||
|
|
Loading…
Reference in New Issue