Use u64 internally for assertion handles

This commit is contained in:
Tony Garnock-Jones 2021-07-22 10:07:49 +02:00
parent 4a69d5573f
commit 6c3f039026
5 changed files with 56 additions and 56 deletions

View File

@ -29,7 +29,7 @@ use tokio_util::sync::CancellationToken;
use tracing::Instrument;
pub use super::schemas::internal_protocol::_Any;
pub use super::schemas::internal_protocol::Handle;
pub type Handle = u64;
pub type ActorResult = Result<(), Error>;
pub type ActorHandle = tokio::task::JoinHandle<ActorResult>;
@ -225,15 +225,13 @@ impl<'activation> Activation<'activation> {
if let Some(assertion) = r.rewrite(a.into()) {
{
let r = Arc::clone(r);
let handle = handle.clone();
self.queue_for(&r).push(Box::new(
move |t| r.with_entity(|e| e.assert(t, assertion, handle))));
}
{
let r = Arc::clone(r);
let handle = handle.clone();
self.actor.outbound_assertions.insert(
handle.clone(),
handle,
Destination::Remote(Arc::clone(&r), Box::new(
move |t| r.with_entity(|e| e.retract(t, handle)))));
}
@ -247,15 +245,13 @@ impl<'activation> Activation<'activation> {
if let Some(assertion) = r.rewrite(a.into()) {
{
let r = Arc::clone(r);
let handle = handle.clone();
self.immediate_self.push(Box::new(
move |t| r.with_entity(|e| e.assert(t, assertion, handle))));
}
{
let r = Arc::clone(r);
let handle = handle.clone();
self.actor.outbound_assertions.insert(
handle.clone(),
handle,
Destination::ImmediateSelf(Box::new(
move |t| r.with_entity(|e| e.retract(t, handle)))));
}

View File

@ -217,7 +217,7 @@ fn handle_resolve(ds: &mut Arc<Ref>, t: &mut Activation, a: _Any) -> DuringResul
target = debug(&target),
"sturdyref resolved");
let h = t.assert(observer, _Any::domain(target));
Ok(Some(Box::new(|_observer, t| Ok(t.retract(h)))))
Ok(Some(Box::new(move |_observer, t| Ok(t.retract(h)))))
}
}
})
@ -245,7 +245,7 @@ fn handle_resolve(ds: &mut Arc<Ref>, t: &mut Activation, a: _Any) -> DuringResul
})),
observer: handler,
});
Ok(Some(Box::new(|_ds, t| Ok(t.retract(oh)))))
Ok(Some(Box::new(move |_ds, t| Ok(t.retract(oh)))))
}
}
}

View File

@ -35,8 +35,7 @@ pub fn next_actor_id() -> ActorId {
static NEXT_HANDLE: AtomicU64 = AtomicU64::new(3);
pub fn next_handle() -> Handle {
Handle(value::signed_integer::SignedInteger::from(
NEXT_HANDLE.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed) as u128))
NEXT_HANDLE.fetch_add(BUMP_AMOUNT.into(), Ordering::Relaxed)
}
static NEXT_MAILBOX_ID: AtomicU64 = AtomicU64::new(4);

View File

@ -6,7 +6,7 @@ use crate::during;
use crate::error::Error;
use crate::error::error;
use crate::schemas::gatekeeper;
use crate::schemas::internal_protocol::*;
use crate::schemas::internal_protocol as P;
use crate::schemas::sturdy;
use crate::schemas::tunnel_relay;
@ -81,10 +81,10 @@ pub struct TunnelRelay
{
self_ref: Arc<Ref>,
input_buffer: BytesMut,
inbound_assertions: Map</* remote */ Handle, (/* local */ Handle, Vec<Arc<WireSymbol>>)>,
outbound_assertions: Map<Handle, Vec<Arc<WireSymbol>>>,
inbound_assertions: Map</* remote */ P::Handle, (/* local */ Handle, Vec<Arc<WireSymbol>>)>,
outbound_assertions: Map<P::Handle, Vec<Arc<WireSymbol>>>,
membranes: Membranes,
pending_outbound: Vec<TurnEvent>,
pending_outbound: Vec<P::TurnEvent>,
output: UnboundedSender<LoanedItem<Vec<u8>>>,
}
@ -202,27 +202,27 @@ impl TunnelRelay {
result
}
fn handle_inbound_packet(&mut self, t: &mut Activation, p: Packet) -> ActorResult {
fn handle_inbound_packet(&mut self, t: &mut Activation, p: P::Packet) -> ActorResult {
// tracing::trace!(packet = debug(&p), "-->");
match p {
Packet::Error(b) => {
P::Packet::Error(b) => {
tracing::info!(message = debug(b.message.clone()),
detail = debug(b.detail.clone()),
"received Error from peer");
Err(*b)
},
Packet::Turn(b) => {
P::Packet::Turn(b) => {
let t = &mut Activation::new(t.actor, Arc::clone(&t.debtor));
let Turn(events) = *b;
for TurnEvent { oid, event } in events {
let P::Turn(events) = *b;
for P::TurnEvent { oid, event } in events {
let target = match self.membranes.exported.oid_map.get(&sturdy::Oid(oid.0.clone())) {
Some(ws) => &ws.obj,
None => return Err(error("Cannot deliver event: nonexistent oid",
_Any::from(&TurnEvent { oid, event }))),
_Any::from(&P::TurnEvent { oid, event }))),
};
match event {
Event::Assert(b) => {
let Assert { assertion: Assertion(a), handle: remote_handle } = *b;
P::Event::Assert(b) => {
let P::Assert { assertion: P::Assertion(a), handle: remote_handle } = *b;
let mut imported = vec![];
let imported_membrane = &mut self.membranes.imported;
a.foreach_embedded::<_, Error>(&mut |r| {
@ -233,8 +233,8 @@ impl TunnelRelay {
return Err(error("Assertion with duplicate handle", _Any::new(false)));
}
}
Event::Retract(b) => {
let Retract { handle: remote_handle } = *b;
P::Event::Retract(b) => {
let P::Retract { handle: remote_handle } = *b;
let (local_handle, imported) = match self.inbound_assertions.remove(&remote_handle) {
None => return Err(error("Retraction of nonexistent handle", _Any::from(&remote_handle))),
Some(wss) => wss,
@ -244,8 +244,8 @@ impl TunnelRelay {
}
t.retract(local_handle);
}
Event::Message(b) => {
let Message { body: Assertion(a) } = *b;
P::Event::Message(b) => {
let P::Message { body: P::Assertion(a) } = *b;
let imported_membrane = &mut self.membranes.imported;
a.foreach_embedded(&mut |r| {
let ws = imported_membrane.acquire(r);
@ -256,8 +256,8 @@ impl TunnelRelay {
})?;
t.message(target, a);
}
Event::Sync(b) => {
let Sync { peer } = *b;
P::Event::Sync(b) => {
let P::Sync { peer } = *b;
self.membranes.imported.acquire(&peer);
struct SyncPeer {
tr: Arc<Ref>,
@ -290,25 +290,25 @@ impl TunnelRelay {
}
}
fn handle_outbound_event(&mut self, t: &mut Activation, mut event: Event) -> Result<Event, Error> {
match &mut event {
Event::Assert(b) => {
let Assert { assertion: Assertion(a), handle } = &**b;
fn handle_outbound_event(&mut self, t: &mut Activation, event: P::Event) -> Result<P::Event, Error> {
match &event {
P::Event::Assert(b) => {
let P::Assert { assertion: P::Assertion(a), handle } = &**b;
let mut outbound = Vec::new();
a.foreach_embedded::<_, Error>(
&mut |r| Ok(outbound.push(self.membranes.export_ref(Arc::clone(r), true))))?;
self.outbound_assertions.insert(handle.clone(), outbound);
}
Event::Retract(b) => {
let Retract { handle } = &**b;
P::Event::Retract(b) => {
let P::Retract { handle } = &**b;
if let Some(outbound) = self.outbound_assertions.remove(handle) {
for ws in outbound.into_iter() {
self.membranes.exported.release(&ws);
}
}
}
Event::Message(b) => {
let Message { body: Assertion(a) } = &**b;
P::Event::Message(b) => {
let P::Message { body: P::Assertion(a) } = &**b;
a.foreach_embedded(&mut |r| {
let ws = self.membranes.export_ref(Arc::clone(r), false);
match ws.ref_count.load(Ordering::SeqCst) {
@ -317,18 +317,18 @@ impl TunnelRelay {
}
})?;
},
Event::Sync(_b) => panic!("TODO not yet implemented"),
P::Event::Sync(_b) => panic!("TODO not yet implemented"),
}
Ok(event)
}
fn encode_packet(&mut self, p: Packet) -> Result<Vec<u8>, Error> {
fn encode_packet(&mut self, p: P::Packet) -> Result<Vec<u8>, Error> {
let item = _Any::from(&p);
// tracing::trace!(packet = debug(&item), "<--");
Ok(PackedWriter::encode::<_, _Any, _>(&mut self.membranes, &item)?)
}
pub fn send_packet(&mut self, debtor: &Arc<Debtor>, cost: usize, p: Packet) -> ActorResult {
pub fn send_packet(&mut self, debtor: &Arc<Debtor>, cost: usize, p: P::Packet) -> ActorResult {
let bs = self.encode_packet(p)?;
let _ = self.output.send(LoanedItem::new(debtor, cost, bs));
Ok(())
@ -367,7 +367,7 @@ impl Membranes {
relay_ref: &Arc<Ref>,
src: &'src mut S,
_read_annotations: bool,
) -> io::Result<_Ptr> {
) -> io::Result<P::_Ptr> {
let v: IOValue = PackedReader::new(src, NoEmbeddedDomainCodec).demand_next(false)?;
match sturdy::WireRef::try_from(&v)? {
sturdy::WireRef::Mine{ oid: b } => {
@ -403,21 +403,21 @@ struct ActivatedMembranes<'a, 'activation, 'm>(&'a mut Activation<'activation>,
&'m Arc<Ref>,
&'m mut Membranes);
impl<'a, 'activation, 'm> DomainDecode<_Ptr> for ActivatedMembranes<'a, 'activation, 'm> {
impl<'a, 'activation, 'm> DomainDecode<P::_Ptr> for ActivatedMembranes<'a, 'activation, 'm> {
fn decode_embedded<'de, 'src, S: BinarySource<'de>>(
&mut self,
src: &'src mut S,
read_annotations: bool,
) -> io::Result<_Ptr> {
) -> io::Result<P::_Ptr> {
self.2.decode_embedded(self.0, self.1, src, read_annotations)
}
}
impl DomainEncode<_Ptr> for Membranes {
impl DomainEncode<P::_Ptr> for Membranes {
fn encode_embedded<W: Writer>(
&mut self,
w: &mut W,
d: &_Ptr,
d: &P::_Ptr,
) -> io::Result<()> {
w.write(&mut NoEmbeddedDomainCodec, &_Any::from(&match self.exported.ref_map.get(d) {
Some(ws) => sturdy::WireRef::Mine {
@ -547,7 +547,7 @@ impl Entity for TunnelRelay {
let mut src = BytesBinarySource::new(&bs);
let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes);
let mut r = src.packed::<_, _Any, _>(&mut dec);
let item = Packet::deserialize(&mut r)?;
let item = P::Packet::deserialize(&mut r)?;
self.handle_inbound_packet(t, item)?;
}
tunnel_relay::Input::Segment { bs } => {
@ -557,7 +557,7 @@ impl Entity for TunnelRelay {
let mut src = BytesBinarySource::new(&self.input_buffer);
let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes);
let mut r = src.packed::<_, _Any, _>(&mut dec);
let e = match Packet::deserialize(&mut r) {
let e = match P::Packet::deserialize(&mut r) {
Err(ParseError::Preserves(PreservesError::Io(e)))
if is_eof_io_error(&e) =>
None,
@ -581,8 +581,8 @@ impl Entity for TunnelRelay {
t.message_immediate_self(
&self.self_ref, &tunnel_relay::RelayProtocol::Flush);
}
let turn_event = TurnEvent {
oid: Oid(oid.0),
let turn_event = P::TurnEvent {
oid: P::Oid(oid.0),
event: self.handle_outbound_event(t, event)?,
};
self.pending_outbound.push(turn_event);
@ -598,7 +598,7 @@ impl Entity for TunnelRelay {
}
tunnel_relay::RelayProtocol::Flush => {
let events = std::mem::take(&mut self.pending_outbound);
self.send_packet(&t.debtor, events.len(), Packet::Turn(Box::new(Turn(events))))?
self.send_packet(&t.debtor, events.len(), P::Packet::Turn(Box::new(P::Turn(events))))?
}
}
}
@ -608,7 +608,7 @@ impl Entity for TunnelRelay {
fn exit_hook(&mut self, t: &mut Activation, exit_status: &ActorResult) -> ActorResult {
if let Err(e) = exit_status {
let e = e.clone();
self.send_packet(&t.debtor, 1, Packet::Error(Box::new(e)))?;
self.send_packet(&t.debtor, 1, P::Packet::Error(Box::new(e)))?;
}
Ok(())
}
@ -621,25 +621,30 @@ impl Entity for RelayEntity {
fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult {
Ok(t.message(&self.relay_ref, &tunnel_relay::Output {
oid: self.oid.clone(),
event: Event::Assert(Box::new(Assert { assertion: Assertion(a), handle: h })),
event: P::Event::Assert(Box::new(P::Assert {
assertion: P::Assertion(a),
handle: P::Handle(h.into()),
})),
}))
}
fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult {
Ok(t.message(&self.relay_ref, &tunnel_relay::Output {
oid: self.oid.clone(),
event: Event::Retract(Box::new(Retract { handle: h })),
event: P::Event::Retract(Box::new(P::Retract {
handle: P::Handle(h.into()),
})),
}))
}
fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult {
Ok(t.message(&self.relay_ref, &tunnel_relay::Output {
oid: self.oid.clone(),
event: Event::Message(Box::new(Message { body: Assertion(m) })),
event: P::Event::Message(Box::new(P::Message { body: P::Assertion(m) })),
}))
}
fn sync(&mut self, t: &mut Activation, peer: Arc<Ref>) -> ActorResult {
Ok(t.message(&self.relay_ref, &tunnel_relay::Output {
oid: self.oid.clone(),
event: Event::Sync(Box::new(Sync { peer })),
event: P::Event::Sync(Box::new(P::Sync { peer })),
}))
}
}

View File

@ -8,8 +8,8 @@ use std::sync::Arc;
use crate::actor::_Any;
use crate::actor::Activation;
use crate::actor::Handle;
use crate::actor::Ref;
use crate::schemas::internal_protocol::Handle;
use crate::schemas::dataspace_patterns as ds;
use crate::pattern::{self, PathStep, Path, Paths};