wait_for_all_actors_to_stop

This commit is contained in:
Tony Garnock-Jones 2021-10-08 16:37:26 +02:00
parent baf98d6c54
commit 4713005997
2 changed files with 43 additions and 0 deletions

View File

@ -198,5 +198,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}).await??;
wait_for_all_actors_to_stop(std::time::Duration::from_secs(2)).await;
Ok(())
}

View File

@ -2005,6 +2005,47 @@ impl<F: Send + FnMut(&mut Activation) -> ActorResult> Entity<Synced> for F {
}
}
async fn wait_loop(wait_time: time::Duration) {
let deadline = time::Instant::now() + wait_time;
while time::Instant::now() < deadline {
let remaining_count = ACTORS.read().len();
if remaining_count == 0 {
break;
}
tracing::debug!("Waiting for {} remaining actors to stop", remaining_count);
tokio::time::sleep(time::Duration::from_millis(100)).await;
}
}
pub async fn wait_for_all_actors_to_stop(wait_time: time::Duration) {
wait_loop(wait_time).await;
let remaining = ACTORS.read().clone();
if remaining.len() > 0 {
tracing::warn!("Some actors remain after {:?}:", wait_time);
for (name, actor) in remaining.into_values() {
let _entry = name.enter();
tracing::warn!(?actor, "still running, requesting shutdown");
let g = actor.state.lock();
if let ActorState::Running(state) = &*g {
state.shutdown();
}
}
wait_loop(wait_time).await;
let remaining = ACTORS.read().clone();
if remaining.len() > 0 {
tracing::error!("Some actors failed to stop after being explicitly shut down:");
for (name, actor) in remaining.into_values() {
let _entry = name.enter();
tracing::error!(?actor, "failed to stop");
}
} else {
tracing::debug!("All remaining actors have stopped.");
}
} else {
tracing::debug!("All remaining actors have stopped.");
}
}
/// A convenient Syndicate-enhanced variation on
/// [`tracing::info_span`].
///