Introduce "AnyValue", a better name for "internal_protocol::_Any"

This commit is contained in:
Tony Garnock-Jones 2021-08-11 17:16:01 -04:00
parent ca85f27fbc
commit 5e5ee0bbdd
10 changed files with 85 additions and 84 deletions

View File

@ -19,7 +19,7 @@ use tokio::runtime::Runtime;
use tracing::Level; use tracing::Level;
#[inline] #[inline]
fn says(who: _Any, what: _Any) -> _Any { fn says(who: AnyValue, what: AnyValue) -> AnyValue {
let mut r = Value::simple_record("Says", 2); let mut r = Value::simple_record("Says", 2);
r.fields_vec_mut().push(who); r.fields_vec_mut().push(who);
r.fields_vec_mut().push(what); r.fields_vec_mut().push(what);
@ -28,8 +28,8 @@ fn says(who: _Any, what: _Any) -> _Any {
struct ShutdownEntity; struct ShutdownEntity;
impl Entity<_Any> for ShutdownEntity { impl Entity<AnyValue> for ShutdownEntity {
fn message(&mut self, t: &mut Activation, _m: _Any) -> ActorResult { fn message(&mut self, t: &mut Activation, _m: AnyValue) -> ActorResult {
t.state.shutdown(); t.state.shutdown();
Ok(()) Ok(())
} }
@ -61,12 +61,12 @@ pub fn bench_pub(c: &mut Criterion) {
let ds = Arc::clone(&ds); let ds = Arc::clone(&ds);
external_event(&Arc::clone(&ds.mailbox), &debtor, Box::new( external_event(&Arc::clone(&ds.mailbox), &debtor, Box::new(
move |t| ds.with_entity( move |t| ds.with_entity(
|e| e.message(t, says(_Any::new("bench_pub"), |e| e.message(t, says(AnyValue::new("bench_pub"),
Value::ByteString(vec![]).wrap())))))? Value::ByteString(vec![]).wrap())))))?
} }
external_event(&Arc::clone(&shutdown.mailbox), &debtor, Box::new( external_event(&Arc::clone(&shutdown.mailbox), &debtor, Box::new(
move |t| shutdown.with_entity( move |t| shutdown.with_entity(
|e| e.message(t, _Any::new(true)))))?; |e| e.message(t, AnyValue::new(true)))))?;
Ok(()) Ok(())
}); });
Ok(()) Ok(())
@ -89,8 +89,8 @@ pub fn bench_pub(c: &mut Criterion) {
let turn_count = Arc::clone(&turn_count); let turn_count = Arc::clone(&turn_count);
Actor::new().boot(syndicate::name!("consumer"), move |t| { Actor::new().boot(syndicate::name!("consumer"), move |t| {
struct Receiver(Arc<AtomicU64>); struct Receiver(Arc<AtomicU64>);
impl Entity<_Any> for Receiver { impl Entity<AnyValue> for Receiver {
fn message(&mut self, _t: &mut Activation, _m: _Any) -> ActorResult { fn message(&mut self, _t: &mut Activation, _m: AnyValue) -> ActorResult {
self.0.fetch_add(1, Ordering::Relaxed); self.0.fetch_add(1, Ordering::Relaxed);
Ok(()) Ok(())
} }
@ -107,7 +107,7 @@ pub fn bench_pub(c: &mut Criterion) {
}), }),
members: Map::from_iter(vec![ members: Map::from_iter(vec![
(0.into(), p::Pattern::DLit(Box::new(p::DLit { (0.into(), p::Pattern::DLit(Box::new(p::DLit {
value: _Any::new("bench_pub"), value: AnyValue::new("bench_pub"),
}))), }))),
(1.into(), p::Pattern::DBind(Box::new(p::DBind { (1.into(), p::Pattern::DBind(Box::new(p::DBind {
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)),
@ -119,7 +119,7 @@ pub fn bench_pub(c: &mut Criterion) {
ds.assert(t, &Observe { ds.assert(t, &Observe {
pattern: p::Pattern::DBind(Box::new(p::DBind { pattern: p::Pattern::DBind(Box::new(p::DBind {
pattern: p::Pattern::DLit(Box::new(p::DLit { pattern: p::Pattern::DLit(Box::new(p::DLit {
value: _Any::new(true), value: AnyValue::new(true),
})), })),
})), })),
observer: shutdown, observer: shutdown,
@ -130,14 +130,14 @@ pub fn bench_pub(c: &mut Criterion) {
let ds = Arc::clone(&ds); let ds = Arc::clone(&ds);
external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new( external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new(
move |t| ds.underlying.with_entity( move |t| ds.underlying.with_entity(
|e| e.message(t, says(_Any::new("bench_pub"), |e| e.message(t, says(AnyValue::new("bench_pub"),
Value::ByteString(vec![]).wrap())))))? Value::ByteString(vec![]).wrap())))))?
} }
{ {
let ds = Arc::clone(&ds); let ds = Arc::clone(&ds);
external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new( external_event(&Arc::clone(&ds.underlying.mailbox), &debtor, Box::new(
move |t| ds.underlying.with_entity( move |t| ds.underlying.with_entity(
|e| e.message(t, _Any::new(true)))))?; |e| e.message(t, AnyValue::new(true)))))?;
} }
Ok(()) Ok(())
}); });

View File

@ -36,7 +36,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Activation::for_actor(&ac, boot_debtor, |t| { Activation::for_actor(&ac, boot_debtor, |t| {
relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| { relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| {
let consumer = syndicate::entity(0) let consumer = syndicate::entity(0)
.on_message(|message_count, _t, m: _Any| { .on_message(|message_count, _t, m: AnyValue| {
if m.value().is_boolean() { if m.value().is_boolean() {
tracing::info!("{:?} messages in the last second", message_count); tracing::info!("{:?} messages in the last second", message_count);
*message_count = 0; *message_count = 0;
@ -72,7 +72,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
external_event(&Arc::clone(&consumer.underlying.mailbox), external_event(&Arc::clone(&consumer.underlying.mailbox),
&Debtor::new(syndicate::name!("debtor")), &Debtor::new(syndicate::name!("debtor")),
Box::new(move |t| consumer.underlying.with_entity( Box::new(move |t| consumer.underlying.with_entity(
|e| e.message(t, _Any::new(true)))))?; |e| e.message(t, AnyValue::new(true)))))?;
} }
}); });
Ok(None) Ok(None)

View File

@ -52,7 +52,7 @@ fn now() -> u64 {
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).expect("time after epoch").as_nanos() as u64 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).expect("time after epoch").as_nanos() as u64
} }
fn simple_record2(label: &str, v1: _Any, v2: _Any) -> _Any { fn simple_record2(label: &str, v1: AnyValue, v2: AnyValue) -> AnyValue {
let mut r = Value::simple_record(label, 2); let mut r = Value::simple_record(label, 2);
r.fields_vec_mut().push(v1); r.fields_vec_mut().push(v1);
r.fields_vec_mut().push(v2); r.fields_vec_mut().push(v2);
@ -121,7 +121,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let self_ref = t.state.create_inert(); let self_ref = t.state.create_inert();
self_ref.become_entity( self_ref.become_entity(
syndicate::entity(Arc::clone(&self_ref)) syndicate::entity(Arc::clone(&self_ref))
.on_message(move |self_ref, t, m: _Any| { .on_message(move |self_ref, t, m: AnyValue| {
match m.value().as_boolean() { match m.value().as_boolean() {
Some(true) => { Some(true) => {
tracing::info!("{:?} turns, {:?} events in the last second", tracing::info!("{:?} turns, {:?} events in the last second",
@ -146,7 +146,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
} else { } else {
if let None = current_reply { if let None = current_reply {
turn_counter += 1; turn_counter += 1;
t.message_for_myself(&self_ref, _Any::new(false)); t.message_for_myself(&self_ref, AnyValue::new(false));
let rtt_ns = now() - timestamp.value().to_u64()?; let rtt_ns = now() - timestamp.value().to_u64()?;
rtt_ns_samples[rtt_batch_count] = rtt_ns; rtt_ns_samples[rtt_batch_count] = rtt_ns;
rtt_batch_count += 1; rtt_batch_count += 1;
@ -197,7 +197,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
external_event(&Arc::clone(&consumer.underlying.mailbox), external_event(&Arc::clone(&consumer.underlying.mailbox),
&Debtor::new(syndicate::name!("debtor")), &Debtor::new(syndicate::name!("debtor")),
Box::new(move |t| consumer.underlying.with_entity( Box::new(move |t| consumer.underlying.with_entity(
|e| e.message(t, _Any::new(true)))))?; |e| e.message(t, AnyValue::new(true)))))?;
} }
}); });
@ -206,7 +206,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let action_count = c.action_count; let action_count = c.action_count;
let debtor = Arc::clone(t.debtor()); let debtor = Arc::clone(t.debtor());
t.state.linked_task(syndicate::name!("boot-ping"), async move { t.state.linked_task(syndicate::name!("boot-ping"), async move {
let padding: _Any = Value::ByteString(vec![0; bytes_padding]).wrap(); let padding: AnyValue = Value::ByteString(vec![0; bytes_padding]).wrap();
for _ in 0..turn_count { for _ in 0..turn_count {
let mut events: PendingEventQueue = vec![]; let mut events: PendingEventQueue = vec![];
let current_rec = simple_record2(send_label, let current_rec = simple_record2(send_label,

View File

@ -22,7 +22,7 @@ pub struct Config {
} }
#[inline] #[inline]
fn says(who: _Any, what: _Any) -> _Any { fn says(who: AnyValue, what: AnyValue) -> AnyValue {
let mut r = Value::simple_record("Says", 2); let mut r = Value::simple_record("Says", 2);
r.fields_vec_mut().push(who); r.fields_vec_mut().push(who);
r.fields_vec_mut().push(what); r.fields_vec_mut().push(what);
@ -42,7 +42,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split(); let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
Activation::for_actor(&ac, boot_debtor, |t| { Activation::for_actor(&ac, boot_debtor, |t| {
relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| {
let padding: _Any = Value::ByteString(vec![0; config.bytes_padding]).wrap(); let padding: AnyValue = Value::ByteString(vec![0; config.bytes_padding]).wrap();
let action_count = config.action_count; let action_count = config.action_count;
let debtor = Debtor::new(syndicate::name!("debtor")); let debtor = Debtor::new(syndicate::name!("debtor"));
t.state.linked_task(syndicate::name!("sender"), async move { t.state.linked_task(syndicate::name!("sender"), async move {

View File

@ -90,7 +90,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
external_event(&Arc::clone(&consumer.underlying.mailbox), external_event(&Arc::clone(&consumer.underlying.mailbox),
&Debtor::new(syndicate::name!("debtor")), &Debtor::new(syndicate::name!("debtor")),
Box::new(move |t| consumer.underlying.with_entity( Box::new(move |t| consumer.underlying.with_entity(
|e| e.message(t, _Any::new(true)))))?; |e| e.message(t, AnyValue::new(true)))))?;
} }
}); });
Ok(None) Ok(None)

View File

@ -29,7 +29,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| { relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| {
let debtor = Debtor::new(syndicate::name!("debtor")); let debtor = Debtor::new(syndicate::name!("debtor"));
t.state.linked_task(syndicate::name!("sender"), async move { t.state.linked_task(syndicate::name!("sender"), async move {
let presence: _Any = Value::simple_record1( let presence: AnyValue = Value::simple_record1(
"Present", "Present",
Value::from(std::process::id()).wrap()).wrap(); Value::from(std::process::id()).wrap()).wrap();
let handle = syndicate::next_handle(); let handle = syndicate::next_handle();

View File

@ -29,7 +29,8 @@ use tokio_util::sync::CancellationToken;
use tracing::Instrument; use tracing::Instrument;
pub use super::schemas::internal_protocol::_Any; pub type AnyValue = super::schemas::internal_protocol::_Any;
pub type Handle = u64; pub type Handle = u64;
pub type ActorResult = Result<(), Error>; pub type ActorResult = Result<(), Error>;
@ -145,14 +146,14 @@ pub struct Ref<M> {
#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] #[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct Cap { pub struct Cap {
pub underlying: Arc<Ref<_Any>>, pub underlying: Arc<Ref<AnyValue>>,
pub attenuation: Vec<CheckedCaveat>, pub attenuation: Vec<CheckedCaveat>,
} }
pub struct Guard<M> pub struct Guard<M>
where where
for<'a> &'a M: Into<_Any>, for<'a> &'a M: Into<AnyValue>,
for<'a> M: TryFrom<&'a _Any>, for<'a> M: TryFrom<&'a AnyValue>,
{ {
underlying: Arc<Ref<M>> underlying: Arc<Ref<M>>
} }
@ -190,9 +191,9 @@ pub fn start_debt_reporter() {
}); });
} }
impl TryFrom<&_Any> for Synced { impl TryFrom<&AnyValue> for Synced {
type Error = ParseError; type Error = ParseError;
fn try_from(value: &_Any) -> Result<Self, Self::Error> { fn try_from(value: &AnyValue) -> Result<Self, Self::Error> {
if let Some(true) = value.value().as_boolean() { if let Some(true) = value.value().as_boolean() {
Ok(Synced) Ok(Synced)
} else { } else {
@ -201,9 +202,9 @@ impl TryFrom<&_Any> for Synced {
} }
} }
impl From<&Synced> for _Any { impl From<&Synced> for AnyValue {
fn from(_value: &Synced) -> Self { fn from(_value: &Synced) -> Self {
_Any::new(true) AnyValue::new(true)
} }
} }
@ -449,7 +450,7 @@ fn send_actions(
) -> ActorResult { ) -> ActorResult {
let token_count = t.len(); let token_count = t.len();
tx.send(SystemMessage::Turn(LoanedItem::new(debtor, token_count, t))) tx.send(SystemMessage::Turn(LoanedItem::new(debtor, token_count, t)))
.map_err(|_| error("Target actor not running", _Any::new(false))) .map_err(|_| error("Target actor not running", AnyValue::new(false)))
} }
impl std::fmt::Debug for Mailbox { impl std::fmt::Debug for Mailbox {
@ -566,7 +567,7 @@ impl Actor {
loop { loop {
match self.rx.recv().await { match self.rx.recv().await {
None => { None => {
return self.terminate(Err(error("Unexpected channel close", _Any::new(false)))); return self.terminate(Err(error("Unexpected channel close", AnyValue::new(false))));
} }
Some(m) => match m { Some(m) => match m {
SystemMessage::Release => { SystemMessage::Release => {
@ -597,7 +598,7 @@ impl Actor {
} }
fn panicked_err() -> Option<ActorResult> { fn panicked_err() -> Option<ActorResult> {
Some(Err(error("Actor panicked", _Any::new(false)))) Some(Err(error("Actor panicked", AnyValue::new(false))))
} }
impl ActorRef { impl ActorRef {
@ -801,8 +802,8 @@ impl<M> std::fmt::Debug for Ref<M> {
impl Cap { impl Cap {
pub fn guard<M: 'static + Send>(underlying: &Arc<Ref<M>>) -> Arc<Self> pub fn guard<M: 'static + Send>(underlying: &Arc<Ref<M>>) -> Arc<Self>
where where
for<'a> &'a M: Into<_Any>, for<'a> &'a M: Into<AnyValue>,
for<'a> M: TryFrom<&'a _Any>, for<'a> M: TryFrom<&'a AnyValue>,
{ {
Self::new(&Arc::new(Ref { Self::new(&Arc::new(Ref {
mailbox: Arc::clone(&underlying.mailbox), mailbox: Arc::clone(&underlying.mailbox),
@ -810,7 +811,7 @@ impl Cap {
})) }))
} }
pub fn new(underlying: &Arc<Ref<_Any>>) -> Arc<Self> { pub fn new(underlying: &Arc<Ref<AnyValue>>) -> Arc<Self> {
Arc::new(Cap { Arc::new(Cap {
underlying: Arc::clone(underlying), underlying: Arc::clone(underlying),
attenuation: Vec::new(), attenuation: Vec::new(),
@ -823,7 +824,7 @@ impl Cap {
Ok(Arc::new(r)) Ok(Arc::new(r))
} }
pub fn rewrite(&self, mut a: _Any) -> Option<_Any> { pub fn rewrite(&self, mut a: AnyValue) -> Option<AnyValue> {
for c in &self.attenuation { for c in &self.attenuation {
match c.rewrite(&a) { match c.rewrite(&a) {
Some(v) => a = v, Some(v) => a = v,
@ -833,11 +834,11 @@ impl Cap {
Some(a) Some(a)
} }
pub fn assert<M: Into<_Any>>(&self, t: &mut Activation, m: M) -> Option<Handle> { pub fn assert<M: Into<AnyValue>>(&self, t: &mut Activation, m: M) -> Option<Handle> {
self.rewrite(m.into()).map(|m| t.assert(&self.underlying, m)) self.rewrite(m.into()).map(|m| t.assert(&self.underlying, m))
} }
pub fn message<M: Into<_Any>>(&self, t: &mut Activation, m: M) { pub fn message<M: Into<AnyValue>>(&self, t: &mut Activation, m: M) {
if let Some(m) = self.rewrite(m.into()) { if let Some(m) = self.rewrite(m.into()) {
t.message(&self.underlying, m) t.message(&self.underlying, m)
} }
@ -872,12 +873,12 @@ impl std::convert::From<&Cap> for IOValue {
} }
} }
impl<M> Entity<_Any> for Guard<M> impl<M> Entity<AnyValue> for Guard<M>
where where
for<'a> &'a M: Into<_Any>, for<'a> &'a M: Into<AnyValue>,
for<'a> M: TryFrom<&'a _Any>, for<'a> M: TryFrom<&'a AnyValue>,
{ {
fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult { fn assert(&mut self, t: &mut Activation, a: AnyValue, h: Handle) -> ActorResult {
match M::try_from(&a) { match M::try_from(&a) {
Ok(a) => self.underlying.with_entity(|e| e.assert(t, a, h)), Ok(a) => self.underlying.with_entity(|e| e.assert(t, a, h)),
Err(_) => Ok(()), Err(_) => Ok(()),
@ -886,7 +887,7 @@ where
fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult { fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult {
self.underlying.with_entity(|e| e.retract(t, h)) self.underlying.with_entity(|e| e.retract(t, h))
} }
fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult { fn message(&mut self, t: &mut Activation, m: AnyValue) -> ActorResult {
match M::try_from(&m) { match M::try_from(&m) {
Ok(m) => self.underlying.with_entity(|e| e.message(t, m)), Ok(m) => self.underlying.with_entity(|e| e.message(t, m)),
Err(_) => Ok(()), Err(_) => Ok(()),

View File

@ -103,11 +103,11 @@ where
} }
} }
impl<E, Fa, Fm> DuringEntity<_Any, E, Fa, Fm> impl<E, Fa, Fm> DuringEntity<AnyValue, E, Fa, Fm>
where where
E: 'static + Send, E: 'static + Send,
Fa: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> DuringResult<E>, Fa: 'static + Send + FnMut(&mut E, &mut Activation, AnyValue) -> DuringResult<E>,
Fm: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> ActorResult, Fm: 'static + Send + FnMut(&mut E, &mut Activation, AnyValue) -> ActorResult,
{ {
pub fn create_cap(self, ac: &mut RunningActor) -> Arc<Cap> pub fn create_cap(self, ac: &mut RunningActor) -> Arc<Cap>
{ {

View File

@ -162,7 +162,7 @@ pub fn connect_stream<I, O, E, F>(
let i = Input::Bytes(Box::pin(i)); let i = Input::Bytes(Box::pin(i));
let o = Output::Bytes(Box::pin(o)); let o = Output::Bytes(Box::pin(o));
let gatekeeper = TunnelRelay::run(t, i, o, None, Some(sturdy::Oid(0.into()))).unwrap(); let gatekeeper = TunnelRelay::run(t, i, o, None, Some(sturdy::Oid(0.into()))).unwrap();
let main_entity = t.state.create(during::entity(initial_state).on_asserted(move |state, t, a: _Any| { let main_entity = t.state.create(during::entity(initial_state).on_asserted(move |state, t, a: AnyValue| {
let denotation = a.value().to_embedded()?; let denotation = a.value().to_embedded()?;
f(state, t, Arc::clone(denotation)) f(state, t, Arc::clone(denotation))
})); }));
@ -217,13 +217,13 @@ impl TunnelRelay {
match src.peek() { match src.peek() {
Ok(v) => if v >= 128 { Ok(v) => if v >= 128 {
self.output_text = false; self.output_text = false;
let mut r = src.packed::<_, _Any, _>(&mut dec); let mut r = src.packed::<_, AnyValue, _>(&mut dec);
let res = P::Packet::deserialize(&mut r); let res = P::Packet::deserialize(&mut r);
(res, r.source.index) (res, r.source.index)
} else { } else {
self.output_text = true; self.output_text = true;
let mut dec = ViaCodec::new(dec); let mut dec = ViaCodec::new(dec);
let mut r = src.text::<_, _Any, _>(&mut dec); let mut r = src.text::<_, AnyValue, _>(&mut dec);
let res = P::Packet::deserialize(&mut r); let res = P::Packet::deserialize(&mut r);
(res, r.source.index) (res, r.source.index)
}, },
@ -266,7 +266,7 @@ impl TunnelRelay {
let target = match self.membranes.exported.oid_map.get(&sturdy::Oid(oid.0.clone())) { let target = match self.membranes.exported.oid_map.get(&sturdy::Oid(oid.0.clone())) {
Some(ws) => &ws.obj, Some(ws) => &ws.obj,
None => return Err(error("Cannot deliver event: nonexistent oid", None => return Err(error("Cannot deliver event: nonexistent oid",
_Any::from(&P::TurnEvent { oid, event }))), AnyValue::from(&P::TurnEvent { oid, event }))),
}; };
match event { match event {
P::Event::Assert(b) => { P::Event::Assert(b) => {
@ -278,14 +278,14 @@ impl TunnelRelay {
})?; })?;
if let Some(local_handle) = target.assert(t, a) { if let Some(local_handle) = target.assert(t, a) {
if let Some(_) = self.inbound_assertions.insert(remote_handle, (local_handle, imported)) { if let Some(_) = self.inbound_assertions.insert(remote_handle, (local_handle, imported)) {
return Err(error("Assertion with duplicate handle", _Any::new(false))); return Err(error("Assertion with duplicate handle", AnyValue::new(false)));
} }
} }
} }
P::Event::Retract(b) => { P::Event::Retract(b) => {
let P::Retract { handle: remote_handle } = *b; let P::Retract { handle: remote_handle } = *b;
let (local_handle, imported) = match self.inbound_assertions.remove(&remote_handle) { let (local_handle, imported) = match self.inbound_assertions.remove(&remote_handle) {
None => return Err(error("Retraction of nonexistent handle", _Any::from(&remote_handle))), None => return Err(error("Retraction of nonexistent handle", AnyValue::from(&remote_handle))),
Some(wss) => wss, Some(wss) => wss,
}; };
for ws in imported.into_iter() { for ws in imported.into_iter() {
@ -299,7 +299,7 @@ impl TunnelRelay {
a.foreach_embedded(&mut |r| { a.foreach_embedded(&mut |r| {
let ws = imported_membrane.acquire(r); let ws = imported_membrane.acquire(r);
match ws.ref_count.load(Ordering::SeqCst) { match ws.ref_count.load(Ordering::SeqCst) {
1 => Err(error("Cannot receive transient reference", _Any::new(false))), 1 => Err(error("Cannot receive transient reference", AnyValue::new(false))),
_ => Ok(()) _ => Ok(())
} }
})?; })?;
@ -314,7 +314,7 @@ impl TunnelRelay {
} }
impl Entity<Synced> for SyncPeer { impl Entity<Synced> for SyncPeer {
fn message(&mut self, t: &mut Activation, _a: Synced) -> ActorResult { fn message(&mut self, t: &mut Activation, _a: Synced) -> ActorResult {
self.peer.message(t, _Any::new(true)); self.peer.message(t, AnyValue::new(true));
let mut g = self.relay_ref.lock().expect("unpoisoned"); let mut g = self.relay_ref.lock().expect("unpoisoned");
let tr = g.as_mut().expect("initialized"); let tr = g.as_mut().expect("initialized");
if let Some(ws) = tr.membranes.imported.ref_map.get(&self.peer) { if let Some(ws) = tr.membranes.imported.ref_map.get(&self.peer) {
@ -360,7 +360,7 @@ impl TunnelRelay {
a.foreach_embedded(&mut |r| { a.foreach_embedded(&mut |r| {
let ws = self.membranes.export_ref(Arc::clone(r), false); let ws = self.membranes.export_ref(Arc::clone(r), false);
match ws.ref_count.load(Ordering::SeqCst) { match ws.ref_count.load(Ordering::SeqCst) {
0 => Err(error("Cannot send transient reference", _Any::new(false))), 0 => Err(error("Cannot send transient reference", AnyValue::new(false))),
_ => Ok(()) _ => Ok(())
} }
})?; })?;
@ -371,14 +371,14 @@ impl TunnelRelay {
} }
fn encode_packet(&mut self, p: P::Packet) -> Result<Vec<u8>, Error> { fn encode_packet(&mut self, p: P::Packet) -> Result<Vec<u8>, Error> {
let item = _Any::from(&p); let item = AnyValue::from(&p);
// tracing::trace!(packet = debug(&item), "<--"); // tracing::trace!(packet = debug(&item), "<--");
if self.output_text { if self.output_text {
let mut s = TextWriter::encode::<_, _Any, _>(&mut self.membranes, &item)?; let mut s = TextWriter::encode::<_, AnyValue, _>(&mut self.membranes, &item)?;
s.push('\n'); s.push('\n');
Ok(s.into_bytes()) Ok(s.into_bytes())
} else { } else {
Ok(PackedWriter::encode::<_, _Any, _>(&mut self.membranes, &item)?) Ok(PackedWriter::encode::<_, AnyValue, _>(&mut self.membranes, &item)?)
} }
} }
@ -485,7 +485,7 @@ impl DomainEncode<P::_Ptr> for Membranes {
w: &mut W, w: &mut W,
d: &P::_Ptr, d: &P::_Ptr,
) -> io::Result<()> { ) -> io::Result<()> {
w.write(&mut NoEmbeddedDomainCodec, &_Any::from(&match self.exported.ref_map.get(d) { w.write(&mut NoEmbeddedDomainCodec, &AnyValue::from(&match self.exported.ref_map.get(d) {
Some(ws) => sturdy::WireRef::Mine { Some(ws) => sturdy::WireRef::Mine {
oid: Box::new(ws.oid.clone()), oid: Box::new(ws.oid.clone()),
}, },
@ -608,8 +608,8 @@ impl Entity<()> for TunnelRefEntity {
} }
} }
impl Entity<_Any> for RelayEntity { impl Entity<AnyValue> for RelayEntity {
fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult { fn assert(&mut self, t: &mut Activation, a: AnyValue, h: Handle) -> ActorResult {
let mut g = self.relay_ref.lock().expect("unpoisoned"); let mut g = self.relay_ref.lock().expect("unpoisoned");
let tr = g.as_mut().expect("initialized"); let tr = g.as_mut().expect("initialized");
tr.send_event(t, self.oid.clone(), P::Event::Assert(Box::new(P::Assert { tr.send_event(t, self.oid.clone(), P::Event::Assert(Box::new(P::Assert {
@ -624,7 +624,7 @@ impl Entity<_Any> for RelayEntity {
handle: P::Handle(h.into()), handle: P::Handle(h.into()),
}))) })))
} }
fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult { fn message(&mut self, t: &mut Activation, m: AnyValue) -> ActorResult {
let mut g = self.relay_ref.lock().expect("unpoisoned"); let mut g = self.relay_ref.lock().expect("unpoisoned");
let tr = g.as_mut().expect("initialized"); let tr = g.as_mut().expect("initialized");
tr.send_event(t, self.oid.clone(), P::Event::Message(Box::new(P::Message { tr.send_event(t, self.oid.clone(), P::Event::Message(Box::new(P::Message {

View File

@ -6,7 +6,7 @@ use std::convert::TryFrom;
use std::convert::TryInto; use std::convert::TryInto;
use std::sync::Arc; use std::sync::Arc;
use crate::actor::_Any; use crate::actor::AnyValue;
use crate::actor::Activation; use crate::actor::Activation;
use crate::actor::Handle; use crate::actor::Handle;
use crate::actor::Cap; use crate::actor::Cap;
@ -15,18 +15,18 @@ use crate::pattern::{self, PathStep, Path, Paths};
type Bag<A> = bag::BTreeBag<A>; type Bag<A> = bag::BTreeBag<A>;
type Captures = _Any; type Captures = AnyValue;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub enum Guard { pub enum Guard {
Rec(_Any, usize), Rec(AnyValue, usize),
Seq(usize), Seq(usize),
Map, Map,
} }
#[derive(Debug)] #[derive(Debug)]
pub struct Index { pub struct Index {
all_assertions: Bag<_Any>, all_assertions: Bag<AnyValue>,
observer_count: usize, observer_count: usize,
root: Node, root: Node,
} }
@ -39,7 +39,7 @@ struct Node {
#[derive(Debug)] #[derive(Debug)]
struct Continuation { struct Continuation {
cached_assertions: Set<_Any>, cached_assertions: Set<AnyValue>,
leaf_map: Map<Paths, Map<Captures, Leaf>>, leaf_map: Map<Paths, Map<Captures, Leaf>>,
} }
@ -51,7 +51,7 @@ struct Selector {
#[derive(Debug)] #[derive(Debug)]
struct Leaf { // aka Topic struct Leaf { // aka Topic
cached_assertions: Set<_Any>, cached_assertions: Set<AnyValue>,
endpoints_map: Map<Paths, Endpoints>, endpoints_map: Map<Paths, Endpoints>,
} }
@ -94,7 +94,7 @@ impl Index {
self.observer_count -= 1; self.observer_count -= 1;
} }
pub fn insert(&mut self, t: &mut Activation, outer_value: &_Any) { pub fn insert(&mut self, t: &mut Activation, outer_value: &AnyValue) {
let net = self.all_assertions.change(outer_value.clone(), 1); let net = self.all_assertions.change(outer_value.clone(), 1);
match net { match net {
bag::Net::AbsentToPresent => { bag::Net::AbsentToPresent => {
@ -119,7 +119,7 @@ impl Index {
} }
} }
pub fn remove(&mut self, t: &mut Activation, outer_value: &_Any) { pub fn remove(&mut self, t: &mut Activation, outer_value: &AnyValue) {
let net = self.all_assertions.change(outer_value.clone(), -1); let net = self.all_assertions.change(outer_value.clone(), -1);
match net { match net {
bag::Net::PresentToAbsent => { bag::Net::PresentToAbsent => {
@ -144,7 +144,7 @@ impl Index {
} }
} }
pub fn send(&mut self, t: &mut Activation, outer_value: &_Any, delivery_count: &mut usize) { pub fn send(&mut self, t: &mut Activation, outer_value: &AnyValue, delivery_count: &mut usize) {
Modification::new( Modification::new(
false, false,
&outer_value, &outer_value,
@ -256,24 +256,24 @@ impl<'a, T> Stack<'a, T> {
} }
struct Modification<'op, FCont, FLeaf, FEndpoints> struct Modification<'op, FCont, FLeaf, FEndpoints>
where FCont: FnMut(&mut Continuation, &_Any) -> (), where FCont: FnMut(&mut Continuation, &AnyValue) -> (),
FLeaf: FnMut(&mut Leaf, &_Any) -> (), FLeaf: FnMut(&mut Leaf, &AnyValue) -> (),
FEndpoints: FnMut(&mut Endpoints, Captures) -> () FEndpoints: FnMut(&mut Endpoints, Captures) -> ()
{ {
create_leaf_if_absent: bool, create_leaf_if_absent: bool,
outer_value: &'op _Any, outer_value: &'op AnyValue,
m_cont: FCont, m_cont: FCont,
m_leaf: FLeaf, m_leaf: FLeaf,
m_endpoints: FEndpoints, m_endpoints: FEndpoints,
} }
impl<'op, FCont, FLeaf, FEndpoints> Modification<'op, FCont, FLeaf, FEndpoints> impl<'op, FCont, FLeaf, FEndpoints> Modification<'op, FCont, FLeaf, FEndpoints>
where FCont: FnMut(&mut Continuation, &_Any) -> (), where FCont: FnMut(&mut Continuation, &AnyValue) -> (),
FLeaf: FnMut(&mut Leaf, &_Any) -> (), FLeaf: FnMut(&mut Leaf, &AnyValue) -> (),
FEndpoints: FnMut(&mut Endpoints, Captures) -> () FEndpoints: FnMut(&mut Endpoints, Captures) -> ()
{ {
fn new(create_leaf_if_absent: bool, fn new(create_leaf_if_absent: bool,
outer_value: &'op _Any, outer_value: &'op AnyValue,
m_cont: FCont, m_cont: FCont,
m_leaf: FLeaf, m_leaf: FLeaf,
m_endpoints: FEndpoints, m_endpoints: FEndpoints,
@ -291,7 +291,7 @@ where FCont: FnMut(&mut Continuation, &_Any) -> (),
self.node(n, &Stack::Item(&Value::from(vec![self.outer_value.clone()]).wrap(), &Stack::Empty)) self.node(n, &Stack::Item(&Value::from(vec![self.outer_value.clone()]).wrap(), &Stack::Empty))
} }
fn node(&mut self, n: &mut Node, term_stack: &Stack<&_Any>) { fn node(&mut self, n: &mut Node, term_stack: &Stack<&AnyValue>) {
self.continuation(&mut n.continuation); self.continuation(&mut n.continuation);
for (selector, table) in &mut n.edges { for (selector, table) in &mut n.edges {
let mut next_stack = term_stack; let mut next_stack = term_stack;
@ -338,7 +338,7 @@ where FCont: FnMut(&mut Continuation, &_Any) -> (),
} }
} }
fn class_of(v: &_Any) -> Option<Guard> { fn class_of(v: &AnyValue) -> Option<Guard> {
match v.value() { match v.value() {
Value::Sequence(vs) => Some(Guard::Seq(vs.len())), Value::Sequence(vs) => Some(Guard::Seq(vs.len())),
Value::Record(r) => Some(Guard::Rec(r.label().clone(), r.arity())), Value::Record(r) => Some(Guard::Rec(r.label().clone(), r.arity())),
@ -347,7 +347,7 @@ fn class_of(v: &_Any) -> Option<Guard> {
} }
} }
fn project_path<'a>(v: &'a _Any, p: &Path) -> Option<&'a _Any> { fn project_path<'a>(v: &'a AnyValue, p: &Path) -> Option<&'a AnyValue> {
let mut v = v; let mut v = v;
for i in p { for i in p {
match step(v, i) { match step(v, i) {
@ -358,7 +358,7 @@ fn project_path<'a>(v: &'a _Any, p: &Path) -> Option<&'a _Any> {
Some(v) Some(v)
} }
fn project_paths<'a>(v: &'a _Any, ps: &Paths) -> Option<Captures> { fn project_paths<'a>(v: &'a AnyValue, ps: &Paths) -> Option<Captures> {
let mut vs = Vec::new(); let mut vs = Vec::new();
for p in ps { for p in ps {
match project_path(v, p) { match project_path(v, p) {
@ -369,7 +369,7 @@ fn project_paths<'a>(v: &'a _Any, ps: &Paths) -> Option<Captures> {
Some(Captures::new(vs)) Some(Captures::new(vs))
} }
fn step<'a>(v: &'a _Any, s: &PathStep) -> Option<&'a _Any> { fn step<'a>(v: &'a AnyValue, s: &PathStep) -> Option<&'a AnyValue> {
match (v.value(), s) { match (v.value(), s) {
(Value::Sequence(vs), PathStep::Index(i)) => (Value::Sequence(vs), PathStep::Index(i)) =>
if *i < vs.len() { Some(&vs[*i]) } else { None }, if *i < vs.len() { Some(&vs[*i]) } else { None },
@ -383,7 +383,7 @@ fn step<'a>(v: &'a _Any, s: &PathStep) -> Option<&'a _Any> {
} }
impl Continuation { impl Continuation {
fn new(cached_assertions: Set<_Any>) -> Self { fn new(cached_assertions: Set<AnyValue>) -> Self {
Continuation { cached_assertions, leaf_map: Map::new() } Continuation { cached_assertions, leaf_map: Map::new() }
} }