From 21a69618cfd0940048a95e2d48f1e93d416db3c3 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 22 Jul 2021 01:05:08 +0200 Subject: [PATCH] Rearrange Entity storage: they are now held in Refs --- benches/bench_dataspace.rs | 7 ++ src/actor.rs | 135 ++++++++++++++++++------------------ src/bin/syndicate-server.rs | 8 ++- src/dataspace.rs | 4 ++ src/during.rs | 35 ++++++---- src/lib.rs | 7 -- src/relay.rs | 21 ++++-- src/tracer.rs | 6 +- 8 files changed, 125 insertions(+), 98 deletions(-) diff --git a/benches/bench_dataspace.rs b/benches/bench_dataspace.rs index f2379ed..9231cdf 100644 --- a/benches/bench_dataspace.rs +++ b/benches/bench_dataspace.rs @@ -1,5 +1,6 @@ use criterion::{criterion_group, criterion_main, Criterion}; +use std::any::Any; use std::iter::FromIterator; use std::sync::Arc; use std::sync::atomic::AtomicU64; @@ -30,6 +31,9 @@ fn says(who: _Any, what: _Any) -> _Any { struct ShutdownEntity; impl Entity for ShutdownEntity { + fn as_any(&mut self) -> &mut dyn Any { + self + } fn message(&mut self, t: &mut Activation, _m: _Any) -> ActorResult { t.actor.shutdown(); Ok(()) @@ -84,6 +88,9 @@ pub fn bench_pub(c: &mut Criterion) { struct Receiver(Arc); impl Entity for Receiver { + fn as_any(&mut self) -> &mut dyn Any { + self + } fn message(&mut self, _t: &mut Activation, _m: _Any) -> ActorResult { self.0.fetch_add(1, Ordering::Relaxed); Ok(()) diff --git a/src/actor.rs b/src/actor.rs index 8870e50..38ca4c7 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -15,6 +15,7 @@ use preserves::value::IOValue; use preserves::value::Map; use preserves::value::NestedValue; +use std::any::Any; use std::boxed::Box; use std::collections::hash_map::HashMap; use std::convert::TryInto; @@ -31,12 +32,13 @@ use tracing::Instrument; pub use super::schemas::internal_protocol::_Any; pub use super::schemas::internal_protocol::Handle; -pub use super::schemas::internal_protocol::Oid; pub type ActorResult = Result<(), Error>; pub type ActorHandle = tokio::task::JoinHandle; -pub trait Entity: Send { +pub trait Entity: Send + std::marker::Sync { + fn as_any(&mut self) -> &mut dyn Any; + fn assert(&mut self, _t: &mut Activation, _a: _Any, _h: Handle) -> ActorResult { Ok(()) } @@ -53,8 +55,8 @@ pub trait Entity: Send { fn turn_end(&mut self, _t: &mut Activation) -> ActorResult { Ok(()) } - fn exit_hook(&mut self, _t: &mut Activation, _exit_status: &ActorResult) -> BoxFuture { - Box::pin(ready(Ok(()))) + fn exit_hook(&mut self, _t: &mut Activation, _exit_status: &ActorResult) -> ActorResult { + Ok(()) } } @@ -92,7 +94,6 @@ pub struct LoanedItem { #[derive(Debug)] enum SystemMessage { Release, - ReleaseOid(Oid), Turn(LoanedItem), Crash(Error), } @@ -110,16 +111,46 @@ pub struct Actor { rx: Option>, mailbox_count: Arc, outbound_assertions: OutboundAssertions, - oid_map: Map>, next_task_id: u64, linked_tasks: Map, exit_hooks: Vec>, } -#[derive(PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct ObjectAddress { pub mailbox: Mailbox, - pub oid: Oid, + pub target: RwLock>, +} + +impl ObjectAddress { + pub fn oid(&self) -> usize { + std::ptr::addr_of!(*self) as usize + } +} + +impl PartialEq for ObjectAddress { + fn eq(&self, other: &Self) -> bool { + self.oid() == other.oid() + } +} + +impl Eq for ObjectAddress {} + +impl std::hash::Hash for ObjectAddress { + fn hash(&self, hash: &mut H) where H: std::hash::Hasher { + self.oid().hash(hash) + } +} + +impl PartialOrd for ObjectAddress { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ObjectAddress { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.oid().cmp(&other.oid()) + } } #[derive(PartialEq, Eq, Hash, PartialOrd, Ord)] @@ -135,7 +166,11 @@ static NEXT_DEBTOR_ID: AtomicU64 = AtomicU64::new(4); preserves_schema::support::lazy_static! { pub static ref INERT_REF: Arc = { struct InertEntity; - impl crate::actor::Entity for InertEntity {} + impl crate::actor::Entity for InertEntity { + fn as_any(&mut self) -> &mut dyn Any { + self + } + } let mut ac = Actor::new(); let e = ac.create(InertEntity); ac.boot(tracing::info_span!(parent: None, "INERT_REF"), @@ -256,24 +291,6 @@ impl<'activation> Activation<'activation> { let _ = target.send(&self.debtor, turn); } } - - fn with_oid R, - Fs: FnOnce(&mut Self, &mut Box) -> R>( - &mut self, - oid: &Oid, - kf: Ff, - ks: Fs, - ) -> R { - match self.actor.oid_map.remove_entry(&oid) { - None => kf(self), - Some((k, mut e)) => { - let result = ks(self, &mut e); - self.actor.oid_map.insert(k, e); - result - } - } - } } impl<'activation> Drop for Activation<'activation> { @@ -421,18 +438,20 @@ impl Actor { rx: Some(rx), mailbox_count: Arc::new(AtomicUsize::new(0)), outbound_assertions: Map::new(), - oid_map: Map::new(), next_task_id: 0, linked_tasks: Map::new(), exit_hooks: Vec::new(), } } - pub fn create_and_start(name: tracing::Span, e: E) -> Arc { + pub fn create_and_start( + name: tracing::Span, + e: E, + ) -> Arc { Self::create_and_start_rec(name, e, |_, _, _| ()) } - pub fn create_and_start_rec) -> ()>( name: tracing::Span, e: E, @@ -466,26 +485,24 @@ impl Actor { () } - pub fn create(&mut self, e: E) -> Arc { + pub fn create(&mut self, e: E) -> Arc { self.create_rec(e, |_, _, _| ()) } - pub fn create_rec) -> ()>( &mut self, - mut e: E, + e: E, f: F, ) -> Arc { - let oid = crate::next_oid(); let r = Arc::new(Ref { addr: Arc::new(ObjectAddress { mailbox: self.mailbox(), - oid: oid.clone(), + target: RwLock::new(Box::new(e)), }), attenuation: Vec::new(), }); - f(self, &mut e, &r); - self.oid_map.insert(oid, Box::new(e)); + f(self, r.addr.target.write().expect("unpoisoned").as_any().downcast_mut().unwrap(), &r); r } @@ -501,16 +518,11 @@ impl Actor { { let mut t = Activation::new(&mut self, Debtor::new(crate::name!("shutdown"))); for r in std::mem::take(&mut t.actor.exit_hooks) { - match t.actor.oid_map.remove_entry(&r.addr.oid) { - None => (), - Some((k, mut e)) => { - if let Err(err) = e.exit_hook(&mut t, &result).await { - tracing::error!(err = debug(err), - r = debug(&r), - "error in exit hook"); - } - t.actor.oid_map.insert(k, e); - } + let mut e = r.addr.target.write().expect("unpoisoned"); + if let Err(err) = e.exit_hook(&mut t, &result) { + tracing::error!(err = debug(err), + r = debug(&r), + "error in exit hook"); } } } @@ -561,34 +573,30 @@ impl Actor { tracing::trace!("SystemMessage::Release"); Ok(true) } - SystemMessage::ReleaseOid(oid) => { - tracing::trace!("SystemMessage::ReleaseOid({:?})", &oid); - self.oid_map.remove(&oid); - Ok(false) - } SystemMessage::Turn(mut loaned_item) => { let mut events = std::mem::take(&mut loaned_item.item); let mut t = Activation::new(self, Arc::clone(&loaned_item.debtor)); loop { for (r, event) in events.into_iter() { - t.with_oid(&r.addr.oid, |_| Ok(()), |t, e| match event { + let mut e = r.addr.target.write().expect("unpoisoned"); + match event { Event::Assert(b) => { let Assert { assertion: Assertion(assertion), handle } = *b; - e.assert(t, assertion, handle) + e.assert(&mut t, assertion, handle)? } Event::Retract(b) => { let Retract { handle } = *b; - e.retract(t, handle) + e.retract(&mut t, handle)? } Event::Message(b) => { let Message { body: Assertion(body) } = *b; - e.message(t, body) + e.message(&mut t, body)? } Event::Sync(b) => { let Sync { peer } = *b; - e.sync(t, peer) + e.sync(&mut t, peer)? } - })?; + } } events = std::mem::take(&mut t.immediate_self); if events.is_empty() { break; } @@ -698,20 +706,13 @@ impl Ref { impl std::fmt::Debug for Ref { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { if self.attenuation.is_empty() { - write!(f, "⌜{}:{}⌝", self.addr.mailbox.actor_id, self.addr.oid.0) + write!(f, "⌜{}:{:016x}⌝", self.addr.mailbox.actor_id, self.addr.oid()) } else { - write!(f, "⌜{}:{}\\{:?}⌝", self.addr.mailbox.actor_id, self.addr.oid.0, self.attenuation) + write!(f, "⌜{}:{:016x}\\{:?}⌝", self.addr.mailbox.actor_id, self.addr.oid(), self.attenuation) } } } -impl Drop for ObjectAddress { - fn drop(&mut self) { - let _ = self.mailbox.tx.send(SystemMessage::ReleaseOid(self.oid.clone())); - () - } -} - impl Domain for Ref {} impl std::convert::TryFrom<&IOValue> for Ref { diff --git a/src/bin/syndicate-server.rs b/src/bin/syndicate-server.rs index 9f13eed..bb03611 100644 --- a/src/bin/syndicate-server.rs +++ b/src/bin/syndicate-server.rs @@ -5,6 +5,7 @@ use preserves::value::Map; use preserves::value::NestedValue; use preserves::value::Value; +use std::any::Any; use std::convert::TryFrom; use std::future::ready; use std::iter::FromIterator; @@ -158,9 +159,12 @@ async fn run_connection( }; struct ExitListener; impl Entity for ExitListener { - fn exit_hook(&mut self, _t: &mut Activation, exit_status: &ActorResult) -> BoxFuture { + fn as_any(&mut self) -> &mut dyn Any { + self + } + fn exit_hook(&mut self, _t: &mut Activation, exit_status: &ActorResult) -> ActorResult { tracing::info!(exit_status = debug(exit_status), "disconnect"); - Box::pin(ready(Ok(()))) + Ok(()) } } let exit_listener = t.actor.create(ExitListener); diff --git a/src/dataspace.rs b/src/dataspace.rs index feec6af..d04ffeb 100644 --- a/src/dataspace.rs +++ b/src/dataspace.rs @@ -5,6 +5,7 @@ use super::schemas::dataspace::_Any; use preserves::value::Map; +use std::any::Any; use std::convert::TryFrom; #[derive(Debug)] @@ -68,6 +69,9 @@ impl Dataspace { } impl Entity for Dataspace { + fn as_any(&mut self) -> &mut dyn Any { + self + } fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult { // tracing::trace!(assertion = debug(&a), handle = debug(&h), "assert"); diff --git a/src/during.rs b/src/during.rs index 493bbf1..db98466 100644 --- a/src/during.rs +++ b/src/during.rs @@ -3,20 +3,21 @@ use crate::error::Error; use preserves::value::Map; +use std::any::Any; use std::sync::Arc; -pub type DuringRetractionHandler = Box ActorResult>; +pub type DuringRetractionHandler = Box ActorResult>; pub struct During(Map>); pub type DuringResult = - Result ActorResult>>, + Result ActorResult>>, Error>; pub struct DuringEntity where - E: 'static + Send, - Fa: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> DuringResult, - Fm: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> ActorResult, + E: 'static + Send + Sync, + Fa: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> DuringResult, + Fm: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> ActorResult, { state: E, assertion_handler: Option, @@ -29,7 +30,7 @@ impl During { During(Map::new()) } - pub fn await_retraction ActorResult>( + pub fn await_retraction ActorResult>( &mut self, h: Handle, f: F, @@ -49,16 +50,16 @@ pub fn entity( fn (&mut E, &mut Activation, _Any) -> DuringResult, fn (&mut E, &mut Activation, _Any) -> ActorResult> where - E: 'static + Send, + E: 'static + Send + Sync, { DuringEntity::new(state, None, None) } impl DuringEntity where - E: 'static + Send, - Fa: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> DuringResult, - Fm: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> ActorResult, + E: 'static + Send + Sync, + Fa: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> DuringResult, + Fm: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> ActorResult, { pub fn new(state: E, assertion_handler: Option, message_handler: Option) -> Self { DuringEntity { @@ -71,7 +72,7 @@ where pub fn on_asserted(self, assertion_handler: Fa1) -> DuringEntity where - Fa1: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> DuringResult, + Fa1: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> DuringResult, { DuringEntity { state: self.state, @@ -83,7 +84,7 @@ where pub fn on_message(self, message_handler: Fm1) -> DuringEntity where - Fm1: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> ActorResult, + Fm1: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> ActorResult, { DuringEntity { state: self.state, @@ -107,10 +108,14 @@ where impl Entity for DuringEntity where - E: 'static + Send, - Fa: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> DuringResult, - Fm: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> ActorResult, + E: 'static + Send + Sync, + Fa: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> DuringResult, + Fm: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> ActorResult, { + fn as_any(&mut self) -> &mut dyn Any { + self + } + fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult { match &mut self.assertion_handler { Some(handler) => match handler(&mut self.state, t, a)? { diff --git a/src/lib.rs b/src/lib.rs index f2dda45..0bfa194 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,7 +3,6 @@ pub use preserves::value; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use actor::Handle; -use actor::Oid; pub mod actor; pub mod bag; @@ -34,12 +33,6 @@ pub fn next_actor_id() -> ActorId { NEXT_ACTOR_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed) } -static NEXT_OID: AtomicU64 = AtomicU64::new(2); -pub fn next_oid() -> Oid { - Oid(value::signed_integer::SignedInteger::from( - NEXT_OID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed) as u128)) -} - static NEXT_HANDLE: AtomicU64 = AtomicU64::new(3); pub fn next_handle() -> Handle { Handle(value::signed_integer::SignedInteger::from( diff --git a/src/relay.rs b/src/relay.rs index 52dc3d2..79c8eb7 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -34,6 +34,7 @@ use preserves::value::signed_integer::SignedInteger; use preserves_schema::support::Deserialize; use preserves_schema::support::ParseError; +use std::any::Any; use std::convert::TryFrom; use std::io; use std::pin::Pin; @@ -147,8 +148,8 @@ pub fn connect_stream( ) where I: 'static + Send + AsyncRead, O: 'static + Send + AsyncWrite, - E: 'static + Send, - F: 'static + Send + FnMut(&mut E, &mut Activation, Arc) -> during::DuringResult + E: 'static + Send + std::marker::Sync, + F: 'static + Send + std::marker::Sync + FnMut(&mut E, &mut Activation, Arc) -> during::DuringResult { let i = Input::Bytes(Box::pin(i)); let o = Output::Bytes(Box::pin(o)); @@ -263,6 +264,9 @@ impl TunnelRelay { peer: Arc, } impl Entity for SyncPeer { + fn as_any(&mut self) -> &mut dyn Any { + self + } fn message(&mut self, t: &mut Activation, a: _Any) -> ActorResult { if let Some(true) = a.value().as_boolean() { t.message(&self.peer, _Any::new(true)); @@ -526,6 +530,9 @@ pub async fn output_loop( } impl Entity for TunnelRelay { + fn as_any(&mut self) -> &mut dyn Any { + self + } fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult { if let Ok(m) = tunnel_relay::RelayProtocol::try_from(&m) { match m { @@ -595,17 +602,19 @@ impl Entity for TunnelRelay { Ok(()) } - fn exit_hook(&mut self, t: &mut Activation, exit_status: &ActorResult) -> BoxFuture { + fn exit_hook(&mut self, t: &mut Activation, exit_status: &ActorResult) -> ActorResult { if let Err(e) = exit_status { let e = e.clone(); - Box::pin(ready(self.send_packet(&t.debtor, 1, Packet::Error(Box::new(e))))) - } else { - Box::pin(ready(Ok(()))) + self.send_packet(&t.debtor, 1, Packet::Error(Box::new(e)))?; } + Ok(()) } } impl Entity for RelayEntity { + fn as_any(&mut self) -> &mut dyn Any { + self + } fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult { Ok(t.message(&self.relay_ref, &tunnel_relay::Output { oid: self.oid.clone(), diff --git a/src/tracer.rs b/src/tracer.rs index 6088988..5434b4a 100644 --- a/src/tracer.rs +++ b/src/tracer.rs @@ -2,12 +2,13 @@ use crate::actor::*; use preserves::value::NestedValue; +use std::any::Any; use std::sync::Arc; struct Tracer(tracing::Span); fn set_name_oid(_ac: &mut Actor, t: &mut Tracer, r: &Arc) { - t.0.record("oid", &tracing::field::display(&r.addr.oid.0)); + t.0.record("oid", &tracing::field::display(&r.addr.oid())); } pub fn tracer(ac: &mut Actor, name: tracing::Span) -> Arc { @@ -19,6 +20,9 @@ pub fn tracer_top(name: tracing::Span) -> Arc { } impl Entity for Tracer { + fn as_any(&mut self) -> &mut dyn Any { + self + } fn assert(&mut self, _t: &mut Activation, a: _Any, h: Handle) -> ActorResult { let _guard = self.0.enter(); tracing::trace!(a = debug(&a), h = debug(&h), "assert");