Switch to parking_lot for another performance boost

This commit is contained in:
Tony Garnock-Jones 2021-09-30 13:32:41 +02:00
parent c252975a16
commit ed12c0883e
5 changed files with 126 additions and 94 deletions

44
Cargo.lock generated
View File

@ -605,6 +605,15 @@ dependencies = [
"bytes", "bytes",
] ]
[[package]]
name = "instant"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "716d3d89f35ac6a34fd0eed635395f4c3b76fa889338a4632e5231a8684216bd"
dependencies = [
"cfg-if 1.0.0",
]
[[package]] [[package]]
name = "iovec" name = "iovec"
version = "0.1.4" version = "0.1.4"
@ -666,6 +675,15 @@ version = "0.2.102"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2a5ac8f984bfcf3a823267e5fde638acc3325f6496633a5da6bb6eb2171e103" checksum = "a2a5ac8f984bfcf3a823267e5fde638acc3325f6496633a5da6bb6eb2171e103"
[[package]]
name = "lock_api"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109"
dependencies = [
"scopeguard",
]
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.14" version = "0.4.14"
@ -973,6 +991,31 @@ dependencies = [
"vcpkg", "vcpkg",
] ]
[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216"
dependencies = [
"cfg-if 1.0.0",
"instant",
"libc",
"redox_syscall",
"smallvec",
"winapi 0.3.9",
]
[[package]] [[package]]
name = "percent-encoding" name = "percent-encoding"
version = "2.1.0" version = "2.1.0"
@ -1476,6 +1519,7 @@ dependencies = [
"hmac", "hmac",
"lazy_static", "lazy_static",
"openssl", "openssl",
"parking_lot",
"preserves", "preserves",
"preserves-schema", "preserves-schema",
"sha2", "sha2",

View File

@ -25,7 +25,7 @@ fn run(t: &mut Activation, ds: Arc<Cap>) -> ActorResult {
let mut timer = tokio::time::interval(core::time::Duration::from_secs(1)); let mut timer = tokio::time::interval(core::time::Duration::from_secs(1));
loop { loop {
timer.tick().await; timer.tick().await;
for (id, (name, debt)) in syndicate::actor::ACCOUNTS.read().unwrap().iter() { for (id, (name, debt)) in syndicate::actor::ACCOUNTS.read().iter() {
let _enter = name.enter(); let _enter = name.enter();
tracing::info!(id, debt = ?debt.load(std::sync::atomic::Ordering::Relaxed)); tracing::info!(id, debt = ?debt.load(std::sync::atomic::Ordering::Relaxed));
} }

View File

@ -28,6 +28,7 @@ futures = "0.3"
getrandom = "0.2" getrandom = "0.2"
hmac = "0.11" hmac = "0.11"
lazy_static = "1.4" lazy_static = "1.4"
parking_lot = "0.11"
sha2 = "0.9" sha2 = "0.9"
tracing = "0.1" tracing = "0.1"

View File

@ -13,6 +13,9 @@ use super::rewrite::CaveatError;
use super::rewrite::CheckedCaveat; use super::rewrite::CheckedCaveat;
use super::schemas::sturdy; use super::schemas::sturdy;
use parking_lot::Mutex;
use parking_lot::RwLock;
use preserves::value::ArcValue; use preserves::value::ArcValue;
use preserves::value::Domain; use preserves::value::Domain;
use preserves::value::IOValue; use preserves::value::IOValue;
@ -31,8 +34,6 @@ use std::convert::TryInto;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::num::NonZeroU64; use std::num::NonZeroU64;
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
use std::sync::Weak; use std::sync::Weak;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::time; use std::time;
@ -576,9 +577,8 @@ impl FacetRef {
) -> ActorResult where ) -> ActorResult where
F: FnOnce(&mut Activation) -> RunDisposition, F: FnOnce(&mut Activation) -> RunDisposition,
{ {
match self.actor.state.lock() { let mut g = self.actor.state.lock();
Err(_) => panicked_err(), match &mut *g {
Ok(mut g) => match &mut *g {
ActorState::Terminated { exit_status } => ActorState::Terminated { exit_status } =>
Err(error("Could not activate terminated actor", Err(error("Could not activate terminated actor",
encode_error((**exit_status).clone()))), encode_error((**exit_status).clone()))),
@ -621,7 +621,6 @@ impl FacetRef {
} }
} }
} }
}
} }
impl<'activation> Activation<'activation> { impl<'activation> Activation<'activation> {
@ -1345,7 +1344,7 @@ impl Account {
pub fn new(name: tracing::Span) -> Arc<Self> { pub fn new(name: tracing::Span) -> Arc<Self> {
let id = NEXT_ACCOUNT_ID.fetch_add(1, Ordering::Relaxed); let id = NEXT_ACCOUNT_ID.fetch_add(1, Ordering::Relaxed);
let debt = Arc::new(AtomicI64::new(0)); let debt = Arc::new(AtomicI64::new(0));
ACCOUNTS.write().unwrap().insert(id, (name, Arc::clone(&debt))); ACCOUNTS.write().insert(id, (name, Arc::clone(&debt)));
Arc::new(Account { Arc::new(Account {
id, id,
debt, debt,
@ -1388,7 +1387,7 @@ impl Account {
impl Drop for Account { impl Drop for Account {
fn drop(&mut self) { fn drop(&mut self) {
ACCOUNTS.write().unwrap().remove(&self.id); ACCOUNTS.write().remove(&self.id);
} }
} }
@ -1552,7 +1551,7 @@ impl Actor {
SystemMessage::ReleaseField(field_id) => { SystemMessage::ReleaseField(field_id) => {
tracing::trace!(actor_id = ?self.ac_ref.actor_id, tracing::trace!(actor_id = ?self.ac_ref.actor_id,
"SystemMessage::ReleaseField({})", field_id); "SystemMessage::ReleaseField({})", field_id);
self.ac_ref.access(|s| if let ActorState::Running(ra) = s.unwrap() { self.ac_ref.access(|s| if let ActorState::Running(ra) = s {
ra.fields.remove(&field_id); ra.fields.remove(&field_id);
}) })
} }
@ -1611,19 +1610,12 @@ impl Facet {
} }
} }
fn panicked_err() -> ActorResult {
Err(error("Actor panicked", AnyValue::new(false)))
}
impl ActorRef { impl ActorRef {
/// Uses an internal mutex to access the internal state: takes the /// Uses an internal mutex to access the internal state: takes the
/// lock, calls `f` with the internal state, releases the lock, /// lock, calls `f` with the internal state, releases the lock,
/// and returns the result of `f`. /// and returns the result of `f`.
pub fn access<R, F: FnOnce(Option<&mut ActorState>) -> R>(&self, f: F) -> R { pub fn access<R, F: FnOnce(&mut ActorState) -> R>(&self, f: F) -> R {
match self.state.lock() { f(&mut *self.state.lock())
Err(_) => f(None),
Ok(mut g) => f(Some(&mut *g)),
}
} }
/// Retrieves the exit status of the denoted actor. If it is still /// Retrieves the exit status of the denoted actor. If it is still
@ -1631,12 +1623,10 @@ impl ActorRef {
/// exited normally, or `Some(Err(_))` if it terminated /// exited normally, or `Some(Err(_))` if it terminated
/// abnormally. /// abnormally.
pub fn exit_status(&self) -> Option<ActorResult> { pub fn exit_status(&self) -> Option<ActorResult> {
self.access(|s| s.map_or_else( self.access(|state| match state {
|| Some(panicked_err()),
|state| match state {
ActorState::Running(_) => None, ActorState::Running(_) => None,
ActorState::Terminated { exit_status } => Some((**exit_status).clone()), ActorState::Terminated { exit_status } => Some((**exit_status).clone()),
})) })
} }
fn facet_ref(&self, facet_id: FacetId) -> FacetRef { fn facet_ref(&self, facet_id: FacetId) -> FacetRef {
@ -1647,7 +1637,7 @@ impl ActorRef {
} }
fn root_facet_id(&self) -> FacetId { fn root_facet_id(&self) -> FacetId {
self.access(|s| match s.expect("Actor missing its state") { self.access(|s| match s {
ActorState::Terminated { .. } => panic!("Actor unexpectedly in terminated state"), ActorState::Terminated { .. } => panic!("Actor unexpectedly in terminated state"),
ActorState::Running(ra) => ra.root, // what a lot of work to get this one number ActorState::Running(ra) => ra.root, // what a lot of work to get this one number
}) })
@ -1796,7 +1786,7 @@ impl<M> Ref<M> {
/// ///
/// Panics if this `Ref` has already been given a behaviour. /// Panics if this `Ref` has already been given a behaviour.
pub fn become_entity<E: 'static + Entity<M>>(&self, e: E) { pub fn become_entity<E: 'static + Entity<M>>(&self, e: E) {
let mut g = self.target.lock().expect("unpoisoned"); let mut g = self.target.lock();
if g.is_some() { if g.is_some() {
panic!("Double initialization of Ref"); panic!("Double initialization of Ref");
} }
@ -1804,7 +1794,7 @@ impl<M> Ref<M> {
} }
fn internal_with_entity<R, F: FnOnce(&mut dyn Entity<M>) -> R>(&self, f: F) -> R { fn internal_with_entity<R, F: FnOnce(&mut dyn Entity<M>) -> R>(&self, f: F) -> R {
let mut g = self.target.lock().expect("unpoisoned"); let mut g = self.target.lock();
f(g.as_mut().expect("initialized").as_mut()) f(g.as_mut().expect("initialized").as_mut())
} }
} }

View File

@ -15,6 +15,8 @@ use futures::SinkExt;
use futures::Stream; use futures::Stream;
use futures::StreamExt; use futures::StreamExt;
use parking_lot::Mutex;
use preserves::error::Error as PreservesError; use preserves::error::Error as PreservesError;
use preserves::error::is_eof_io_error; use preserves::error::is_eof_io_error;
use preserves::value::BinarySource; use preserves::value::BinarySource;
@ -37,7 +39,6 @@ use preserves_schema::ParseError;
use std::io; use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
@ -244,7 +245,7 @@ impl TunnelRelay {
let result = initial_oid.map( let result = initial_oid.map(
|io| Arc::clone(&tr.membranes.import_oid(t, &tr_ref, io).inc_ref().obj)); |io| Arc::clone(&tr.membranes.import_oid(t, &tr_ref, io).inc_ref().obj));
dump_membranes!(tr.membranes); dump_membranes!(tr.membranes);
*tr_ref.lock().unwrap() = Some(tr); *tr_ref.lock() = Some(tr);
t.linked_task(crate::name!("writer"), output_loop(o, output_rx)); t.linked_task(crate::name!("writer"), output_loop(o, output_rx));
t.linked_task(crate::name!("reader"), input_loop(t.facet.clone(), i, tr_ref)); t.linked_task(crate::name!("reader"), input_loop(t.facet.clone(), i, tr_ref));
t.state.add_exit_hook(&self_entity); t.state.add_exit_hook(&self_entity);
@ -373,7 +374,7 @@ impl TunnelRelay {
impl Entity<Synced> for SyncPeer { impl Entity<Synced> for SyncPeer {
fn message(&mut self, t: &mut Activation, _a: Synced) -> ActorResult { fn message(&mut self, t: &mut Activation, _a: Synced) -> ActorResult {
self.peer.message(t, &(), &AnyValue::new(true)); self.peer.message(t, &(), &AnyValue::new(true));
let mut g = self.relay_ref.lock().expect("unpoisoned"); let mut g = self.relay_ref.lock();
let tr = g.as_mut().expect("initialized"); let tr = g.as_mut().expect("initialized");
tr.membranes.release(std::mem::take(&mut self.pins)); tr.membranes.release(std::mem::take(&mut self.pins));
dump_membranes!(tr.membranes); dump_membranes!(tr.membranes);
@ -609,7 +610,7 @@ async fn input_loop(
match src.next().await { match src.next().await {
None => return Ok(LinkedTaskTermination::Normal), None => return Ok(LinkedTaskTermination::Normal),
Some(bs) => facet.activate(Arc::clone(&account), |t| { Some(bs) => facet.activate(Arc::clone(&account), |t| {
let mut g = relay.lock().expect("unpoisoned"); let mut g = relay.lock();
let tr = g.as_mut().expect("initialized"); let tr = g.as_mut().expect("initialized");
tr.handle_inbound_datagram(t, &bs?) tr.handle_inbound_datagram(t, &bs?)
})?, })?,
@ -634,7 +635,7 @@ async fn input_loop(
match n { match n {
0 => return Ok(LinkedTaskTermination::Normal), 0 => return Ok(LinkedTaskTermination::Normal),
_ => facet.activate(Arc::clone(&account), |t| { _ => facet.activate(Arc::clone(&account), |t| {
let mut g = relay.lock().expect("unpoisoned"); let mut g = relay.lock();
let tr = g.as_mut().expect("initialized"); let tr = g.as_mut().expect("initialized");
tr.handle_inbound_stream(t, &mut buf) tr.handle_inbound_stream(t, &mut buf)
})?, })?,
@ -667,7 +668,7 @@ async fn output_loop(
impl Entity<()> for TunnelRefEntity { impl Entity<()> for TunnelRefEntity {
fn message(&mut self, t: &mut Activation, _m: ()) -> ActorResult { fn message(&mut self, t: &mut Activation, _m: ()) -> ActorResult {
let mut g = self.relay_ref.lock().expect("unpoisoned"); let mut g = self.relay_ref.lock();
let tr = g.as_mut().expect("initialized"); let tr = g.as_mut().expect("initialized");
let events = std::mem::take(&mut tr.pending_outbound); let events = std::mem::take(&mut tr.pending_outbound);
tr.send_packet(&t.account(), events.len(), P::Packet::Turn(Box::new(P::Turn(events.clone()))))?; tr.send_packet(&t.account(), events.len(), P::Packet::Turn(Box::new(P::Turn(events.clone()))))?;
@ -680,7 +681,7 @@ impl Entity<()> for TunnelRefEntity {
fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult { fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult {
if let Err(e) = &**exit_status { if let Err(e) = &**exit_status {
let e = e.clone(); let e = e.clone();
let mut g = self.relay_ref.lock().expect("unpoisoned"); let mut g = self.relay_ref.lock();
let tr = g.as_mut().expect("initialized"); let tr = g.as_mut().expect("initialized");
tr.send_packet(&t.account(), 1, P::Packet::Error(Box::new(e)))?; tr.send_packet(&t.account(), 1, P::Packet::Error(Box::new(e)))?;
} }
@ -690,31 +691,27 @@ impl Entity<()> for TunnelRefEntity {
impl Entity<AnyValue> for RelayEntity { impl Entity<AnyValue> for RelayEntity {
fn assert(&mut self, t: &mut Activation, a: AnyValue, h: Handle) -> ActorResult { fn assert(&mut self, t: &mut Activation, a: AnyValue, h: Handle) -> ActorResult {
let mut g = self.relay_ref.lock().expect("unpoisoned"); self.relay_ref.lock().as_mut().expect("initialized")
let tr = g.as_mut().expect("initialized"); .send_event(t, self.oid.clone(), P::Event::Assert(Box::new(P::Assert {
tr.send_event(t, self.oid.clone(), P::Event::Assert(Box::new(P::Assert {
assertion: P::Assertion(a), assertion: P::Assertion(a),
handle: P::Handle(h.into()), handle: P::Handle(h.into()),
}))) })))
} }
fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult { fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult {
let mut g = self.relay_ref.lock().expect("unpoisoned"); self.relay_ref.lock().as_mut().expect("initialized")
let tr = g.as_mut().expect("initialized"); .send_event(t, self.oid.clone(), P::Event::Retract(Box::new(P::Retract {
tr.send_event(t, self.oid.clone(), P::Event::Retract(Box::new(P::Retract {
handle: P::Handle(h.into()), handle: P::Handle(h.into()),
}))) })))
} }
fn message(&mut self, t: &mut Activation, m: AnyValue) -> ActorResult { fn message(&mut self, t: &mut Activation, m: AnyValue) -> ActorResult {
let mut g = self.relay_ref.lock().expect("unpoisoned"); self.relay_ref.lock().as_mut().expect("initialized")
let tr = g.as_mut().expect("initialized"); .send_event(t, self.oid.clone(), P::Event::Message(Box::new(P::Message {
tr.send_event(t, self.oid.clone(), P::Event::Message(Box::new(P::Message {
body: P::Assertion(m) body: P::Assertion(m)
}))) })))
} }
fn sync(&mut self, t: &mut Activation, peer: Arc<Ref<Synced>>) -> ActorResult { fn sync(&mut self, t: &mut Activation, peer: Arc<Ref<Synced>>) -> ActorResult {
let mut g = self.relay_ref.lock().expect("unpoisoned"); self.relay_ref.lock().as_mut().expect("initialized")
let tr = g.as_mut().expect("initialized"); .send_event(t, self.oid.clone(), P::Event::Sync(Box::new(P::Sync {
tr.send_event(t, self.oid.clone(), P::Event::Sync(Box::new(P::Sync {
peer: Cap::guard(Arc::new(()), peer) peer: Cap::guard(Arc::new(()), peer)
}))) })))
} }