MAJOR REFACTORING OF CORE ASSERTION-TRACKING STRUCTURES. Little impact on API. Read on for details.

2022-02-01 15:22:30 Two problems.

 - If a stop action panics (in `_terminate_facet`), the Facet is dropped before its outbound
   handles are removed. With the code as it stands, this leaks assertions (!!).

 - The logic for removing an outbound handle seems to be running in the wrong facet context???
   (See `f.outbound_handles.remove(&handle)` in the cleanup actions
    - I think I need to remove the for_myself mechanism
    - and add some callbacks to run only on successful commit

2022-02-02 12:12:33 This is hard.

Here's the current implementation:

 - assert
    - inserts into outbound_handles of active facet
    - adds cleanup action describing how to do the retraction
    - enqueues the assert action, which
       - calls e.assert()

 - retract
    - looks up & removes the cleanup action, which
       - enqueues the retract action, which
          - removes from outbound_handles of the WRONG facet in the WRONG actor
          - calls e.retract()

 - _terminate_facet
    - uses outbound_handles to retract the facet's assertions
    - doesn't directly touch cleanup actions, relying on retract to do that
    - if one of a facet's stop actions panics, will drop the facet, leaking its assertions
    - actually, even if a stop action yields `Err`, it will drop the facet and leak assertions
    - yikes

 - facet drop
    - panics if outbound_handles is nonempty

 - actor cleanup
    - relies on facet tree to find assertions to retract

Revised plan:

 - ✓ revise Activation/PendingEvents structures
    - rename `cleanup_actions` to `outbound_assertions`
    - remove `for_myself` queues and `final_actions`
    - add `pre_commit_actions`, `rollback_actions` and `commit_actions`

 - ✓ assert
    - as before
    - but on rollback, removes from `outbound_handles` (if the facet still exists) and
      `outbound_assertions` (always)
    - marks the new assertion as "established" on commit

 - ✓ retract
    - lookup in `outbound_assertions` by handle, using presence as indication it hasn't been
      scheduled in this turn
    - on rollback, put it back in `outbound_assertions` ONLY IF IT IS MARKED ESTABLISHED -
      otherwise it is a retraction of an `assert` that has *also* been rolled back in this turn
    - on commit, remove it from `outbound_handles`
    - enqueue the retract action, which just calls e.retract()

 - ✓ _terminate_facet
    - revised quite a bit now we rely on `RunningActor::cleanup` to use `outbound_assertions`
      rather than the facet tree.
    - still drops Facets on panic, but this is now mostly harmless (reorders retractions a bit)
    - handles `Err` from a stop action more gracefully
    - slightly cleverer tracking of what needs doing based on a `TerminationDirection`
    - now ONLY applies to ORDERLY cleanup of the facet tree. Disorderly cleanup ignores the
      facet tree and just retracts the assertions willy-nilly.

 - ✓ facet drop
    - warn if outbound_handles is nonempty, but don't do anything about it

 - ✓ actor cleanup
    - doesn't use the facet tree at all.
    - cleanly shutting down is done elsewhere
    - uses the remaining entries in `outbound_assertions` (previously `cleanup_actions`) to
      deal with retractions for dropped facets as well as any other facets that haven't been
      cleanly shut down

 - ✓ activate
    - now has a panic_guard::PanicGuard RAII for conveying a crash to an actor in case the
      activation is happening from a linked task or another thread (this wasn't the case in the
      examples that provoked this work, though)
    - simplified
    - explicit commit/rollback decision

 - ✓ Actor::run
    - no longer uses the same path for crash-termination and success-termination
    - instead, for success-termination, takes a turn that calls Activation::stop_root
       - this cleans up the facet tree using _terminate_facet
       - when the turn ends, it notices that the root facet is gone and shuts down the actor
       - so in principle there will be nothing for actor cleanup to do

2022-02-04 13:52:34 This took days. :-(
This commit is contained in:
Tony Garnock-Jones 2022-02-04 13:59:37 +01:00
parent 98731ba968
commit f88592282d
7 changed files with 542 additions and 472 deletions

View File

@ -1,9 +1,11 @@
use std::sync::Arc;
use std::sync::Mutex;
use std::time::SystemTime;
use structopt::StructOpt;
use syndicate::actor::*;
use syndicate::enclose;
use syndicate::language;
use syndicate::relay;
use syndicate::schemas::dataspace::Observe;
@ -109,22 +111,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut event_counter: u64 = 0;
let mut rtt_ns_samples: Vec<u64> = vec![0; report_latency_every];
let mut rtt_batch_count: usize = 0;
let mut current_reply = None;
let self_ref = t.create_inert();
self_ref.become_entity(
syndicate::entity(Arc::clone(&self_ref))
.on_message(move |self_ref, t, m: AnyValue| {
let current_reply = Arc::new(Mutex::new(None));
Cap::new(&t.create(
syndicate::entity(())
.on_message(move |(), t, m: AnyValue| {
match m.value().as_boolean() {
Some(true) => {
Some(_) => {
tracing::info!("{:?} turns, {:?} events in the last second",
turn_counter,
event_counter);
turn_counter = 0;
event_counter = 0;
}
Some(false) => {
current_reply = None;
}
None => {
event_counter += 1;
let bindings = m.value().to_sequence()?;
@ -136,9 +134,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
timestamp.clone(),
padding.clone()));
} else {
if let None = current_reply {
let mut g = current_reply.lock().expect("unpoisoned");
if let None = *g {
turn_counter += 1;
t.message_for_myself(&self_ref, AnyValue::new(false));
t.pre_commit(enclose!((current_reply) move |_| {
*current_reply.lock().expect("unpoisoned") = None;
Ok(())
}));
let rtt_ns = now() - timestamp.value().to_u64()?;
rtt_ns_samples[rtt_batch_count] = rtt_ns;
rtt_batch_count += 1;
@ -149,18 +151,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
rtt_batch_count = 0;
}
current_reply = Some(
simple_record2(&send_label,
Value::from(now()).wrap(),
padding.clone()));
*g = Some(simple_record2(&send_label,
Value::from(now()).wrap(),
padding.clone()));
}
ds.message(t, &(), current_reply.as_ref().expect("some reply"));
ds.message(t, &(), g.as_ref().expect("some reply"));
}
}
}
Ok(())
}));
Cap::new(&self_ref)
})))
};
ds.assert(t, language(), &Observe {

View File

@ -19,9 +19,8 @@ use tungstenite::Message;
struct ExitListener;
impl Entity<()> for ExitListener {
fn exit_hook(&mut self, _t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult {
fn exit_hook(&mut self, _t: &mut Activation, exit_status: &Arc<ActorResult>) {
tracing::info!(?exit_status, "disconnect");
Ok(())
}
}

View File

@ -1,6 +1,7 @@
use preserves_schema::Codec;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use syndicate::actor::*;
use syndicate::enclose;
@ -27,9 +28,38 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: DebtReporter) -> ActorResult {
ds.assert(t, language(), &lifecycle::started(&spec));
ds.assert(t, language(), &lifecycle::ready(&spec));
t.every(core::time::Duration::from_millis((spec.interval_seconds.0 * 1000.0) as u64), |_t| {
for (id, (name, debt)) in syndicate::actor::ACCOUNTS.read().iter() {
tracing::info!(id, ?name, debt = ?debt.load(std::sync::atomic::Ordering::Relaxed));
for (account_id, (name, debt)) in syndicate::actor::ACCOUNTS.read().iter() {
tracing::info!(account_id, ?name, debt = ?debt.load(Ordering::Relaxed));
}
// let snapshot = syndicate::actor::ACTORS.read().clone();
// for (id, (name, ac_ref)) in snapshot.iter() {
// if *id == _t.state.actor_id {
// tracing::debug!("skipping report on the reporting actor, to avoid deadlock");
// continue;
// }
// tracing::trace!(?id, "about to lock");
// tracing::info_span!("actor", id, ?name).in_scope(|| match &*ac_ref.state.lock() {
// ActorState::Terminated { exit_status } =>
// tracing::info!(?exit_status, "terminated"),
// ActorState::Running(state) => {
// tracing::info!(field_count = ?state.fields.len(),
// outbound_assertion_count = ?state.outbound_assertions.len(),
// facet_count = ?state.facet_nodes.len());
// tracing::info_span!("facets").in_scope(|| {
// for (facet_id, f) in state.facet_nodes.iter() {
// tracing::info!(
// ?facet_id,
// parent_id = ?f.parent_facet_id,
// outbound_handle_count = ?f.outbound_handles.len(),
// linked_task_count = ?f.linked_tasks.len(),
// inert_check_preventers = ?f.inert_check_preventers.load(Ordering::Relaxed));
// }
// });
// }
// });
// }
Ok(())
})
}

File diff suppressed because it is too large Load Diff

View File

@ -17,7 +17,7 @@ where
Fa: 'static + Send + FnMut(&mut E, &mut Activation, M) -> DuringResult<E>,
Fm: 'static + Send + FnMut(&mut E, &mut Activation, M) -> ActorResult,
Fs: 'static + Send + FnMut(&mut E, &mut Activation) -> ActorResult,
Fx: 'static + Send + FnMut(&mut E, &mut Activation, &Arc<ActorResult>) -> ActorResult,
Fx: 'static + Send + FnMut(&mut E, &mut Activation, &Arc<ActorResult>),
{
state: E,
assertion_handler: Option<Fa>,
@ -54,7 +54,7 @@ pub fn entity<M: 'static + Send, E>(
fn (&mut E, &mut Activation, M) -> DuringResult<E>,
fn (&mut E, &mut Activation, M) -> ActorResult,
fn (&mut E, &mut Activation) -> ActorResult,
fn (&mut E, &mut Activation, &Arc<ActorResult>) -> ActorResult>
fn (&mut E, &mut Activation, &Arc<ActorResult>)>
where
E: 'static + Send,
{
@ -68,7 +68,7 @@ where
Fa: 'static + Send + FnMut(&mut E, &mut Activation, M) -> DuringResult<E>,
Fm: 'static + Send + FnMut(&mut E, &mut Activation, M) -> ActorResult,
Fs: 'static + Send + FnMut(&mut E, &mut Activation) -> ActorResult,
Fx: 'static + Send + FnMut(&mut E, &mut Activation, &Arc<ActorResult>) -> ActorResult,
Fx: 'static + Send + FnMut(&mut E, &mut Activation, &Arc<ActorResult>),
{
pub fn new(
state: E,
@ -154,7 +154,7 @@ where
pub fn on_exit<Fx1>(self, exit_handler: Fx1) -> DuringEntity<M, E, Fa, Fm, Fs, Fx1>
where
Fx1: 'static + Send + FnMut(&mut E, &mut Activation, &Arc<ActorResult>) -> ActorResult,
Fx1: 'static + Send + FnMut(&mut E, &mut Activation, &Arc<ActorResult>),
{
DuringEntity {
state: self.state,
@ -187,7 +187,7 @@ where
Fa: 'static + Send + FnMut(&mut E, &mut Activation, AnyValue) -> DuringResult<E>,
Fm: 'static + Send + FnMut(&mut E, &mut Activation, AnyValue) -> ActorResult,
Fs: 'static + Send + FnMut(&mut E, &mut Activation) -> ActorResult,
Fx: 'static + Send + FnMut(&mut E, &mut Activation, &Arc<ActorResult>) -> ActorResult,
Fx: 'static + Send + FnMut(&mut E, &mut Activation, &Arc<ActorResult>),
{
pub fn create_cap(self, t: &mut Activation) -> Arc<Cap>
{
@ -202,7 +202,7 @@ where
Fa: 'static + Send + FnMut(&mut E, &mut Activation, M) -> DuringResult<E>,
Fm: 'static + Send + FnMut(&mut E, &mut Activation, M) -> ActorResult,
Fs: 'static + Send + FnMut(&mut E, &mut Activation) -> ActorResult,
Fx: 'static + Send + FnMut(&mut E, &mut Activation, &Arc<ActorResult>) -> ActorResult,
Fx: 'static + Send + FnMut(&mut E, &mut Activation, &Arc<ActorResult>),
{
fn assert(&mut self, t: &mut Activation, a: M, h: Handle) -> ActorResult {
match &mut self.assertion_handler {
@ -232,10 +232,9 @@ where
}
}
fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult {
match &mut self.exit_handler {
Some(handler) => handler(&mut self.state, t, exit_status),
None => Ok(()),
fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc<ActorResult>) {
if let Some(handler) = &mut self.exit_handler {
handler(&mut self.state, t, exit_status);
}
}
}

View File

@ -88,9 +88,6 @@ pub enum Output {
type TunnelRelayRef = Arc<Mutex<Option<TunnelRelay>>>;
#[derive(Debug)]
struct SendPendingTurn;
// There are other kinds of relay. This one has exactly two participants connected to each other.
pub struct TunnelRelay
{
@ -99,7 +96,6 @@ pub struct TunnelRelay
outbound_assertions: Map<P::Handle, Vec<Arc<WireSymbol>>>,
membranes: Membranes,
pending_outbound: Vec<P::TurnEvent<AnyValue>>,
self_entity: Arc<Ref<SendPendingTurn>>,
output: UnboundedSender<LoanedItem<Vec<u8>>>,
output_text: bool,
}
@ -243,7 +239,6 @@ impl TunnelRelay {
next_export_oid: 0,
},
pending_outbound: Vec::new(),
self_entity: self_entity.clone(),
};
if let Some(ir) = initial_ref {
tr.membranes.export_ref(ir).inc_ref();
@ -404,8 +399,7 @@ impl TunnelRelay {
}
}
}
t.commit();
Ok(())
t.commit()
}
}
}
@ -484,7 +478,19 @@ impl TunnelRelay {
pub fn send_event(&mut self, t: &mut Activation, remote_oid: sturdy::Oid, event: P::Event<AnyValue>) -> ActorResult {
if self.pending_outbound.is_empty() {
t.message_for_myself(&self.self_entity, SendPendingTurn);
let self_ref = Arc::clone(&self.self_ref);
t.pre_commit(move |t| {
let mut g = self_ref.lock();
let tr = g.as_mut().expect("initialized");
let events = std::mem::take(&mut tr.pending_outbound);
tr.send_packet(&t.account(),
events.len(),
P::Packet::Turn(Box::new(P::Turn(events.clone()))))?;
for P::TurnEvent { oid, event } in events.into_iter() {
tr.outbound_event_bookkeeping(t, sturdy::Oid(oid.0), &event)?;
}
Ok(())
});
}
self.pending_outbound.push(P::TurnEvent { oid: P::Oid(remote_oid.0), event });
Ok(())
@ -708,26 +714,16 @@ async fn output_loop(
}
}
impl Entity<SendPendingTurn> for TunnelRefEntity {
fn message(&mut self, t: &mut Activation, _m: SendPendingTurn) -> ActorResult {
let mut g = self.relay_ref.lock();
let tr = g.as_mut().expect("initialized");
let events = std::mem::take(&mut tr.pending_outbound);
tr.send_packet(&t.account(), events.len(), P::Packet::Turn(Box::new(P::Turn(events.clone()))))?;
for P::TurnEvent { oid, event } in events.into_iter() {
tr.outbound_event_bookkeeping(t, sturdy::Oid(oid.0), &event)?;
}
Ok(())
}
fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult {
impl Entity<()> for TunnelRefEntity {
fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc<ActorResult>) {
if let Err(e) = &**exit_status {
let e = e.clone();
let mut g = self.relay_ref.lock();
let tr = g.as_mut().expect("initialized");
tr.send_packet(&t.account(), 1, P::Packet::Error(Box::new(e)))?;
if let Err(f) = tr.send_packet(&t.account(), 1, P::Packet::Error(Box::new(e))) {
tracing::error!("Failed to send error packet: {:?}", f);
}
}
Ok(())
}
}

View File

@ -72,17 +72,15 @@ impl Entity<StartNow> for Supervisor
fn stop(&mut self, t: &mut Activation) -> ActorResult {
let _entry = tracing::info_span!("supervisor", name = ?self.child_name).entered();
let exit_status =
self.ac_ref.take().expect("valid supervisee ActorRef")
.exit_status()
.expect("supervisee to have terminated");
tracing::debug!(?exit_status);
match exit_status {
Ok(()) if self.config.restart_policy == RestartPolicy::OnErrorOnly => {
match self.ac_ref.take().expect("valid supervisee ActorRef").exit_status() {
None =>
tracing::debug!("Supervisor shut down; supervisee will exit soon"),
Some(Ok(())) if self.config.restart_policy == RestartPolicy::OnErrorOnly => {
tracing::trace!("Not restarting: normal exit, restart_policy is OnErrorOnly");
t.set(&self.state, State::Complete);
},
_ => {
Some(exit_status) => {
tracing::debug!(?exit_status);
tracing::trace!("Restarting: restart_policy is Always or exit was abnormal");
t.set(&self.state,
if exit_status.is_ok() { State::Complete } else { State::Failed });