Compare commits
2 Commits
9f3b9cfd59
...
a7cb035b45
Author | SHA1 | Date |
---|---|---|
Tony Garnock-Jones | a7cb035b45 | |
Tony Garnock-Jones | 2cb72cd020 |
|
@ -324,6 +324,7 @@ pub struct RunningActor {
|
||||||
tx: UnboundedSender<SystemMessage>,
|
tx: UnboundedSender<SystemMessage>,
|
||||||
mailbox: Weak<Mailbox>,
|
mailbox: Weak<Mailbox>,
|
||||||
exit_hooks: Vec<Box<dyn Send + FnOnce(&mut Activation, &Arc<ActorResult>) -> ActorResult>>,
|
exit_hooks: Vec<Box<dyn Send + FnOnce(&mut Activation, &Arc<ActorResult>) -> ActorResult>>,
|
||||||
|
cleanup_actions: CleanupActions,
|
||||||
facet_nodes: Map<FacetId, Facet>,
|
facet_nodes: Map<FacetId, Facet>,
|
||||||
facet_children: Map<FacetId, Set<FacetId>>,
|
facet_children: Map<FacetId, Set<FacetId>>,
|
||||||
root: FacetId,
|
root: FacetId,
|
||||||
|
@ -353,7 +354,7 @@ pub struct Facet {
|
||||||
pub facet_id: FacetId,
|
pub facet_id: FacetId,
|
||||||
/// The ID of the facet's parent facet, if any; if None, this facet is the `Actor`'s root facet.
|
/// The ID of the facet's parent facet, if any; if None, this facet is the `Actor`'s root facet.
|
||||||
pub parent_facet_id: Option<FacetId>,
|
pub parent_facet_id: Option<FacetId>,
|
||||||
cleanup_actions: CleanupActions,
|
outbound_handles: Set<Handle>,
|
||||||
stop_actions: Vec<Action>,
|
stop_actions: Vec<Action>,
|
||||||
linked_tasks: Map<u64, CancellationToken>,
|
linked_tasks: Map<u64, CancellationToken>,
|
||||||
inert_check_preventers: Arc<AtomicU64>,
|
inert_check_preventers: Arc<AtomicU64>,
|
||||||
|
@ -530,6 +531,9 @@ impl FacetRef {
|
||||||
tracing::error!(?err, "unexpected error from terminate_facet");
|
tracing::error!(?err, "unexpected error from terminate_facet");
|
||||||
panic!("Unexpected error result from terminate_facet");
|
panic!("Unexpected error result from terminate_facet");
|
||||||
}
|
}
|
||||||
|
// TODO: The linked_tasks are being cancelled above ^ when their Facets drop.
|
||||||
|
// TODO: We don't want that: we want (? do we?) exit hooks to run before linked_tasks are cancelled.
|
||||||
|
// TODO: Example: send an error message in an exit_hook that is processed and delivered by a linked_task.
|
||||||
for action in std::mem::take(&mut t.state.exit_hooks) {
|
for action in std::mem::take(&mut t.state.exit_hooks) {
|
||||||
if let Err(err) = action(&mut t, &exit_status) {
|
if let Err(err) = action(&mut t, &exit_status) {
|
||||||
tracing::error!(?err, "error in exit hook");
|
tracing::error!(?err, "error in exit hook");
|
||||||
|
@ -601,10 +605,11 @@ impl<'activation> Activation<'activation> {
|
||||||
/// Returns the [`Handle`] for the new assertion.
|
/// Returns the [`Handle`] for the new assertion.
|
||||||
pub fn assert<M: 'static + Send + std::fmt::Debug>(&mut self, r: &Arc<Ref<M>>, a: M) -> Handle {
|
pub fn assert<M: 'static + Send + std::fmt::Debug>(&mut self, r: &Arc<Ref<M>>, a: M) -> Handle {
|
||||||
let handle = next_handle();
|
let handle = next_handle();
|
||||||
if let Some(f) = self.active_facet() {
|
if let Some(f) = self.state.get_facet(self.facet.facet_id) {
|
||||||
tracing::trace!(?r, ?handle, ?a, "assert");
|
tracing::trace!(?r, ?handle, ?a, "assert");
|
||||||
f.insert_retract_cleanup_action(&r, handle);
|
f.outbound_handles.insert(handle);
|
||||||
drop(f);
|
drop(f);
|
||||||
|
self.state.insert_retract_cleanup_action(&r, handle);
|
||||||
{
|
{
|
||||||
let r = Arc::clone(r);
|
let r = Arc::clone(r);
|
||||||
self.pending.queue_for(&r).push(Box::new(
|
self.pending.queue_for(&r).push(Box::new(
|
||||||
|
@ -636,17 +641,21 @@ impl<'activation> Activation<'activation> {
|
||||||
let handle = next_handle();
|
let handle = next_handle();
|
||||||
if let Some(f) = self.active_facet() {
|
if let Some(f) = self.active_facet() {
|
||||||
tracing::trace!(?r, ?handle, ?a, "assert_for_myself");
|
tracing::trace!(?r, ?handle, ?a, "assert_for_myself");
|
||||||
|
f.outbound_handles.insert(handle);
|
||||||
|
drop(f);
|
||||||
{
|
{
|
||||||
let r = Arc::clone(r);
|
let r = Arc::clone(r);
|
||||||
f.cleanup_actions.insert(
|
self.state.cleanup_actions.insert(
|
||||||
handle,
|
handle,
|
||||||
CleanupAction::ForMyself(Box::new(
|
CleanupAction::ForMyself(Box::new(
|
||||||
move |t| t.with_entity(&r, |t, e| {
|
move |t| t.with_entity(&r, |t, e| {
|
||||||
tracing::trace!(?handle, "retracted");
|
tracing::trace!(?handle, "retracted");
|
||||||
|
if let Some(f) = t.active_facet() {
|
||||||
|
f.outbound_handles.remove(&handle);
|
||||||
|
}
|
||||||
e.retract(t, handle)
|
e.retract(t, handle)
|
||||||
}))));
|
}))));
|
||||||
}
|
}
|
||||||
drop(f);
|
|
||||||
{
|
{
|
||||||
let r = Arc::clone(r);
|
let r = Arc::clone(r);
|
||||||
self.pending.for_myself.push(Box::new(
|
self.pending.for_myself.push(Box::new(
|
||||||
|
@ -663,16 +672,15 @@ impl<'activation> Activation<'activation> {
|
||||||
let entity_ref = t_other.create::<AnyValue, _>(StopOnRetract);
|
let entity_ref = t_other.create::<AnyValue, _>(StopOnRetract);
|
||||||
let handle = next_handle();
|
let handle = next_handle();
|
||||||
tracing::trace!(?handle, ?entity_ref, "half_link");
|
tracing::trace!(?handle, ?entity_ref, "half_link");
|
||||||
self.active_facet().unwrap().insert_retract_cleanup_action(&entity_ref, handle);
|
self.state.insert_retract_cleanup_action(&entity_ref, handle);
|
||||||
|
self.active_facet().unwrap().outbound_handles.insert(handle);
|
||||||
t_other.with_entity(&entity_ref, |t, e| e.assert(t, AnyValue::new(true), handle)).unwrap();
|
t_other.with_entity(&entity_ref, |t, e| e.assert(t, AnyValue::new(true), handle)).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Core API: retract a previously-established assertion.
|
/// Core API: retract a previously-established assertion.
|
||||||
pub fn retract(&mut self, handle: Handle) {
|
pub fn retract(&mut self, handle: Handle) {
|
||||||
if let Some(f) = self.active_facet() {
|
if let Some(d) = self.state.cleanup_actions.remove(&handle) {
|
||||||
if let Some(d) = f.cleanup_actions.remove(&handle) {
|
self.pending.execute_cleanup_action(d)
|
||||||
self.pending.execute_cleanup_action(d)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -988,10 +996,9 @@ impl<'activation> Activation<'activation> {
|
||||||
action(t)?;
|
action(t)?;
|
||||||
}
|
}
|
||||||
let parent_facet_id = f.parent_facet_id;
|
let parent_facet_id = f.parent_facet_id;
|
||||||
// if !alive, the drop will happen at the end of this function, but we
|
f.retract_outbound(t);
|
||||||
// need it to happen right here so that child-facet cleanup-actions are
|
// ^ we need retraction to happen right here so that child-facet
|
||||||
// performed before parent-facet cleanup-actions.
|
// cleanup-actions are performed before parent-facet cleanup-actions.
|
||||||
drop(f);
|
|
||||||
if let Some(p) = parent_facet_id {
|
if let Some(p) = parent_facet_id {
|
||||||
if t.state.facet_exists_and_is_inert(p) {
|
if t.state.facet_exists_and_is_inert(p) {
|
||||||
t._terminate_facet(p, true)?;
|
t._terminate_facet(p, true)?;
|
||||||
|
@ -999,6 +1006,8 @@ impl<'activation> Activation<'activation> {
|
||||||
} else {
|
} else {
|
||||||
t.state.shutdown();
|
t.state.shutdown();
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
f.retract_outbound(t);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
|
@ -1192,6 +1201,7 @@ impl Actor {
|
||||||
tx,
|
tx,
|
||||||
mailbox: Weak::new(),
|
mailbox: Weak::new(),
|
||||||
exit_hooks: Vec::new(),
|
exit_hooks: Vec::new(),
|
||||||
|
cleanup_actions: Map::new(),
|
||||||
facet_nodes: Map::new(),
|
facet_nodes: Map::new(),
|
||||||
facet_children: Map::new(),
|
facet_children: Map::new(),
|
||||||
root: root.facet_id,
|
root: root.facet_id,
|
||||||
|
@ -1298,7 +1308,7 @@ impl Facet {
|
||||||
Facet {
|
Facet {
|
||||||
facet_id: next_facet_id(),
|
facet_id: next_facet_id(),
|
||||||
parent_facet_id,
|
parent_facet_id,
|
||||||
cleanup_actions: Map::new(),
|
outbound_handles: Set::new(),
|
||||||
stop_actions: Vec::new(),
|
stop_actions: Vec::new(),
|
||||||
linked_tasks: Map::new(),
|
linked_tasks: Map::new(),
|
||||||
inert_check_preventers: Arc::new(AtomicU64::new(0)),
|
inert_check_preventers: Arc::new(AtomicU64::new(0)),
|
||||||
|
@ -1320,15 +1330,11 @@ impl Facet {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn insert_retract_cleanup_action<M: 'static + Send>(&mut self, r: &Arc<Ref<M>>, handle: Handle) {
|
fn retract_outbound(&mut self, t: &mut Activation) {
|
||||||
let r = Arc::clone(r);
|
for handle in std::mem::take(&mut self.outbound_handles).into_iter() {
|
||||||
self.cleanup_actions.insert(
|
tracing::trace!(h = ?handle, "retract on termination");
|
||||||
handle,
|
t.retract(handle);
|
||||||
CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new(
|
}
|
||||||
move |t| t.with_entity(&r, |t, e| {
|
|
||||||
tracing::trace!(?handle, "retracted");
|
|
||||||
e.retract(t, handle)
|
|
||||||
}))));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1419,13 +1425,31 @@ impl RunningActor {
|
||||||
let no_kids = self.facet_children.get(&facet_id).map(|cs| cs.is_empty()).unwrap_or(true);
|
let no_kids = self.facet_children.get(&facet_id).map(|cs| cs.is_empty()).unwrap_or(true);
|
||||||
if let Some(f) = self.get_facet(facet_id) {
|
if let Some(f) = self.get_facet(facet_id) {
|
||||||
no_kids &&
|
no_kids &&
|
||||||
f.cleanup_actions.is_empty() &&
|
f.outbound_handles.is_empty() &&
|
||||||
f.linked_tasks.is_empty() &&
|
f.linked_tasks.is_empty() &&
|
||||||
f.inert_check_preventers.load(Ordering::Relaxed) == 0
|
f.inert_check_preventers.load(Ordering::Relaxed) == 0
|
||||||
} else {
|
} else {
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn insert_retract_cleanup_action<M: 'static + Send>(
|
||||||
|
&mut self,
|
||||||
|
r: &Arc<Ref<M>>,
|
||||||
|
handle: Handle,
|
||||||
|
) {
|
||||||
|
let r = Arc::clone(r);
|
||||||
|
self.cleanup_actions.insert(
|
||||||
|
handle,
|
||||||
|
CleanupAction::ForAnother(Arc::clone(&r.mailbox), Box::new(
|
||||||
|
move |t| t.with_entity(&r, |t, e| {
|
||||||
|
tracing::trace!(?handle, "retracted");
|
||||||
|
if let Some(f) = t.active_facet() {
|
||||||
|
f.outbound_handles.remove(&handle);
|
||||||
|
}
|
||||||
|
e.retract(t, handle)
|
||||||
|
}))));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for Actor {
|
impl Drop for Actor {
|
||||||
|
@ -1441,13 +1465,8 @@ impl Drop for Facet {
|
||||||
token.cancel();
|
token.cancel();
|
||||||
}
|
}
|
||||||
|
|
||||||
let to_clear = std::mem::take(&mut self.cleanup_actions);
|
if !self.outbound_handles.is_empty() {
|
||||||
{
|
panic!("Internal error: outbound_handles not empty at drop time");
|
||||||
let mut b = EventBuffer::new(Account::new(crate::name!("drop")));
|
|
||||||
for (_handle, r) in to_clear.into_iter() {
|
|
||||||
tracing::trace!(h = ?_handle, "retract on termination");
|
|
||||||
b.execute_cleanup_action(r);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::trace!(facet_id = ?self.facet_id, "Facet::drop");
|
tracing::trace!(facet_id = ?self.facet_id, "Facet::drop");
|
||||||
|
|
Loading…
Reference in New Issue