From 2d179d1e4675a6a6ba0e3ef4996e1cf926a6bf8d Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 10 Jan 2022 12:52:29 +0100 Subject: [PATCH] Avoid racy approaches to actor-termination. They're still there: you can use turn.state.shutdown(), which enqueues a message for eventual actor shutdown. But it's better to use turn.stop_root(), which terminates the actor's root facet within the current turn, ensuring that the actor's exit_status is definitely set by the time the turn has committed. This is necessary to avoid a racy panic in supervision: before this change, an asynchronous SystemMessage::Release was sent when the last facet of an actor was stopped. Depending on load (!), any retractions resulting from the shutdown would be delivered before the Release arrived at the stopping actor. The supervision logic expected exit_status to be definitely set by the time release() fired, which wasn't always true. Now that in-turn shutdown has been implemented, this is a reliable invariant. A knock-on change is the need to remove enqueue_for_myself_at_commit(), replacing it with a use of pending.for_myself.push(). The old enqueue_for_myself_at_commit approach could lead to lost actions as follows: A: start linked task T, which spawns a new tokio coroutine T: activate some facet in A and terminate A's root facet T: at this point, A transitions to "not running" A: spawn B, enqueuing a call to B's boot() A: commit turn. Deliveries for others go out as usual, but those for A will be discarded since A is "not running". This means that the call to B's boot() goes missing. Using pending.for_myself.push() instead assures that B's boot will always run at the end of A's turn, without regard for whether A is in some terminated state. I think that this kind of race could have happened before, but something about switching away from shutdown() seems to trigger it somewhat reliably. --- syndicate/src/actor.rs | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index b071a12..8eeb39f 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -1001,11 +1001,6 @@ impl<'activation> Activation<'activation> { }); } - fn enqueue_for_myself_at_commit(&mut self, action: Action) { - let mailbox = self.state.mailbox(); - self.pending.queue_for_mailbox(&mailbox).push(action); - } - /// Schedule the creation of a new actor when the Activation commits. pub fn spawn ActorResult>( &mut self, @@ -1014,7 +1009,7 @@ impl<'activation> Activation<'activation> { ) -> ActorRef { let ac = Actor::new(Some(self.state.actor_id)); let ac_ref = ac.ac_ref.clone(); - self.enqueue_for_myself_at_commit(Box::new(move |_| { + self.pending.for_myself.push(Box::new(move |_| { ac.boot(name, boot); Ok(()) })); @@ -1034,7 +1029,7 @@ impl<'activation> Activation<'activation> { let ac = Actor::new(Some(self.state.actor_id)); let ac_ref = ac.ac_ref.clone(); let facet_id = self.facet.facet_id; - self.enqueue_for_myself_at_commit(Box::new(move |t| { + self.pending.for_myself.push(Box::new(move |t| { t.with_facet(true, facet_id, move |t| { ac.link(t).boot(name, boot); Ok(()) @@ -1128,7 +1123,7 @@ impl<'activation> Activation<'activation> { fn stop_if_inert(&mut self) { let facet_id = self.facet.facet_id; - self.enqueue_for_myself_at_commit(Box::new(move |t| { + self.pending.for_myself.push(Box::new(move |t| { tracing::trace!("Checking inertness of facet {} from facet {}", facet_id, t.facet.facet_id); if t.state.facet_exists_and_is_inert(facet_id) { tracing::trace!(" - facet {} is inert, stopping it", facet_id); @@ -1174,8 +1169,7 @@ impl<'activation> Activation<'activation> { tracing::trace!("not terminating parent {:?} of facet {:?}", p, facet_id); } } else { - tracing::trace!("terminating actor of root facet {:?}", facet_id); - t.state.shutdown(); + tracing::trace!("no parent of root facet {:?} to terminate", facet_id); } } else { f.retract_outbound(t); @@ -1312,14 +1306,25 @@ impl<'activation> Activation<'activation> { } fn restore_invariants(&mut self, d: RunDisposition) -> RunDisposition { - match d { + let d = match d { RunDisposition::Continue => self._restore_invariants().into(), RunDisposition::Terminate(Ok(())) => RunDisposition::Terminate(self._restore_invariants()), RunDisposition::Terminate(Err(_)) => d, + }; + + // If we would otherwise continue, check the root facet: is it still alive? + // If not, then the whole actor should terminate now. + if let RunDisposition::Continue = d { + if let None = self.state.get_facet(self.state.root) { + tracing::trace!("terminating actor because root facet no longer exists"); + return RunDisposition::Terminate(Ok(())); + } } + + d } }