diff --git a/syndicate-server/examples/pingpong.rs b/syndicate-server/examples/pingpong.rs index a969043..b8bc18d 100644 --- a/syndicate-server/examples/pingpong.rs +++ b/syndicate-server/examples/pingpong.rs @@ -177,7 +177,7 @@ async fn main() -> ActorResult { })?; if let PingPongMode::Ping(c) = &config.mode { - let facet = t.facet.clone(); + let facet = t.facet_ref(); let turn_count = c.turn_count; let action_count = c.action_count; let account = Arc::clone(t.account()); diff --git a/syndicate-server/examples/producer.rs b/syndicate-server/examples/producer.rs index 61eacfc..3b8af19 100644 --- a/syndicate-server/examples/producer.rs +++ b/syndicate-server/examples/producer.rs @@ -28,7 +28,7 @@ async fn main() -> ActorResult { let (i, o) = TcpStream::connect("127.0.0.1:9001").await?.into_split(); Actor::top(None, |t| { relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| { - let facet = t.facet.clone(); + let facet = t.facet_ref(); let padding = AnyValue::new(&vec![0u8; config.bytes_padding][..]); let action_count = config.action_count; let account = Account::new(None, None); diff --git a/syndicate-server/examples/state-producer.rs b/syndicate-server/examples/state-producer.rs index b1f5439..45cc8cb 100644 --- a/syndicate-server/examples/state-producer.rs +++ b/syndicate-server/examples/state-producer.rs @@ -22,7 +22,7 @@ async fn main() -> ActorResult { let (i, o) = TcpStream::connect("127.0.0.1:9001").await?.into_split(); Actor::top(None, |t| { relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| { - let facet = t.facet.clone(); + let facet = t.facet_ref(); let account = Account::new(None, None); t.linked_task(Some(AnyValue::symbol("sender")), async move { let presence = rec![AnyValue::symbol("Present"), AnyValue::new(std::process::id())]; diff --git a/syndicate-server/src/gatekeeper.rs b/syndicate-server/src/gatekeeper.rs index 699889c..f56d74f 100644 --- a/syndicate-server/src/gatekeeper.rs +++ b/syndicate-server/src/gatekeeper.rs @@ -297,7 +297,7 @@ impl Entity for ResponderState { let transport = ResponderTransport { relay_input, c_recv }; let initiator_session = Arc::clone(&details.initiator_session); let relay_output_name = Some(AnyValue::symbol("relay_output")); - let transport_facet = t.facet.clone(); + let transport_facet = t.facet_ref(); t.linked_task(relay_output_name.clone(), async move { let account = Account::new(relay_output_name, trace_collector); let cause = TurnCause::external("relay_output"); diff --git a/syndicate-server/src/protocol.rs b/syndicate-server/src/protocol.rs index a392d10..739443b 100644 --- a/syndicate-server/src/protocol.rs +++ b/syndicate-server/src/protocol.rs @@ -35,7 +35,7 @@ pub fn run_io_relay( initial_ref: Arc, ) -> ActorResult { let exit_listener = t.create(ExitListener); - t.state.add_exit_hook(&exit_listener); + t.add_exit_hook(&exit_listener); relay::TunnelRelay::run(t, i, o, Some(initial_ref), None, false); Ok(()) } diff --git a/syndicate-server/src/services/config_watcher.rs b/syndicate-server/src/services/config_watcher.rs index ea50c26..fdf011b 100644 --- a/syndicate-server/src/services/config_watcher.rs +++ b/syndicate-server/src/services/config_watcher.rs @@ -184,7 +184,7 @@ fn run( let mut watcher = watcher(tx, Duration::from_millis(100)).map_err(convert_notify_error)?; watcher.watch(&env.path, RecursiveMode::Recursive).map_err(convert_notify_error)?; - let facet = t.facet.clone(); + let facet = t.facet_ref(); let trace_collector = t.trace_collector(); let span = tracing::Span::current(); thread::spawn(move || { diff --git a/syndicate-server/src/services/daemon.rs b/syndicate-server/src/services/daemon.rs index 6c0ae4c..b74c885 100644 --- a/syndicate-server/src/services/daemon.rs +++ b/syndicate-server/src/services/daemon.rs @@ -41,7 +41,7 @@ fn supervise_daemon( lifecycle::on_service_restart(t, &config_ds, &spec, enclose!( (config_ds, root_ds, spec) move |t| { tracing::info!(id = ?spec.id, "Terminating to restart"); - t.stop_facet_and_continue(t.facet.facet_id, Some( + t.stop_facet_and_continue(t.facet_id(), Some( enclose!((config_ds, root_ds, spec) move |t: &mut Activation| { supervise_daemon(t, config_ds, root_ds, spec) }))) @@ -176,7 +176,7 @@ impl DaemonInstance { fn handle_exit(self, t: &mut Activation, error_message: Option) -> ActorResult { let delay = std::time::Duration::from_millis(if let None = error_message { 200 } else { 1000 }); - t.stop_facet_and_continue(t.facet.facet_id, Some(move |t: &mut Activation| { + t.stop_facet_and_continue(t.facet_id(), Some(move |t: &mut Activation| { #[derive(Debug)] enum NextStep { SleepAndRestart, @@ -230,7 +230,7 @@ impl DaemonInstance { kind: &str ) -> ActorResult { t.facet(|t| { - let facet = t.facet.clone(); + let facet = t.facet_ref(); let log_ds = self.log_ds.clone(); let service = self.service.clone(); let kind = AnyValue::symbol(kind); @@ -290,7 +290,7 @@ impl DaemonInstance { let pid = child.id(); tracing::debug!(?pid, cmd = ?self.cmd, "started"); - let facet = t.facet.clone(); + let facet = t.facet_ref(); if let Some(r) = child.stderr.take() { self.log(t, pid, r, "stderr")?; @@ -401,7 +401,7 @@ fn run( Ok(config) => { tracing::info!(?config); let config = config.elaborate(); - let facet = t.facet.clone(); + let facet = t.facet_ref(); t.linked_task(Some(AnyValue::symbol("subprocess")), async move { let mut cmd = config.process.build_command().ok_or("Cannot start daemon process")?; diff --git a/syndicate-server/src/services/tcp_relay_listener.rs b/syndicate-server/src/services/tcp_relay_listener.rs index a39828b..96334cf 100644 --- a/syndicate-server/src/services/tcp_relay_listener.rs +++ b/syndicate-server/src/services/tcp_relay_listener.rs @@ -55,7 +55,7 @@ fn run(t: &mut Activation, ds: Arc, spec: TcpRelayListener) -> ActorResult }; let host = addr.host.clone(); let port = u16::try_from(&addr.port).map_err(|_| "Invalid TCP port number")?; - let facet = t.facet.clone(); + let facet = t.facet_ref(); let trace_collector = t.trace_collector(); t.linked_task(Some(AnyValue::symbol("listener")), async move { let listen_addr = format!("{}:{}", host, port); @@ -85,7 +85,7 @@ fn run(t: &mut Activation, ds: Arc, spec: TcpRelayListener) -> ActorResult &account, cause, enclose!((trace_collector, httpd) move |t| { t.spawn(name, move |t| { Ok(t.linked_task(None, { - let facet = t.facet.clone(); + let facet = t.facet_ref(); async move { detect_protocol(trace_collector, facet, diff --git a/syndicate-server/src/services/unix_relay_listener.rs b/syndicate-server/src/services/unix_relay_listener.rs index 2a71fa8..ddc0d14 100644 --- a/syndicate-server/src/services/unix_relay_listener.rs +++ b/syndicate-server/src/services/unix_relay_listener.rs @@ -39,7 +39,7 @@ pub fn on_demand(t: &mut Activation, ds: Arc) { fn run(t: &mut Activation, ds: Arc, spec: UnixRelayListener) -> ActorResult { lifecycle::terminate_on_service_restart(t, &ds, &spec); let path_str = spec.addr.path.clone(); - let facet = t.facet.clone(); + let facet = t.facet_ref(); let trace_collector = t.trace_collector(); t.linked_task(Some(AnyValue::symbol("listener")), async move { let listener = bind_unix_listener(&PathBuf::from(path_str)).await?; @@ -71,7 +71,7 @@ fn run(t: &mut Activation, ds: Arc, spec: UnixRelayListener) -> ActorResult &account, cause, enclose!((trace_collector) move |t| { t.spawn(name, |t| { Ok(t.linked_task(None, { - let facet = t.facet.clone(); + let facet = t.facet_ref(); async move { tracing::info!(protocol = %"unix"); let (i, o) = stream.into_split(); diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index b629249..8f3d481 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -63,7 +63,7 @@ pub type AnyValue = ArcValue>; pub type Name = Option; /// The type of process-unique actor IDs. -pub type ActorId = u64; +pub type ActorId = NonZeroU64; /// The type of process-unique facet IDs. pub type FacetId = NonZeroU64; @@ -230,7 +230,7 @@ pub trait Entity: Send { /// entity's animating [Actor] terminates. /// /// Programs register an entity's exit hook with - /// [RunningActor::add_exit_hook]. + /// [Activation::add_exit_hook]. /// /// The default implementation does nothing. fn exit_hook(&mut self, turn: &mut Activation, exit_status: &Arc) { @@ -265,40 +265,66 @@ pub type PendingEventQueue = Vec; /// The main API for programming Syndicated Actor objects. /// /// Through `Activation`s, programs can access the state of their -/// animating [`RunningActor`] and their active [`Facet`]. +/// animating actor and their active [`Facet`]. /// /// Usually, an `Activation` will be supplied to code that needs one; but when non-Actor code /// (such as a [linked task][crate::actor#linked-tasks]) needs to enter an Actor's execution /// context, use [`FacetRef::activate`] to construct one. /// /// Many actions that an entity can perform are methods directly on -/// `Activation`, but methods on the [`RunningActor`] and [`FacetRef`] -/// values contained in an `Activation` are also sometimes useful. +/// `Activation`, but methods on [`FacetRef`] are also sometimes useful. /// /// This is what other implementations call a "Turn", renamed here to /// avoid conflicts with [`crate::schemas::protocol::Turn`]. -pub struct Activation<'activation> { - /// A reference to the currently active [`Facet`] and the implementation-side state of its - /// [`Actor`]. - pub facet: FacetRef, - /// A reference to the current state of the active [`Actor`]. - pub state: &'activation mut RunningActor, - active_block: Option, - pending: EventBuffer, +pub struct Activation { + // Do not poke at exposed but doc(hidden) fields here! You will violate invariants if you do! + // They are exposed for debug/reflective purposes. + + //--------------------------------------------------------------------------- + // Fields related to an active turn for an actor. + + // INVARIANT: facet.is_none() any time the owning actor's Mutex is unlocked. + facet_id: Option, + account: Option>, + turn_description: Option, + trace_collector: Option, + pre_commit_actions: Vec, rollback_actions: Vec, commit_actions: Vec, -} - -struct EventBuffer { - pub source_actor_id: ActorId, - pub desc: Option, - pub trace_collector: Option, - pub account: Arc, // INVARIANT: At most one of single_queue and multiple_queues is non-None at any given time. single_queue: Option<(ActorId, UnboundedSender, PendingEventQueue)>, multiple_queues: Option, PendingEventQueue)>>, + + //--------------------------------------------------------------------------- + // Fields related to the actor itself, relevant even when it is suspended. + + actor_id: ActorId, + actor_ref: ActorRef, + + tx: UnboundedSender, + mailbox: Weak, + + exit_hooks: Vec)>>, + #[doc(hidden)] + pub outbound_assertions: OutboundAssertions, + #[doc(hidden)] + pub facet_nodes: Map, + #[doc(hidden)] + pub facet_children: Map>, + pub root: FacetId, // TODO rename to root_facet_id, or ideally make it a FacetRef called root_facet + + #[doc(hidden)] + dataflow: Option> +} + +pub struct DataflowState { + active_block: Option, + graph: Graph, + #[doc(hidden)] + pub fields: HashMap>, + blocks: HashMap, } /// An `Account` records a "debt" in terms of outstanding work items. @@ -360,7 +386,7 @@ pub struct ActorRef { pub actor_id: ActorId, // Not intended for ordinary use! You'll break a bunch of invariants if you do! - // Intended for reflective/debug use. + // Used internally, and made publicly accessible for reflective/debug use. #[doc(hidden)] pub state: Arc>, } @@ -375,8 +401,8 @@ pub struct FacetRef { /// The state of an actor: either `Running` or `Terminated`. pub enum ActorState { - /// A non-terminated actor has an associated [`RunningActor`] state record. - Running(RunningActor), + /// A non-terminated actor has an associated [`Activation`] state record. + Running(Activation), /// A terminated actor has an [`ActorResult`] as its `exit_status`. Terminated { /// The exit status of the actor: `Ok(())` for normal @@ -385,32 +411,10 @@ pub enum ActorState { }, } -/// State associated with each non-terminated [`Actor`]. -// Do not poke at exposed but doc(hidden) fields here! You will violate invariants if you do! -// They are exposed for debug/reflective purposes. -pub struct RunningActor { - /// The ID of the actor this state belongs to. - pub actor_id: ActorId, - tx: UnboundedSender, - mailbox: Weak, - dataflow: Graph, - #[doc(hidden)] - pub fields: HashMap>, - blocks: HashMap, - exit_hooks: Vec)>>, - #[doc(hidden)] - pub outbound_assertions: OutboundAssertions, - #[doc(hidden)] - pub facet_nodes: Map, - #[doc(hidden)] - pub facet_children: Map>, - pub root: FacetId, -} - /// The type of process-unique task IDs. pub type TaskId = u64; -/// Handle to a shared, mutable field (i.e. a *dataflow variable*) within a [`RunningActor`]. +/// Handle to a shared, mutable field (i.e. a *dataflow variable*) within a running actor. /// /// Use [`Activation::field`] to create fields, and use [`Activation::get`], /// [`::get_mut`][Activation::get_mut], and [`::set`][Activation::set] to read and write field @@ -538,7 +542,8 @@ const BUMP_AMOUNT: u8 = 10; static NEXT_ACTOR_ID: AtomicU64 = AtomicU64::new(1); #[doc(hidden)] pub fn next_actor_id() -> ActorId { - NEXT_ACTOR_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed) + ActorId::new(NEXT_ACTOR_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed)) + .expect("Internal error: Attempt to allocate ActorId of zero. Too many ActorIds allocated. Restart the process.") } static NEXT_FACET_ID: AtomicU64 = AtomicU64::new(2); @@ -658,38 +663,35 @@ impl FacetRef { let mut panic_guard = panic_guard::PanicGuard::new(state.tx.clone()); // let _entry = tracing::info_span!(parent: None, "actor", actor_id = ?self.actor.actor_id).entered(); - let mut activation = - Activation::make(self, - Arc::clone(account), - cause, - account.trace_collector.clone(), - state); - let f_result = f(&mut activation); - let maybe_exit_status = match f_result { - Ok(()) => - activation.commit().map_or_else( - |e| Some(Err(e)), - |_| { - // If we would otherwise continue, check the root facet: is it - // still alive? If not, then the whole actor should terminate now. - if let None = activation.state.get_facet(activation.state.root) { - tracing::trace!( - "terminating actor because root facet no longer exists"); - Some(Ok(())) - } else { - None - } - }), - Err(e) => { - activation.rollback(); - Some(Err(e)) - } - }; + let maybe_exit_status = state.take_turn( + self.facet_id, + Arc::clone(account), + cause, + account.trace_collector.clone(), + |t| match f(t) { + Ok(()) => + t.commit().map_or_else( + |e| Some(Err(e)), + |_| { + // If we would otherwise continue, check the root facet: is it + // still alive? If not, then the whole actor should terminate now. + if let None = t.get_facet(t.root) { + tracing::trace!( + "terminating actor because root facet no longer exists"); + Some(Ok(())) + } else { + None + } + }), + Err(e) => { + t.rollback(); + Some(Err(e)) + } + }); panic_guard.disarm(); drop(panic_guard); - drop(activation); match maybe_exit_status { None => true, @@ -707,33 +709,97 @@ impl FacetRef { } } -impl<'activation> Activation<'activation> { - fn make( - facet: &FacetRef, - account: Arc, - cause: Option, - trace_collector: Option, - state: &'activation mut RunningActor, - ) -> Self { - Activation { - facet: facet.clone(), - state, +impl DataflowState { + fn new() -> Self { + DataflowState { active_block: None, - pending: EventBuffer::new( - facet.actor.actor_id, - account, - cause.map(|c| trace::TurnDescription::new( - NEXT_ACTIVATION_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed), - c)), - trace_collector), - pre_commit_actions: Vec::new(), - rollback_actions: Vec::new(), - commit_actions: Vec::new(), + graph: Graph::new(), + fields: HashMap::new(), + blocks: HashMap::new(), } } + fn named_field( + &mut self, + name: &str, + initial_value: T, + tx: UnboundedSender, + ) -> Arc> { + let field_id = FieldId::new(NEXT_FIELD_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed)) + .expect("Internal error: Attempt to allocate FieldId of zero. Too many FieldIds allocated. Restart the process."); + self.fields.insert(field_id, Box::new(initial_value)); + Arc::new(Field { + name: name.to_owned(), + field_id, + tx, + phantom: PhantomData, + }) + } + + fn get(&mut self, field: &Field) -> &T { + if let Some(block) = self.active_block { + tracing::trace!(?field, ?block, action = "get", "observed"); + self.graph.record_observation(block, field.field_id); + } + let any = self.fields.get(&field.field_id) + .expect("Attempt to get() missing field: wrong actor?"); + any.downcast_ref().expect("Attempt to access field at incorrect type") + } + + fn get_mut(&mut self, field: &Field) -> &mut T { + { + // Overapproximation. + if let Some(block) = self.active_block { + tracing::trace!(?field, ?block, action = "get_mut", "observed"); + self.graph.record_observation(block, field.field_id); + } + tracing::trace!(?field, active_block = ?self.active_block, action = "get_mut", "damaged"); + self.graph.record_damage(field.field_id); + } + let any = self.fields.get_mut(&field.field_id) + .expect("Attempt to get_mut() missing field: wrong actor?"); + any.downcast_mut().expect("Attempt to access field at incorrect type") + } + + fn set(&mut self, field: &Field, value: T) { + tracing::trace!(?field, active_block = ?self.active_block, action = "set", "damaged"); + // Overapproximation in many cases, since the new value may not produce an + // observable difference (may be equal to the current value). + self.graph.record_damage(field.field_id); + let any = self.fields.get_mut(&field.field_id) + .expect("Attempt to set() missing field: wrong actor?"); + *any = Box::new(value); + } +} + +impl Activation { + fn take_turn R>( + &mut self, + facet_id: FacetId, + account: Arc, + cause: Option, + trace_collector: Option, + f: F, + ) -> R { + if self.facet_id.is_some() { + panic!("Invariant violated: nested turns detected"); + } + self.facet_id = Some(facet_id); + self.account = Some(account); + self.turn_description = cause.map(|c| trace::TurnDescription::new( + NEXT_ACTIVATION_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed), + c)); + self.trace_collector = trace_collector; + let result = f(self); + self.facet_id = None; + self.account = None; + self.turn_description = None; + self.trace_collector = None; + result + } + pub fn trace_collector(&self) -> Option { - self.pending.trace_collector.clone() + self.trace_collector.clone() } /// Constructs a new [`Account`] with the given `name`, inheriting @@ -746,7 +812,7 @@ impl<'activation> Activation<'activation> { where F: FnOnce(&mut Activation) -> ActorResult, { - if self.state.facet_nodes.contains_key(&facet_id) { + if self.facet_nodes.contains_key(&facet_id) { tracing::trace!(facet_id, "alive=true"); self._with_facet(facet_id, f) } else { @@ -755,15 +821,27 @@ impl<'activation> Activation<'activation> { } } + /// Retrieve the [`FacetId`] of the currently-active facet. + pub fn facet_id(&self) -> FacetId { + self.facet_id.expect("Attempt to access active facet ID outside turn") + } + + /// Retrieve a [`FacetRef`] for the currently-active facet. + pub fn facet_ref(&self) -> FacetRef { + FacetRef { + actor: self.actor_ref.clone(), + facet_id: self.facet_id(), + } + } + fn _with_facet(&mut self, facet_id: FacetId, f: F) -> ActorResult where F: FnOnce(&mut Activation) -> ActorResult, { - let old_facet_id = self.facet.facet_id; - self.facet.facet_id = facet_id; + let saved = self.facet_id.replace(facet_id); // let _entry = tracing::info_span!("facet", ?facet_id).entered(); let result = f(self); - self.facet.facet_id = old_facet_id; + self.facet_id = saved; result } @@ -775,13 +853,13 @@ impl<'activation> Activation<'activation> { } fn active_facet<'a>(&'a mut self) -> Option<&'a mut Facet> { - self.state.get_facet(self.facet.facet_id) + self.get_facet(self.facet_id()) } /// Retrieves the chain of facet IDs, in order, from the currently-active [`Facet`] up to /// and including the root facet of the active actor. Useful for debugging. pub fn facet_ids(&mut self) -> Vec { - if let Some(f) = self.state.get_facet_immut(self.facet.facet_id) { + if let Some(f) = self.get_facet_immut(self.facet_id()) { self.facet_ids_for(f) } else { Vec::new() @@ -794,7 +872,7 @@ impl<'activation> Activation<'activation> { let mut id = f.parent_facet_id; while let Some(parent_id) = id { ids.push(parent_id); - match self.state.get_facet_immut(parent_id) { + match self.get_facet_immut(parent_id) { None => break, Some(pf) => id = pf.parent_facet_id, } @@ -804,20 +882,41 @@ impl<'activation> Activation<'activation> { #[inline(always)] fn trace trace::ActionDescription>(&mut self, f: F) { - if self.pending.desc.is_some() { + if self.turn_description.is_some() { let a = f(self); - self.pending.desc.as_mut().unwrap().record(a); + self.turn_description.as_mut().unwrap().record(a); } } + #[inline(always)] + fn trace_targeted trace::TurnEvent>( + &mut self, + internal: bool, + r: &Arc>, + f: F, + ) -> Option { + self.turn_description.as_mut().map(|d| { + let event = trace::TargetedTurnEvent { + target: r.as_ref().into(), + detail: f(), + }; + d.record(if internal { + trace::ActionDescription::EnqueueInternal { event: Box::new(event.clone()) } + } else { + trace::ActionDescription::Enqueue { event: Box::new(event.clone()) } + }); + event + }) + } + fn insert_outbound_assertion( &mut self, r: &Arc>, handle: Handle, description: Option, ) -> bool { - let asserting_facet_id = self.facet.facet_id; - match self.state.get_facet(asserting_facet_id) { + let asserting_facet_id = self.facet_id(); + match self.get_facet(asserting_facet_id) { None => false, Some(f) => { f.outbound_handles.insert(handle); @@ -835,16 +934,16 @@ impl<'activation> Activation<'activation> { }))), }; - self.state.outbound_assertions.insert(handle, Arc::new(Mutex::new(Some(details)))); + self.outbound_assertions.insert(handle, Arc::new(Mutex::new(Some(details)))); self.on_rollback(move |t| { - if let Some(f) = t.state.get_facet(asserting_facet_id) { + if let Some(f) = t.get_facet(asserting_facet_id) { f.outbound_handles.remove(&handle); } - t.state.outbound_assertions.remove(&handle); + t.outbound_assertions.remove(&handle); }); self.on_commit(move |t| { - if let Some(oa_handle) = t.state.outbound_assertions.get_mut(&handle) { + if let Some(oa_handle) = t.outbound_assertions.get_mut(&handle) { oa_handle.lock().as_mut().expect("OutboundAssertion").established = true; } }); @@ -859,7 +958,7 @@ impl<'activation> Activation<'activation> { /// Returns the [`Handle`] for the new assertion. pub fn assert(&mut self, r: &Arc>, a: M) -> Handle { let handle = next_handle(); - if self.insert_outbound_assertion(r, handle, self.pending.desc.as_ref().map( + if self.insert_outbound_assertion(r, handle, self.turn_description.as_ref().map( enclose!((r) move |_| trace::TargetedTurnEvent { target: r.as_ref().into(), detail: trace::TurnEvent::Retract { @@ -869,11 +968,11 @@ impl<'activation> Activation<'activation> { { tracing::trace!(?r, ?handle, ?a, "assert"); let r = Arc::clone(r); - let description = self.pending.trace_targeted(false, &r, || trace::TurnEvent::Assert { + let description = self.trace_targeted(false, &r, || trace::TurnEvent::Assert { assertion: Box::new((&a).into()), handle: Box::new(protocol::Handle(handle.into())), }); - self.pending.queue_for(&r).push(( + self.queue_for(&r).push(( description, Box::new(move |t| t.with_entity(&r, |t, e| { tracing::trace!(?handle, ?a, "asserted"); @@ -884,14 +983,14 @@ impl<'activation> Activation<'activation> { } fn half_link(&mut self, t_other: &mut Activation) -> Handle { - let this_actor_id = self.state.actor_id; + let this_actor_id = self.actor_id; let entity_ref = t_other.create::(StopOnRetract); let handle = next_handle(); - assert!(self.insert_outbound_assertion(&entity_ref, handle, self.pending.desc.as_ref().map( + assert!(self.insert_outbound_assertion(&entity_ref, handle, self.turn_description.as_ref().map( enclose!((entity_ref) move |_| trace::TargetedTurnEvent { target: entity_ref.as_ref().into(), detail: trace::TurnEvent::BreakLink { - source: Box::new(trace::ActorId(AnyValue::new(this_actor_id))), + source: Box::new(this_actor_id.into()), handle: Box::new(protocol::Handle(handle.into())), }, })))); @@ -903,10 +1002,10 @@ impl<'activation> Activation<'activation> { /// Core API: retract a previously-established assertion. pub fn retract(&mut self, handle: Handle) { tracing::trace!(?handle, "retract"); - if let Some(oa_handle) = self.state.outbound_assertions.remove(&handle) { + if let Some(oa_handle) = self.outbound_assertions.remove(&handle) { self.on_rollback(enclose!((oa_handle) move |t| { if oa_handle.lock().as_mut().expect("OutboundAssertion").established { - t.state.outbound_assertions.insert(handle, oa_handle); + t.outbound_assertions.insert(handle, oa_handle); } })); @@ -917,7 +1016,7 @@ impl<'activation> Activation<'activation> { self.trace(|_| trace::ActionDescription::Enqueue { event: Box::new(desc.clone()) }); } - self.pending.queue_for_mailbox(&oa.peer).push(( + self.queue_for_mailbox(&oa.peer).push(( oa.retractor.0.clone(), Box::new(enclose!((oa_handle) move |remote_t| { let oa = oa_handle.lock().take().expect("Present OutboundAssertion"); @@ -926,7 +1025,7 @@ impl<'activation> Activation<'activation> { let asserting_facet_id = oa.asserting_facet_id; self.on_commit(move |t| { - if let Some(f) = t.state.get_facet(asserting_facet_id) { + if let Some(f) = t.get_facet(asserting_facet_id) { f.outbound_handles.remove(&handle); } }); @@ -953,10 +1052,10 @@ impl<'activation> Activation<'activation> { pub fn message(&mut self, r: &Arc>, m: M) { tracing::trace!(?r, ?m, "message"); let r = Arc::clone(r); - let description = self.pending.trace_targeted(false, &r, || trace::TurnEvent::Message { + let description = self.trace_targeted(false, &r, || trace::TurnEvent::Message { body: Box::new((&m).into()), }); - self.pending.queue_for(&r).push(( + self.queue_for(&r).push(( description, Box::new(move |t| t.with_entity(&r, |t, e| { tracing::trace!(?m, "delivered"); @@ -971,17 +1070,18 @@ impl<'activation> Activation<'activation> { /// the synchronisation request. pub fn sync(&mut self, r: &Arc>, peer: Arc>) { let r = Arc::clone(r); - let description = self.pending.trace_targeted(false, &r, || trace::TurnEvent::Sync { + let description = self.trace_targeted(false, &r, || trace::TurnEvent::Sync { peer: Box::new(peer.as_ref().into()), }); - self.pending.queue_for(&r).push(( + self.queue_for(&r).push(( description, Box::new(move |t| t.with_entity(&r, |t, e| e.sync(t, peer))))) } pub fn later ActorResult>(&mut self, action: F) { // TODO: properly describe this in traces - self.pending.queue_for_mailbox(&self.state.mailbox()).push(( + let mailbox = self.mailbox(); + self.queue_for_mailbox(&mailbox).push(( None, Box::new(action))); } @@ -992,7 +1092,7 @@ impl<'activation> Activation<'activation> { /// /// **Note.** If the actor crashes, stop actions will *not* be called. /// - /// Use [`RunningActor::add_exit_hook`] to install a callback that will be called at the + /// Use [`Activation::add_exit_hook`] to install a callback that will be called at the /// end of the lifetime of the *actor* rather than the facet. (Also, exit hooks are called /// no matter whether actor termination was normal or abnormal.) pub fn on_stop_notify(&mut self, r: &Arc>) { @@ -1006,7 +1106,7 @@ impl<'activation> Activation<'activation> { /// terminates cleanly, `action` will be called in the context of the facet's parent. See /// also notes against [`on_stop_notify`][Activation::on_stop_notify]. pub fn on_stop ActorResult>(&mut self, action: F) { - self.on_facet_stop(self.facet.facet_id, action) + self.on_facet_stop(self.facet_id(), action) } fn on_facet_stop ActorResult>( @@ -1014,14 +1114,14 @@ impl<'activation> Activation<'activation> { facet_id: FacetId, action: F, ) { - if let Some(f) = self.state.get_facet(facet_id) { + if let Some(f) = self.get_facet(facet_id) { f.stop_actions.push(Box::new(action)); } } /// Retrieve the [`Account`] against which actions are recorded. pub fn account(&self) -> &Arc { - &self.pending.account + self.account.as_ref().expect("Attempt to access active account outside turn") } /// Delivers all pending actions in this activation and resets it, ready for more. Succeeds @@ -1060,20 +1160,59 @@ impl<'activation> Activation<'activation> { } } tracing::trace!("Commit is final"); - std::mem::take(&mut self.rollback_actions); - // ^ drop 'em so they don't run next time we commit, if there is a next time - for ac in std::mem::take(&mut self.commit_actions) { - ac(self); + if !self.rollback_actions.is_empty() { + // just drop 'em so they don't run next time + std::mem::take(&mut self.rollback_actions); } - self.pending.commit(); + if !self.commit_actions.is_empty() { + for ac in std::mem::take(&mut self.commit_actions) { + ac(self); + } + } + + let mut causing_turn_id: Option = None; + if let Some(d) = self.turn_description.take() { + causing_turn_id = Some(d.id.clone()); + if let Some(c) = &self.trace_collector { + c.record(self.actor_id, trace::ActorActivation::Turn(Box::new(d))); + } + } + + // Deliberately ignore send errors here: they indicate that the recipient is no + // longer alive. But we don't care about that case, since we have to be robust to + // crashes anyway. (When we were printing errors we saw here, an example of the + // problems it caused was a relay output_loop that received EPIPE causing the relay + // to crash, just as it was receiving thousands of messages a second, leading to + // many, many log reports of failed send_actions from the following line.) + if let Some((_actor_id, tx, turn)) = std::mem::take(&mut self.single_queue) { + let desc = causing_turn_id.as_ref().map(|id| trace::TurnCause::Turn { id: Box::new(id.clone()) }); + let _ = send_actions(&tx, desc, &self.account(), turn); + } + if let Some(table) = std::mem::take(&mut self.multiple_queues) { + for (_actor_id, (tx, turn)) in table.into_iter() { + let desc = causing_turn_id.as_ref().map(|id| trace::TurnCause::Turn { id: Box::new(id.clone()) }); + let _ = send_actions(&tx, desc, &self.account(), turn); + } + } + tracing::trace!("Activation::commit complete"); Ok(()) } fn rollback(&mut self) { tracing::trace!("Activation::rollback"); - for ac in std::mem::take(&mut self.rollback_actions) { - ac(self) + if !self.pre_commit_actions.is_empty() { + // just drop 'em so they don't run next time + std::mem::take(&mut self.pre_commit_actions); + } + if !self.rollback_actions.is_empty() { + for ac in std::mem::take(&mut self.rollback_actions) { + ac(self) + } + } + if !self.commit_actions.is_empty() { + // just drop 'em so they don't run next time + std::mem::take(&mut self.commit_actions); } tracing::trace!("Activation::rollback complete"); } @@ -1094,8 +1233,8 @@ impl<'activation> Activation<'activation> { /// via [`become_entity`][Ref::become_entity]. pub fn create_inert(&mut self) -> Arc> { Arc::new(Ref { - mailbox: self.state.mailbox(), - facet_id: self.facet.facet_id, + mailbox: self.mailbox(), + facet_id: self.facet_id(), target: Mutex::new(None), }) } @@ -1109,12 +1248,12 @@ impl<'activation> Activation<'activation> { name: Name, boot: F, ) { - let mailbox = self.state.mailbox(); - let facet = self.facet.clone(); + let mailbox = self.mailbox(); + let facet = self.facet_ref(); let trace_collector = self.trace_collector(); if self.active_facet().is_some() { let task_id = NEXT_TASK_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed); - self.pending.trace(|| trace::ActionDescription::LinkedTaskStart { + self.trace(|_| trace::ActionDescription::LinkedTaskStart { task_name: Box::new(name.into()), id: Box::new(trace::TaskId(AnyValue::new(task_id))), }); @@ -1177,12 +1316,12 @@ impl<'activation> Activation<'activation> { a: F, ) { let account = Arc::clone(self.account()); - let desc = self.pending.desc.as_ref().map(|d| trace::TurnCause::Delay { + let desc = self.turn_description.as_ref().map(|d| trace::TurnCause::Delay { causing_turn: Box::new(d.id.clone()), amount: duration.as_secs_f64().into(), }); let instant = time::Instant::now() + duration; - let facet = self.facet.clone(); + let facet = self.facet_ref(); self.linked_task(Some(AnyValue::symbol("delay")), async move { tokio::time::sleep_until(instant.into()).await; facet.activate(&account, desc, a); @@ -1198,7 +1337,7 @@ impl<'activation> Activation<'activation> { mut a: F, ) -> ActorResult { let account = Arc::clone(self.account()); - let facet = self.facet.clone(); + let facet = self.facet_ref(); let desc = trace::TurnCause::PeriodicActivation { period: duration.as_secs_f64().into() }; self.linked_task(Some(AnyValue::symbol("periodic-activation")), async move { let mut timer = tokio::time::interval(duration); @@ -1293,13 +1432,13 @@ impl<'activation> Activation<'activation> { boot: F, link: bool, ) -> ActorRef { - let ac = Actor::new(Some(self.state.actor_id), self.trace_collector()); + let ac = Actor::new(Some(self.actor_id), self.trace_collector()); let ac_ref = ac.ac_ref.clone(); - self.pending.trace(|| trace::ActionDescription::Spawn { + self.trace(|_| trace::ActionDescription::Spawn { link, - id: Box::new(trace::ActorId(AnyValue::new(ac_ref.actor_id))), + id: Box::new(ac_ref.actor_id.into()), }); - let cause = self.pending.desc.as_ref().map( + let cause = self.turn_description.as_ref().map( |d| trace::TurnCause::Turn { id: Box::new(d.id.clone()) }); let ac = if link { ac.link(self) } else { ac }; self.on_commit(move |t| { @@ -1315,16 +1454,16 @@ impl<'activation> Activation<'activation> { &mut self, boot: F, ) -> Result { - let f = Facet::new(Some(self.facet.facet_id)); + let f = Facet::new(Some(self.facet_id())); self.trace(|t| trace::ActionDescription::FacetStart { - path: t.facet_ids_for(&f).iter().map(|i| trace::FacetId(AnyValue::new(u64::from(*i)))).collect(), + path: t.facet_ids_for(&f).iter().map(|i| (*i).into()).collect(), }); let facet_id = f.facet_id; - self.state.facet_nodes.insert(facet_id, f); - tracing::trace!(parent_id = ?self.facet.facet_id, + self.facet_nodes.insert(facet_id, f); + tracing::trace!(parent_id = ?self.facet_id, ?facet_id, - new_actor_facet_count = ?self.state.facet_nodes.len()); - self.state.facet_children.entry(self.facet.facet_id).or_default().insert(facet_id); + new_actor_facet_count = ?self.facet_nodes.len()); + self.facet_children.entry(self.facet_id()).or_default().insert(facet_id); self._with_facet(facet_id, move |t| { boot(t)?; t.stop_if_inert(); @@ -1385,7 +1524,7 @@ impl<'activation> Activation<'activation> { /// /// Equivalent to `self.stop_facet(self.facet.facet_id)`. pub fn stop(&mut self) { - self.stop_facet(self.facet.facet_id) + self.stop_facet(self.facet_id()) } /// Arranges for the active actor's root facet to be stopped cleanly when `self` commits; @@ -1393,15 +1532,15 @@ impl<'activation> Activation<'activation> { /// /// Equivalent to `self.stop_facet(self.state.root)`. pub fn stop_root(&mut self) { - self.stop_facet(self.state.root); + self.stop_facet(self.root); } fn stop_if_inert(&mut self) { - let facet_id = self.facet.facet_id; + let facet_id = self.facet_id(); // Registering a pre-commit hook lets this run after the dataflow graph has been repaired. self.pre_commit(move |t| { - tracing::trace!("Checking inertness of facet {} from facet {}", facet_id, t.facet.facet_id); - if t.state.facet_exists_and_is_inert(facet_id) { + tracing::trace!("Checking inertness of facet {} from facet {}", facet_id, t.facet_id()); + if t.facet_exists_and_is_inert(facet_id) { tracing::trace!(" - facet {} is inert, stopping it", facet_id); t._terminate_facet(facet_id, TerminationDirection::AtOrAboveStartingPoint, @@ -1419,20 +1558,19 @@ impl<'activation> Activation<'activation> { direction: TerminationDirection, reason: trace::FacetStopReason, ) -> ActorResult { - if let Some(mut f) = self.state.facet_nodes.remove(&facet_id) { + if let Some(mut f) = self.facet_nodes.remove(&facet_id) { let maybe_parent_id = f.parent_facet_id; self.trace(|t| trace::ActionDescription::FacetStop { - path: t.facet_ids_for(&f).iter().map( - |i| trace::FacetId(AnyValue::new(u64::from(*i)))).collect(), + path: t.facet_ids_for(&f).iter().map(|i| (*i).into()).collect(), reason: Box::new(reason), }); - tracing::trace!(remaining_actor_facet_count = ?self.state.facet_nodes.len(), + tracing::trace!(remaining_actor_facet_count = ?self.facet_nodes.len(), ?facet_id, ?direction, "stopping"); - if let Some(children) = self.state.facet_children.remove(&facet_id) { + if let Some(children) = self.facet_children.remove(&facet_id) { for child_id in children.into_iter() { self._terminate_facet(child_id, TerminationDirection::BelowStartingPoint, @@ -1442,7 +1580,7 @@ impl<'activation> Activation<'activation> { if let TerminationDirection::AtOrAboveStartingPoint = direction { if let Some(p) = maybe_parent_id { - self.state.facet_children.get_mut(&p).map(|children| children.remove(&facet_id)); + self.facet_children.get_mut(&p).map(|children| children.remove(&facet_id)); } } @@ -1461,7 +1599,7 @@ impl<'activation> Activation<'activation> { if let TerminationDirection::AtOrAboveStartingPoint = direction { match maybe_parent_id { Some(p) => { - if self.state.facet_exists_and_is_inert(p) { + if self.facet_exists_and_is_inert(p) { tracing::trace!("terminating parent {:?} of facet {:?}", p, facet_id); self._terminate_facet(p, TerminationDirection::AtOrAboveStartingPoint, @@ -1477,17 +1615,17 @@ impl<'activation> Activation<'activation> { Ok(()) } + fn dataflow_state_mut(&mut self) -> &mut DataflowState { + if self.dataflow.is_none() { + self.dataflow = Some(Box::new(DataflowState::new())); + } + self.dataflow.as_mut().unwrap() + } + /// Create a new named dataflow variable (field) within the active [`Actor`]. pub fn named_field(&mut self, name: &str, initial_value: T) -> Arc> { - let field_id = FieldId::new(NEXT_FIELD_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed)) - .expect("Internal error: Attempt to allocate FieldId of zero. Too many FieldIds allocated. Restart the process."); - self.state.fields.insert(field_id, Box::new(initial_value)); - Arc::new(Field { - name: name.to_owned(), - field_id, - tx: self.state.tx.clone(), - phantom: PhantomData, - }) + let tx = self.tx.clone(); + self.dataflow_state_mut().named_field(name, initial_value, tx) } /// Create a new anonymous dataflow variable (field) within the active [`Actor`]. @@ -1500,13 +1638,7 @@ impl<'activation> Activation<'activation> { /// *depending upon* the field. /// pub fn get(&mut self, field: &Field) -> &T { - if let Some(block) = self.active_block { - tracing::trace!(?field, ?block, action = "get", "observed"); - self.state.dataflow.record_observation(block, field.field_id); - } - let any = self.state.fields.get(&field.field_id) - .expect("Attempt to get() missing field: wrong actor?"); - any.downcast_ref().expect("Attempt to access field at incorrect type") + self.dataflow_state_mut().get(field) } /// Retrieve a mutable reference to the contents of a dataflow variable (field). As for @@ -1516,37 +1648,20 @@ impl<'activation> Activation<'activation> { /// reevaluation of dependent blocks. /// pub fn get_mut(&mut self, field: &Field) -> &mut T { - { - // Overapproximation. - if let Some(block) = self.active_block { - tracing::trace!(?field, ?block, action = "get_mut", "observed"); - self.state.dataflow.record_observation(block, field.field_id); - } - tracing::trace!(?field, active_block = ?self.active_block, action = "get_mut", "damaged"); - self.state.dataflow.record_damage(field.field_id); - } - let any = self.state.fields.get_mut(&field.field_id) - .expect("Attempt to get_mut() missing field: wrong actor?"); - any.downcast_mut().expect("Attempt to access field at incorrect type") + self.dataflow_state_mut().get_mut(field) } /// Overwrite the value of a dataflow variable (field). Marks the field as dirty, even if /// the new value is [`eq`][std::cmp::PartialEq::eq] to the value being overwritten. /// pub fn set(&mut self, field: &Field, value: T) { - tracing::trace!(?field, active_block = ?self.active_block, action = "set", "damaged"); - // Overapproximation in many cases, since the new value may not produce an - // observable difference (may be equal to the current value). - self.state.dataflow.record_damage(field.field_id); - let any = self.state.fields.get_mut(&field.field_id) - .expect("Attempt to set() missing field: wrong actor?"); - *any = Box::new(value); + self.dataflow_state_mut().set(field, value) } fn with_block(&mut self, block_id: BlockId, block: &mut Block) -> ActorResult { - let saved = self.active_block.replace(block_id); + let saved = self.dataflow_state_mut().active_block.replace(block_id); let result = block(self); - self.active_block = saved; + self.dataflow_state_mut().active_block = saved; result } @@ -1559,62 +1674,66 @@ impl<'activation> Activation<'activation> { .expect("Internal error: Attempt to allocate BlockId of zero. Too many BlockIds allocated. Restart the process."); let mut block: Block = Box::new(block); self.with_block(block_id, &mut block)?; - self.state.blocks.insert(block_id, (self.facet.facet_id, block)); + let facet_id = self.facet_id(); + self.dataflow_state_mut().blocks.insert(block_id, (facet_id, block)); Ok(()) } fn repair_dataflow(&mut self) -> Result { + if self.dataflow.is_none() { + return Ok(false); + } + let mut pass_number = 0; - while !self.state.dataflow.is_clean() { - pass_number += 1; - tracing::trace!(?pass_number, "repair_dataflow"); - let damaged_field_ids = self.state.dataflow.take_damaged_nodes(); - let mut executed_blocks = Set::new(); - for field_id in damaged_field_ids.into_iter() { - let block_ids = self.state.dataflow.take_observers_of(&field_id); - for block_id in block_ids.into_iter() { - if !executed_blocks.contains(&block_id) { - executed_blocks.insert(block_id); - if let Some((facet_id, mut block)) = self.state.blocks.remove(&block_id) { - let result = self.with_facet(facet_id, |t| t.with_block(block_id, &mut block)); - self.state.blocks.insert(block_id, (facet_id, block)); - result?; + loop { + let mut blocks_to_run = HashMap::new(); + + { + let df = self.dataflow.as_mut().unwrap(); + if df.graph.is_clean() { + break; + } + pass_number += 1; + tracing::trace!(?pass_number, "repair_dataflow"); + + let damaged_field_ids = df.graph.take_damaged_nodes(); + for field_id in damaged_field_ids.into_iter() { + let block_ids = df.graph.take_observers_of(&field_id); + for block_id in block_ids.into_iter() { + if let Some(entry) = df.blocks.remove(&block_id) { + blocks_to_run.insert(block_id, entry); } } } } + + let mut error = None; + let mut blocks_to_replace = Vec::with_capacity(blocks_to_run.len()); + for (block_id, (facet_id, mut block)) in blocks_to_run.into_iter() { + if error.is_none() { + if let Err(e) = self.with_facet(facet_id, |t| t.with_block(block_id, &mut block)) { + error = Some(e); + } + } + blocks_to_replace.push((block_id, (facet_id, block))); + } + + { + let df = self.dataflow.as_mut().unwrap(); + for (block_id, entry) in blocks_to_replace.into_iter() { + df.blocks.insert(block_id, entry); + } + } + + if let Some(e) = error { + Err(e)?; + } } if pass_number > 0 { tracing::trace!(passes = ?pass_number, "repair_dataflow complete"); } Ok(pass_number > 0) } -} - -impl<'activation> Drop for Activation<'activation> { - fn drop(&mut self) { - tracing::trace!("Activation::drop"); - self.rollback(); - tracing::trace!("Activation::drop complete"); - } -} - -impl EventBuffer { - fn new( - source_actor_id: ActorId, - account: Arc, - desc: Option, - trace_collector: Option, - ) -> Self { - EventBuffer { - source_actor_id, - desc, - trace_collector, - account, - single_queue: None, - multiple_queues: None, - } - } fn queue_for(&mut self, r: &Arc>) -> &mut PendingEventQueue { self.queue_for_mailbox(&r.mailbox) @@ -1643,58 +1762,116 @@ impl EventBuffer { .or_insert((mailbox.tx.clone(), Vec::with_capacity(3))).1 } - fn commit(&mut self) { - tracing::trace!("EventBuffer::commit"); - if let Some(d) = &mut self.desc { - if let Some(c) = &self.trace_collector { - c.record(self.source_actor_id, trace::ActorActivation::Turn(Box::new(d.take()))); - } - } + /// Retrieve the ID of the current actor. + pub fn actor_id(&self) -> ActorId { + self.actor_id + } - // Deliberately ignore send errors here: they indicate that the recipient is no - // longer alive. But we don't care about that case, since we have to be robust to - // crashes anyway. (When we were printing errors we saw here, an example of the - // problems it caused was a relay output_loop that received EPIPE causing the relay - // to crash, just as it was receiving thousands of messages a second, leading to - // many, many log reports of failed send_actions from the following line.) - if let Some((_actor_id, tx, turn)) = std::mem::take(&mut self.single_queue) { - let desc = self.desc.as_ref().map(|d| trace::TurnCause::Turn { id: Box::new(d.id.clone()) }); - let _ = send_actions(&tx, desc, &self.account, turn); - } - if let Some(table) = std::mem::take(&mut self.multiple_queues) { - for (_actor_id, (tx, turn)) in table.into_iter() { - let desc = self.desc.as_ref().map(|d| trace::TurnCause::Turn { id: Box::new(d.id.clone()) }); - let _ = send_actions(&tx, desc, &self.account, turn); + #[doc(hidden)] + pub fn actor_ref(&self) -> ActorRef { + self.actor_ref.clone() + } + + /// Requests a shutdown of the actor. The shutdown request is + /// handled by the actor's main loop, causing it to terminate with + /// exit status `Ok(())`. + pub fn shutdown(&self) { + let _ = self.tx.send(SystemMessage::Release); + } + + fn mailbox(&mut self) -> Arc { + match self.mailbox.upgrade() { + None => { + let new_mailbox = Arc::new(Mailbox { + actor_id: self.actor_id, + tx: self.tx.clone(), + }); + self.mailbox = Arc::downgrade(&new_mailbox); + new_mailbox } + Some(m) => m } } - #[inline(always)] - fn trace trace::ActionDescription>(&mut self, f: F) { - if let Some(d) = &mut self.desc { - d.record(f()); + /// Registers the entity `r` in the list of exit hooks for this actor. When the actor + /// terminates, `r`'s [`exit_hook`][Entity::exit_hook] will be called in the context of the + /// actor's root facet. + pub fn add_exit_hook(&mut self, r: &Arc>) { + let r = Arc::clone(r); + self.exit_hooks.push(Box::new( + move |t, exit_status| r.internal_with_entity(|e| e.exit_hook(t, &exit_status)))) + } + + fn get_facet(&mut self, facet_id: FacetId) -> Option<&mut Facet> { + self.facet_nodes.get_mut(&facet_id) + } + + fn get_facet_immut(&self, facet_id: FacetId) -> Option<&Facet> { + self.facet_nodes.get(&facet_id) + } + + /// See the definition of an [inert facet][Facet#inert-facets]. + fn facet_exists_and_is_inert(&mut self, facet_id: FacetId) -> bool { + let no_kids = self.facet_children.get(&facet_id).map(|cs| cs.is_empty()).unwrap_or(true); + if let Some(f) = self.get_facet(facet_id) { + // The only outbound handle the root facet of an actor may have is a link + // assertion, from [Activation::link]. This is not to be considered a "real" + // assertion for purposes of keeping the facet alive! + let maybe_parent_id = f.parent_facet_id.clone(); + let no_outbound_handles = f.outbound_handles.is_empty(); + let is_root_facet = maybe_parent_id.is_none(); + let no_linked_tasks = f.linked_tasks.is_empty(); + let no_inert_check_preventers = f.inert_check_preventers.load(Ordering::Relaxed) == 0; + let parent_facet_missing = maybe_parent_id.map_or(false, |p| self.get_facet_immut(p).is_none()); + tracing::trace!(?facet_id, ?no_kids, ?no_outbound_handles, ?is_root_facet, ?no_linked_tasks, ?no_inert_check_preventers, ?parent_facet_missing); + parent_facet_missing || (no_kids && (no_outbound_handles || is_root_facet) && no_linked_tasks && no_inert_check_preventers) + } else { + tracing::trace!(?facet_id, exists = ?false); + false } } - #[inline(always)] - fn trace_targeted trace::TurnEvent>( - &mut self, - internal: bool, - r: &Arc>, - f: F, - ) -> Option { - self.desc.as_mut().map(|d| { - let event = trace::TargetedTurnEvent { - target: r.as_ref().into(), - detail: f(), - }; - d.record(if internal { - trace::ActionDescription::EnqueueInternal { event: Box::new(event.clone()) } - } else { - trace::ActionDescription::Enqueue { event: Box::new(event.clone()) } + fn cleanup( + mut self, + exit_status: Arc, + trace_collector: Option, + ) { + match &*exit_status { + ExitStatus::Normal => assert!(self.get_facet(self.root).is_none()), + ExitStatus::Dropped | ExitStatus::Error(_) => (), + } + + let cause = Some(trace::TurnCause::Cleanup); + let account = Account::new(Some(AnyValue::symbol("cleanup")), trace_collector.clone()); + + self.take_turn( + self.root, + account, + cause, + trace_collector, + |t| { + // NB. In descending order so that we retract newer handles first. Since + // facet-tree-order retraction is children before parents, this isn't quite right - a + // parent facet could have made an assertion after some child made an assertion. But + // given that this is for *unclean shutdown*, maybe it's OK? + // + let handles_descending: Vec = t.outbound_assertions.keys().rev().cloned().collect(); + tracing::trace!(actor_id=?t.actor_id, + handles=?handles_descending, + "remaining to retract at cleanup time"); + for handle in handles_descending.into_iter() { + t.retract(handle); + } + + if let Err(err) = t.commit() { + // This can only happen through an internal error in this module + tracing::error!(?err, "Internal error during Activation::cleanup"); + } + + for action in std::mem::take(&mut t.exit_hooks) { + action(t, &exit_status); + } }); - event - }) } } @@ -1837,21 +2014,41 @@ impl Actor { let actor_id = next_actor_id(); let root = Facet::new(None); tracing::debug!(?actor_id, ?parent_actor_id, root_facet_id = ?root.facet_id, "Actor::new"); - let mut st = RunningActor { + let mut st = Activation { + facet_id: None, + account: None, + turn_description: None, + trace_collector: None, + + pre_commit_actions: Vec::new(), + rollback_actions: Vec::new(), + commit_actions: Vec::new(), + + single_queue: None, + multiple_queues: None, + actor_id, + actor_ref: ActorRef { + actor_id, + state: Arc::new(Mutex::new(ActorState::Terminated{ + exit_status: Arc::new(ExitStatus::Normal) + })), + }, + tx, mailbox: Weak::new(), - dataflow: Graph::new(), - fields: HashMap::new(), - blocks: HashMap::new(), + exit_hooks: Vec::new(), outbound_assertions: Map::new(), facet_nodes: Map::new(), facet_children: Map::new(), root: root.facet_id, + + dataflow: None, }; st.facet_nodes.insert(root.facet_id, root); - let ac_ref = ActorRef { actor_id, state: Arc::new(Mutex::new(ActorState::Running(st))) }; + let ac_ref = st.actor_ref.clone(); + *ac_ref.state.lock() = ActorState::Running(st); Actor { rx, trace_collector, ac_ref } } @@ -1871,11 +2068,11 @@ impl Actor { Ok(()) }); if is_alive { - let parent_actor = t_parent.state.actor_id; + let parent_actor = t_parent.actor_id; t_parent.trace(|_| trace::ActionDescription::Link { - parent_actor: Box::new(trace::ActorId(AnyValue::new(parent_actor))), + parent_actor: Box::new(parent_actor.into()), parent_to_child: Box::new(protocol::Handle(h_to_child.unwrap().into())), - child_actor: Box::new(trace::ActorId(AnyValue::new(self.ac_ref.actor_id))), + child_actor: Box::new(self.ac_ref.actor_id.into()), child_to_parent: Box::new(protocol::Handle(h_to_parent.unwrap().into())), }); self @@ -1951,7 +2148,7 @@ impl Actor { tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::ReleaseField({})", field_id); self.ac_ref.access(|s| if let ActorState::Running(ra) = s { - ra.fields.remove(&field_id); + ra.dataflow_state_mut().fields.remove(&field_id); }) } SystemMessage::Turn(cause, mut loaned_item) => { @@ -1961,7 +2158,7 @@ impl Actor { &loaned_item.account, cause, |t| { for (maybe_desc, action) in actions.into_iter() { if let Some(desc) = maybe_desc { - t.pending.trace(|| trace::ActionDescription::Dequeue { + t.trace(|_| trace::ActionDescription::Dequeue { event: Box::new(desc), }); } @@ -2096,7 +2293,7 @@ impl ActorState { ActorState::Terminated { .. } => unreachable!(), ActorState::Running(state) => - state.cleanup(actor, Arc::clone(&exit_status), trace_collector.clone()), + state.cleanup(Arc::clone(&exit_status), trace_collector.clone()), } match &*exit_status { @@ -2116,109 +2313,6 @@ impl ActorState { } } -impl RunningActor { - /// Requests a shutdown of the actor. The shutdown request is - /// handled by the actor's main loop, causing it to terminate with - /// exit status `Ok(())`. - pub fn shutdown(&self) { - let _ = self.tx.send(SystemMessage::Release); - } - - fn mailbox(&mut self) -> Arc { - match self.mailbox.upgrade() { - None => { - let new_mailbox = Arc::new(Mailbox { - actor_id: self.actor_id, - tx: self.tx.clone(), - }); - self.mailbox = Arc::downgrade(&new_mailbox); - new_mailbox - } - Some(m) => m - } - } - - /// Registers the entity `r` in the list of exit hooks for this actor. When the actor - /// terminates, `r`'s [`exit_hook`][Entity::exit_hook] will be called in the context of the - /// actor's root facet. - pub fn add_exit_hook(&mut self, r: &Arc>) { - let r = Arc::clone(r); - self.exit_hooks.push(Box::new( - move |t, exit_status| r.internal_with_entity(|e| e.exit_hook(t, &exit_status)))) - } - - fn get_facet(&mut self, facet_id: FacetId) -> Option<&mut Facet> { - self.facet_nodes.get_mut(&facet_id) - } - - fn get_facet_immut(&self, facet_id: FacetId) -> Option<&Facet> { - self.facet_nodes.get(&facet_id) - } - - /// See the definition of an [inert facet][Facet#inert-facets]. - fn facet_exists_and_is_inert(&mut self, facet_id: FacetId) -> bool { - let no_kids = self.facet_children.get(&facet_id).map(|cs| cs.is_empty()).unwrap_or(true); - if let Some(f) = self.get_facet(facet_id) { - // The only outbound handle the root facet of an actor may have is a link - // assertion, from [Activation::link]. This is not to be considered a "real" - // assertion for purposes of keeping the facet alive! - let maybe_parent_id = f.parent_facet_id.clone(); - let no_outbound_handles = f.outbound_handles.is_empty(); - let is_root_facet = maybe_parent_id.is_none(); - let no_linked_tasks = f.linked_tasks.is_empty(); - let no_inert_check_preventers = f.inert_check_preventers.load(Ordering::Relaxed) == 0; - let parent_facet_missing = maybe_parent_id.map_or(false, |p| self.get_facet_immut(p).is_none()); - tracing::trace!(?facet_id, ?no_kids, ?no_outbound_handles, ?is_root_facet, ?no_linked_tasks, ?no_inert_check_preventers, ?parent_facet_missing); - parent_facet_missing || (no_kids && (no_outbound_handles || is_root_facet) && no_linked_tasks && no_inert_check_preventers) - } else { - tracing::trace!(?facet_id, exists = ?false); - false - } - } - - fn cleanup( - mut self, - ac_ref: &ActorRef, - exit_status: Arc, - trace_collector: Option, - ) { - match &*exit_status { - ExitStatus::Normal => assert!(self.get_facet(self.root).is_none()), - ExitStatus::Dropped | ExitStatus::Error(_) => (), - } - - let cause = Some(trace::TurnCause::Cleanup); - let account = Account::new(Some(AnyValue::symbol("cleanup")), trace_collector.clone()); - let mut t = Activation::make(&ac_ref.facet_ref(self.root), - account, - cause, - trace_collector, - &mut self); - - // NB. In descending order so that we retract newer handles first. Since - // facet-tree-order retraction is children before parents, this isn't quite right - a - // parent facet could have made an assertion after some child made an assertion. But - // given that this is for *unclean shutdown*, maybe it's OK? - // - let handles_descending: Vec = t.state.outbound_assertions.keys().rev().cloned().collect(); - tracing::trace!(actor_id=?t.state.actor_id, - handles=?handles_descending, - "remaining to retract at cleanup time"); - for handle in handles_descending.into_iter() { - t.retract(handle); - } - - if let Err(err) = t.commit() { - // This can only happen through an internal error in this module - tracing::error!(?err, "Internal error during RunningActor::cleanup"); - } - - for action in std::mem::take(&mut t.state.exit_hooks) { - action(&mut t, &exit_status); - } - } -} - impl std::fmt::Debug for Field { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { write!(f, "#", self.name, self.field_id) diff --git a/syndicate/src/during.rs b/syndicate/src/during.rs index 8cc58f3..7c29eae 100644 --- a/syndicate/src/during.rs +++ b/syndicate/src/during.rs @@ -175,7 +175,7 @@ where t.on_stop_notify(&r); } if should_register_exit_hook { - t.state.add_exit_hook(&r); + t.add_exit_hook(&r); } r } diff --git a/syndicate/src/relay.rs b/syndicate/src/relay.rs index ea54f92..3147aeb 100644 --- a/syndicate/src/relay.rs +++ b/syndicate/src/relay.rs @@ -112,8 +112,8 @@ struct TunnelRefEntity { relay_ref: TunnelRelayRef, } -struct ActivatedMembranes<'a, 'activation, 'm> { - turn: &'a mut Activation<'activation>, +struct ActivatedMembranes<'a, 'm> { + turn: &'a mut Activation, tr_ref: &'m TunnelRelayRef, membranes: &'m mut Membranes, } @@ -234,7 +234,7 @@ impl TunnelRelay { t.linked_task(Some(AnyValue::symbol("writer")), output_loop(o, output_rx)); t.linked_task(Some(AnyValue::symbol("reader")), - input_loop(t.trace_collector(), t.facet.clone(), i, tr_ref)); + input_loop(t.trace_collector(), t.facet_ref(), i, tr_ref)); result } @@ -269,7 +269,7 @@ impl TunnelRelay { |io| Arc::clone(&tr.membranes.import_oid(t, &tr_ref, io).inc_ref().obj)); dump_membranes!(tr.membranes); *tr_ref.lock() = Some(tr); - t.state.add_exit_hook(&self_entity); + t.add_exit_hook(&self_entity); (result, tr_ref, output_rx) } @@ -604,7 +604,7 @@ impl Membranes { } } -impl<'a, 'activation, 'm> DomainDecode> for ActivatedMembranes<'a, 'activation, 'm> { +impl<'a, 'm> DomainDecode> for ActivatedMembranes<'a, 'm> { fn decode_embedded<'de, 'src, S: BinarySource<'de>>( &mut self, src: &'src mut S, diff --git a/syndicate/src/trace.rs b/syndicate/src/trace.rs index 87a8fb3..1174b46 100644 --- a/syndicate/src/trace.rs +++ b/syndicate/src/trace.rs @@ -13,6 +13,7 @@ use preserves_schema::Codec; use super::actor::{self, AnyValue, Ref, Cap}; use super::language; +use std::num::NonZeroU64; use std::sync::Arc; use std::time::SystemTime; @@ -27,8 +28,8 @@ pub struct TraceCollector { impl From<&Ref> for Target { fn from(v: &Ref) -> Target { Target { - actor: ActorId(AnyValue::new(v.mailbox.actor_id)), - facet: FacetId(AnyValue::new(u64::from(v.facet_id))), + actor: v.mailbox.actor_id.into(), + facet: v.facet_id.into(), oid: Oid(AnyValue::new(v.oid())), } } @@ -51,7 +52,7 @@ impl TraceCollector { let _ = self.tx.send(TraceEntry { timestamp: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) .expect("Time after Unix epoch").as_secs_f64().into(), - actor: ActorId(AnyValue::new(id)), + actor: id.into(), item: a, }); } @@ -159,3 +160,15 @@ impl From for Name { } } } + +impl From for ActorId { + fn from(v: NonZeroU64) -> Self { + ActorId(AnyValue::new(u64::from(v))) + } +} + +impl From for FacetId { + fn from(v: NonZeroU64) -> Self { + FacetId(AnyValue::new(u64::from(v))) + } +}