From ed12c0883e2e64e53ff471e857de9e8b76cd4871 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 30 Sep 2021 13:32:41 +0200 Subject: [PATCH] Switch to parking_lot for another performance boost --- Cargo.lock | 44 +++++++ .../src/services/debt_reporter.rs | 2 +- syndicate/Cargo.toml | 1 + syndicate/src/actor.rs | 120 ++++++++---------- syndicate/src/relay.rs | 53 ++++---- 5 files changed, 126 insertions(+), 94 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 309bc4b..3673434 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -605,6 +605,15 @@ dependencies = [ "bytes", ] +[[package]] +name = "instant" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "716d3d89f35ac6a34fd0eed635395f4c3b76fa889338a4632e5231a8684216bd" +dependencies = [ + "cfg-if 1.0.0", +] + [[package]] name = "iovec" version = "0.1.4" @@ -666,6 +675,15 @@ version = "0.2.102" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2a5ac8f984bfcf3a823267e5fde638acc3325f6496633a5da6bb6eb2171e103" +[[package]] +name = "lock_api" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.4.14" @@ -973,6 +991,31 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" +dependencies = [ + "cfg-if 1.0.0", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi 0.3.9", +] + [[package]] name = "percent-encoding" version = "2.1.0" @@ -1476,6 +1519,7 @@ dependencies = [ "hmac", "lazy_static", "openssl", + "parking_lot", "preserves", "preserves-schema", "sha2", diff --git a/syndicate-server/src/services/debt_reporter.rs b/syndicate-server/src/services/debt_reporter.rs index 735995f..b688a8d 100644 --- a/syndicate-server/src/services/debt_reporter.rs +++ b/syndicate-server/src/services/debt_reporter.rs @@ -25,7 +25,7 @@ fn run(t: &mut Activation, ds: Arc) -> ActorResult { let mut timer = tokio::time::interval(core::time::Duration::from_secs(1)); loop { timer.tick().await; - for (id, (name, debt)) in syndicate::actor::ACCOUNTS.read().unwrap().iter() { + for (id, (name, debt)) in syndicate::actor::ACCOUNTS.read().iter() { let _enter = name.enter(); tracing::info!(id, debt = ?debt.load(std::sync::atomic::Ordering::Relaxed)); } diff --git a/syndicate/Cargo.toml b/syndicate/Cargo.toml index 4187d12..91085d1 100644 --- a/syndicate/Cargo.toml +++ b/syndicate/Cargo.toml @@ -28,6 +28,7 @@ futures = "0.3" getrandom = "0.2" hmac = "0.11" lazy_static = "1.4" +parking_lot = "0.11" sha2 = "0.9" tracing = "0.1" diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index a2405f7..5e363e0 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -13,6 +13,9 @@ use super::rewrite::CaveatError; use super::rewrite::CheckedCaveat; use super::schemas::sturdy; +use parking_lot::Mutex; +use parking_lot::RwLock; + use preserves::value::ArcValue; use preserves::value::Domain; use preserves::value::IOValue; @@ -31,8 +34,6 @@ use std::convert::TryInto; use std::marker::PhantomData; use std::num::NonZeroU64; use std::sync::Arc; -use std::sync::Mutex; -use std::sync::RwLock; use std::sync::Weak; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use std::time; @@ -576,49 +577,47 @@ impl FacetRef { ) -> ActorResult where F: FnOnce(&mut Activation) -> RunDisposition, { - match self.actor.state.lock() { - Err(_) => panicked_err(), - Ok(mut g) => match &mut *g { - ActorState::Terminated { exit_status } => - Err(error("Could not activate terminated actor", - encode_error((**exit_status).clone()))), - ActorState::Running(state) => { - tracing::trace!(actor_id=?self.actor.actor_id, "activate"); - let mut activation = Activation::make(self, account, state); - let f_result = f(&mut activation); - let result = match activation.restore_invariants(f_result) { - RunDisposition::Continue => Ok(()), - RunDisposition::Terminate(exit_status) => { - if exit_status.is_err() { - activation.clear(); - } - drop(activation); - let exit_status = Arc::new(exit_status); - let mut t = Activation::make(&self.actor.facet_ref(state.root), - Account::new(crate::name!("shutdown")), - state); - if let Err(err) = t._terminate_facet(t.state.root, exit_status.is_ok()) { - // This can only occur as the result of an internal error in this file's code. - tracing::error!(?err, "unexpected error from terminate_facet"); - panic!("Unexpected error result from terminate_facet"); - } - // TODO: The linked_tasks are being cancelled above ^ when their Facets drop. - // TODO: We don't want that: we want (? do we?) exit hooks to run before linked_tasks are cancelled. - // TODO: Example: send an error message in an exit_hook that is processed and delivered by a linked_task. - for action in std::mem::take(&mut t.state.exit_hooks) { - if let Err(err) = action(&mut t, &exit_status) { - tracing::error!(?err, "error in exit hook"); - } - } - *g = ActorState::Terminated { - exit_status: Arc::clone(&exit_status), - }; - (*exit_status).clone() + let mut g = self.actor.state.lock(); + match &mut *g { + ActorState::Terminated { exit_status } => + Err(error("Could not activate terminated actor", + encode_error((**exit_status).clone()))), + ActorState::Running(state) => { + tracing::trace!(actor_id=?self.actor.actor_id, "activate"); + let mut activation = Activation::make(self, account, state); + let f_result = f(&mut activation); + let result = match activation.restore_invariants(f_result) { + RunDisposition::Continue => Ok(()), + RunDisposition::Terminate(exit_status) => { + if exit_status.is_err() { + activation.clear(); } - }; - tracing::trace!(actor_id=?self.actor.actor_id, "deactivate"); - result - } + drop(activation); + let exit_status = Arc::new(exit_status); + let mut t = Activation::make(&self.actor.facet_ref(state.root), + Account::new(crate::name!("shutdown")), + state); + if let Err(err) = t._terminate_facet(t.state.root, exit_status.is_ok()) { + // This can only occur as the result of an internal error in this file's code. + tracing::error!(?err, "unexpected error from terminate_facet"); + panic!("Unexpected error result from terminate_facet"); + } + // TODO: The linked_tasks are being cancelled above ^ when their Facets drop. + // TODO: We don't want that: we want (? do we?) exit hooks to run before linked_tasks are cancelled. + // TODO: Example: send an error message in an exit_hook that is processed and delivered by a linked_task. + for action in std::mem::take(&mut t.state.exit_hooks) { + if let Err(err) = action(&mut t, &exit_status) { + tracing::error!(?err, "error in exit hook"); + } + } + *g = ActorState::Terminated { + exit_status: Arc::clone(&exit_status), + }; + (*exit_status).clone() + } + }; + tracing::trace!(actor_id=?self.actor.actor_id, "deactivate"); + result } } } @@ -1345,7 +1344,7 @@ impl Account { pub fn new(name: tracing::Span) -> Arc { let id = NEXT_ACCOUNT_ID.fetch_add(1, Ordering::Relaxed); let debt = Arc::new(AtomicI64::new(0)); - ACCOUNTS.write().unwrap().insert(id, (name, Arc::clone(&debt))); + ACCOUNTS.write().insert(id, (name, Arc::clone(&debt))); Arc::new(Account { id, debt, @@ -1388,7 +1387,7 @@ impl Account { impl Drop for Account { fn drop(&mut self) { - ACCOUNTS.write().unwrap().remove(&self.id); + ACCOUNTS.write().remove(&self.id); } } @@ -1552,7 +1551,7 @@ impl Actor { SystemMessage::ReleaseField(field_id) => { tracing::trace!(actor_id = ?self.ac_ref.actor_id, "SystemMessage::ReleaseField({})", field_id); - self.ac_ref.access(|s| if let ActorState::Running(ra) = s.unwrap() { + self.ac_ref.access(|s| if let ActorState::Running(ra) = s { ra.fields.remove(&field_id); }) } @@ -1611,19 +1610,12 @@ impl Facet { } } -fn panicked_err() -> ActorResult { - Err(error("Actor panicked", AnyValue::new(false))) -} - impl ActorRef { /// Uses an internal mutex to access the internal state: takes the /// lock, calls `f` with the internal state, releases the lock, /// and returns the result of `f`. - pub fn access) -> R>(&self, f: F) -> R { - match self.state.lock() { - Err(_) => f(None), - Ok(mut g) => f(Some(&mut *g)), - } + pub fn access R>(&self, f: F) -> R { + f(&mut *self.state.lock()) } /// Retrieves the exit status of the denoted actor. If it is still @@ -1631,12 +1623,10 @@ impl ActorRef { /// exited normally, or `Some(Err(_))` if it terminated /// abnormally. pub fn exit_status(&self) -> Option { - self.access(|s| s.map_or_else( - || Some(panicked_err()), - |state| match state { - ActorState::Running(_) => None, - ActorState::Terminated { exit_status } => Some((**exit_status).clone()), - })) + self.access(|state| match state { + ActorState::Running(_) => None, + ActorState::Terminated { exit_status } => Some((**exit_status).clone()), + }) } fn facet_ref(&self, facet_id: FacetId) -> FacetRef { @@ -1647,7 +1637,7 @@ impl ActorRef { } fn root_facet_id(&self) -> FacetId { - self.access(|s| match s.expect("Actor missing its state") { + self.access(|s| match s { ActorState::Terminated { .. } => panic!("Actor unexpectedly in terminated state"), ActorState::Running(ra) => ra.root, // what a lot of work to get this one number }) @@ -1796,7 +1786,7 @@ impl Ref { /// /// Panics if this `Ref` has already been given a behaviour. pub fn become_entity>(&self, e: E) { - let mut g = self.target.lock().expect("unpoisoned"); + let mut g = self.target.lock(); if g.is_some() { panic!("Double initialization of Ref"); } @@ -1804,7 +1794,7 @@ impl Ref { } fn internal_with_entity) -> R>(&self, f: F) -> R { - let mut g = self.target.lock().expect("unpoisoned"); + let mut g = self.target.lock(); f(g.as_mut().expect("initialized").as_mut()) } } diff --git a/syndicate/src/relay.rs b/syndicate/src/relay.rs index 9e04989..725757d 100644 --- a/syndicate/src/relay.rs +++ b/syndicate/src/relay.rs @@ -15,6 +15,8 @@ use futures::SinkExt; use futures::Stream; use futures::StreamExt; +use parking_lot::Mutex; + use preserves::error::Error as PreservesError; use preserves::error::is_eof_io_error; use preserves::value::BinarySource; @@ -37,7 +39,6 @@ use preserves_schema::ParseError; use std::io; use std::pin::Pin; use std::sync::Arc; -use std::sync::Mutex; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; @@ -244,7 +245,7 @@ impl TunnelRelay { let result = initial_oid.map( |io| Arc::clone(&tr.membranes.import_oid(t, &tr_ref, io).inc_ref().obj)); dump_membranes!(tr.membranes); - *tr_ref.lock().unwrap() = Some(tr); + *tr_ref.lock() = Some(tr); t.linked_task(crate::name!("writer"), output_loop(o, output_rx)); t.linked_task(crate::name!("reader"), input_loop(t.facet.clone(), i, tr_ref)); t.state.add_exit_hook(&self_entity); @@ -373,7 +374,7 @@ impl TunnelRelay { impl Entity for SyncPeer { fn message(&mut self, t: &mut Activation, _a: Synced) -> ActorResult { self.peer.message(t, &(), &AnyValue::new(true)); - let mut g = self.relay_ref.lock().expect("unpoisoned"); + let mut g = self.relay_ref.lock(); let tr = g.as_mut().expect("initialized"); tr.membranes.release(std::mem::take(&mut self.pins)); dump_membranes!(tr.membranes); @@ -609,7 +610,7 @@ async fn input_loop( match src.next().await { None => return Ok(LinkedTaskTermination::Normal), Some(bs) => facet.activate(Arc::clone(&account), |t| { - let mut g = relay.lock().expect("unpoisoned"); + let mut g = relay.lock(); let tr = g.as_mut().expect("initialized"); tr.handle_inbound_datagram(t, &bs?) })?, @@ -634,7 +635,7 @@ async fn input_loop( match n { 0 => return Ok(LinkedTaskTermination::Normal), _ => facet.activate(Arc::clone(&account), |t| { - let mut g = relay.lock().expect("unpoisoned"); + let mut g = relay.lock(); let tr = g.as_mut().expect("initialized"); tr.handle_inbound_stream(t, &mut buf) })?, @@ -667,7 +668,7 @@ async fn output_loop( impl Entity<()> for TunnelRefEntity { fn message(&mut self, t: &mut Activation, _m: ()) -> ActorResult { - let mut g = self.relay_ref.lock().expect("unpoisoned"); + let mut g = self.relay_ref.lock(); let tr = g.as_mut().expect("initialized"); let events = std::mem::take(&mut tr.pending_outbound); tr.send_packet(&t.account(), events.len(), P::Packet::Turn(Box::new(P::Turn(events.clone()))))?; @@ -680,7 +681,7 @@ impl Entity<()> for TunnelRefEntity { fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) -> ActorResult { if let Err(e) = &**exit_status { let e = e.clone(); - let mut g = self.relay_ref.lock().expect("unpoisoned"); + let mut g = self.relay_ref.lock(); let tr = g.as_mut().expect("initialized"); tr.send_packet(&t.account(), 1, P::Packet::Error(Box::new(e)))?; } @@ -690,32 +691,28 @@ impl Entity<()> for TunnelRefEntity { impl Entity for RelayEntity { fn assert(&mut self, t: &mut Activation, a: AnyValue, h: Handle) -> ActorResult { - let mut g = self.relay_ref.lock().expect("unpoisoned"); - let tr = g.as_mut().expect("initialized"); - tr.send_event(t, self.oid.clone(), P::Event::Assert(Box::new(P::Assert { - assertion: P::Assertion(a), - handle: P::Handle(h.into()), - }))) + self.relay_ref.lock().as_mut().expect("initialized") + .send_event(t, self.oid.clone(), P::Event::Assert(Box::new(P::Assert { + assertion: P::Assertion(a), + handle: P::Handle(h.into()), + }))) } fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult { - let mut g = self.relay_ref.lock().expect("unpoisoned"); - let tr = g.as_mut().expect("initialized"); - tr.send_event(t, self.oid.clone(), P::Event::Retract(Box::new(P::Retract { - handle: P::Handle(h.into()), - }))) + self.relay_ref.lock().as_mut().expect("initialized") + .send_event(t, self.oid.clone(), P::Event::Retract(Box::new(P::Retract { + handle: P::Handle(h.into()), + }))) } fn message(&mut self, t: &mut Activation, m: AnyValue) -> ActorResult { - let mut g = self.relay_ref.lock().expect("unpoisoned"); - let tr = g.as_mut().expect("initialized"); - tr.send_event(t, self.oid.clone(), P::Event::Message(Box::new(P::Message { - body: P::Assertion(m) - }))) + self.relay_ref.lock().as_mut().expect("initialized") + .send_event(t, self.oid.clone(), P::Event::Message(Box::new(P::Message { + body: P::Assertion(m) + }))) } fn sync(&mut self, t: &mut Activation, peer: Arc>) -> ActorResult { - let mut g = self.relay_ref.lock().expect("unpoisoned"); - let tr = g.as_mut().expect("initialized"); - tr.send_event(t, self.oid.clone(), P::Event::Sync(Box::new(P::Sync { - peer: Cap::guard(Arc::new(()), peer) - }))) + self.relay_ref.lock().as_mut().expect("initialized") + .send_event(t, self.oid.clone(), P::Event::Sync(Box::new(P::Sync { + peer: Cap::guard(Arc::new(()), peer) + }))) } }