diff --git a/benches/bench_dataspace.rs b/benches/bench_dataspace.rs index d4ea516..201e054 100644 --- a/benches/bench_dataspace.rs +++ b/benches/bench_dataspace.rs @@ -1,6 +1,5 @@ use criterion::{criterion_group, criterion_main, Criterion}; -use std::any::Any; use std::iter::FromIterator; use std::sync::Arc; use std::sync::atomic::AtomicU64; @@ -29,10 +28,7 @@ fn says(who: _Any, what: _Any) -> _Any { struct ShutdownEntity; -impl Entity for ShutdownEntity { - fn as_any(&mut self) -> &mut dyn Any { - self - } +impl Entity<_Any> for ShutdownEntity { fn message(&mut self, t: &mut Activation, _m: _Any) -> ActorResult { t.actor.shutdown(); Ok(()) @@ -63,12 +59,12 @@ pub fn bench_pub(c: &mut Criterion) { ac.linked_task(syndicate::name!("sender"), async move { for _ in 0..iters { let ds = Arc::clone(&ds); - external_event(&Arc::clone(&ds), &debtor, Box::new( + external_event(&Arc::clone(&ds.mailbox), &debtor, Box::new( move |t| ds.with_entity( |e| e.message(t, says(_Any::new("bench_pub"), Value::ByteString(vec![]).wrap())))))? } - external_event(&Arc::clone(&shutdown), &debtor, Box::new( + external_event(&Arc::clone(&shutdown.mailbox), &debtor, Box::new( move |t| shutdown.with_entity( |e| e.message(t, _Any::new(true)))))?; Ok(()) @@ -83,14 +79,11 @@ pub fn bench_pub(c: &mut Criterion) { b.iter_custom(|iters| { let start = Instant::now(); rt.block_on(async move { - let ds = Actor::create_and_start(syndicate::name!("dataspace"), Dataspace::new()); + let ds = Cap::new(&Actor::create_and_start(syndicate::name!("dataspace"), Dataspace::new())); let turn_count = Arc::new(AtomicU64::new(0)); struct Receiver(Arc); - impl Entity for Receiver { - fn as_any(&mut self) -> &mut dyn Any { - self - } + impl Entity<_Any> for Receiver { fn message(&mut self, _t: &mut Activation, _m: _Any) -> ActorResult { self.0.fetch_add(1, Ordering::Relaxed); Ok(()) @@ -98,13 +91,13 @@ pub fn bench_pub(c: &mut Criterion) { } let mut ac = Actor::new(); - let shutdown = ac.create(ShutdownEntity); - let receiver = ac.create(Receiver(Arc::clone(&turn_count))); + let shutdown = Cap::new(&ac.create(ShutdownEntity)); + let receiver = Cap::new(&ac.create(Receiver(Arc::clone(&turn_count)))); { let iters = iters.clone(); ac.boot(syndicate::name!("dataspace"), move |t| Box::pin(async move { - t.assert(&ds, &Observe { + ds.assert(t, &Observe { pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { ctor: Box::new(p::CRec { label: Value::symbol("Says").wrap(), @@ -122,7 +115,7 @@ pub fn bench_pub(c: &mut Criterion) { })), observer: receiver, }); - t.assert(&ds, &Observe { + ds.assert(t, &Observe { pattern: p::Pattern::DBind(Box::new(p::DBind { name: "shutdownTrigger".to_owned(), pattern: p::Pattern::DLit(Box::new(p::DLit { @@ -135,15 +128,15 @@ pub fn bench_pub(c: &mut Criterion) { t.actor.linked_task(syndicate::name!("sender"), async move { for _ in 0..iters { let ds = Arc::clone(&ds); - external_event(&Arc::clone(&ds), &debtor, Box::new( - move |t| ds.with_entity( + external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new( + move |t| ds.underlying.with_entity( |e| e.message(t, says(_Any::new("bench_pub"), Value::ByteString(vec![]).wrap())))))? } { let ds = Arc::clone(&ds); - external_event(&Arc::clone(&ds), &debtor, Box::new( - move |t| ds.with_entity( + external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new( + move |t| ds.underlying.with_entity( |e| e.message(t, _Any::new(true)))))?; } Ok(()) diff --git a/build.rs b/build.rs index 2737139..ac855f4 100644 --- a/build.rs +++ b/build.rs @@ -11,8 +11,7 @@ fn main() -> Result<(), Error> { let mut c = CompilerConfig::new(gen_dir, "crate::schemas".to_owned()); c.module_aliases.insert(vec!["EntityRef".to_owned()], "crate::actor".to_owned()); - let inputs = expand_inputs(&vec!["protocols/schema-bundle.bin".to_owned(), - "local-protocols/schema-bundle.bin".to_owned()])?; + let inputs = expand_inputs(&vec!["protocols/schema-bundle.bin".to_owned()])?; c.load_schemas_and_bundles(&inputs)?; compile(&c) } diff --git a/examples/consumer.rs b/examples/consumer.rs index b8bc27e..b1eb352 100644 --- a/examples/consumer.rs +++ b/examples/consumer.rs @@ -32,7 +32,7 @@ async fn main() -> Result<(), Box> { let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| { let consumer = syndicate::entity(0) - .on_message(|message_count, _t, m| { + .on_message(|message_count, _t, m: _Any| { if m.value().is_boolean() { tracing::info!("{:?} messages in the last second", message_count); *message_count = 0; @@ -41,9 +41,9 @@ async fn main() -> Result<(), Box> { } Ok(()) }) - .create(t.actor); + .create_cap(t.actor); - t.assert(&ds, &Observe { + ds.assert(t, &Observe { pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { ctor: Box::new(p::CRec { label: Value::symbol("Says").wrap(), @@ -68,9 +68,9 @@ async fn main() -> Result<(), Box> { loop { stats_timer.tick().await; let consumer = Arc::clone(&consumer); - external_event(&Arc::clone(&consumer), + external_event(&Arc::clone(&consumer.underlying.mailbox), &Debtor::new(syndicate::name!("debtor")), - Box::new(move |t| consumer.with_entity( + Box::new(move |t| consumer.underlying.with_entity( |e| e.message(t, _Any::new(true)))))?; } }); diff --git a/examples/pingpong.rs b/examples/pingpong.rs index 618aefe..1aa8035 100644 --- a/examples/pingpong.rs +++ b/examples/pingpong.rs @@ -114,58 +114,60 @@ async fn main() -> Result<(), Box> { let mut rtt_ns_samples: Vec = vec![0; report_latency_every]; let mut rtt_batch_count: usize = 0; let mut current_reply = None; - syndicate::entity(Arc::clone(&*INERT_REF)) - .on_message(move |self_ref, t, m| { - match m.value().as_boolean() { - Some(true) => { - tracing::info!("{:?} turns, {:?} events in the last second", - turn_counter, - event_counter); - turn_counter = 0; - event_counter = 0; - } - Some(false) => { - current_reply = None; - } - None => { - event_counter += 1; - let bindings = m.value().to_sequence()?; - let timestamp = &bindings[0]; - let padding = &bindings[1]; + let self_ref = t.actor.create_inert(); + self_ref.become_entity( + syndicate::entity(Arc::clone(&self_ref)) + .on_message(move |self_ref, t, m: _Any| { + match m.value().as_boolean() { + Some(true) => { + tracing::info!("{:?} turns, {:?} events in the last second", + turn_counter, + event_counter); + turn_counter = 0; + event_counter = 0; + } + Some(false) => { + current_reply = None; + } + None => { + event_counter += 1; + let bindings = m.value().to_sequence()?; + let timestamp = &bindings[0]; + let padding = &bindings[1]; - if should_echo || (report_latency_every == 0) { - t.message(&ds, simple_record2(&send_label, - timestamp.clone(), - padding.clone())); - } else { - if let None = current_reply { - turn_counter += 1; - t.message_immediate_self(&self_ref, _Any::new(false)); - let rtt_ns = now() - timestamp.value().to_u64()?; - rtt_ns_samples[rtt_batch_count] = rtt_ns; - rtt_batch_count += 1; + if should_echo || (report_latency_every == 0) { + ds.message(t, simple_record2(&send_label, + timestamp.clone(), + padding.clone())); + } else { + if let None = current_reply { + turn_counter += 1; + t.message_immediate_self(&self_ref, _Any::new(false)); + let rtt_ns = now() - timestamp.value().to_u64()?; + rtt_ns_samples[rtt_batch_count] = rtt_ns; + rtt_batch_count += 1; - if rtt_batch_count == report_latency_every { - rtt_ns_samples.sort(); - report_latencies(&rtt_ns_samples); - rtt_batch_count = 0; + if rtt_batch_count == report_latency_every { + rtt_ns_samples.sort(); + report_latencies(&rtt_ns_samples); + rtt_batch_count = 0; + } + + current_reply = Some( + simple_record2(&send_label, + Value::from(now()).wrap(), + padding.clone())); } - - current_reply = Some( - simple_record2(&send_label, - Value::from(now()).wrap(), - padding.clone())); + ds.message(t, current_reply.as_ref().expect("some reply").clone()); } - t.message(&ds, current_reply.as_ref().expect("some reply").clone()); } } - } - Ok(()) - }) - .create_rec(t.actor, |_ac, self_ref, e_ref| *self_ref = Arc::clone(e_ref)) + Ok(()) + })); + Cap::new(&self_ref) }; - t.assert(&ds, &Observe { + ds.assert(t, &Observe { pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { ctor: Box::new(p::CRec { label: Value::symbol(recv_label).wrap(), @@ -190,9 +192,9 @@ async fn main() -> Result<(), Box> { loop { stats_timer.tick().await; let consumer = Arc::clone(&consumer); - external_event(&Arc::clone(&consumer), + external_event(&Arc::clone(&consumer.underlying.mailbox), &Debtor::new(syndicate::name!("debtor")), - Box::new(move |t| consumer.with_entity( + Box::new(move |t| consumer.underlying.with_entity( |e| e.message(t, _Any::new(true)))))?; } }); @@ -211,10 +213,10 @@ async fn main() -> Result<(), Box> { for _ in 0..action_count { let ds = Arc::clone(&ds); let current_rec = current_rec.clone(); - events.push(Box::new(move |t| ds.with_entity( + events.push(Box::new(move |t| ds.underlying.with_entity( |e| e.message(t, current_rec)))); } - external_events(&ds, &debtor, events)? + external_events(&ds.underlying.mailbox, &debtor, events)? } Ok(()) }); diff --git a/examples/producer.rs b/examples/producer.rs index 524f42b..2f6ffa5 100644 --- a/examples/producer.rs +++ b/examples/producer.rs @@ -48,10 +48,10 @@ async fn main() -> Result<(), Box> { for _ in 0..action_count { let ds = Arc::clone(&ds); let padding = padding.clone(); - events.push(Box::new(move |t| ds.with_entity( + events.push(Box::new(move |t| ds.underlying.with_entity( |e| e.message(t, says(Value::from("producer").wrap(), padding))))); } - external_events(&ds, &debtor, events)?; + external_events(&ds.underlying.mailbox, &debtor, events)?; } }); Ok(None) diff --git a/examples/state-consumer.rs b/examples/state-consumer.rs index 5bb77a7..5dd0787 100644 --- a/examples/state-consumer.rs +++ b/examples/state-consumer.rs @@ -60,10 +60,10 @@ async fn main() -> Result<(), Box> { s.arrival_counter = 0; s.departure_counter = 0; Ok(()) - }).create(t.actor) + }).create_cap(t.actor) }; - t.assert(&ds, &Observe { + ds.assert(t, &Observe { pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { ctor: Box::new(p::CRec { label: Value::symbol("Present").wrap(), @@ -84,9 +84,9 @@ async fn main() -> Result<(), Box> { loop { stats_timer.tick().await; let consumer = Arc::clone(&consumer); - external_event(&Arc::clone(&consumer), + external_event(&Arc::clone(&consumer.underlying.mailbox), &Debtor::new(syndicate::name!("debtor")), - Box::new(move |t| consumer.with_entity( + Box::new(move |t| consumer.underlying.with_entity( |e| e.message(t, _Any::new(true)))))?; } }); diff --git a/examples/state-producer.rs b/examples/state-producer.rs index 00abd16..4f50f89 100644 --- a/examples/state-producer.rs +++ b/examples/state-producer.rs @@ -33,14 +33,14 @@ async fn main() -> Result<(), Box> { let ds = Arc::clone(&ds); let presence = presence.clone(); let handle = handle.clone(); - external_event(&Arc::clone(&ds), &debtor, Box::new( - move |t| ds.with_entity(|e| e.assert(t, presence, handle)))) + external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new( + move |t| ds.underlying.with_entity(|e| e.assert(t, presence, handle)))) }; let retract_e = || { let ds = Arc::clone(&ds); let handle = handle.clone(); - external_event(&Arc::clone(&ds), &debtor, Box::new( - move |t| ds.with_entity(|e| e.retract(t, handle)))) + external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new( + move |t| ds.underlying.with_entity(|e| e.retract(t, handle)))) }; assert_e()?; loop { diff --git a/local-protocols/Makefile b/local-protocols/Makefile deleted file mode 100644 index 1efe0bd..0000000 --- a/local-protocols/Makefile +++ /dev/null @@ -1,8 +0,0 @@ -all: schema-bundle.bin - -clean: - rm -f schema-bundle.bin - -schema-bundle.bin: schemas/*.prs - preserves-schemac schemas/*.prs > $@.tmp - mv $@.tmp $@ diff --git a/local-protocols/schema-bundle.bin b/local-protocols/schema-bundle.bin deleted file mode 100644 index 46c4d91..0000000 --- a/local-protocols/schema-bundle.bin +++ /dev/null @@ -1,3 +0,0 @@ -“³bundle·µ³ tunnelRelay„“³schema·³version‘³ definitions·³Input“³orµµ±eof“³lit³eof„„µ±packet“³rec“³lit³packet„“³tupleµ“³named³bs“³atom³ -ByteString„„„„„„µ±segment“³rec“³lit³segment„“³tupleµ“³named³bs“³atom³ -ByteString„„„„„„„„³Output“³rec“³lit³event„“³tupleµ“³named³oid“³refµ³sturdy„³Oid„„“³named³event“³refµ³internalProtocol„³Event„„„„„³SyncGc“³rec“³lit³sync-gc„“³tupleµ“³named³peer“³embedded“³lit„„„„„„³ RelayProtocol“³orµµ±Input“³refµ„³Input„„µ±Output“³refµ„³Output„„µ±SyncGc“³refµ„³SyncGc„„µ±flush“³lit³flush„„„„„³ embeddedType“³refµ³ EntityRef„³Ref„„„„„ \ No newline at end of file diff --git a/local-protocols/schemas/tunnelRelay.prs b/local-protocols/schemas/tunnelRelay.prs deleted file mode 100644 index d7d834d..0000000 --- a/local-protocols/schemas/tunnelRelay.prs +++ /dev/null @@ -1,8 +0,0 @@ -version 1 . -embeddedType EntityRef.Ref . - -RelayProtocol = Input / Output / SyncGc / =flush . - -Input = =eof / / . -Output = . -SyncGc = . diff --git a/src/actor.rs b/src/actor.rs index 2ec78db..df83c20 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -13,14 +13,16 @@ use preserves::value::Domain; use preserves::value::IOValue; use preserves::value::Map; use preserves::value::NestedValue; +use preserves_schema::support::ParseError; -use std::any::Any; use std::boxed::Box; use std::collections::hash_map::HashMap; +use std::convert::TryFrom; use std::convert::TryInto; use std::sync::Arc; use std::sync::RwLock; -use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering}; +use std::sync::Weak; +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use tokio::select; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; @@ -34,33 +36,33 @@ pub type Handle = u64; pub type ActorResult = Result<(), Error>; pub type ActorHandle = tokio::task::JoinHandle; -pub trait Entity: Send + std::marker::Sync { - fn as_any(&mut self) -> &mut dyn Any; +pub struct Synced; - fn assert(&mut self, _t: &mut Activation, _a: _Any, _h: Handle) -> ActorResult { +pub trait Entity: Send + Sync { + fn assert(&mut self, _t: &mut Activation, _a: M, _h: Handle) -> ActorResult { Ok(()) } fn retract(&mut self, _t: &mut Activation, _h: Handle) -> ActorResult { Ok(()) } - fn message(&mut self, _t: &mut Activation, _m: _Any) -> ActorResult { + fn message(&mut self, _t: &mut Activation, _m: M) -> ActorResult { Ok(()) } - fn sync(&mut self, t: &mut Activation, peer: Arc) -> ActorResult { - t.message(&peer, _Any::new(true)); + fn sync(&mut self, t: &mut Activation, peer: Arc>) -> ActorResult { + t.message(&peer, Synced); Ok(()) } fn turn_end(&mut self, _t: &mut Activation) -> ActorResult { Ok(()) } - fn exit_hook(&mut self, _t: &mut Activation, _exit_status: &ActorResult) -> ActorResult { + fn exit_hook(&mut self, _t: &mut Activation, _exit_status: &Arc) -> ActorResult { Ok(()) } } enum Destination { ImmediateSelf(Action), - Remote(Arc, Action), + Remote(Arc, Action), } type OutboundAssertions = Map; @@ -98,84 +100,45 @@ enum SystemMessage { pub struct Mailbox { pub actor_id: ActorId, - pub mailbox_id: u64, tx: UnboundedSender, - mailbox_count: Arc, } pub struct Actor { actor_id: ActorId, tx: UnboundedSender, + mailbox: Weak, rx: Option>, - mailbox_count: Arc, outbound_assertions: OutboundAssertions, next_task_id: u64, linked_tasks: Map, - exit_hooks: Vec>, + exit_hooks: Vec, + exit_status: Option>, } -pub struct ObjectAddress { - pub mailbox: Mailbox, - pub target: RwLock>, +pub struct Ref { + pub mailbox: Arc, + pub target: RwLock>>>, } -impl ObjectAddress { - pub fn oid(&self) -> usize { - std::ptr::addr_of!(*self) as usize - } -} - -impl PartialEq for ObjectAddress { - fn eq(&self, other: &Self) -> bool { - self.oid() == other.oid() - } -} - -impl Eq for ObjectAddress {} - -impl std::hash::Hash for ObjectAddress { - fn hash(&self, hash: &mut H) where H: std::hash::Hasher { - self.oid().hash(hash) - } -} - -impl PartialOrd for ObjectAddress { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for ObjectAddress { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.oid().cmp(&other.oid()) - } -} - -#[derive(PartialEq, Eq, Hash, PartialOrd, Ord)] -pub struct Ref { - pub addr: Arc, +#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct Cap { + pub underlying: Arc>, pub attenuation: Vec, } +pub struct Guard +where + for<'a> &'a M: Into<_Any>, + for<'a> M: TryFrom<&'a _Any>, +{ + underlying: Arc> +} + //--------------------------------------------------------------------------- static NEXT_DEBTOR_ID: AtomicU64 = AtomicU64::new(4); preserves_schema::support::lazy_static! { - pub static ref INERT_REF: Arc = { - struct InertEntity; - impl crate::actor::Entity for InertEntity { - fn as_any(&mut self) -> &mut dyn Any { - self - } - } - let mut ac = Actor::new(); - let e = ac.create(InertEntity); - ac.boot(tracing::info_span!(parent: None, "INERT_REF"), - |t| Box::pin(ready(Ok(t.actor.shutdown())))); - e - }; - pub static ref SYNDICATE_CREDIT: i64 = { let credit = std::env::var("SYNDICATE_CREDIT").unwrap_or("100".to_owned()) @@ -204,6 +167,23 @@ pub fn start_debt_reporter() { })); } +impl TryFrom<&_Any> for Synced { + type Error = ParseError; + fn try_from(value: &_Any) -> Result { + if let Some(true) = value.value().as_boolean() { + Ok(Synced) + } else { + Err(ParseError::conformance_error("Synced")) + } + } +} + +impl From<&Synced> for _Any { + fn from(_value: &Synced) -> Self { + _Any::new(true) + } +} + impl<'activation> Activation<'activation> { pub fn new(actor: &'activation mut Actor, debtor: Arc) -> Self { Activation { @@ -214,47 +194,43 @@ impl<'activation> Activation<'activation> { } } - fn immediate_oid(&self, r: &Arc) { - if r.addr.mailbox.actor_id != self.actor.actor_id { + fn immediate_oid(&self, r: &Arc>) { + if r.mailbox.actor_id != self.actor.actor_id { panic!("Cannot use immediate_self to send to remote peers"); } } - pub fn assert(&mut self, r: &Arc, a: M) -> Handle where M: Into<_Any> { + pub fn assert(&mut self, r: &Arc>, a: M) -> Handle { let handle = crate::next_handle(); - if let Some(assertion) = r.rewrite(a.into()) { - { - let r = Arc::clone(r); - self.queue_for(&r).push(Box::new( - move |t| r.with_entity(|e| e.assert(t, assertion, handle)))); - } - { - let r = Arc::clone(r); - self.actor.outbound_assertions.insert( - handle, - Destination::Remote(Arc::clone(&r), Box::new( - move |t| r.with_entity(|e| e.retract(t, handle))))); - } + { + let r = Arc::clone(r); + self.queue_for(&r).push(Box::new( + move |t| r.with_entity(|e| e.assert(t, a, handle)))); + } + { + let r = Arc::clone(r); + self.actor.outbound_assertions.insert( + handle, + Destination::Remote(Arc::clone(&r.mailbox), Box::new( + move |t| r.with_entity(|e| e.retract(t, handle))))); } handle } - pub fn assert_immediate_self(&mut self, r: &Arc, a: M) -> Handle where M: Into<_Any> { + pub fn assert_immediate_self(&mut self, r: &Arc>, a: M) -> Handle { self.immediate_oid(r); let handle = crate::next_handle(); - if let Some(assertion) = r.rewrite(a.into()) { - { - let r = Arc::clone(r); - self.immediate_self.push(Box::new( - move |t| r.with_entity(|e| e.assert(t, assertion, handle)))); - } - { - let r = Arc::clone(r); - self.actor.outbound_assertions.insert( - handle, - Destination::ImmediateSelf(Box::new( - move |t| r.with_entity(|e| e.retract(t, handle))))); - } + { + let r = Arc::clone(r); + self.immediate_self.push(Box::new( + move |t| r.with_entity(|e| e.assert(t, a, handle)))); + } + { + let r = Arc::clone(r); + self.actor.outbound_assertions.insert( + handle, + Destination::ImmediateSelf(Box::new( + move |t| r.with_entity(|e| e.retract(t, handle))))); } handle } @@ -267,39 +243,39 @@ impl<'activation> Activation<'activation> { fn retract_known_ref(&mut self, d: Destination) { match d { - Destination::Remote(r, action) => - self.queue_for(&r).push(action), + Destination::Remote(mailbox, action) => + self.queue_for_mailbox(&mailbox).push(action), Destination::ImmediateSelf(action) => self.immediate_self.push(action), } } - pub fn message(&mut self, r: &Arc, m: M) where M: Into<_Any> { - if let Some(body) = r.rewrite(m.into()) { - let r = Arc::clone(r); - self.queue_for(&r).push(Box::new( - move |t| r.with_entity(|e| e.message(t, body)))) - } + pub fn message(&mut self, r: &Arc>, m: M) { + let r = Arc::clone(r); + self.queue_for(&r).push(Box::new( + move |t| r.with_entity(|e| e.message(t, m)))) } - pub fn message_immediate_self(&mut self, r: &Arc, m: M) where M: Into<_Any> { + pub fn message_immediate_self(&mut self, r: &Arc>, m: M) { self.immediate_oid(r); - if let Some(body) = r.rewrite(m.into()) { - let r = Arc::clone(r); - self.immediate_self.push(Box::new( - move |t| r.with_entity(|e| e.message(t, body)))) - } + let r = Arc::clone(r); + self.immediate_self.push(Box::new( + move |t| r.with_entity(|e| e.message(t, m)))) } - pub fn sync(&mut self, r: &Arc, peer: Arc) { + pub fn sync(&mut self, r: &Arc>, peer: Arc>) { let r = Arc::clone(r); self.queue_for(&r).push(Box::new( move |t| r.with_entity(|e| e.sync(t, peer)))) } - fn queue_for(&mut self, r: &Arc) -> &mut PendingEventQueue { - &mut self.queues.entry(r.addr.mailbox.actor_id) - .or_insert((r.addr.mailbox.tx.clone(), Vec::new())).1 + fn queue_for(&mut self, r: &Arc>) -> &mut PendingEventQueue { + self.queue_for_mailbox(&r.mailbox) + } + + fn queue_for_mailbox(&mut self, mailbox: &Arc) -> &mut PendingEventQueue { + &mut self.queues.entry(mailbox.actor_id) + .or_insert((mailbox.tx.clone(), Vec::new())).1 } fn deliver(&mut self) { @@ -389,26 +365,26 @@ fn send_actions( impl std::fmt::Debug for Mailbox { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { - write!(f, "#", self.actor_id, self.mailbox_id) + write!(f, "#", self.actor_id) } } impl std::hash::Hash for Mailbox { fn hash(&self, state: &mut H) { - self.mailbox_id.hash(state) + self.actor_id.hash(state) } } impl Eq for Mailbox {} impl PartialEq for Mailbox { fn eq(&self, other: &Mailbox) -> bool { - self.mailbox_id == other.mailbox_id + self.actor_id == other.actor_id } } impl Ord for Mailbox { fn cmp(&self, other: &Mailbox) -> std::cmp::Ordering { - return self.mailbox_id.cmp(&other.mailbox_id) + return self.actor_id.cmp(&other.actor_id) } } @@ -418,33 +394,10 @@ impl PartialOrd for Mailbox { } } -impl Clone for Mailbox { - fn clone(&self) -> Self { - let Mailbox { actor_id, tx, mailbox_count, .. } = self; - let _old_refcount = mailbox_count.fetch_add(1, Ordering::SeqCst); - let new_mailbox = Mailbox { - actor_id: *actor_id, - mailbox_id: crate::next_mailbox_id(), - tx: tx.clone(), - mailbox_count: Arc::clone(mailbox_count), - }; - // tracing::trace!(old_mailbox = debug(&self), - // new_mailbox = debug(&new_mailbox), - // new_mailbox_refcount = debug(_old_refcount + 1)); - new_mailbox - } -} - impl Drop for Mailbox { fn drop(&mut self) { - let old_mailbox_refcount = self.mailbox_count.fetch_sub(1, Ordering::SeqCst); - let new_mailbox_refcount = old_mailbox_refcount - 1; - // tracing::trace!(mailbox = debug(&self), - // new_mailbox_refcount); - if new_mailbox_refcount == 0 { - let _ = self.tx.send(SystemMessage::Release); - () - } + let _ = self.tx.send(SystemMessage::Release); + () } } @@ -457,29 +410,27 @@ impl Actor { actor_id, tx, rx: Some(rx), - mailbox_count: Arc::new(AtomicUsize::new(0)), + mailbox: Weak::new(), outbound_assertions: Map::new(), next_task_id: 0, linked_tasks: Map::new(), exit_hooks: Vec::new(), + exit_status: None, } } - pub fn create_and_start( + pub fn create_and_start + Send + Sync + 'static>( name: tracing::Span, e: E, - ) -> Arc { - Self::create_and_start_rec(name, e, |_, _, _| ()) + ) -> Arc> { + let r = Self::create_and_start_inert(name); + r.become_entity(e); + r } - pub fn create_and_start_rec) -> ()>( - name: tracing::Span, - e: E, - f: F, - ) -> Arc { + pub fn create_and_start_inert(name: tracing::Span) -> Arc> { let mut ac = Self::new(); - let r = ac.create_rec(e, f); + let r = ac.create_inert(); ac.start(name); r } @@ -488,17 +439,18 @@ impl Actor { self.actor_id } - fn mailbox(&mut self) -> Mailbox { - let _old_refcount = self.mailbox_count.fetch_add(1, Ordering::SeqCst); - let new_mailbox = Mailbox { - actor_id: self.actor_id, - mailbox_id: crate::next_mailbox_id(), - tx: self.tx.clone(), - mailbox_count: Arc::clone(&self.mailbox_count), - }; - // tracing::trace!(new_mailbox = debug(&new_mailbox), - // new_mailbox_refcount = debug(_old_refcount + 1)); - new_mailbox + fn mailbox(&mut self) -> Arc { + match self.mailbox.upgrade() { + None => { + let new_mailbox = Arc::new(Mailbox { + actor_id: self.actor_id, + tx: self.tx.clone(), + }); + self.mailbox = Arc::downgrade(&new_mailbox); + new_mailbox + } + Some(m) => m + } } pub fn shutdown(&mut self) { @@ -506,25 +458,17 @@ impl Actor { () } - pub fn create(&mut self, e: E) -> Arc { - self.create_rec(e, |_, _, _| ()) + pub fn create + Send + Sync + 'static>(&mut self, e: E) -> Arc> { + let r = self.create_inert(); + r.become_entity(e); + r } - pub fn create_rec) -> ()>( - &mut self, - e: E, - f: F, - ) -> Arc { - let r = Arc::new(Ref { - addr: Arc::new(ObjectAddress { - mailbox: self.mailbox(), - target: RwLock::new(Box::new(e)), - }), - attenuation: Vec::new(), - }); - f(self, r.addr.target.write().expect("unpoisoned").as_any().downcast_mut().unwrap(), &r); - r + pub fn create_inert(&mut self) -> Arc> { + Arc::new(Ref { + mailbox: self.mailbox(), + target: RwLock::new(None), + }) } pub fn boot FnOnce(&'a mut Activation) -> BoxFuture<'a, ActorResult>>( @@ -536,13 +480,12 @@ impl Actor { tokio::spawn(async move { tracing::trace!("start"); let result = self.run(boot).await; + self.exit_status = Some(Arc::new(result.clone())); { let mut t = Activation::new(&mut self, Debtor::new(crate::name!("shutdown"))); - for r in std::mem::take(&mut t.actor.exit_hooks) { - if let Err(err) = r.with_entity(|e| e.exit_hook(&mut t, &result)) { - tracing::error!(err = debug(err), - r = debug(&r), - "error in exit hook"); + for action in std::mem::take(&mut t.actor.exit_hooks) { + if let Err(err) = action(&mut t) { + tracing::error!(err = debug(err), "error in exit hook"); } } } @@ -583,8 +526,12 @@ impl Actor { } } - pub fn add_exit_hook(&mut self, r: &Arc) { - self.exit_hooks.push(Arc::clone(r)) + pub fn add_exit_hook(&mut self, r: &Arc>) { + let r = Arc::clone(r); + self.exit_hooks.push(Box::new(move |t| { + let exit_status = Arc::clone(t.actor.exit_status.as_ref().expect("exited")); + r.with_entity(|e| e.exit_hook(t, &exit_status)) + })) } async fn handle(&mut self, m: SystemMessage) -> Result { @@ -673,25 +620,89 @@ impl Drop for Actor { } #[must_use] -pub fn external_event(r: &Arc, debtor: &Arc, action: Action) -> ActorResult { - send_actions(&r.addr.mailbox.tx, debtor, vec![action]) +pub fn external_event(mailbox: &Arc, debtor: &Arc, action: Action) -> ActorResult { + send_actions(&mailbox.tx, debtor, vec![action]) } #[must_use] -pub fn external_events(r: &Arc, debtor: &Arc, events: PendingEventQueue) -> ActorResult { - send_actions(&r.addr.mailbox.tx, debtor, events) +pub fn external_events(mailbox: &Arc, debtor: &Arc, events: PendingEventQueue) -> ActorResult { + send_actions(&mailbox.tx, debtor, events) } -impl Ref { - pub fn with_entity R>(&self, f: F) -> R { - f(&mut **self.addr.target.write().expect("unpoisoned")) +impl Ref { + pub fn become_entity>(&self, e: E) { + let mut g = self.target.write().expect("unpoisoned"); + if g.is_some() { + panic!("Double initialization of Ref"); + } + *g = Some(Box::new(e)); + } + + pub fn with_entity) -> R>(&self, f: F) -> R { + let mut g = self.target.write().expect("unpoisoned"); + f(g.as_mut().expect("initialized").as_mut()) + } +} + +impl Ref { + pub fn oid(&self) -> usize { + std::ptr::addr_of!(*self) as usize + } +} + +impl PartialEq for Ref { + fn eq(&self, other: &Self) -> bool { + self.oid() == other.oid() + } +} + +impl Eq for Ref {} + +impl std::hash::Hash for Ref { + fn hash(&self, hash: &mut H) where H: std::hash::Hasher { + self.oid().hash(hash) + } +} + +impl PartialOrd for Ref { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Ref { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.oid().cmp(&other.oid()) + } +} + +impl std::fmt::Debug for Ref { + fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { + write!(f, "āŒœ{}:{:016x}āŒ", self.mailbox.actor_id, self.oid()) + } +} + +impl Cap { + pub fn guard(underlying: &Arc>) -> Arc + where + for<'a> &'a M: Into<_Any>, + for<'a> M: TryFrom<&'a _Any>, + { + Self::new(&Arc::new(Ref { + mailbox: Arc::clone(&underlying.mailbox), + target: RwLock::new(Some(Box::new(Guard { underlying: underlying.clone() }))), + })) + } + + pub fn new(underlying: &Arc>) -> Arc { + Arc::new(Cap { + underlying: Arc::clone(underlying), + attenuation: Vec::new(), + }) } pub fn attenuate(&self, attenuation: &sturdy::Attenuation) -> Result, CaveatError> { - let mut r = Ref { - addr: Arc::clone(&self.addr), - attenuation: self.attenuation.clone(), - }; + let mut r = Cap { attenuation: self.attenuation.clone(), .. self.clone() }; r.attenuation.extend(attenuation.check()?); Ok(Arc::new(r)) } @@ -705,33 +716,77 @@ impl Ref { } Some(a) } -} -impl std::fmt::Debug for Ref { - fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { - if self.attenuation.is_empty() { - write!(f, "āŒœ{}:{:016x}āŒ", self.addr.mailbox.actor_id, self.addr.oid()) - } else { - write!(f, "āŒœ{}:{:016x}\\{:?}āŒ", self.addr.mailbox.actor_id, self.addr.oid(), self.attenuation) + pub fn assert>(&self, t: &mut Activation, m: M) -> Option { + self.rewrite(m.into()).map(|m| t.assert(&self.underlying, m)) + } + + pub fn message>(&self, t: &mut Activation, m: M) { + if let Some(m) = self.rewrite(m.into()) { + t.message(&self.underlying, m) } } } -impl Domain for Ref {} - -impl std::convert::TryFrom<&IOValue> for Ref { - type Error = preserves_schema::support::ParseError; - fn try_from(_v: &IOValue) -> Result { - panic!("Attempted to serialize Ref via IOValue"); +impl std::fmt::Debug for Cap { + fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { + if self.attenuation.is_empty() { + self.underlying.fmt(f) + } else { + write!(f, "āŒœ{}:{:016x}\\{:?}āŒ", + self.underlying.mailbox.actor_id, + self.underlying.oid(), + self.attenuation) + } } } -impl std::convert::From<&Ref> for IOValue { - fn from(_v: &Ref) -> IOValue { +impl Domain for Cap {} + +impl std::convert::TryFrom<&IOValue> for Cap { + type Error = preserves_schema::support::ParseError; + fn try_from(_v: &IOValue) -> Result { + panic!("Attempted to serialize Cap via IOValue"); + } +} + +impl std::convert::From<&Cap> for IOValue { + fn from(_v: &Cap) -> IOValue { panic!("Attempted to deserialize Ref via IOValue"); } } +impl Entity<_Any> for Guard +where + for<'a> &'a M: Into<_Any>, + for<'a> M: TryFrom<&'a _Any>, +{ + fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult { + match M::try_from(&a) { + Ok(a) => self.underlying.with_entity(|e| e.assert(t, a, h)), + Err(_) => Ok(()), + } + } + fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult { + self.underlying.with_entity(|e| e.retract(t, h)) + } + fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult { + match M::try_from(&m) { + Ok(m) => self.underlying.with_entity(|e| e.message(t, m)), + Err(_) => Ok(()), + } + } + fn sync(&mut self, t: &mut Activation, peer: Arc>) -> ActorResult { + self.underlying.with_entity(|e| e.sync(t, peer)) + } + fn turn_end(&mut self, t: &mut Activation) -> ActorResult { + self.underlying.with_entity(|e| e.turn_end(t)) + } + fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) -> ActorResult { + self.underlying.with_entity(|e| e.exit_hook(t, exit_status)) + } +} + #[macro_export] macro_rules! name { () => {tracing::info_span!(actor_id = tracing::field::Empty, diff --git a/src/bin/syndicate-server.rs b/src/bin/syndicate-server.rs index f8cb1c2..ee39720 100644 --- a/src/bin/syndicate-server.rs +++ b/src/bin/syndicate-server.rs @@ -5,8 +5,6 @@ use preserves::value::Map; use preserves::value::NestedValue; use preserves::value::Value; -use std::any::Any; -use std::convert::TryFrom; use std::future::ready; use std::iter::FromIterator; use std::sync::Arc; @@ -21,6 +19,7 @@ use syndicate::error::error; use syndicate::config; use syndicate::relay; use syndicate::schemas::internal_protocol::_Any; +use syndicate::schemas::gatekeeper; use syndicate::sturdy; use tokio::net::TcpListener; @@ -72,10 +71,10 @@ async fn main() -> Result<(), Box> { tracing::trace!("startup"); - let ds = Actor::create_and_start(syndicate::name!("dataspace"), Dataspace::new()); - let gateway = Actor::create_and_start( + let ds = Cap::new(&Actor::create_and_start(syndicate::name!("dataspace"), Dataspace::new())); + let gateway = Cap::guard(&Actor::create_and_start( syndicate::name!("gateway"), - syndicate::entity(Arc::clone(&ds)).on_asserted(handle_resolve)); + syndicate::entity(Arc::clone(&ds)).on_asserted(handle_resolve))); { let ds = Arc::clone(&ds); Actor::new().boot(syndicate::name!("rootcap"), |t| Box::pin(async move { @@ -84,7 +83,7 @@ async fn main() -> Result<(), Box> { let sr = sturdy::SturdyRef::mint(_Any::new("syndicate"), &key); tracing::info!(rootcap = debug(&_Any::from(&sr))); tracing::info!(rootcap = display(sr.to_hex())); - t.assert(&ds, &gatekeeper::Bind { oid: sr.oid.clone(), key, target: ds.clone() }); + ds.assert(t, &gatekeeper::Bind { oid: sr.oid.clone(), key, target: ds.clone() }); Ok(()) })); } @@ -131,7 +130,7 @@ fn extract_binary_packets( async fn run_connection( t: &mut Activation<'_>, stream: TcpStream, - gateway: Arc, + gateway: Arc, addr: std::net::SocketAddr, config: Arc, ) -> ActorResult { @@ -158,11 +157,8 @@ async fn run_connection( _ => unreachable!() }; struct ExitListener; - impl Entity for ExitListener { - fn as_any(&mut self) -> &mut dyn Any { - self - } - fn exit_hook(&mut self, _t: &mut Activation, exit_status: &ActorResult) -> ActorResult { + impl Entity<()> for ExitListener { + fn exit_hook(&mut self, _t: &mut Activation, exit_status: &Arc) -> ActorResult { tracing::info!(exit_status = debug(exit_status), "disconnect"); Ok(()) } @@ -174,7 +170,7 @@ async fn run_connection( } async fn run_listener( - gateway: Arc, + gateway: Arc, port: u16, config: Arc, ) -> ActorResult { @@ -193,59 +189,65 @@ async fn run_listener( //--------------------------------------------------------------------------- -fn handle_resolve(ds: &mut Arc, t: &mut Activation, a: _Any) -> DuringResult> { +fn handle_resolve( + ds: &mut Arc, + t: &mut Activation, + a: gatekeeper::Resolve, +) -> DuringResult> { use syndicate::schemas::dataspace; use syndicate::schemas::dataspace_patterns as p; - use syndicate::schemas::gatekeeper; - match gatekeeper::Resolve::try_from(&a) { - Err(_) => Ok(None), - Ok(gatekeeper::Resolve { sturdyref, observer }) => { - let queried_oid = sturdyref.oid.clone(); - let handler = syndicate::entity(observer) - .on_asserted(move |observer, t, a| { - let bindings = a.value().to_sequence()?; - let key = bindings[0].value().to_bytestring()?; - let unattenuated_target = bindings[1].value().to_embedded()?; - match sturdyref.validate_and_attenuate(key, unattenuated_target) { - Err(e) => { - tracing::warn!(sturdyref = debug(&_Any::from(&sturdyref)), - "sturdyref failed validation: {}", e); - Ok(None) - }, - Ok(target) => { - tracing::trace!(sturdyref = debug(&_Any::from(&sturdyref)), - target = debug(&target), - "sturdyref resolved"); - let h = t.assert(observer, _Any::domain(target)); - Ok(Some(Box::new(move |_observer, t| Ok(t.retract(h))))) - } + + let gatekeeper::Resolve { sturdyref, observer } = a; + let queried_oid = sturdyref.oid.clone(); + let handler = syndicate::entity(observer) + .on_asserted(move |observer, t, a: _Any| { + let bindings = a.value().to_sequence()?; + let key = bindings[0].value().to_bytestring()?; + let unattenuated_target = bindings[1].value().to_embedded()?; + match sturdyref.validate_and_attenuate(key, unattenuated_target) { + Err(e) => { + tracing::warn!(sturdyref = debug(&_Any::from(&sturdyref)), + "sturdyref failed validation: {}", e); + Ok(None) + }, + Ok(target) => { + tracing::trace!(sturdyref = debug(&_Any::from(&sturdyref)), + target = debug(&target), + "sturdyref resolved"); + if let Some(h) = observer.assert(t, _Any::domain(target)) { + Ok(Some(Box::new(move |_observer, t| Ok(t.retract(h))))) + } else { + Ok(None) } - }) - .create(t.actor); - let oh = t.assert(ds, &dataspace::Observe { - // TODO: codegen plugin to generate pattern constructors - pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { - ctor: Box::new(p::CRec { - label: Value::symbol("bind").wrap(), - arity: 3.into(), - }), - members: Map::from_iter(vec![ - (0.into(), p::Pattern::DLit(Box::new(p::DLit { - value: queried_oid, - }))), - (1.into(), p::Pattern::DBind(Box::new(p::DBind { - name: "key".to_owned(), - pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), - }))), - (2.into(), p::Pattern::DBind(Box::new(p::DBind { - name: "target".to_owned(), - pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), - }))), - ].into_iter()) - })), - observer: handler, - }); - Ok(Some(Box::new(move |_ds, t| Ok(t.retract(oh))))) - } + } + } + }) + .create_cap(t.actor); + if let Some(oh) = ds.assert(t, &dataspace::Observe { + // TODO: codegen plugin to generate pattern constructors + pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { + ctor: Box::new(p::CRec { + label: Value::symbol("bind").wrap(), + arity: 3.into(), + }), + members: Map::from_iter(vec![ + (0.into(), p::Pattern::DLit(Box::new(p::DLit { + value: queried_oid, + }))), + (1.into(), p::Pattern::DBind(Box::new(p::DBind { + name: "key".to_owned(), + pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), + }))), + (2.into(), p::Pattern::DBind(Box::new(p::DBind { + name: "target".to_owned(), + pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), + }))), + ].into_iter()) + })), + observer: handler, + }) { + Ok(Some(Box::new(move |_ds, t| Ok(t.retract(oh))))) + } else { + Ok(None) } } diff --git a/src/dataspace.rs b/src/dataspace.rs index d04ffeb..e0fb195 100644 --- a/src/dataspace.rs +++ b/src/dataspace.rs @@ -5,7 +5,6 @@ use super::schemas::dataspace::_Any; use preserves::value::Map; -use std::any::Any; use std::convert::TryFrom; #[derive(Debug)] @@ -68,10 +67,7 @@ impl Dataspace { } } -impl Entity for Dataspace { - fn as_any(&mut self) -> &mut dyn Any { - self - } +impl Entity<_Any> for Dataspace { fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult { // tracing::trace!(assertion = debug(&a), handle = debug(&h), "assert"); diff --git a/src/during.rs b/src/during.rs index db98466..fa3d9b6 100644 --- a/src/during.rs +++ b/src/during.rs @@ -3,26 +3,25 @@ use crate::error::Error; use preserves::value::Map; -use std::any::Any; use std::sync::Arc; +use std::marker::PhantomData; -pub type DuringRetractionHandler = Box ActorResult>; pub struct During(Map>); +pub type DuringRetractionHandler = Box ActorResult>; +pub type DuringResult = Result>, Error>; -pub type DuringResult = - Result ActorResult>>, - Error>; - -pub struct DuringEntity +pub struct DuringEntity where + M: 'static + Send + Sync, E: 'static + Send + Sync, - Fa: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> DuringResult, - Fm: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> ActorResult, + Fa: 'static + Send + Sync + FnMut(&mut E, &mut Activation, M) -> DuringResult, + Fm: 'static + Send + Sync + FnMut(&mut E, &mut Activation, M) -> ActorResult, { state: E, assertion_handler: Option, message_handler: Option, during: During, + phantom: PhantomData, } impl During { @@ -44,22 +43,24 @@ impl During { } } -pub fn entity( +pub fn entity( state: E -) -> DuringEntity DuringResult, - fn (&mut E, &mut Activation, _Any) -> ActorResult> +) -> DuringEntity DuringResult, + fn (&mut E, &mut Activation, M) -> ActorResult> where E: 'static + Send + Sync, { DuringEntity::new(state, None, None) } -impl DuringEntity +impl DuringEntity where + M: 'static + Send + Sync, E: 'static + Send + Sync, - Fa: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> DuringResult, - Fm: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> ActorResult, + Fa: 'static + Send + Sync + FnMut(&mut E, &mut Activation, M) -> DuringResult, + Fm: 'static + Send + Sync + FnMut(&mut E, &mut Activation, M) -> ActorResult, { pub fn new(state: E, assertion_handler: Option, message_handler: Option) -> Self { DuringEntity { @@ -67,56 +68,61 @@ where assertion_handler, message_handler, during: During::new(), + phantom: PhantomData, } } - pub fn on_asserted(self, assertion_handler: Fa1) -> DuringEntity + pub fn on_asserted(self, assertion_handler: Fa1) -> DuringEntity where - Fa1: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> DuringResult, + Fa1: 'static + Send + Sync + FnMut(&mut E, &mut Activation, M) -> DuringResult, { DuringEntity { state: self.state, assertion_handler: Some(assertion_handler), message_handler: self.message_handler, during: self.during, + phantom: PhantomData, } } - pub fn on_message(self, message_handler: Fm1) -> DuringEntity + pub fn on_message(self, message_handler: Fm1) -> DuringEntity where - Fm1: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> ActorResult, + Fm1: 'static + Send + Sync + FnMut(&mut E, &mut Activation, M) -> ActorResult, { DuringEntity { state: self.state, assertion_handler: self.assertion_handler, message_handler: Some(message_handler), during: self.during, + phantom: PhantomData, } } - pub fn create(self, ac: &mut Actor) -> Arc { + pub fn create(self, ac: &mut Actor) -> Arc> { ac.create(self) } - - pub fn create_rec(self, ac: &mut Actor, f: F) -> Arc - where - F: FnOnce(&mut Actor, &mut E, &Arc) -> () - { - ac.create_rec(self, |ac, e, e_ref| f(ac, &mut e.state, e_ref)) - } } -impl Entity for DuringEntity +impl DuringEntity<_Any, E, Fa, Fm> where E: 'static + Send + Sync, Fa: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> DuringResult, Fm: 'static + Send + Sync + FnMut(&mut E, &mut Activation, _Any) -> ActorResult, { - fn as_any(&mut self) -> &mut dyn Any { - self + pub fn create_cap(self, ac: &mut Actor) -> Arc + { + Cap::new(&self.create(ac)) } +} - fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult { +impl Entity for DuringEntity +where + M: Send + Sync, + E: 'static + Send + Sync, + Fa: 'static + Send + Sync + FnMut(&mut E, &mut Activation, M) -> DuringResult, + Fm: 'static + Send + Sync + FnMut(&mut E, &mut Activation, M) -> ActorResult, +{ + fn assert(&mut self, t: &mut Activation, a: M, h: Handle) -> ActorResult { match &mut self.assertion_handler { Some(handler) => match handler(&mut self.state, t, a)? { Some(f) => self.during.await_retraction(h, f), @@ -130,7 +136,7 @@ where self.during.retract(h)(&mut self.state, t) } - fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult { + fn message(&mut self, t: &mut Activation, m: M) -> ActorResult { match &mut self.message_handler { Some(handler) => handler(&mut self.state, t, m), None => Ok(()), diff --git a/src/error.rs b/src/error.rs index 9fd098c..b7c954d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -16,7 +16,7 @@ impl std::fmt::Display for Error { pub fn error(message: &str, detail: Detail) -> Error where _Any: From { Error { message: message.to_owned(), - detail: detail.into(), + detail: _Any::from(detail), } } diff --git a/src/lib.rs b/src/lib.rs index 7daaec5..6039c53 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,8 +37,3 @@ static NEXT_HANDLE: AtomicU64 = AtomicU64::new(3); pub fn next_handle() -> Handle { NEXT_HANDLE.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed) } - -static NEXT_MAILBOX_ID: AtomicU64 = AtomicU64::new(4); -pub fn next_mailbox_id() -> u64 { - NEXT_MAILBOX_ID.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed) -} diff --git a/src/relay.rs b/src/relay.rs index 645275b..df7d4b7 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -8,7 +8,6 @@ use crate::error::error; use crate::schemas::gatekeeper; use crate::schemas::internal_protocol as P; use crate::schemas::sturdy; -use crate::schemas::tunnel_relay; use futures::Sink; use futures::SinkExt; @@ -34,7 +33,6 @@ use preserves::value::signed_integer::SignedInteger; use preserves_schema::support::Deserialize; use preserves_schema::support::ParseError; -use std::any::Any; use std::convert::TryFrom; use std::io; use std::pin::Pin; @@ -49,15 +47,28 @@ use tokio::io::AsyncWriteExt; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; +enum RelayInput { + Eof, + Packet(Vec), + Segment(Vec), +} + +enum RelayProtocol { + Input(RelayInput), + Output(sturdy::Oid, P::Event), + SyncGc(Arc), + Flush, +} + struct WireSymbol { oid: sturdy::Oid, - obj: Arc, + obj: Arc, ref_count: AtomicUsize, } struct Membrane { oid_map: Map>, - ref_map: Map, Arc>, + ref_map: Map, Arc>, } struct Membranes { @@ -76,10 +87,12 @@ pub enum Output { Bytes(Pin>), } +type TunnelRelayRef = Arc>; + // There are other kinds of relay. This one has exactly two participants connected to each other. pub struct TunnelRelay { - self_ref: Arc, + self_ref: TunnelRelayRef, input_buffer: BytesMut, inbound_assertions: Map>)>, outbound_assertions: Map>>, @@ -89,7 +102,7 @@ pub struct TunnelRelay } struct RelayEntity { - relay_ref: Arc, + relay_ref: TunnelRelayRef, oid: sturdy::Oid, } @@ -113,7 +126,7 @@ impl Membrane { } } - fn insert(&mut self, oid: sturdy::Oid, obj: Arc) -> Arc { + fn insert(&mut self, oid: sturdy::Oid, obj: Arc) -> Arc { let ws = Arc::new(WireSymbol { oid: oid.clone(), obj: Arc::clone(&obj), @@ -124,7 +137,7 @@ impl Membrane { ws } - fn acquire(&mut self, r: &Arc) -> Arc { + fn acquire(&mut self, r: &Arc) -> Arc { let ws = self.ref_map.get(r).expect("WireSymbol must be present at acquire() time"); ws.acquire(); Arc::clone(ws) @@ -149,18 +162,18 @@ pub fn connect_stream( I: 'static + Send + AsyncRead, O: 'static + Send + AsyncWrite, E: 'static + Send + std::marker::Sync, - F: 'static + Send + std::marker::Sync + FnMut(&mut E, &mut Activation, Arc) -> during::DuringResult + F: 'static + Send + std::marker::Sync + FnMut(&mut E, &mut Activation, Arc) -> during::DuringResult { let i = Input::Bytes(Box::pin(i)); let o = Output::Bytes(Box::pin(o)); let gatekeeper = TunnelRelay::run(t, i, o, None, Some(sturdy::Oid(0.into()))).unwrap(); - let main_entity = t.actor.create(during::entity(initial_state).on_asserted(move |state, t, a| { + let main_entity = t.actor.create(during::entity(initial_state).on_asserted(move |state, t, a: _Any| { let denotation = a.value().to_embedded()?; f(state, t, Arc::clone(denotation)) })); - t.assert(&gatekeeper, &gatekeeper::Resolve { + gatekeeper.assert(t, &gatekeeper::Resolve { sturdyref, - observer: main_entity, + observer: Cap::new(&main_entity), }); } @@ -169,12 +182,12 @@ impl TunnelRelay { t: &mut Activation, i: Input, o: Output, - initial_ref: Option>, + initial_ref: Option>, initial_oid: Option, - ) -> Option> { + ) -> Option> { let (output_tx, output_rx) = unbounded_channel(); let mut tr = TunnelRelay { - self_ref: Arc::clone(&*INERT_REF), /* placeholder */ + self_ref: t.actor.create_inert(), input_buffer: BytesMut::with_capacity(1024), output: output_tx, inbound_assertions: Map::new(), @@ -189,13 +202,10 @@ impl TunnelRelay { if let Some(ir) = initial_ref { tr.membranes.export_ref(ir, true); } - let mut result = None; - let tr_ref = t.actor.create_rec(tr, |ac, tr, tr_ref| { - tr.self_ref = Arc::clone(tr_ref); - if let Some(io) = initial_oid { - result = Some(Arc::clone(&tr.membranes.import_oid(ac, tr_ref, io).obj)); - } - }); + let result = initial_oid.map( + |io| Arc::clone(&tr.membranes.import_oid(t.actor, &tr.self_ref, io).obj)); + let tr_ref = Arc::clone(&tr.self_ref); + tr_ref.become_entity(tr); t.actor.add_exit_hook(&tr_ref); t.actor.linked_task(crate::name!("writer"), output_loop(o, output_rx)); t.actor.linked_task(crate::name!("reader"), input_loop(i, tr_ref)); @@ -228,9 +238,10 @@ impl TunnelRelay { a.foreach_embedded::<_, Error>(&mut |r| { Ok(imported.push(imported_membrane.acquire(r))) })?; - let local_handle = t.assert(target, a); - if let Some(_) = self.inbound_assertions.insert(remote_handle, (local_handle, imported)) { - return Err(error("Assertion with duplicate handle", _Any::new(false))); + if let Some(local_handle) = target.assert(t, a) { + if let Some(_) = self.inbound_assertions.insert(remote_handle, (local_handle, imported)) { + return Err(error("Assertion with duplicate handle", _Any::new(false))); + } } } P::Event::Retract(b) => { @@ -254,26 +265,20 @@ impl TunnelRelay { _ => Ok(()) } })?; - t.message(target, a); + target.message(t, a); } P::Event::Sync(b) => { let P::Sync { peer } = *b; self.membranes.imported.acquire(&peer); struct SyncPeer { - tr: Arc, - peer: Arc, + tr: TunnelRelayRef, + peer: Arc, } - impl Entity for SyncPeer { - fn as_any(&mut self) -> &mut dyn Any { - self - } - fn message(&mut self, t: &mut Activation, a: _Any) -> ActorResult { - if let Some(true) = a.value().as_boolean() { - t.message(&self.peer, _Any::new(true)); - t.message(&self.tr, &tunnel_relay::SyncGc { - peer: Arc::clone(&self.peer) - }); - } + impl Entity for SyncPeer { + fn message(&mut self, t: &mut Activation, _a: Synced) -> ActorResult { + self.peer.message(t, _Any::new(true)); + t.message(&self.tr, RelayProtocol::SyncGc( + Arc::clone(&self.peer))); Ok(()) } } @@ -281,7 +286,7 @@ impl TunnelRelay { tr: Arc::clone(&self.self_ref), peer: Arc::clone(&peer), }); - t.sync(&peer, k); + t.sync(&peer.underlying, k); } } } @@ -336,7 +341,7 @@ impl TunnelRelay { } impl Membranes { - fn export_ref(&mut self, obj: Arc, and_acquire: bool) -> Arc { + fn export_ref(&mut self, obj: Arc, and_acquire: bool) -> Arc { let ws = match self.exported.ref_map.get(&obj) { None => { let oid = sturdy::Oid(SignedInteger::from(self.next_export_oid as u128)); @@ -354,17 +359,17 @@ impl Membranes { fn import_oid( &mut self, ac: &mut Actor, - relay_ref: &Arc, + relay_ref: &TunnelRelayRef, oid: sturdy::Oid, ) -> Arc { let obj = ac.create(RelayEntity { relay_ref: Arc::clone(relay_ref), oid: oid.clone() }); - self.imported.insert(oid, obj) + self.imported.insert(oid, Cap::new(&obj)) } fn decode_embedded<'de, 'src, S: BinarySource<'de>>( &mut self, t: &mut Activation, - relay_ref: &Arc, + relay_ref: &TunnelRelayRef, src: &'src mut S, _read_annotations: bool, ) -> io::Result { @@ -392,7 +397,7 @@ impl Membranes { })?) } } - None => Ok(Arc::clone(&*INERT_REF)), + None => Ok(Cap::new(&t.actor.create_inert())), } } } @@ -400,7 +405,7 @@ impl Membranes { } struct ActivatedMembranes<'a, 'activation, 'm>(&'a mut Activation<'activation>, - &'m Arc, + &'m TunnelRelayRef, &'m mut Membranes); impl<'a, 'activation, 'm> DomainDecode for ActivatedMembranes<'a, 'activation, 'm> { @@ -448,17 +453,20 @@ impl DomainEncode for Membranes { } } -pub async fn input_loop( +async fn input_loop( i: Input, - relay: Arc, + relay: TunnelRelayRef, ) -> ActorResult { #[must_use] - async fn s>(relay: &Arc, debtor: &Arc, m: M) -> ActorResult { + async fn s( + relay: &TunnelRelayRef, + debtor: &Arc, + m: RelayInput, + ) -> ActorResult { debtor.ensure_clear_funds().await; - let m = m.into(); let relay = Arc::clone(relay); - external_event(&Arc::clone(&relay), debtor, Box::new( - move |t| relay.with_entity(|e| e.message(t, m)))) + external_event(&Arc::clone(&relay.mailbox), debtor, Box::new( + move |t| relay.with_entity(|e| e.message(t, RelayProtocol::Input(m))))) } let debtor = Debtor::new(crate::name!("input-loop")); @@ -468,11 +476,11 @@ pub async fn input_loop( loop { match src.next().await { None => { - s(&relay, &debtor, &tunnel_relay::Input::Eof).await?; + s(&relay, &debtor, RelayInput::Eof).await?; return Ok(()); } Some(bs) => { - s(&relay, &debtor, &tunnel_relay::Input::Packet { bs: bs? }).await?; + s(&relay, &debtor, RelayInput::Packet(bs?)).await?; } } } @@ -486,7 +494,7 @@ pub async fn input_loop( Ok(n) => n, Err(e) => if e.kind() == io::ErrorKind::ConnectionReset { - s(&relay, &debtor, &tunnel_relay::Input::Eof).await?; + s(&relay, &debtor, RelayInput::Eof).await?; return Ok(()); } else { return Err(e)?; @@ -494,14 +502,14 @@ pub async fn input_loop( }; match n { 0 => { - s(&relay, &debtor, &tunnel_relay::Input::Eof).await?; + s(&relay, &debtor, RelayInput::Eof).await?; return Ok(()); } _ => { while buf.has_remaining() { let bs = buf.chunk(); let n = bs.len(); - s(&relay, &debtor, &tunnel_relay::Input::Segment { bs: bs.to_vec() }).await?; + s(&relay, &debtor, RelayInput::Segment(bs.to_vec())).await?; buf.advance(n); } } @@ -511,7 +519,7 @@ pub async fn input_loop( } } -pub async fn output_loop( +async fn output_loop( mut o: Output, mut output_rx: UnboundedReceiver>>, ) -> ActorResult { @@ -532,81 +540,69 @@ pub async fn output_loop( } } -impl Entity for TunnelRelay { - fn as_any(&mut self) -> &mut dyn Any { - self - } - fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult { - if let Ok(m) = tunnel_relay::RelayProtocol::try_from(&m) { - match m { - tunnel_relay::RelayProtocol::Input(b) => match *b { - tunnel_relay::Input::Eof => { - t.actor.shutdown(); - } - tunnel_relay::Input::Packet { bs } => { - let mut src = BytesBinarySource::new(&bs); +impl Entity for TunnelRelay { + fn message(&mut self, t: &mut Activation, m: RelayProtocol) -> ActorResult { + match m { + RelayProtocol::Input(RelayInput::Eof) => { + t.actor.shutdown(); + } + RelayProtocol::Input(RelayInput::Packet(bs)) => { + let mut src = BytesBinarySource::new(&bs); + let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes); + let mut r = src.packed::<_, _Any, _>(&mut dec); + let item = P::Packet::deserialize(&mut r)?; + self.handle_inbound_packet(t, item)?; + } + RelayProtocol::Input(RelayInput::Segment(bs)) => { + self.input_buffer.extend_from_slice(&bs); + loop { + let (e, count) = { + let mut src = BytesBinarySource::new(&self.input_buffer); let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes); let mut r = src.packed::<_, _Any, _>(&mut dec); - let item = P::Packet::deserialize(&mut r)?; - self.handle_inbound_packet(t, item)?; - } - tunnel_relay::Input::Segment { bs } => { - self.input_buffer.extend_from_slice(&bs); - loop { - let (e, count) = { - let mut src = BytesBinarySource::new(&self.input_buffer); - let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes); - let mut r = src.packed::<_, _Any, _>(&mut dec); - let e = match P::Packet::deserialize(&mut r) { - Err(ParseError::Preserves(PreservesError::Io(e))) - if is_eof_io_error(&e) => - None, - result => Some(result?), - }; - (e, r.source.index) - }; - match e { - None => break, - Some(item) => { - self.input_buffer.advance(count); - self.handle_inbound_packet(t, item)?; - } - } - } - } - } - tunnel_relay::RelayProtocol::Output(b) => match *b { - tunnel_relay::Output { oid, event } => { - if self.pending_outbound.is_empty() { - t.message_immediate_self( - &self.self_ref, &tunnel_relay::RelayProtocol::Flush); - } - let turn_event = P::TurnEvent { - oid: P::Oid(oid.0), - event: self.handle_outbound_event(t, event)?, + let e = match P::Packet::deserialize(&mut r) { + Err(ParseError::Preserves(PreservesError::Io(e))) + if is_eof_io_error(&e) => + None, + result => Some(result?), }; - self.pending_outbound.push(turn_event); - } - } - tunnel_relay::RelayProtocol::SyncGc(b) => match *b { - tunnel_relay::SyncGc { peer } => { - if let Some(ws) = self.membranes.imported.ref_map.get(&peer) { - let ws = Arc::clone(ws); // cloned to release the borrow to permit the release - self.membranes.imported.release(&ws); + (e, r.source.index) + }; + match e { + None => break, + Some(item) => { + self.input_buffer.advance(count); + self.handle_inbound_packet(t, item)?; } } } - tunnel_relay::RelayProtocol::Flush => { - let events = std::mem::take(&mut self.pending_outbound); - self.send_packet(&t.debtor, events.len(), P::Packet::Turn(Box::new(P::Turn(events))))? + } + RelayProtocol::Output(oid, event) => { + if self.pending_outbound.is_empty() { + t.message_immediate_self(&self.self_ref, RelayProtocol::Flush); } + let turn_event = P::TurnEvent { + oid: P::Oid(oid.0), + event: self.handle_outbound_event(t, event)?, + }; + self.pending_outbound.push(turn_event); + } + RelayProtocol::SyncGc(peer) => { + if let Some(ws) = self.membranes.imported.ref_map.get(&peer) { + let ws = Arc::clone(ws); // cloned to release the borrow to permit the release + self.membranes.imported.release(&ws); + } + } + RelayProtocol::Flush => { + let events = std::mem::take(&mut self.pending_outbound); + self.send_packet(&t.debtor, events.len(), P::Packet::Turn(Box::new(P::Turn(events))))? } } Ok(()) } - fn exit_hook(&mut self, t: &mut Activation, exit_status: &ActorResult) -> ActorResult { - if let Err(e) = exit_status { + fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) -> ActorResult { + if let Err(e) = &**exit_status { let e = e.clone(); self.send_packet(&t.debtor, 1, P::Packet::Error(Box::new(e)))?; } @@ -614,37 +610,30 @@ impl Entity for TunnelRelay { } } -impl Entity for RelayEntity { - fn as_any(&mut self) -> &mut dyn Any { - self - } +impl Entity<_Any> for RelayEntity { fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult { - Ok(t.message(&self.relay_ref, &tunnel_relay::Output { - oid: self.oid.clone(), - event: P::Event::Assert(Box::new(P::Assert { + Ok(t.message(&self.relay_ref, RelayProtocol::Output( + self.oid.clone(), + P::Event::Assert(Box::new(P::Assert { assertion: P::Assertion(a), handle: P::Handle(h.into()), - })), - })) + }))))) } fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult { - Ok(t.message(&self.relay_ref, &tunnel_relay::Output { - oid: self.oid.clone(), - event: P::Event::Retract(Box::new(P::Retract { + Ok(t.message(&self.relay_ref, RelayProtocol::Output( + self.oid.clone(), + P::Event::Retract(Box::new(P::Retract { handle: P::Handle(h.into()), - })), - })) + }))))) } fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult { - Ok(t.message(&self.relay_ref, &tunnel_relay::Output { - oid: self.oid.clone(), - event: P::Event::Message(Box::new(P::Message { body: P::Assertion(m) })), - })) + Ok(t.message(&self.relay_ref, RelayProtocol::Output( + self.oid.clone(), + P::Event::Message(Box::new(P::Message { body: P::Assertion(m) }))))) } - fn sync(&mut self, t: &mut Activation, peer: Arc) -> ActorResult { - Ok(t.message(&self.relay_ref, &tunnel_relay::Output { - oid: self.oid.clone(), - event: P::Event::Sync(Box::new(P::Sync { peer })), - })) + fn sync(&mut self, t: &mut Activation, peer: Arc>) -> ActorResult { + Ok(t.message(&self.relay_ref, RelayProtocol::Output( + self.oid.clone(), + P::Event::Sync(Box::new(P::Sync { peer: Cap::guard(&peer) }))))) } } diff --git a/src/skeleton.rs b/src/skeleton.rs index ba00b6c..0df8074 100644 --- a/src/skeleton.rs +++ b/src/skeleton.rs @@ -1,6 +1,6 @@ use super::bag; -use preserves::value::{Map, Set, Value, NestedValue}; +use preserves::value::{Map, NestedValue, Set, Value}; use std::collections::btree_map::Entry; use std::convert::TryFrom; use std::convert::TryInto; @@ -9,7 +9,7 @@ use std::sync::Arc; use crate::actor::_Any; use crate::actor::Activation; use crate::actor::Handle; -use crate::actor::Ref; +use crate::actor::Cap; use crate::schemas::dataspace_patterns as ds; use crate::pattern::{self, PathStep, Path, Paths}; @@ -58,7 +58,7 @@ struct Leaf { // aka Topic #[derive(Debug)] struct Endpoints { cached_captures: Bag, - endpoints: Map, Map>, + endpoints: Map, Map>, } //--------------------------------------------------------------------------- @@ -76,7 +76,7 @@ impl Index { &mut self, t: &mut Activation, pat: &ds::Pattern, - observer: &Arc, + observer: &Arc, ) { let analysis = pattern::PatternAnalysis::new(pat); self.root.extend(pat).add_observer(t, &analysis, observer); @@ -87,7 +87,7 @@ impl Index { &mut self, t: &mut Activation, pat: ds::Pattern, - observer: &Arc, + observer: &Arc, ) { let analysis = pattern::PatternAnalysis::new(&pat); self.root.extend(&pat).remove_observer(t, analysis, observer); @@ -106,7 +106,9 @@ impl Index { |es, cs| { if es.cached_captures.change(cs.clone(), 1) == bag::Net::AbsentToPresent { for (observer, capture_map) in &mut es.endpoints { - capture_map.insert(cs.clone(), t.assert(observer, cs.clone())); + if let Some(h) = observer.assert(t, cs.clone()) { + capture_map.insert(cs.clone(), h); + } } } }) @@ -151,7 +153,7 @@ impl Index { |es, cs| { *delivery_count += es.endpoints.len(); for observer in es.endpoints.keys() { - t.message(observer, cs.clone()); + observer.message(t, cs.clone()); } }).perform(&mut self.root); } @@ -389,7 +391,7 @@ impl Continuation { &mut self, t: &mut Activation, analysis: &pattern::PatternAnalysis, - observer: &Arc, + observer: &Arc, ) { let cached_assertions = &self.cached_assertions; let const_val_map = @@ -418,7 +420,9 @@ impl Continuation { }); let mut capture_map = Map::new(); for cs in endpoints.cached_captures.keys() { - capture_map.insert(cs.clone(), t.assert(observer, cs.clone())); + if let Some(h) = observer.assert(t, cs.clone()) { + capture_map.insert(cs.clone(), h); + } } endpoints.endpoints.insert(observer.clone(), capture_map); } @@ -427,7 +431,7 @@ impl Continuation { &mut self, t: &mut Activation, analysis: pattern::PatternAnalysis, - observer: &Arc, + observer: &Arc, ) { if let Entry::Occupied(mut const_val_map_entry) = self.leaf_map.entry(analysis.const_paths) diff --git a/src/tracer.rs b/src/tracer.rs index 5434b4a..b0ae3f4 100644 --- a/src/tracer.rs +++ b/src/tracer.rs @@ -1,29 +1,32 @@ use crate::actor::*; -use preserves::value::NestedValue; - -use std::any::Any; +use std::fmt::Debug; use std::sync::Arc; struct Tracer(tracing::Span); -fn set_name_oid(_ac: &mut Actor, t: &mut Tracer, r: &Arc) { - t.0.record("oid", &tracing::field::display(&r.addr.oid())); +fn set_name_oid(t: &mut Tracer, r: &Arc>) { + t.0.record("oid", &tracing::field::display(&r.oid())); } -pub fn tracer(ac: &mut Actor, name: tracing::Span) -> Arc { - ac.create_rec(Tracer(name), set_name_oid) +pub fn tracer(ac: &mut Actor, name: tracing::Span) -> Arc> { + let mut e = Tracer(name); + let r = ac.create_inert(); + set_name_oid(&mut e, &r); + r.become_entity(e); + r } -pub fn tracer_top(name: tracing::Span) -> Arc { - Actor::create_and_start_rec(crate::name!(parent: None, "tracer"), Tracer(name), set_name_oid) +pub fn tracer_top(name: tracing::Span) -> Arc> { + let mut e = Tracer(name); + let r = Actor::create_and_start_inert(crate::name!(parent: None, "tracer")); + set_name_oid(&mut e, &r); + r.become_entity(e); + r } -impl Entity for Tracer { - fn as_any(&mut self) -> &mut dyn Any { - self - } - fn assert(&mut self, _t: &mut Activation, a: _Any, h: Handle) -> ActorResult { +impl Entity for Tracer { + fn assert(&mut self, _t: &mut Activation, a: M, h: Handle) -> ActorResult { let _guard = self.0.enter(); tracing::trace!(a = debug(&a), h = debug(&h), "assert"); Ok(()) @@ -33,15 +36,15 @@ impl Entity for Tracer { tracing::trace!(h = debug(&h), "retract"); Ok(()) } - fn message(&mut self, _t: &mut Activation, m: _Any) -> ActorResult { + fn message(&mut self, _t: &mut Activation, m: M) -> ActorResult { let _guard = self.0.enter(); tracing::trace!(m = debug(&m), "message"); Ok(()) } - fn sync(&mut self, t: &mut Activation, peer: Arc) -> ActorResult { + fn sync(&mut self, t: &mut Activation, peer: Arc>) -> ActorResult { let _guard = self.0.enter(); tracing::trace!(peer = debug(&peer), "sync"); - t.message(&peer, _Any::new(true)); + t.message(&peer, Synced); Ok(()) } }