diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index 8530d5d..29a71b8 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -198,5 +198,7 @@ async fn main() -> Result<(), Box> { Ok(()) }).await??; + wait_for_all_actors_to_stop(std::time::Duration::from_secs(2)).await; + Ok(()) } diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index 0802879..00cfaab 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -2005,6 +2005,47 @@ impl ActorResult> Entity for F { } } +async fn wait_loop(wait_time: time::Duration) { + let deadline = time::Instant::now() + wait_time; + while time::Instant::now() < deadline { + let remaining_count = ACTORS.read().len(); + if remaining_count == 0 { + break; + } + tracing::debug!("Waiting for {} remaining actors to stop", remaining_count); + tokio::time::sleep(time::Duration::from_millis(100)).await; + } +} + +pub async fn wait_for_all_actors_to_stop(wait_time: time::Duration) { + wait_loop(wait_time).await; + let remaining = ACTORS.read().clone(); + if remaining.len() > 0 { + tracing::warn!("Some actors remain after {:?}:", wait_time); + for (name, actor) in remaining.into_values() { + let _entry = name.enter(); + tracing::warn!(?actor, "still running, requesting shutdown"); + let g = actor.state.lock(); + if let ActorState::Running(state) = &*g { + state.shutdown(); + } + } + wait_loop(wait_time).await; + let remaining = ACTORS.read().clone(); + if remaining.len() > 0 { + tracing::error!("Some actors failed to stop after being explicitly shut down:"); + for (name, actor) in remaining.into_values() { + let _entry = name.enter(); + tracing::error!(?actor, "failed to stop"); + } + } else { + tracing::debug!("All remaining actors have stopped."); + } + } else { + tracing::debug!("All remaining actors have stopped."); + } +} + /// A convenient Syndicate-enhanced variation on /// [`tracing::info_span`]. ///