Rearrange Entity storage: they are now held in Refs

This commit is contained in:
Tony Garnock-Jones 2021-07-22 01:05:08 +02:00
parent aa1755be0f
commit 21a69618cf
8 changed files with 125 additions and 98 deletions

View File

@ -1,5 +1,6 @@
use criterion::{criterion_group, criterion_main, Criterion};
use std::any::Any;
use std::iter::FromIterator;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
@ -30,6 +31,9 @@ fn says(who: _Any, what: _Any) -> _Any {
struct ShutdownEntity;
impl Entity for ShutdownEntity {
fn as_any(&mut self) -> &mut dyn Any {
self
}
fn message(&mut self, t: &mut Activation, _m: _Any) -> ActorResult {
t.actor.shutdown();
Ok(())
@ -84,6 +88,9 @@ pub fn bench_pub(c: &mut Criterion) {
struct Receiver(Arc<AtomicU64>);
impl Entity for Receiver {
fn as_any(&mut self) -> &mut dyn Any {
self
}
fn message(&mut self, _t: &mut Activation, _m: _Any) -> ActorResult {
self.0.fetch_add(1, Ordering::Relaxed);
Ok(())

View File

@ -15,6 +15,7 @@ use preserves::value::IOValue;
use preserves::value::Map;
use preserves::value::NestedValue;
use std::any::Any;
use std::boxed::Box;
use std::collections::hash_map::HashMap;
use std::convert::TryInto;
@ -31,12 +32,13 @@ use tracing::Instrument;
pub use super::schemas::internal_protocol::_Any;
pub use super::schemas::internal_protocol::Handle;
pub use super::schemas::internal_protocol::Oid;
pub type ActorResult = Result<(), Error>;
pub type ActorHandle = tokio::task::JoinHandle<ActorResult>;
pub trait Entity: Send {
pub trait Entity: Send + std::marker::Sync {
fn as_any(&mut self) -> &mut dyn Any;
fn assert(&mut self, _t: &mut Activation, _a: _Any, _h: Handle) -> ActorResult {
Ok(())
}
@ -53,8 +55,8 @@ pub trait Entity: Send {
fn turn_end(&mut self, _t: &mut Activation) -> ActorResult {
Ok(())
}
fn exit_hook(&mut self, _t: &mut Activation, _exit_status: &ActorResult) -> BoxFuture<ActorResult> {
Box::pin(ready(Ok(())))
fn exit_hook(&mut self, _t: &mut Activation, _exit_status: &ActorResult) -> ActorResult {
Ok(())
}
}
@ -92,7 +94,6 @@ pub struct LoanedItem<T> {
#[derive(Debug)]
enum SystemMessage {
Release,
ReleaseOid(Oid),
Turn(LoanedItem<PendingEventQueue>),
Crash(Error),
}
@ -110,16 +111,46 @@ pub struct Actor {
rx: Option<UnboundedReceiver<SystemMessage>>,
mailbox_count: Arc<AtomicUsize>,
outbound_assertions: OutboundAssertions,
oid_map: Map<Oid, Box<dyn Entity + Send>>,
next_task_id: u64,
linked_tasks: Map<u64, CancellationToken>,
exit_hooks: Vec<Arc<Ref>>,
}
#[derive(PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct ObjectAddress {
pub mailbox: Mailbox,
pub oid: Oid,
pub target: RwLock<Box<dyn Entity>>,
}
impl ObjectAddress {
pub fn oid(&self) -> usize {
std::ptr::addr_of!(*self) as usize
}
}
impl PartialEq for ObjectAddress {
fn eq(&self, other: &Self) -> bool {
self.oid() == other.oid()
}
}
impl Eq for ObjectAddress {}
impl std::hash::Hash for ObjectAddress {
fn hash<H>(&self, hash: &mut H) where H: std::hash::Hasher {
self.oid().hash(hash)
}
}
impl PartialOrd for ObjectAddress {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ObjectAddress {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.oid().cmp(&other.oid())
}
}
#[derive(PartialEq, Eq, Hash, PartialOrd, Ord)]
@ -135,7 +166,11 @@ static NEXT_DEBTOR_ID: AtomicU64 = AtomicU64::new(4);
preserves_schema::support::lazy_static! {
pub static ref INERT_REF: Arc<Ref> = {
struct InertEntity;
impl crate::actor::Entity for InertEntity {}
impl crate::actor::Entity for InertEntity {
fn as_any(&mut self) -> &mut dyn Any {
self
}
}
let mut ac = Actor::new();
let e = ac.create(InertEntity);
ac.boot(tracing::info_span!(parent: None, "INERT_REF"),
@ -256,24 +291,6 @@ impl<'activation> Activation<'activation> {
let _ = target.send(&self.debtor, turn);
}
}
fn with_oid<R,
Ff: FnOnce(&mut Self) -> R,
Fs: FnOnce(&mut Self, &mut Box<dyn Entity + Send>) -> R>(
&mut self,
oid: &Oid,
kf: Ff,
ks: Fs,
) -> R {
match self.actor.oid_map.remove_entry(&oid) {
None => kf(self),
Some((k, mut e)) => {
let result = ks(self, &mut e);
self.actor.oid_map.insert(k, e);
result
}
}
}
}
impl<'activation> Drop for Activation<'activation> {
@ -421,18 +438,20 @@ impl Actor {
rx: Some(rx),
mailbox_count: Arc::new(AtomicUsize::new(0)),
outbound_assertions: Map::new(),
oid_map: Map::new(),
next_task_id: 0,
linked_tasks: Map::new(),
exit_hooks: Vec::new(),
}
}
pub fn create_and_start<E: Entity + Send + 'static>(name: tracing::Span, e: E) -> Arc<Ref> {
pub fn create_and_start<E: Entity + Send + std::marker::Sync + 'static>(
name: tracing::Span,
e: E,
) -> Arc<Ref> {
Self::create_and_start_rec(name, e, |_, _, _| ())
}
pub fn create_and_start_rec<E: Entity + Send + 'static,
pub fn create_and_start_rec<E: Entity + Send + std::marker::Sync + 'static,
F: FnOnce(&mut Self, &mut E, &Arc<Ref>) -> ()>(
name: tracing::Span,
e: E,
@ -466,26 +485,24 @@ impl Actor {
()
}
pub fn create<E: Entity + Send + 'static>(&mut self, e: E) -> Arc<Ref> {
pub fn create<E: Entity + Send + std::marker::Sync + 'static>(&mut self, e: E) -> Arc<Ref> {
self.create_rec(e, |_, _, _| ())
}
pub fn create_rec<E: Entity + Send + 'static,
pub fn create_rec<E: Entity + Send + std::marker::Sync + 'static,
F: FnOnce(&mut Self, &mut E, &Arc<Ref>) -> ()>(
&mut self,
mut e: E,
e: E,
f: F,
) -> Arc<Ref> {
let oid = crate::next_oid();
let r = Arc::new(Ref {
addr: Arc::new(ObjectAddress {
mailbox: self.mailbox(),
oid: oid.clone(),
target: RwLock::new(Box::new(e)),
}),
attenuation: Vec::new(),
});
f(self, &mut e, &r);
self.oid_map.insert(oid, Box::new(e));
f(self, r.addr.target.write().expect("unpoisoned").as_any().downcast_mut().unwrap(), &r);
r
}
@ -501,16 +518,11 @@ impl Actor {
{
let mut t = Activation::new(&mut self, Debtor::new(crate::name!("shutdown")));
for r in std::mem::take(&mut t.actor.exit_hooks) {
match t.actor.oid_map.remove_entry(&r.addr.oid) {
None => (),
Some((k, mut e)) => {
if let Err(err) = e.exit_hook(&mut t, &result).await {
tracing::error!(err = debug(err),
r = debug(&r),
"error in exit hook");
}
t.actor.oid_map.insert(k, e);
}
let mut e = r.addr.target.write().expect("unpoisoned");
if let Err(err) = e.exit_hook(&mut t, &result) {
tracing::error!(err = debug(err),
r = debug(&r),
"error in exit hook");
}
}
}
@ -561,34 +573,30 @@ impl Actor {
tracing::trace!("SystemMessage::Release");
Ok(true)
}
SystemMessage::ReleaseOid(oid) => {
tracing::trace!("SystemMessage::ReleaseOid({:?})", &oid);
self.oid_map.remove(&oid);
Ok(false)
}
SystemMessage::Turn(mut loaned_item) => {
let mut events = std::mem::take(&mut loaned_item.item);
let mut t = Activation::new(self, Arc::clone(&loaned_item.debtor));
loop {
for (r, event) in events.into_iter() {
t.with_oid(&r.addr.oid, |_| Ok(()), |t, e| match event {
let mut e = r.addr.target.write().expect("unpoisoned");
match event {
Event::Assert(b) => {
let Assert { assertion: Assertion(assertion), handle } = *b;
e.assert(t, assertion, handle)
e.assert(&mut t, assertion, handle)?
}
Event::Retract(b) => {
let Retract { handle } = *b;
e.retract(t, handle)
e.retract(&mut t, handle)?
}
Event::Message(b) => {
let Message { body: Assertion(body) } = *b;
e.message(t, body)
e.message(&mut t, body)?
}
Event::Sync(b) => {
let Sync { peer } = *b;
e.sync(t, peer)
e.sync(&mut t, peer)?
}
})?;
}
}
events = std::mem::take(&mut t.immediate_self);
if events.is_empty() { break; }
@ -698,20 +706,13 @@ impl Ref {
impl std::fmt::Debug for Ref {
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
if self.attenuation.is_empty() {
write!(f, "⌜{}:{}⌝", self.addr.mailbox.actor_id, self.addr.oid.0)
write!(f, "⌜{}:{:016x}⌝", self.addr.mailbox.actor_id, self.addr.oid())
} else {
write!(f, "⌜{}:{}\\{:?}⌝", self.addr.mailbox.actor_id, self.addr.oid.0, self.attenuation)
write!(f, "⌜{}:{:016x}\\{:?}⌝", self.addr.mailbox.actor_id, self.addr.oid(), self.attenuation)
}
}
}
impl Drop for ObjectAddress {
fn drop(&mut self) {
let _ = self.mailbox.tx.send(SystemMessage::ReleaseOid(self.oid.clone()));
()
}
}
impl Domain for Ref {}
impl std::convert::TryFrom<&IOValue> for Ref {

View File

@ -5,6 +5,7 @@ use preserves::value::Map;
use preserves::value::NestedValue;
use preserves::value::Value;
use std::any::Any;
use std::convert::TryFrom;
use std::future::ready;
use std::iter::FromIterator;
@ -158,9 +159,12 @@ async fn run_connection(
};
struct ExitListener;
impl Entity for ExitListener {
fn exit_hook(&mut self, _t: &mut Activation, exit_status: &ActorResult) -> BoxFuture<ActorResult> {
fn as_any(&mut self) -> &mut dyn Any {
self
}
fn exit_hook(&mut self, _t: &mut Activation, exit_status: &ActorResult) -> ActorResult {
tracing::info!(exit_status = debug(exit_status), "disconnect");
Box::pin(ready(Ok(())))
Ok(())
}
}
let exit_listener = t.actor.create(ExitListener);

View File

@ -5,6 +5,7 @@ use super::schemas::dataspace::_Any;
use preserves::value::Map;
use std::any::Any;
use std::convert::TryFrom;
#[derive(Debug)]
@ -68,6 +69,9 @@ impl Dataspace {
}
impl Entity for Dataspace {
fn as_any(&mut self) -> &mut dyn Any {
self
}
fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult {
// tracing::trace!(assertion = debug(&a), handle = debug(&h), "assert");

View File

@ -3,20 +3,21 @@ use crate::error::Error;
use preserves::value::Map;
use std::any::Any;
use std::sync::Arc;
pub type DuringRetractionHandler<T> = Box<dyn Send + FnOnce(&mut T, &mut Activation) -> ActorResult>;
pub type DuringRetractionHandler<T> = Box<dyn Send + Sync + FnOnce(&mut T, &mut Activation) -> ActorResult>;
pub struct During<T>(Map<Handle, DuringRetractionHandler<T>>);
pub type DuringResult<E> =
Result<Option<Box<dyn 'static + Send + FnOnce(&mut E, &mut Activation) -> ActorResult>>,
Result<Option<Box<dyn 'static + Send + Sync + FnOnce(&mut E, &mut Activation) -> ActorResult>>,
Error>;
pub struct DuringEntity<E, Fa, Fm>
where
E: 'static + Send,
Fa: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> DuringResult<E>,
Fm: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> ActorResult,
E: 'static + Send + Sync,
Fa: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> DuringResult<E>,
Fm: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> ActorResult,
{
state: E,
assertion_handler: Option<Fa>,
@ -29,7 +30,7 @@ impl<T> During<T> {
During(Map::new())
}
pub fn await_retraction<F: 'static + Send + FnOnce(&mut T, &mut Activation) -> ActorResult>(
pub fn await_retraction<F: 'static + Send + Sync + FnOnce(&mut T, &mut Activation) -> ActorResult>(
&mut self,
h: Handle,
f: F,
@ -49,16 +50,16 @@ pub fn entity<E>(
fn (&mut E, &mut Activation, _Any) -> DuringResult<E>,
fn (&mut E, &mut Activation, _Any) -> ActorResult>
where
E: 'static + Send,
E: 'static + Send + Sync,
{
DuringEntity::new(state, None, None)
}
impl<E, Fa, Fm> DuringEntity<E, Fa, Fm>
where
E: 'static + Send,
Fa: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> DuringResult<E>,
Fm: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> ActorResult,
E: 'static + Send + Sync,
Fa: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> DuringResult<E>,
Fm: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> ActorResult,
{
pub fn new(state: E, assertion_handler: Option<Fa>, message_handler: Option<Fm>) -> Self {
DuringEntity {
@ -71,7 +72,7 @@ where
pub fn on_asserted<Fa1>(self, assertion_handler: Fa1) -> DuringEntity<E, Fa1, Fm>
where
Fa1: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> DuringResult<E>,
Fa1: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> DuringResult<E>,
{
DuringEntity {
state: self.state,
@ -83,7 +84,7 @@ where
pub fn on_message<Fm1>(self, message_handler: Fm1) -> DuringEntity<E, Fa, Fm1>
where
Fm1: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> ActorResult,
Fm1: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> ActorResult,
{
DuringEntity {
state: self.state,
@ -107,10 +108,14 @@ where
impl<E, Fa, Fm> Entity for DuringEntity<E, Fa, Fm>
where
E: 'static + Send,
Fa: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> DuringResult<E>,
Fm: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> ActorResult,
E: 'static + Send + Sync,
Fa: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> DuringResult<E>,
Fm: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> ActorResult,
{
fn as_any(&mut self) -> &mut dyn Any {
self
}
fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult {
match &mut self.assertion_handler {
Some(handler) => match handler(&mut self.state, t, a)? {

View File

@ -3,7 +3,6 @@ pub use preserves::value;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use actor::Handle;
use actor::Oid;
pub mod actor;
pub mod bag;
@ -34,12 +33,6 @@ pub fn next_actor_id() -> ActorId {
NEXT_ACTOR_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed)
}
static NEXT_OID: AtomicU64 = AtomicU64::new(2);
pub fn next_oid() -> Oid {
Oid(value::signed_integer::SignedInteger::from(
NEXT_OID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed) as u128))
}
static NEXT_HANDLE: AtomicU64 = AtomicU64::new(3);
pub fn next_handle() -> Handle {
Handle(value::signed_integer::SignedInteger::from(

View File

@ -34,6 +34,7 @@ use preserves::value::signed_integer::SignedInteger;
use preserves_schema::support::Deserialize;
use preserves_schema::support::ParseError;
use std::any::Any;
use std::convert::TryFrom;
use std::io;
use std::pin::Pin;
@ -147,8 +148,8 @@ pub fn connect_stream<I, O, E, F>(
) where
I: 'static + Send + AsyncRead,
O: 'static + Send + AsyncWrite,
E: 'static + Send,
F: 'static + Send + FnMut(&mut E, &mut Activation, Arc<Ref>) -> during::DuringResult<E>
E: 'static + Send + std::marker::Sync,
F: 'static + Send + std::marker::Sync + FnMut(&mut E, &mut Activation, Arc<Ref>) -> during::DuringResult<E>
{
let i = Input::Bytes(Box::pin(i));
let o = Output::Bytes(Box::pin(o));
@ -263,6 +264,9 @@ impl TunnelRelay {
peer: Arc<Ref>,
}
impl Entity for SyncPeer {
fn as_any(&mut self) -> &mut dyn Any {
self
}
fn message(&mut self, t: &mut Activation, a: _Any) -> ActorResult {
if let Some(true) = a.value().as_boolean() {
t.message(&self.peer, _Any::new(true));
@ -526,6 +530,9 @@ pub async fn output_loop(
}
impl Entity for TunnelRelay {
fn as_any(&mut self) -> &mut dyn Any {
self
}
fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult {
if let Ok(m) = tunnel_relay::RelayProtocol::try_from(&m) {
match m {
@ -595,17 +602,19 @@ impl Entity for TunnelRelay {
Ok(())
}
fn exit_hook(&mut self, t: &mut Activation, exit_status: &ActorResult) -> BoxFuture<ActorResult> {
fn exit_hook(&mut self, t: &mut Activation, exit_status: &ActorResult) -> ActorResult {
if let Err(e) = exit_status {
let e = e.clone();
Box::pin(ready(self.send_packet(&t.debtor, 1, Packet::Error(Box::new(e)))))
} else {
Box::pin(ready(Ok(())))
self.send_packet(&t.debtor, 1, Packet::Error(Box::new(e)))?;
}
Ok(())
}
}
impl Entity for RelayEntity {
fn as_any(&mut self) -> &mut dyn Any {
self
}
fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult {
Ok(t.message(&self.relay_ref, &tunnel_relay::Output {
oid: self.oid.clone(),

View File

@ -2,12 +2,13 @@ use crate::actor::*;
use preserves::value::NestedValue;
use std::any::Any;
use std::sync::Arc;
struct Tracer(tracing::Span);
fn set_name_oid(_ac: &mut Actor, t: &mut Tracer, r: &Arc<Ref>) {
t.0.record("oid", &tracing::field::display(&r.addr.oid.0));
t.0.record("oid", &tracing::field::display(&r.addr.oid()));
}
pub fn tracer(ac: &mut Actor, name: tracing::Span) -> Arc<Ref> {
@ -19,6 +20,9 @@ pub fn tracer_top(name: tracing::Span) -> Arc<Ref> {
}
impl Entity for Tracer {
fn as_any(&mut self) -> &mut dyn Any {
self
}
fn assert(&mut self, _t: &mut Activation, a: _Any, h: Handle) -> ActorResult {
let _guard = self.0.enter();
tracing::trace!(a = debug(&a), h = debug(&h), "assert");