Avoid racy approaches to actor-termination.

They're still there: you can use turn.state.shutdown(), which enqueues
a message for eventual actor shutdown. But it's better to use
turn.stop_root(), which terminates the actor's root facet within the
current turn, ensuring that the actor's exit_status is definitely set
by the time the turn has committed.

This is necessary to avoid a racy panic in supervision: before this
change, an asynchronous SystemMessage::Release was sent when the last
facet of an actor was stopped. Depending on load (!), any retractions
resulting from the shutdown would be delivered before the Release
arrived at the stopping actor. The supervision logic expected
exit_status to be definitely set by the time release() fired, which
wasn't always true. Now that in-turn shutdown has been implemented,
this is a reliable invariant.

A knock-on change is the need to remove
enqueue_for_myself_at_commit(), replacing it with a use of
pending.for_myself.push(). The old enqueue_for_myself_at_commit
approach could lead to lost actions as follows:

    A: start linked task T, which spawns a new tokio coroutine
            T: activate some facet in A and terminate A's root facet
            T: at this point, A transitions to "not running"
    A: spawn B, enqueuing a call to B's boot()
    A: commit turn. Deliveries for others go out as usual,
       but those for A will be discarded since A is "not running".
       This means that the call to B's boot() goes missing.

Using pending.for_myself.push() instead assures that B's boot will
always run at the end of A's turn, without regard for whether A is in
some terminated state.

I think that this kind of race could have happened before, but
something about switching away from shutdown() seems to trigger it
somewhat reliably.
This commit is contained in:
Tony Garnock-Jones 2022-01-10 12:52:29 +01:00
parent e06e5fef10
commit 2d179d1e46
1 changed files with 16 additions and 11 deletions

View File

@ -1001,11 +1001,6 @@ impl<'activation> Activation<'activation> {
});
}
fn enqueue_for_myself_at_commit(&mut self, action: Action) {
let mailbox = self.state.mailbox();
self.pending.queue_for_mailbox(&mailbox).push(action);
}
/// Schedule the creation of a new actor when the Activation commits.
pub fn spawn<F: 'static + Send + FnOnce(&mut Activation) -> ActorResult>(
&mut self,
@ -1014,7 +1009,7 @@ impl<'activation> Activation<'activation> {
) -> ActorRef {
let ac = Actor::new(Some(self.state.actor_id));
let ac_ref = ac.ac_ref.clone();
self.enqueue_for_myself_at_commit(Box::new(move |_| {
self.pending.for_myself.push(Box::new(move |_| {
ac.boot(name, boot);
Ok(())
}));
@ -1034,7 +1029,7 @@ impl<'activation> Activation<'activation> {
let ac = Actor::new(Some(self.state.actor_id));
let ac_ref = ac.ac_ref.clone();
let facet_id = self.facet.facet_id;
self.enqueue_for_myself_at_commit(Box::new(move |t| {
self.pending.for_myself.push(Box::new(move |t| {
t.with_facet(true, facet_id, move |t| {
ac.link(t).boot(name, boot);
Ok(())
@ -1128,7 +1123,7 @@ impl<'activation> Activation<'activation> {
fn stop_if_inert(&mut self) {
let facet_id = self.facet.facet_id;
self.enqueue_for_myself_at_commit(Box::new(move |t| {
self.pending.for_myself.push(Box::new(move |t| {
tracing::trace!("Checking inertness of facet {} from facet {}", facet_id, t.facet.facet_id);
if t.state.facet_exists_and_is_inert(facet_id) {
tracing::trace!(" - facet {} is inert, stopping it", facet_id);
@ -1174,8 +1169,7 @@ impl<'activation> Activation<'activation> {
tracing::trace!("not terminating parent {:?} of facet {:?}", p, facet_id);
}
} else {
tracing::trace!("terminating actor of root facet {:?}", facet_id);
t.state.shutdown();
tracing::trace!("no parent of root facet {:?} to terminate", facet_id);
}
} else {
f.retract_outbound(t);
@ -1312,14 +1306,25 @@ impl<'activation> Activation<'activation> {
}
fn restore_invariants(&mut self, d: RunDisposition) -> RunDisposition {
match d {
let d = match d {
RunDisposition::Continue =>
self._restore_invariants().into(),
RunDisposition::Terminate(Ok(())) =>
RunDisposition::Terminate(self._restore_invariants()),
RunDisposition::Terminate(Err(_)) =>
d,
};
// If we would otherwise continue, check the root facet: is it still alive?
// If not, then the whole actor should terminate now.
if let RunDisposition::Continue = d {
if let None = self.state.get_facet(self.state.root) {
tracing::trace!("terminating actor because root facet no longer exists");
return RunDisposition::Terminate(Ok(()));
}
}
d
}
}