From 6c3f039026d448cdd5d3147f3dfba57d7ddda773 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 22 Jul 2021 10:07:49 +0200 Subject: [PATCH] Use u64 internally for assertion handles --- src/actor.rs | 10 ++-- src/bin/syndicate-server.rs | 4 +- src/lib.rs | 3 +- src/relay.rs | 93 +++++++++++++++++++------------------ src/skeleton.rs | 2 +- 5 files changed, 56 insertions(+), 56 deletions(-) diff --git a/src/actor.rs b/src/actor.rs index 03e1dfd..2ec78db 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -29,7 +29,7 @@ use tokio_util::sync::CancellationToken; use tracing::Instrument; pub use super::schemas::internal_protocol::_Any; -pub use super::schemas::internal_protocol::Handle; +pub type Handle = u64; pub type ActorResult = Result<(), Error>; pub type ActorHandle = tokio::task::JoinHandle; @@ -225,15 +225,13 @@ impl<'activation> Activation<'activation> { if let Some(assertion) = r.rewrite(a.into()) { { let r = Arc::clone(r); - let handle = handle.clone(); self.queue_for(&r).push(Box::new( move |t| r.with_entity(|e| e.assert(t, assertion, handle)))); } { let r = Arc::clone(r); - let handle = handle.clone(); self.actor.outbound_assertions.insert( - handle.clone(), + handle, Destination::Remote(Arc::clone(&r), Box::new( move |t| r.with_entity(|e| e.retract(t, handle))))); } @@ -247,15 +245,13 @@ impl<'activation> Activation<'activation> { if let Some(assertion) = r.rewrite(a.into()) { { let r = Arc::clone(r); - let handle = handle.clone(); self.immediate_self.push(Box::new( move |t| r.with_entity(|e| e.assert(t, assertion, handle)))); } { let r = Arc::clone(r); - let handle = handle.clone(); self.actor.outbound_assertions.insert( - handle.clone(), + handle, Destination::ImmediateSelf(Box::new( move |t| r.with_entity(|e| e.retract(t, handle))))); } diff --git a/src/bin/syndicate-server.rs b/src/bin/syndicate-server.rs index bb03611..f8cb1c2 100644 --- a/src/bin/syndicate-server.rs +++ b/src/bin/syndicate-server.rs @@ -217,7 +217,7 @@ fn handle_resolve(ds: &mut Arc, t: &mut Activation, a: _Any) -> DuringResul target = debug(&target), "sturdyref resolved"); let h = t.assert(observer, _Any::domain(target)); - Ok(Some(Box::new(|_observer, t| Ok(t.retract(h))))) + Ok(Some(Box::new(move |_observer, t| Ok(t.retract(h))))) } } }) @@ -245,7 +245,7 @@ fn handle_resolve(ds: &mut Arc, t: &mut Activation, a: _Any) -> DuringResul })), observer: handler, }); - Ok(Some(Box::new(|_ds, t| Ok(t.retract(oh))))) + Ok(Some(Box::new(move |_ds, t| Ok(t.retract(oh))))) } } } diff --git a/src/lib.rs b/src/lib.rs index 0bfa194..7daaec5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,8 +35,7 @@ pub fn next_actor_id() -> ActorId { static NEXT_HANDLE: AtomicU64 = AtomicU64::new(3); pub fn next_handle() -> Handle { - Handle(value::signed_integer::SignedInteger::from( - NEXT_HANDLE.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed) as u128)) + NEXT_HANDLE.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed) } static NEXT_MAILBOX_ID: AtomicU64 = AtomicU64::new(4); diff --git a/src/relay.rs b/src/relay.rs index 850b4c9..645275b 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -6,7 +6,7 @@ use crate::during; use crate::error::Error; use crate::error::error; use crate::schemas::gatekeeper; -use crate::schemas::internal_protocol::*; +use crate::schemas::internal_protocol as P; use crate::schemas::sturdy; use crate::schemas::tunnel_relay; @@ -81,10 +81,10 @@ pub struct TunnelRelay { self_ref: Arc, input_buffer: BytesMut, - inbound_assertions: Map>)>, - outbound_assertions: Map>>, + inbound_assertions: Map>)>, + outbound_assertions: Map>>, membranes: Membranes, - pending_outbound: Vec, + pending_outbound: Vec, output: UnboundedSender>>, } @@ -202,27 +202,27 @@ impl TunnelRelay { result } - fn handle_inbound_packet(&mut self, t: &mut Activation, p: Packet) -> ActorResult { + fn handle_inbound_packet(&mut self, t: &mut Activation, p: P::Packet) -> ActorResult { // tracing::trace!(packet = debug(&p), "-->"); match p { - Packet::Error(b) => { + P::Packet::Error(b) => { tracing::info!(message = debug(b.message.clone()), detail = debug(b.detail.clone()), "received Error from peer"); Err(*b) }, - Packet::Turn(b) => { + P::Packet::Turn(b) => { let t = &mut Activation::new(t.actor, Arc::clone(&t.debtor)); - let Turn(events) = *b; - for TurnEvent { oid, event } in events { + let P::Turn(events) = *b; + for P::TurnEvent { oid, event } in events { let target = match self.membranes.exported.oid_map.get(&sturdy::Oid(oid.0.clone())) { Some(ws) => &ws.obj, None => return Err(error("Cannot deliver event: nonexistent oid", - _Any::from(&TurnEvent { oid, event }))), + _Any::from(&P::TurnEvent { oid, event }))), }; match event { - Event::Assert(b) => { - let Assert { assertion: Assertion(a), handle: remote_handle } = *b; + P::Event::Assert(b) => { + let P::Assert { assertion: P::Assertion(a), handle: remote_handle } = *b; let mut imported = vec![]; let imported_membrane = &mut self.membranes.imported; a.foreach_embedded::<_, Error>(&mut |r| { @@ -233,8 +233,8 @@ impl TunnelRelay { return Err(error("Assertion with duplicate handle", _Any::new(false))); } } - Event::Retract(b) => { - let Retract { handle: remote_handle } = *b; + P::Event::Retract(b) => { + let P::Retract { handle: remote_handle } = *b; let (local_handle, imported) = match self.inbound_assertions.remove(&remote_handle) { None => return Err(error("Retraction of nonexistent handle", _Any::from(&remote_handle))), Some(wss) => wss, @@ -244,8 +244,8 @@ impl TunnelRelay { } t.retract(local_handle); } - Event::Message(b) => { - let Message { body: Assertion(a) } = *b; + P::Event::Message(b) => { + let P::Message { body: P::Assertion(a) } = *b; let imported_membrane = &mut self.membranes.imported; a.foreach_embedded(&mut |r| { let ws = imported_membrane.acquire(r); @@ -256,8 +256,8 @@ impl TunnelRelay { })?; t.message(target, a); } - Event::Sync(b) => { - let Sync { peer } = *b; + P::Event::Sync(b) => { + let P::Sync { peer } = *b; self.membranes.imported.acquire(&peer); struct SyncPeer { tr: Arc, @@ -290,25 +290,25 @@ impl TunnelRelay { } } - fn handle_outbound_event(&mut self, t: &mut Activation, mut event: Event) -> Result { - match &mut event { - Event::Assert(b) => { - let Assert { assertion: Assertion(a), handle } = &**b; + fn handle_outbound_event(&mut self, t: &mut Activation, event: P::Event) -> Result { + match &event { + P::Event::Assert(b) => { + let P::Assert { assertion: P::Assertion(a), handle } = &**b; let mut outbound = Vec::new(); a.foreach_embedded::<_, Error>( &mut |r| Ok(outbound.push(self.membranes.export_ref(Arc::clone(r), true))))?; self.outbound_assertions.insert(handle.clone(), outbound); } - Event::Retract(b) => { - let Retract { handle } = &**b; + P::Event::Retract(b) => { + let P::Retract { handle } = &**b; if let Some(outbound) = self.outbound_assertions.remove(handle) { for ws in outbound.into_iter() { self.membranes.exported.release(&ws); } } } - Event::Message(b) => { - let Message { body: Assertion(a) } = &**b; + P::Event::Message(b) => { + let P::Message { body: P::Assertion(a) } = &**b; a.foreach_embedded(&mut |r| { let ws = self.membranes.export_ref(Arc::clone(r), false); match ws.ref_count.load(Ordering::SeqCst) { @@ -317,18 +317,18 @@ impl TunnelRelay { } })?; }, - Event::Sync(_b) => panic!("TODO not yet implemented"), + P::Event::Sync(_b) => panic!("TODO not yet implemented"), } Ok(event) } - fn encode_packet(&mut self, p: Packet) -> Result, Error> { + fn encode_packet(&mut self, p: P::Packet) -> Result, Error> { let item = _Any::from(&p); // tracing::trace!(packet = debug(&item), "<--"); Ok(PackedWriter::encode::<_, _Any, _>(&mut self.membranes, &item)?) } - pub fn send_packet(&mut self, debtor: &Arc, cost: usize, p: Packet) -> ActorResult { + pub fn send_packet(&mut self, debtor: &Arc, cost: usize, p: P::Packet) -> ActorResult { let bs = self.encode_packet(p)?; let _ = self.output.send(LoanedItem::new(debtor, cost, bs)); Ok(()) @@ -367,7 +367,7 @@ impl Membranes { relay_ref: &Arc, src: &'src mut S, _read_annotations: bool, - ) -> io::Result<_Ptr> { + ) -> io::Result { let v: IOValue = PackedReader::new(src, NoEmbeddedDomainCodec).demand_next(false)?; match sturdy::WireRef::try_from(&v)? { sturdy::WireRef::Mine{ oid: b } => { @@ -403,21 +403,21 @@ struct ActivatedMembranes<'a, 'activation, 'm>(&'a mut Activation<'activation>, &'m Arc, &'m mut Membranes); -impl<'a, 'activation, 'm> DomainDecode<_Ptr> for ActivatedMembranes<'a, 'activation, 'm> { +impl<'a, 'activation, 'm> DomainDecode for ActivatedMembranes<'a, 'activation, 'm> { fn decode_embedded<'de, 'src, S: BinarySource<'de>>( &mut self, src: &'src mut S, read_annotations: bool, - ) -> io::Result<_Ptr> { + ) -> io::Result { self.2.decode_embedded(self.0, self.1, src, read_annotations) } } -impl DomainEncode<_Ptr> for Membranes { +impl DomainEncode for Membranes { fn encode_embedded( &mut self, w: &mut W, - d: &_Ptr, + d: &P::_Ptr, ) -> io::Result<()> { w.write(&mut NoEmbeddedDomainCodec, &_Any::from(&match self.exported.ref_map.get(d) { Some(ws) => sturdy::WireRef::Mine { @@ -547,7 +547,7 @@ impl Entity for TunnelRelay { let mut src = BytesBinarySource::new(&bs); let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes); let mut r = src.packed::<_, _Any, _>(&mut dec); - let item = Packet::deserialize(&mut r)?; + let item = P::Packet::deserialize(&mut r)?; self.handle_inbound_packet(t, item)?; } tunnel_relay::Input::Segment { bs } => { @@ -557,7 +557,7 @@ impl Entity for TunnelRelay { let mut src = BytesBinarySource::new(&self.input_buffer); let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes); let mut r = src.packed::<_, _Any, _>(&mut dec); - let e = match Packet::deserialize(&mut r) { + let e = match P::Packet::deserialize(&mut r) { Err(ParseError::Preserves(PreservesError::Io(e))) if is_eof_io_error(&e) => None, @@ -581,8 +581,8 @@ impl Entity for TunnelRelay { t.message_immediate_self( &self.self_ref, &tunnel_relay::RelayProtocol::Flush); } - let turn_event = TurnEvent { - oid: Oid(oid.0), + let turn_event = P::TurnEvent { + oid: P::Oid(oid.0), event: self.handle_outbound_event(t, event)?, }; self.pending_outbound.push(turn_event); @@ -598,7 +598,7 @@ impl Entity for TunnelRelay { } tunnel_relay::RelayProtocol::Flush => { let events = std::mem::take(&mut self.pending_outbound); - self.send_packet(&t.debtor, events.len(), Packet::Turn(Box::new(Turn(events))))? + self.send_packet(&t.debtor, events.len(), P::Packet::Turn(Box::new(P::Turn(events))))? } } } @@ -608,7 +608,7 @@ impl Entity for TunnelRelay { fn exit_hook(&mut self, t: &mut Activation, exit_status: &ActorResult) -> ActorResult { if let Err(e) = exit_status { let e = e.clone(); - self.send_packet(&t.debtor, 1, Packet::Error(Box::new(e)))?; + self.send_packet(&t.debtor, 1, P::Packet::Error(Box::new(e)))?; } Ok(()) } @@ -621,25 +621,30 @@ impl Entity for RelayEntity { fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult { Ok(t.message(&self.relay_ref, &tunnel_relay::Output { oid: self.oid.clone(), - event: Event::Assert(Box::new(Assert { assertion: Assertion(a), handle: h })), + event: P::Event::Assert(Box::new(P::Assert { + assertion: P::Assertion(a), + handle: P::Handle(h.into()), + })), })) } fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult { Ok(t.message(&self.relay_ref, &tunnel_relay::Output { oid: self.oid.clone(), - event: Event::Retract(Box::new(Retract { handle: h })), + event: P::Event::Retract(Box::new(P::Retract { + handle: P::Handle(h.into()), + })), })) } fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult { Ok(t.message(&self.relay_ref, &tunnel_relay::Output { oid: self.oid.clone(), - event: Event::Message(Box::new(Message { body: Assertion(m) })), + event: P::Event::Message(Box::new(P::Message { body: P::Assertion(m) })), })) } fn sync(&mut self, t: &mut Activation, peer: Arc) -> ActorResult { Ok(t.message(&self.relay_ref, &tunnel_relay::Output { oid: self.oid.clone(), - event: Event::Sync(Box::new(Sync { peer })), + event: P::Event::Sync(Box::new(P::Sync { peer })), })) } } diff --git a/src/skeleton.rs b/src/skeleton.rs index ea4f479..ba00b6c 100644 --- a/src/skeleton.rs +++ b/src/skeleton.rs @@ -8,8 +8,8 @@ use std::sync::Arc; use crate::actor::_Any; use crate::actor::Activation; +use crate::actor::Handle; use crate::actor::Ref; -use crate::schemas::internal_protocol::Handle; use crate::schemas::dataspace_patterns as ds; use crate::pattern::{self, PathStep, Path, Paths};