diff --git a/Cargo.lock b/Cargo.lock index 564d07b..bb0174a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -446,6 +446,30 @@ dependencies = [ "itertools", ] +[[package]] +name = "crossbeam" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.3" @@ -470,6 +494,16 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.16" @@ -924,15 +958,6 @@ dependencies = [ "bytes", ] -[[package]] -name = "instant" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" -dependencies = [ - "cfg-if 1.0.0", -] - [[package]] name = "iovec" version = "0.1.4" @@ -1344,27 +1369,25 @@ dependencies = [ [[package]] name = "parking_lot" -version = "0.11.2" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ - "instant", "lock_api", "parking_lot_core", ] [[package]] name = "parking_lot_core" -version = "0.8.6" +version = "0.9.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" dependencies = [ "cfg-if 1.0.0", - "instant", "libc", - "redox_syscall 0.2.16", + "redox_syscall 0.4.1", "smallvec", - "winapi 0.3.9", + "windows-targets", ] [[package]] @@ -1599,15 +1622,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "redox_syscall" -version = "0.2.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" -dependencies = [ - "bitflags 1.3.2", -] - [[package]] name = "redox_syscall" version = "0.3.5" @@ -1930,6 +1944,7 @@ dependencies = [ "blake2", "bytes", "criterion", + "crossbeam", "futures", "getrandom 0.2.10", "hmac", @@ -1939,7 +1954,7 @@ dependencies = [ "preserves", "preserves-schema", "tokio", - "tokio-util", + "tokio-util 0.7.10", "tracing", "tracing-futures", "tracing-subscriber", @@ -1974,7 +1989,7 @@ dependencies = [ "tikv-jemallocator", "tokio", "tokio-tungstenite", - "tokio-util", + "tokio-util 0.6.10", "tracing", "tracing-futures", "tracing-subscriber", @@ -2146,6 +2161,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-util" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "tracing" version = "0.1.40" diff --git a/syndicate-macros/examples/ring.rs b/syndicate-macros/examples/ring.rs index ee879d1..9a5268e 100644 --- a/syndicate-macros/examples/ring.rs +++ b/syndicate-macros/examples/ring.rs @@ -111,6 +111,6 @@ async fn main() -> ActorResult { Ok(()) }); Ok(()) - }).await??; + }).await?; Ok(()) } diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index d8a514b..2027e71 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -213,7 +213,7 @@ async fn main() -> ActorResult { })); Ok(()) - }).await??; + }).await?; wait_for_all_actors_to_stop(std::time::Duration::from_secs(2)).await; diff --git a/syndicate-server/src/services/config_watcher.rs b/syndicate-server/src/services/config_watcher.rs index b0c4a3a..18125a1 100644 --- a/syndicate-server/src/services/config_watcher.rs +++ b/syndicate-server/src/services/config_watcher.rs @@ -187,7 +187,7 @@ fn run( let facet = t.facet.clone(); let trace_collector = t.trace_collector(); let span = tracing::Span::current(); - thread::spawn(move || { + tokio::task::spawn_blocking(move || { let _entry = span.enter(); let mut path_state: Map = Map::new(); diff --git a/syndicate/Cargo.toml b/syndicate/Cargo.toml index 166f25a..653d990 100644 --- a/syndicate/Cargo.toml +++ b/syndicate/Cargo.toml @@ -19,17 +19,18 @@ preserves-schema = "4.991" preserves = "4.991" preserves-schema = "4.991" -tokio = { version = "1.10", features = ["io-util", "macros", "rt", "rt-multi-thread", "time"] } -tokio-util = "0.6" -bytes = "1.0" +tokio = { version = "1.33", features = ["io-util", "macros", "rt", "rt-multi-thread", "time"] } +tokio-util = "0.7" +bytes = "1.5" futures = "0.3" blake2 = "0.10" +crossbeam = "0.8" getrandom = "0.2" hmac = "0.12" lazy_static = "1.4" -parking_lot = "0.11" +parking_lot = "0.12" tracing = "0.1" tracing-subscriber = "0.2" diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index 19c807f..3b5eca3 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -16,6 +16,9 @@ use super::schemas::protocol; use super::schemas::sturdy; use super::trace; +use crossbeam::atomic::AtomicCell; + +use futures::Future; use parking_lot::Mutex; use parking_lot::RwLock; @@ -43,7 +46,6 @@ use std::time; use tokio::select; use tokio::sync::Notify; -use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; use tokio_util::sync::CancellationToken; // use tracing::Instrument; @@ -91,10 +93,6 @@ pub type ActorResult = Result<(), ActorError>; /// Final exit status of an actor. pub type ExitStatus = Result<(), Error>; -/// The [`Actor::boot`] method returns an `ActorHandle`, representing -/// the actor's mainloop task. -pub type ActorHandle = tokio::task::JoinHandle; - /// The type of the "disarm" function returned from [`Activation::prevent_inert_check`]. pub type DisarmFn = Box; @@ -277,8 +275,8 @@ struct EventBuffer { pub account: Arc, // INVARIANT: At most one of single_queue and multiple_queues is non-None at any given time. - single_queue: Option<(ActorId, UnboundedSender, PendingEventQueue)>, - multiple_queues: Option, PendingEventQueue)>>, + single_queue: Option<(Arc, PendingEventQueue)>, + multiple_queues: Option, PendingEventQueue)>>, } /// An `Account` records a "debt" in terms of outstanding work items. @@ -310,32 +308,40 @@ pub struct LoanedItem { } enum SystemMessage { + None, Release, ReleaseField(FieldId), Turn(Option, LoanedItem), Crash(Error), } -/// The mechanism by which events are delivered to a given [`Actor`]. -pub struct Mailbox { - /// The ID of the actor this mailbox corresponds to. - pub actor_id: ActorId, - tx: UnboundedSender, +impl Default for SystemMessage { + fn default() -> Self { + SystemMessage::None + } } -/// Each actor owns an instance of this structure. -/// -/// It holds the receive-half of the actor's mailbox, plus a reference -/// to the actor's private state. -pub struct Actor { - rx: UnboundedReceiver, - trace_collector: Option, - ac_ref: ActorRef, +type Link = Arc>>>; + +struct WorkItem { + item: AtomicCell, + link: Link, +} + +unsafe impl Send for WorkItem {} +unsafe impl Sync for WorkItem {} + +/// The mechanism by which events are delivered to a given [`Actor`]. +pub struct Mailbox { + /// The actor this mailbox corresponds to. + pub actor: Actor, + tail: Link, + count: Arc, } /// A reference to an actor's private [`ActorState`]. #[derive(Clone)] -pub struct ActorRef { +pub struct Actor { /// The ID of the referenced actor. pub actor_id: ActorId, @@ -345,11 +351,11 @@ pub struct ActorRef { pub state: Arc>, } -/// A combination of an [`ActorRef`] with a [`FacetId`], acting as a capability to enter the +/// A combination of an [`Actor`] with a [`FacetId`], acting as a capability to enter the /// execution context of a facet from a linked task. #[derive(Clone)] pub struct FacetRef { - pub actor: ActorRef, + pub actor: Actor, pub facet_id: FacetId, } @@ -371,7 +377,10 @@ pub enum ActorState { pub struct RunningActor { /// The ID of the actor this state belongs to. pub actor_id: ActorId, - tx: UnboundedSender, + myself: Option, + head: Link, + tail: Link, + count: Arc, mailbox: Weak, dataflow: Graph, #[doc(hidden)] @@ -384,7 +393,9 @@ pub struct RunningActor { pub facet_nodes: Map, #[doc(hidden)] pub facet_children: Map>, - pub root: FacetId, + pub root: Option, + pub trace_collector: Option, + waiters: Vec, } /// The type of process-unique task IDs. @@ -400,7 +411,7 @@ pub type TaskId = u64; pub struct Field { pub name: String, pub field_id: FieldId, - tx: UnboundedSender, + mailbox: Arc, phantom: PhantomData, } @@ -558,7 +569,7 @@ preserves_schema::support::lazy_static! { pub static ref ACCOUNTS: RwLock)>> = Default::default(); #[doc(hidden)] - pub static ref ACTORS: RwLock> = Default::default(); + pub static ref ACTORS: RwLock> = Default::default(); } impl TryFrom<&AnyValue> for Synced { @@ -590,13 +601,37 @@ impl<'a> Unparse<&'a (), AnyValue> for Synced { } } +impl WorkItem { + fn new(item: SystemMessage) -> Self { + WorkItem { + item: AtomicCell::new(item), + link: Arc::new(AtomicCell::new(None)), + } + } +} + +impl Mailbox { + fn send(&self, item: SystemMessage) { + let w = Arc::new(WorkItem::new(item)); + self.tail.swap(Some(w.clone())).unwrap().link.store(Some(w)); + let old_count = self.count.fetch_add(1, Ordering::SeqCst); + tracing::debug!(?self, ?old_count, "sent!"); + if old_count == 0 { + let ac = self.actor.clone(); + tokio::spawn(async { + ac.do_work().await; + }); + } + } +} + mod panic_guard { use super::*; - pub struct PanicGuard(Option>); + pub struct PanicGuard(Option>); impl PanicGuard { - pub(super) fn new(tx: UnboundedSender) -> Self { + pub(super) fn new(mailbox: Arc) -> Self { tracing::trace!("Panic guard armed"); - PanicGuard(Some(tx)) + PanicGuard(Some(mailbox)) } pub fn disarm(&mut self) { tracing::trace!("Panic guard disarmed"); @@ -605,9 +640,9 @@ mod panic_guard { } impl Drop for PanicGuard { fn drop(&mut self) { - if let Some(tx) = &self.0 { + if let Some(mailbox) = &self.0 { tracing::trace!("Panic guard triggering"); - let _ = tx.send(SystemMessage::Crash( + mailbox.send(SystemMessage::Crash( error("Actor panicked during activation", AnyValue::new(false)))); } } @@ -634,69 +669,33 @@ impl FacetRef { match &mut *g { ActorState::Terminated { .. } => false, - ActorState::Running(state) => { - let mut panic_guard = panic_guard::PanicGuard::new(state.tx.clone()); - - // let _entry = tracing::info_span!(parent: None, "actor", actor_id = ?self.actor.actor_id).entered(); - let mut activation = - Activation::make(self, - Arc::clone(account), - cause, - account.trace_collector.clone(), - state); - let f_result = f(&mut activation); - - let maybe_exit_status = match f_result { - Ok(()) => - activation.commit().map_or_else( - |e| Some(Err(e)), - |_| { - // If we would otherwise continue, check the root facet: is it - // still alive? If not, then the whole actor should terminate now. - if let None = activation.state.get_facet(activation.state.root) { - tracing::trace!( - "terminating actor because root facet no longer exists"); - Some(Ok(())) - } else { - None - } - }), - Err(e) => { - activation.rollback(); - Some(Err(e)) - } - }; - - panic_guard.disarm(); - drop(panic_guard); - drop(activation); - - match maybe_exit_status { + ActorState::Running(state) => + match state.activate(account, cause, f) { None => true, Some(exit_status) => { - g.terminate(exit_status.map_err(|e| e.into()), &self.actor, &account.trace_collector); + g.terminate(exit_status, self.actor.actor_id, &account.trace_collector); false } - } - } + }, } } } impl<'activation> Activation<'activation> { fn make( - facet: &FacetRef, + facet: FacetRef, account: Arc, cause: Option, trace_collector: Option, state: &'activation mut RunningActor, ) -> Self { + let actor_id = facet.actor.actor_id; Activation { - facet: facet.clone(), + facet, state, active_block: None, pending: EventBuffer::new( - facet.actor.actor_id, + actor_id, account, cause.map(|c| trace::TurnDescription::new( NEXT_ACTIVATION_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed), @@ -1115,7 +1114,7 @@ impl<'activation> Activation<'activation> { } Err(e) => { tracing::error!(task_id, "linked task error: {}", e); - let _ = mailbox.tx.send(SystemMessage::Crash(e.clone())); + mailbox.send(SystemMessage::Crash(e.clone())); Err(e)? } } @@ -1225,7 +1224,7 @@ impl<'activation> Activation<'activation> { name: Name, link: bool, target: Box>, - ) -> (Option>>, ActorRef) { + ) -> (Option>>, Actor) { let ac_ref = self._spawn(name, |t| { let _ = t.prevent_inert_check(); Ok(()) @@ -1235,7 +1234,7 @@ impl<'activation> Activation<'activation> { ActorState::Terminated { .. } => None, ActorState::Running(state) => Some(Arc::new(Ref { mailbox: state.mailbox(), - facet_id: state.root, + facet_id: state.root.as_ref().unwrap().facet_id, target: Mutex::new(Some(target)), })), }; @@ -1248,7 +1247,7 @@ impl<'activation> Activation<'activation> { &mut self, name: Name, boot: F, - ) -> ActorRef { + ) -> Actor { self._spawn(name, boot, false) } @@ -1261,7 +1260,7 @@ impl<'activation> Activation<'activation> { &mut self, name: Name, boot: F, - ) -> ActorRef { + ) -> Actor { self._spawn(name, boot, true) } @@ -1270,20 +1269,23 @@ impl<'activation> Activation<'activation> { name: Name, boot: F, link: bool, - ) -> ActorRef { + ) -> Actor { let ac = Actor::new(Some(self.state.actor_id), self.trace_collector()); - let ac_ref = ac.ac_ref.clone(); + let actor_id = ac.actor_id; self.pending.trace(|| trace::ActionDescription::Spawn { link, - id: Box::new(trace::ActorId(AnyValue::new(ac_ref.actor_id))), + id: Box::new(trace::ActorId(AnyValue::new(actor_id))), }); let cause = self.pending.desc.as_ref().map( |d| trace::TurnCause::Turn { id: Box::new(d.id.clone()) }); let ac = if link { ac.link(self) } else { ac }; - self.on_commit(move |t| { - ac.boot(name, Arc::clone(t.account()), cause, boot); - }); - ac_ref + self.on_commit(enclose!((ac) move |t| { + let account = t.account().clone(); + tokio::spawn(async { + ac.boot(name, account, cause, boot); + }); + })); + ac } /// Create a new subfacet of the currently-active facet. Runs `boot` in the new facet's @@ -1371,7 +1373,7 @@ impl<'activation> Activation<'activation> { /// /// Equivalent to `self.stop_facet(self.state.root)`. pub fn stop_root(&mut self) { - self.stop_facet(self.state.root); + self.stop_facet(self.state.root.as_ref().unwrap().facet_id); } fn stop_if_inert(&mut self) { @@ -1463,7 +1465,7 @@ impl<'activation> Activation<'activation> { Arc::new(Field { name: name.to_owned(), field_id, - tx: self.state.tx.clone(), + mailbox: self.state.mailbox(), phantom: PhantomData, }) } @@ -1596,25 +1598,25 @@ impl EventBuffer { fn queue_for_mailbox(&mut self, mailbox: &Arc) -> &mut PendingEventQueue { if self.multiple_queues.is_some() { - return &mut self.multiple_queues.as_mut().unwrap().entry(mailbox.actor_id) - .or_insert((mailbox.tx.clone(), Vec::with_capacity(3))).1; + return &mut self.multiple_queues.as_mut().unwrap().entry(mailbox.actor.actor_id) + .or_insert((mailbox.clone(), Vec::with_capacity(3))).1; } if let None = self.single_queue { - self.single_queue = Some((mailbox.actor_id, mailbox.tx.clone(), Vec::with_capacity(3))); - return &mut self.single_queue.as_mut().unwrap().2; + self.single_queue = Some((mailbox.clone(), Vec::with_capacity(3))); + return &mut self.single_queue.as_mut().unwrap().1; } - if Some(mailbox.actor_id) == self.single_queue.as_ref().map(|e| e.0) { - return &mut self.single_queue.as_mut().unwrap().2; + if Some(mailbox.actor.actor_id) == self.single_queue.as_ref().map(|e| e.0.actor.actor_id) { + return &mut self.single_queue.as_mut().unwrap().1; } - let (aid, tx, q) = std::mem::take(&mut self.single_queue).unwrap(); + let (mb, q) = std::mem::take(&mut self.single_queue).unwrap(); let mut table = HashMap::new(); - table.insert(aid, (tx, q)); + table.insert(mb.actor.actor_id, (mb.clone(), q)); self.multiple_queues = Some(table); - return &mut self.multiple_queues.as_mut().unwrap().entry(mailbox.actor_id) - .or_insert((mailbox.tx.clone(), Vec::with_capacity(3))).1; + return &mut self.multiple_queues.as_mut().unwrap().entry(mailbox.actor.actor_id) + .or_insert((mailbox.clone(), Vec::with_capacity(3))).1; } fn commit(&mut self) { @@ -1631,14 +1633,14 @@ impl EventBuffer { // problems it caused was a relay output_loop that received EPIPE causing the relay // to crash, just as it was receiving thousands of messages a second, leading to // many, many log reports of failed send_actions from the following line.) - if let Some((_actor_id, tx, turn)) = std::mem::take(&mut self.single_queue) { + if let Some((mailbox, turn)) = std::mem::take(&mut self.single_queue) { let desc = self.desc.as_ref().map(|d| trace::TurnCause::Turn { id: Box::new(d.id.clone()) }); - let _ = send_actions(&tx, desc, &self.account, turn); + let _ = send_actions(mailbox, desc, &self.account, turn); } if let Some(table) = std::mem::take(&mut self.multiple_queues) { - for (_actor_id, (tx, turn)) in table.into_iter() { + for (_actor_id, (mailbox, turn)) in table.into_iter() { let desc = self.desc.as_ref().map(|d| trace::TurnCause::Turn { id: Box::new(d.id.clone()) }); - let _ = send_actions(&tx, desc, &self.account, turn); + let _ = send_actions(mailbox, desc, &self.account, turn); } } } @@ -1743,38 +1745,38 @@ impl Drop for LoanedItem { #[must_use] fn send_actions( - tx: &UnboundedSender, + mailbox: Arc, caused_by: Option, account: &Arc, t: PendingEventQueue, -) -> ActorResult { +) { let token_count = t.len(); - Ok(tx.send(SystemMessage::Turn(caused_by, LoanedItem::new(account, token_count, t))) - .map_err(|_| error("Target actor not running", AnyValue::new(false)))?) + tracing::debug!(?mailbox, ?token_count, "send_actions"); + mailbox.send(SystemMessage::Turn(caused_by, LoanedItem::new(account, token_count, t))) } impl std::fmt::Debug for Mailbox { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { - write!(f, "#", self.actor_id) + write!(f, "#", self.actor.actor_id) } } impl std::hash::Hash for Mailbox { fn hash(&self, state: &mut H) { - self.actor_id.hash(state) + self.actor.actor_id.hash(state) } } impl Eq for Mailbox {} impl PartialEq for Mailbox { fn eq(&self, other: &Mailbox) -> bool { - self.actor_id == other.actor_id + self.actor.actor_id == other.actor.actor_id } } impl Ord for Mailbox { fn cmp(&self, other: &Mailbox) -> std::cmp::Ordering { - return self.actor_id.cmp(&other.actor_id) + return self.actor.actor_id.cmp(&other.actor.actor_id) } } @@ -1786,175 +1788,8 @@ impl PartialOrd for Mailbox { impl Drop for Mailbox { fn drop(&mut self) { - tracing::debug!("Last reference to mailbox of actor id {:?} was dropped", self.actor_id); - let _ = self.tx.send(SystemMessage::Release); - } -} - -impl Actor { - /// Create and start a new "top-level" actor: an actor not - /// causally related to another. This is the usual way to start a - /// Syndicate program. - pub fn top ActorResult>( - trace_collector: Option, - boot: F, - ) -> ActorHandle { - let ac = Actor::new(None, trace_collector.clone()); - let account = Account::new(None, trace_collector); - ac.boot(None, account, Some(trace::TurnCause::external("top-level actor")), boot) - } - - /// Create a new actor. It still needs to be [`boot`ed][Self::boot]. - pub fn new(parent_actor_id: Option, trace_collector: Option) -> Self { - let (tx, rx) = unbounded_channel(); - let actor_id = next_actor_id(); - let root = Facet::new(None); - tracing::debug!(?actor_id, ?parent_actor_id, root_facet_id = ?root.facet_id, "Actor::new"); - let mut st = RunningActor { - actor_id, - tx, - mailbox: Weak::new(), - dataflow: Graph::new(), - fields: HashMap::new(), - blocks: HashMap::new(), - exit_hooks: Vec::new(), - outbound_assertions: Map::new(), - facet_nodes: Map::new(), - facet_children: Map::new(), - root: root.facet_id, - }; - st.facet_nodes.insert(root.facet_id, root); - let ac_ref = ActorRef { actor_id, state: Arc::new(Mutex::new(ActorState::Running(st))) }; - Actor { rx, trace_collector, ac_ref } - } - - fn link(self, t_parent: &mut Activation) -> Self { - if t_parent.active_facet().is_none() { - panic!("No active facet when calling spawn_link"); - } - let account = Arc::clone(t_parent.account()); - let mut h_to_child = None; - let mut h_to_parent = None; - let is_alive = self.ac_ref.root_facet_ref().activate( - &account, - None, - |t_child| { - h_to_child = Some(t_parent.half_link(t_child)); - h_to_parent = Some(t_child.half_link(t_parent)); - Ok(()) - }); - if is_alive { - let parent_actor = t_parent.state.actor_id; - t_parent.trace(|_| trace::ActionDescription::Link { - parent_actor: Box::new(trace::ActorId(AnyValue::new(parent_actor))), - parent_to_child: Box::new(protocol::Handle(h_to_child.unwrap().into())), - child_actor: Box::new(trace::ActorId(AnyValue::new(self.ac_ref.actor_id))), - child_to_parent: Box::new(protocol::Handle(h_to_parent.unwrap().into())), - }); - self - } else { - panic!("spawn_link'd actor terminated before link could happen"); - } - } - - /// Start the actor's mainloop. Takes ownership of `self`. The - /// `name` is used as context for any log messages emitted by the - /// actor. The `boot` function is called in the actor's context, - /// and then the mainloop is entered. - pub fn boot ActorResult>( - mut self, - name: Name, - boot_account: Arc, - boot_cause: Option, - boot: F, - ) -> ActorHandle { - let actor_id = self.ac_ref.actor_id; - ACTORS.write().insert(actor_id, (name.clone(), self.ac_ref.clone())); - let trace_collector = boot_account.trace_collector.clone(); - if let Some(c) = &trace_collector { - c.record(actor_id, trace::ActorActivation::Start { - actor_name: Box::new(name.into()), - }); - } - tokio::spawn(async move { - tracing::trace!(?actor_id, "start"); - self.run(boot_account, boot_cause, move |t| { - t.facet(boot)?; - Ok(()) - }).await; - tracing::trace!(?actor_id, "stop"); - self.ac_ref.exit_status().expect("terminated") - }) - // }.instrument(tracing::info_span!(parent: None, "actor", ?actor_id).or_current())) - } - - async fn run ActorResult>( - &mut self, - boot_account: Arc, - boot_cause: Option, - boot: F, - ) -> () { - let root_facet_ref = self.ac_ref.root_facet_ref(); - - let terminate = |e: Error | { - assert!(!root_facet_ref.activate(&Account::new(None, None), None, |_| Err(e)?)); - }; - - if !root_facet_ref.activate(&boot_account, boot_cause, boot) { - return; - } - - 'mainloop: loop { - tracing::trace!(actor_id = ?self.ac_ref.actor_id, "mainloop top"); - match self.rx.recv().await { - None => { - terminate(error("Unexpected channel close", AnyValue::new(false))); - break 'mainloop; - } - Some(m) => match m { - SystemMessage::Release => { - tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::Release"); - assert!(!root_facet_ref.activate(&Account::new(None, None), None, |t| { - t.stop_root(); - Ok(()) - })); - break 'mainloop; - } - SystemMessage::ReleaseField(field_id) => { - tracing::trace!(actor_id = ?self.ac_ref.actor_id, - "SystemMessage::ReleaseField({})", field_id); - self.ac_ref.access(|s| if let ActorState::Running(ra) = s { - ra.fields.remove(&field_id); - }) - } - SystemMessage::Turn(cause, mut loaned_item) => { - tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::Turn"); - let actions = std::mem::take(&mut loaned_item.item); - if !root_facet_ref.activate( - &loaned_item.account, cause, |t| { - for (maybe_desc, action) in actions.into_iter() { - if let Some(desc) = maybe_desc { - t.pending.trace(|| trace::ActionDescription::Dequeue { - event: Box::new(desc), - }); - } - action(t)?; - } - Ok(()) - }) - { - break 'mainloop; - } - } - SystemMessage::Crash(e) => { - tracing::trace!(actor_id = ?self.ac_ref.actor_id, - "SystemMessage::Crash({:?})", &e); - terminate(e); - break 'mainloop; - } - } - } - } + tracing::debug!("Last reference to mailbox of actor id {:?} was dropped", self.actor.actor_id); + self.send(SystemMessage::Release); } } @@ -1986,7 +1821,171 @@ impl Facet { } } -impl ActorRef { +impl Future for Actor { + type Output = ExitStatus; + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { + self.access(|state| { + match state { + ActorState::Terminated { exit_status } => std::task::Poll::Ready(exit_status.as_ref().clone()), + ActorState::Running(st) => { + st.waiters.push(cx.waker().clone()); + std::task::Poll::Pending + } + } + }) + } +} + +impl Actor { + /// Create and start a new "top-level" actor: an actor not + /// causally related to another. This is the usual way to start a + /// Syndicate program. + pub fn top ActorResult>( + trace_collector: Option, + boot: F, + ) -> Self { + let ac = Actor::new(None, trace_collector.clone()); + let account = Account::new(None, trace_collector); + ac.boot(None, account, Some(trace::TurnCause::external("top-level actor")), boot) + } + + /// Create a new actor. It still needs to be [`boot`ed][Self::boot]. + pub fn new(parent_actor_id: Option, trace_collector: Option) -> Self { + let actor_id = next_actor_id(); + let root = Facet::new(None); + tracing::debug!(?actor_id, ?parent_actor_id, root_facet_id = ?root.facet_id, "Actor::new"); + let head_item = Arc::new(WorkItem::new(SystemMessage::None)); + let mut st = RunningActor { + actor_id, + myself: None, + head: head_item.link.clone(), + tail: Arc::new(AtomicCell::new(Some(head_item))), + count: Arc::new(AtomicU64::new(0)), + mailbox: Weak::new(), + dataflow: Graph::new(), + fields: HashMap::new(), + blocks: HashMap::new(), + exit_hooks: Vec::new(), + outbound_assertions: Map::new(), + facet_nodes: Map::new(), + facet_children: Map::new(), + root: None, + trace_collector, + waiters: Vec::new(), + }; + let root_id = root.facet_id; + st.facet_nodes.insert(root.facet_id, root); + let myself = Arc::new(Mutex::new(ActorState::Running(st))); + let mut g = myself.lock(); + if let ActorState::Running(state) = &mut *g { + state.myself = Some(Actor { + actor_id, + state: myself.clone(), + }); + state.root = Some(FacetRef { + actor: state.myself.as_ref().unwrap().clone(), + facet_id: root_id, + }); + } + drop(g); + Actor { actor_id, state: myself } + } + + fn link(self, t_parent: &mut Activation) -> Self { + if t_parent.active_facet().is_none() { + panic!("No active facet when calling spawn_link"); + } + let account = Arc::clone(t_parent.account()); + let mut h_to_child = None; + let mut h_to_parent = None; + let is_alive = self.root_facet_ref().activate( + &account, + None, + |t_child| { + h_to_child = Some(t_parent.half_link(t_child)); + h_to_parent = Some(t_child.half_link(t_parent)); + Ok(()) + }); + if is_alive { + let parent_actor = t_parent.state.actor_id; + t_parent.trace(|_| trace::ActionDescription::Link { + parent_actor: Box::new(trace::ActorId(AnyValue::new(parent_actor))), + parent_to_child: Box::new(protocol::Handle(h_to_child.unwrap().into())), + child_actor: Box::new(trace::ActorId(AnyValue::new(self.actor_id))), + child_to_parent: Box::new(protocol::Handle(h_to_parent.unwrap().into())), + }); + self + } else { + panic!("spawn_link'd actor terminated before link could happen"); + } + } + + /// Start the actor's mainloop. Takes ownership of `self`. The + /// `name` is used as context for any log messages emitted by the + /// actor. The `boot` function is called in the actor's context, + /// and then the mainloop is entered. + pub fn boot ActorResult>( + self, + name: Name, + boot_account: Arc, + boot_cause: Option, + boot: F, + ) -> Self { + let actor_id = self.actor_id; + ACTORS.write().insert(actor_id, (name.clone(), self.clone())); + let trace_collector = boot_account.trace_collector.clone(); + if let Some(c) = &trace_collector { + c.record(actor_id, trace::ActorActivation::Start { + actor_name: Box::new(name.into()), + }); + } + self.root_facet_ref().activate(&boot_account, boot_cause, move |t| { + tracing::trace!(?actor_id, "start"); + t.facet(boot)?; + Ok(()) + }); + self + } + + async fn do_work(self) -> () { + let mut g = self.state.lock(); + let mut exit_status = None; + match &mut *g { + ActorState::Terminated { .. } => (), + ActorState::Running(st) => { + let mut completed = 0; + tracing::debug!(?st.actor_id, "do_work"); + let mut batch = st.count.load(Ordering::SeqCst); + 'batch_loop: while batch > 0 { + tracing::trace!(?st.actor_id, ?batch, "batch starting"); + for _i in 0 .. batch { + let mut w = None; + tracing::trace!(?st.actor_id, "awaiting incoming"); + while w.is_none() { w = st.head.take(); } + tracing::trace!(?st.actor_id, "got incoming"); + let w = w.unwrap(); + st.head = w.link.clone(); + let item = w.item.take(); + exit_status = st.handle_system_message(item); + completed += 1; + if exit_status.is_some() { + tracing::trace!(?st.actor_id, "do_work early exit"); + break 'batch_loop; + } + } + tracing::trace!(?st.actor_id, ?batch, "batch complete"); + batch = st.count.fetch_sub(batch, Ordering::SeqCst) - batch; + } + tracing::debug!(?st.actor_id, ?completed, "do_work done"); + } + } + if let Some(e) = exit_status { + let tc = g.trace_collector(); + g.terminate(e, self.actor_id, &tc); + } + } + /// Uses an internal mutex to access the internal state: takes the /// lock, calls `f` with the internal state, releases the lock, /// and returns the result of `f`. @@ -2005,28 +2004,17 @@ impl ActorRef { }) } - fn facet_ref(&self, facet_id: FacetId) -> FacetRef { - FacetRef { - actor: self.clone(), - facet_id, - } - } - - fn root_facet_id(&self) -> FacetId { + fn root_facet_ref(&self) -> FacetRef { self.access(|s| match s { ActorState::Terminated { .. } => panic!("Actor unexpectedly in terminated state"), - ActorState::Running(ra) => ra.root, // what a lot of work to get this one number + ActorState::Running(ra) => ra.root.as_ref().unwrap().clone(), // what a lot of work to get this one value }) } - - fn root_facet_ref(&self) -> FacetRef { - self.facet_ref(self.root_facet_id()) - } } -impl std::fmt::Debug for ActorRef { +impl std::fmt::Debug for Actor { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { - write!(f, "#", self.actor_id) + write!(f, "#", self.actor_id) } } @@ -2038,10 +2026,17 @@ impl ActorState { } } + fn trace_collector(&self) -> Option { + match self { + ActorState::Terminated { .. } => None, + ActorState::Running(RunningActor { trace_collector, .. }) => trace_collector.clone(), + } + } + fn terminate( &mut self, exit_status: ExitStatus, - actor: &ActorRef, + actor_id: ActorId, trace_collector: &Option, ) { if !self.is_running() { @@ -2055,16 +2050,16 @@ impl ActorState { ActorState::Terminated { .. } => unreachable!(), ActorState::Running(state) => - state.cleanup(actor, Arc::clone(&exit_status), trace_collector.clone()), + state.cleanup(Arc::clone(&exit_status), trace_collector.clone()), } match &*exit_status { - Ok(()) => tracing::trace!(actor_id=?actor.actor_id, "normal stop"), - Err(e) => tracing::error!(actor_id=?actor.actor_id, %e, "error stop"), + Ok(()) => tracing::trace!(?actor_id, "normal stop"), + Err(e) => tracing::error!(?actor_id, %e, "error stop"), } if let Some(c) = trace_collector { - c.record(actor.actor_id, trace::ActorActivation::Stop { + c.record(actor_id, trace::ActorActivation::Stop { status: Box::new(match &*exit_status { Ok(()) => trace::ExitStatus::Ok, Err(e) => trace::ExitStatus::Error(Box::new(e.clone())), @@ -2078,16 +2073,17 @@ impl RunningActor { /// Requests a shutdown of the actor. The shutdown request is /// handled by the actor's main loop, causing it to terminate with /// exit status `Ok(())`. - pub fn shutdown(&self) { - let _ = self.tx.send(SystemMessage::Release); + pub fn shutdown(&mut self) { + self.mailbox().send(SystemMessage::Release); } fn mailbox(&mut self) -> Arc { match self.mailbox.upgrade() { None => { let new_mailbox = Arc::new(Mailbox { - actor_id: self.actor_id, - tx: self.tx.clone(), + actor: self.myself.as_ref().unwrap().clone(), + tail: self.tail.clone(), + count: self.count.clone(), }); self.mailbox = Arc::downgrade(&new_mailbox); new_mailbox @@ -2134,19 +2130,108 @@ impl RunningActor { } } + fn handle_system_message(&mut self, m: SystemMessage) -> Option { + match m { + SystemMessage::None => { + tracing::trace!(actor_id = ?self.actor_id, "SystemMessage::None"); + None + } + SystemMessage::Release => { + tracing::trace!(actor_id = ?self.actor_id, "SystemMessage::Release"); + self.activate(&Account::new(None, None), None, |t| { + t.stop_root(); + Ok(()) + }) + } + SystemMessage::ReleaseField(field_id) => { + tracing::trace!(actor_id = ?self.actor_id, + "SystemMessage::ReleaseField({})", field_id); + self.fields.remove(&field_id); + None + } + SystemMessage::Turn(cause, mut loaned_item) => { + tracing::trace!(actor_id = ?self.actor_id, "SystemMessage::Turn"); + let actions = std::mem::take(&mut loaned_item.item); + self.activate( + &loaned_item.account, cause, |t| { + for (maybe_desc, action) in actions.into_iter() { + if let Some(desc) = maybe_desc { + t.pending.trace(|| trace::ActionDescription::Dequeue { + event: Box::new(desc), + }); + } + action(t)?; + } + Ok(()) + }) + } + SystemMessage::Crash(e) => { + tracing::trace!(actor_id = ?self.actor_id, + "SystemMessage::Crash({:?})", &e); + self.activate(&Account::new(None, None), None, |_| Err(e)?) + } + } + } + + fn activate ( + &mut self, + account: &Arc, + cause: Option, + f: F, + ) -> Option where + F: FnOnce(&mut Activation) -> ActorResult, + { + let mut panic_guard = panic_guard::PanicGuard::new(self.mailbox()); + + // let _entry = tracing::info_span!(parent: None, "actor", actor_id = ?self.actor.actor_id).entered(); + let mut activation = + Activation::make(self.root.as_ref().unwrap().clone(), + Arc::clone(account), + cause, + account.trace_collector.clone(), + self); + let root_facet_id = activation.facet.facet_id; + let f_result = f(&mut activation); + let maybe_exit_status = match f_result { + Ok(()) => + activation.commit().map_or_else( + |e| Some(Err(e)), + |_| { + // If we would otherwise continue, check the root facet: is it + // still alive? If not, then the whole actor should terminate now. + if let None = activation.state.get_facet(root_facet_id) { + tracing::trace!( + "terminating actor because root facet no longer exists"); + Some(Ok(())) + } else { + None + } + }), + Err(e) => { + activation.rollback(); + Some(Err(e)) + } + }; + + panic_guard.disarm(); + drop(panic_guard); + drop(activation); + + maybe_exit_status.map(|s| s.map_err(|e| e.into())) + } + fn cleanup( mut self, - ac_ref: &ActorRef, exit_status: Arc, trace_collector: Option, ) { if exit_status.is_ok() { - assert!(self.get_facet(self.root).is_none()); + assert!(self.get_facet(self.root.as_ref().unwrap().facet_id).is_none()); } let cause = Some(trace::TurnCause::Cleanup); let account = Account::new(Some(AnyValue::symbol("cleanup")), trace_collector.clone()); - let mut t = Activation::make(&ac_ref.facet_ref(self.root), + let mut t = Activation::make(self.root.as_ref().unwrap().clone(), account, cause, trace_collector, @@ -2173,6 +2258,12 @@ impl RunningActor { for action in std::mem::take(&mut t.state.exit_hooks) { action(&mut t, &exit_status); } + + drop(t); + + for w in self.waiters.into_iter() { + w.wake() + } } } @@ -2191,25 +2282,23 @@ impl PartialEq for Field { impl Drop for Field { fn drop(&mut self) { - let _ = self.tx.send(SystemMessage::ReleaseField(self.field_id)); - () + self.mailbox.send(SystemMessage::ReleaseField(self.field_id)); } } -impl Drop for Actor { - fn drop(&mut self) { - self.rx.close(); - ACTORS.write().remove(&self.ac_ref.actor_id); - // let _scope = tracing::info_span!(parent: None, "actor", actor_id = ?self.ac_ref.actor_id).entered(); - let mut g = self.ac_ref.state.lock(); - if g.is_running() { - g.terminate(Err(error("Force-terminated by Actor::drop", AnyValue::new(false))), - &self.ac_ref, - &self.trace_collector); - } - tracing::debug!("Actor::drop"); - } -} +// impl Drop for Actor { +// fn drop(&mut self) { +// tracing::debug!(?self.actor_id, "Eeek"); +// ACTORS.write().remove(&self.actor_id); +// // let _scope = tracing::info_span!(parent: None, "actor", actor_id = ?self.ac_ref.actor_id).entered(); +// let mut g = self.state.lock(); +// if g.is_running() { +// let tc = g.trace_collector(); +// g.terminate(Err(error("Force-terminated by Actor::drop", AnyValue::new(false))), self, &tc); +// } +// tracing::debug!("Actor::drop"); +// } +// } impl Drop for Facet { fn drop(&mut self) { @@ -2259,7 +2348,7 @@ impl Ref { } pub fn debug_str(&self) -> String { - format!("{}/{}:{:016x}", self.mailbox.actor_id, self.facet_id, self.oid()) + format!("{}/{}:{:016x}", self.mailbox.actor.actor_id, self.facet_id, self.oid()) } } @@ -2385,7 +2474,7 @@ impl Cap { self.underlying.debug_str() } else { format!("{}/{}:{:016x}\\{:?}", - self.underlying.mailbox.actor_id, + self.underlying.mailbox.actor.actor_id, self.underlying.facet_id, self.underlying.oid(), self.attenuation) @@ -2476,8 +2565,8 @@ pub async fn wait_for_all_actors_to_stop(wait_time: time::Duration) { tracing::warn!("Some actors remain after {:?}:", wait_time); for (name, actor) in remaining.into_values() { tracing::warn!(?name, ?actor.actor_id, "actor still running, requesting shutdown"); - let g = actor.state.lock(); - if let ActorState::Running(state) = &*g { + let mut g = actor.state.lock(); + if let ActorState::Running(state) = &mut *g { state.shutdown(); } } diff --git a/syndicate/src/supervise.rs b/syndicate/src/supervise.rs index bb930c4..5422b84 100644 --- a/syndicate/src/supervise.rs +++ b/syndicate/src/supervise.rs @@ -40,7 +40,7 @@ pub struct Supervisor { boot_fn: Boot, restarts: VecDeque, state: Arc>, - ac_ref: Option, + ac_ref: Option, } impl Default for SupervisorConfiguration { diff --git a/syndicate/src/trace.rs b/syndicate/src/trace.rs index 87a8fb3..1214d5e 100644 --- a/syndicate/src/trace.rs +++ b/syndicate/src/trace.rs @@ -27,7 +27,7 @@ pub struct TraceCollector { impl From<&Ref> for Target { fn from(v: &Ref) -> Target { Target { - actor: ActorId(AnyValue::new(v.mailbox.actor_id)), + actor: ActorId(AnyValue::new(v.mailbox.actor.actor_id)), facet: FacetId(AnyValue::new(u64::from(v.facet_id))), oid: Oid(AnyValue::new(v.oid())), }