diff --git a/syndicate/src/relay.rs b/syndicate/src/relay.rs index e31f7e7..e757f45 100644 --- a/syndicate/src/relay.rs +++ b/syndicate/src/relay.rs @@ -6,7 +6,7 @@ use crate::during; use crate::error::Error; use crate::error::error; use crate::schemas::gatekeeper; -use crate::schemas::internal_protocol as P; +use crate::schemas::external_protocol as P; use crate::schemas::sturdy; use futures::Sink; @@ -24,10 +24,9 @@ use preserves::value::IOValue; use preserves::value::Map; use preserves::value::NestedValue; use preserves::value::NoEmbeddedDomainCodec; -use preserves::value::PackedReader; use preserves::value::PackedWriter; -use preserves::value::Reader; use preserves::value::TextWriter; +use preserves::value::Value; use preserves::value::ViaCodec; use preserves::value::Writer; use preserves::value::signed_integer::SignedInteger; @@ -35,7 +34,6 @@ use preserves::value::signed_integer::SignedInteger; use preserves_schema::support::Deserialize; use preserves_schema::support::ParseError; -use std::convert::TryFrom; use std::io; use std::pin::Pin; use std::sync::Arc; @@ -50,23 +48,34 @@ use tokio::io::AsyncWriteExt; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; +#[derive(Debug, Clone, Copy)] +enum WireSymbolSide { + Imported, + Exported, +} + struct WireSymbol { oid: sturdy::Oid, obj: Arc, ref_count: AtomicUsize, + side: WireSymbolSide, } struct Membrane { + side: WireSymbolSide, oid_map: Map>, ref_map: Map, Arc>, } +#[derive(Debug)] struct Membranes { exported: Membrane, imported: Membrane, next_export_oid: usize, } +struct WireRefCodec; + pub enum Input { Packets(Pin, Error>> + Send>>), Bytes(Pin>), @@ -101,21 +110,46 @@ struct TunnelRefEntity { relay_ref: TunnelRelayRef, } +type Pins<'a> = &'a mut Vec>; + //--------------------------------------------------------------------------- impl WireSymbol { - fn acquire(&self) { + #[inline] + fn inc_ref<'a>(self: &'a Arc, pins: Pins) -> &'a Arc { self.ref_count.fetch_add(1, Ordering::SeqCst); + pins.push(Arc::clone(&self)); + tracing::trace!(?self, "acquire"); + self } - fn release(&self) -> bool { - self.ref_count.fetch_sub(1, Ordering::SeqCst) == 1 + #[inline] + fn dec_ref(&self) -> bool { + let old_count = self.ref_count.fetch_sub(1, Ordering::SeqCst); + tracing::trace!(?self, "release"); + old_count == 1 + } + + #[inline] + fn current_ref_count(&self) -> usize { + self.ref_count.load(Ordering::SeqCst) + } +} + +impl std::fmt::Debug for WireSymbol { + fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { + write!(f, "#", + self.side, + self.oid.0, + self.obj, + self.current_ref_count()) } } impl Membrane { - fn new() -> Self { + fn new(side: WireSymbolSide) -> Self { Membrane { + side, oid_map: Map::new(), ref_map: Map::new(), } @@ -126,24 +160,83 @@ impl Membrane { oid: oid.clone(), obj: Arc::clone(&obj), ref_count: AtomicUsize::new(0), + side: self.side, }); self.oid_map.insert(oid, Arc::clone(&ws)); self.ref_map.insert(obj, Arc::clone(&ws)); ws } +} - fn acquire(&mut self, r: &Arc) -> Arc { - let ws = self.ref_map.get(r).expect("WireSymbol must be present at acquire() time"); - ws.acquire(); - Arc::clone(ws) +impl std::fmt::Debug for Membrane { + fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { + f.debug_struct("Membrane") + .field("side", &self.side) + .field("refs", &self.oid_map.values()) + .finish() + } +} + +impl Membranes { + fn export_ref(&mut self, obj: Arc) -> Arc { + let ws = match self.exported.ref_map.get(&obj) { + None => { + let oid = sturdy::Oid(SignedInteger::from(self.next_export_oid as u128)); + self.next_export_oid += 1; + self.exported.insert(oid, obj) + } + Some(ws) => Arc::clone(ws) + }; + ws } - fn release(&mut self, ws: &Arc) { - if ws.release() { - self.oid_map.remove(&ws.oid); - self.ref_map.remove(&ws.obj); + fn import_oid( + &mut self, + t: &mut Activation, + relay_ref: &TunnelRelayRef, + oid: sturdy::Oid, + ) -> Arc { + let obj = t.create(RelayEntity { relay_ref: Arc::clone(relay_ref), oid: oid.clone() }); + self.imported.insert(oid, Cap::new(&obj)) + } + + fn membrane(&mut self, side: WireSymbolSide) -> &mut Membrane { + match side { + WireSymbolSide::Imported => &mut self.imported, + WireSymbolSide::Exported => &mut self.exported, } } + + #[inline] + fn release>>(&mut self, wss: I) { + for ws in wss { + if ws.dec_ref() { + let membrane = self.membrane(ws.side); + membrane.oid_map.remove(&ws.oid); + membrane.ref_map.remove(&ws.obj); + } + } + } +} + +impl DomainEncode for WireRefCodec { + fn encode_embedded( + &mut self, + w: &mut W, + d: &P::_Ptr, + ) -> io::Result<()> { + w.write(&mut NoEmbeddedDomainCodec, &IOValue::from(&**d)) + } +} + +impl DomainDecode for WireRefCodec { + fn decode_embedded<'de, 'src, S: BinarySource<'de>>( + &mut self, + src: &'src mut S, + _read_annotations: bool, + ) -> io::Result { + Ok(Arc::new(P::_Dom::deserialize(&mut src.packed(NoEmbeddedDomainCodec))?)) + } } pub fn connect_stream( @@ -172,6 +265,9 @@ pub fn connect_stream( }); } +// macro_rules! dump_membranes { ($e:expr) => { tracing::trace!("membranes: {:#?}", $e); } } +macro_rules! dump_membranes { ($e:expr) => { (); } } + impl TunnelRelay { pub fn run( t: &mut Activation, @@ -192,18 +288,19 @@ impl TunnelRelay { inbound_assertions: Map::new(), outbound_assertions: Map::new(), membranes: Membranes { - exported: Membrane::new(), - imported: Membrane::new(), + exported: Membrane::new(WireSymbolSide::Exported), + imported: Membrane::new(WireSymbolSide::Imported), next_export_oid: 0, }, pending_outbound: Vec::new(), self_entity: self_entity.clone(), }; if let Some(ir) = initial_ref { - tr.membranes.export_ref(ir, true); + tr.membranes.export_ref(ir).inc_ref(&mut vec![]); } let result = initial_oid.map( |io| Arc::clone(&tr.membranes.import_oid(t, &tr_ref, io).obj)); + dump_membranes!(tr.membranes); *tr_ref.lock().unwrap() = Some(tr); t.linked_task(crate::name!("writer"), output_loop(o, output_rx)); t.linked_task(crate::name!("reader"), input_loop(t.facet.clone(), i, tr_ref)); @@ -211,19 +308,18 @@ impl TunnelRelay { result } - fn deserialize_one(&mut self, t: &mut Activation, bs: &[u8]) -> (Result, usize) { + fn deserialize_one(&mut self, bs: &[u8]) -> (Result, usize) { let mut src = BytesBinarySource::new(&bs); - let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes); match src.peek() { Ok(v) => if v >= 128 { self.output_text = false; - let mut r = src.packed::<_, AnyValue, _>(&mut dec); + let mut r = src.packed(WireRefCodec); let res = P::Packet::deserialize(&mut r); (res, r.source.index) } else { self.output_text = true; - let mut dec = ViaCodec::new(dec); - let mut r = src.text::<_, AnyValue, _>(&mut dec); + let mut dec = ViaCodec::new(WireRefCodec); + let mut r = src.text::<_, P::_Any, _>(&mut dec); let res = P::Packet::deserialize(&mut r); (res, r.source.index) }, @@ -232,17 +328,18 @@ impl TunnelRelay { } fn handle_inbound_datagram(&mut self, t: &mut Activation, bs: &[u8]) -> ActorResult { - let item = self.deserialize_one(t, bs).0?; + let item = self.deserialize_one(bs).0?; self.handle_inbound_packet(t, item) } fn handle_inbound_stream(&mut self, t: &mut Activation, buf: &mut BytesMut) -> ActorResult { loop { - let (result, count) = self.deserialize_one(t, buf); + let (result, count) = self.deserialize_one(buf); match result { Err(ParseError::Preserves(PreservesError::Io(e))) if is_eof_io_error(&e) => return Ok(()), - Err(e) => return Err(e)?, + Err(e) => + return Err(e)?, Ok(item) => { buf.advance(count); self.handle_inbound_packet(t, item)?; @@ -251,82 +348,160 @@ impl TunnelRelay { } } + fn import_wire_ref(&mut self, t: &mut Activation, d: &Arc, pins: Pins) -> io::Result> { + match &**d { + sturdy::WireRef::Mine { oid: b } => { + let oid = &**b; + let ws = match self.membranes.imported.oid_map.get(&oid) { + Some(ws) => Arc::clone(ws), + None => self.membranes.import_oid(t, &self.self_ref, oid.clone()), + }; + Ok(Arc::clone(&ws.inc_ref(pins).obj)) + } + sturdy::WireRef::Yours { oid: b, attenuation } => { + let oid = &**b; + match self.membranes.exported.oid_map.get(&oid) { + Some(ws) => { + ws.inc_ref(pins); + if attenuation.is_empty() { + Ok(Arc::clone(&ws.obj)) + } else { + ws.obj.attenuate(&sturdy::Attenuation(attenuation.clone())) + .map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidInput, + format!("Invalid capability attenuation: {:?}", e)) + }) + } + } + None => Ok(Cap::new(&t.inert_entity())), + } + } + } + } + + #[inline] + fn import>(&mut self, t: &mut Activation, v: V, pins: Pins) -> io::Result { + v.into().copy_via(&mut |d| Ok(Value::Embedded(self.import_wire_ref(t, d, pins)?))) + } + + #[inline] + fn wire_symbol_for_imported_oid(&mut self, oid: &sturdy::Oid) -> &Arc { + self.membranes.imported.oid_map.get(oid).expect("imported oid entry to exist for RelayEntity") + } + + fn export_cap(&mut self, d: &Arc, pins: Pins) -> io::Result> { + Ok(Arc::new(match self.membranes.exported.ref_map.get(d) { + Some(ws) => sturdy::WireRef::Mine { + oid: Box::new(ws.inc_ref(pins).oid.clone()), + }, + None => match self.membranes.imported.ref_map.get(d) { + Some(ws) => { + if d.attenuation.is_empty() { + sturdy::WireRef::Yours { + oid: Box::new(ws.inc_ref(pins).oid.clone()), + attenuation: vec![], + } + } else { + // We may trust the peer to enforce attenuation on our behalf, in + // which case we can return sturdy::WireRef::Yours with an attenuation + // attached here, but for now we don't. + sturdy::WireRef::Mine { + oid: Box::new(self.membranes.export_ref(Arc::clone(d)).inc_ref(pins).oid.clone()), + } + } + } + None => + sturdy::WireRef::Mine { + oid: Box::new(self.membranes.export_ref(Arc::clone(d)).inc_ref(pins).oid.clone()), + }, + } + })) + } + + #[inline] + fn export>(&mut self, v: V, pins: Pins) -> io::Result { + v.into().copy_via(&mut |d| Ok(Value::Embedded(self.export_cap(d, pins)?))) + } + fn handle_inbound_packet(&mut self, t: &mut Activation, p: P::Packet) -> ActorResult { - // tracing::trace!(packet = ?p, "-->"); + tracing::trace!(packet = ?p, "-->"); match p { P::Packet::Error(b) => { tracing::info!(message = ?b.message.clone(), detail = ?b.detail.clone(), "received Error from peer"); - Err(*b) + let P::Error { message, detail } = *b; + Err(error(&message, self.import(t, detail, &mut vec![])?)) }, P::Packet::Turn(b) => { let P::Turn(events) = *b; for P::TurnEvent { oid, event } in events { let target = match self.membranes.exported.oid_map.get(&sturdy::Oid(oid.0.clone())) { - Some(ws) => &ws.obj, - None => return Err(error("Cannot deliver event: nonexistent oid", - AnyValue::from(&P::TurnEvent { oid, event }))), + Some(ws) => Arc::clone(ws), + None => return Err( + error("Cannot deliver event: nonexistent oid", + self.import(t, &P::TurnEvent { oid, event }, &mut vec![])?)), }; match event { P::Event::Assert(b) => { let P::Assert { assertion: P::Assertion(a), handle: remote_handle } = *b; - let mut imported = vec![]; - let imported_membrane = &mut self.membranes.imported; - a.foreach_embedded::<_, Error>(&mut |r| { - Ok(imported.push(imported_membrane.acquire(r))) - })?; - if let Some(local_handle) = target.assert(t, a) { - if let Some(_) = self.inbound_assertions.insert(remote_handle, (local_handle, imported)) { - return Err(error("Assertion with duplicate handle", AnyValue::new(false))); + let mut pins = vec![]; + target.inc_ref(&mut pins); + let a = self.import(t, a, &mut pins)?; + dump_membranes!(self.membranes); + if let Some(local_handle) = target.obj.assert(t, a) { + if let Some(_) = self.inbound_assertions.insert(remote_handle, (local_handle, pins)) { + return Err(error("Assertion with duplicate handle", + AnyValue::new(false))); } } } P::Event::Retract(b) => { let P::Retract { handle: remote_handle } = *b; - let (local_handle, imported) = match self.inbound_assertions.remove(&remote_handle) { - None => return Err(error("Retraction of nonexistent handle", AnyValue::from(&remote_handle))), + let (local_handle, pins) = match self.inbound_assertions.remove(&remote_handle) { + None => return Err(error("Retraction of nonexistent handle", + self.import(t, &remote_handle, &mut vec![])?)), Some(wss) => wss, }; - for ws in imported.into_iter() { - self.membranes.imported.release(&ws); - } + self.membranes.release(pins); + dump_membranes!(self.membranes); t.retract(local_handle); } P::Event::Message(b) => { let P::Message { body: P::Assertion(a) } = *b; - let imported_membrane = &mut self.membranes.imported; - a.foreach_embedded(&mut |r| { - let ws = imported_membrane.acquire(r); - match ws.ref_count.load(Ordering::SeqCst) { - 1 => Err(error("Cannot receive transient reference", AnyValue::new(false))), - _ => Ok(()) - } - })?; - target.message(t, a); + let mut pins = vec![]; + let a = self.import(t, a, &mut pins)?; + ensure_no_transient_references(&pins)?; + target.obj.message(t, a); + self.membranes.release(pins); + dump_membranes!(self.membranes); } P::Event::Sync(b) => { let P::Sync { peer } = *b; - self.membranes.imported.acquire(&peer); + let mut pins = vec![]; + target.inc_ref(&mut pins); + let peer = self.import_wire_ref(t, &peer, &mut pins)?; + dump_membranes!(self.membranes); struct SyncPeer { relay_ref: TunnelRelayRef, peer: Arc, + pins: Vec>, } impl Entity for SyncPeer { fn message(&mut self, t: &mut Activation, _a: Synced) -> ActorResult { self.peer.message(t, AnyValue::new(true)); let mut g = self.relay_ref.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); - if let Some(ws) = tr.membranes.imported.ref_map.get(&self.peer) { - let ws = Arc::clone(ws); // cloned to release the borrow to permit the release - tr.membranes.imported.release(&ws); - } + tr.membranes.release(std::mem::take(&mut self.pins)); + dump_membranes!(tr.membranes); Ok(()) } } let k = t.create(SyncPeer { relay_ref: Arc::clone(&self.self_ref), peer: Arc::clone(&peer), + pins, }); t.sync(&peer.underlying, k); } @@ -338,47 +513,15 @@ impl TunnelRelay { } } - fn handle_outbound_event(&mut self, t: &mut Activation, event: P::Event) -> Result { - match &event { - P::Event::Assert(b) => { - let P::Assert { assertion: P::Assertion(a), handle } = &**b; - let mut outbound = Vec::new(); - a.foreach_embedded::<_, Error>( - &mut |r| Ok(outbound.push(self.membranes.export_ref(Arc::clone(r), true))))?; - self.outbound_assertions.insert(handle.clone(), outbound); - } - P::Event::Retract(b) => { - let P::Retract { handle } = &**b; - if let Some(outbound) = self.outbound_assertions.remove(handle) { - for ws in outbound.into_iter() { - self.membranes.exported.release(&ws); - } - } - } - P::Event::Message(b) => { - let P::Message { body: P::Assertion(a) } = &**b; - a.foreach_embedded(&mut |r| { - let ws = self.membranes.export_ref(Arc::clone(r), false); - match ws.ref_count.load(Ordering::SeqCst) { - 0 => Err(error("Cannot send transient reference", AnyValue::new(false))), - _ => Ok(()) - } - })?; - }, - P::Event::Sync(_b) => panic!("TODO not yet implemented"), - } - Ok(event) - } - fn encode_packet(&mut self, p: P::Packet) -> Result, Error> { - let item = AnyValue::from(&p); - // tracing::trace!(packet = ?item, "<--"); + let item = P::_Any::from(&p); + tracing::trace!(packet = ?item, "<--"); if self.output_text { - let mut s = TextWriter::encode::<_, AnyValue, _>(&mut self.membranes, &item)?; + let mut s = TextWriter::encode(&mut WireRefCodec, &item)?; s.push('\n'); Ok(s.into_bytes()) } else { - Ok(PackedWriter::encode::<_, AnyValue, _>(&mut self.membranes, &item)?) + Ok(PackedWriter::encode(&mut WireRefCodec, &item)?) } } @@ -394,126 +537,13 @@ impl TunnelRelay { } let turn_event = P::TurnEvent { oid: P::Oid(oid.0), - event: self.handle_outbound_event(t, event)?, + event, }; self.pending_outbound.push(turn_event); Ok(()) } } -impl Membranes { - fn export_ref(&mut self, obj: Arc, and_acquire: bool) -> Arc { - let ws = match self.exported.ref_map.get(&obj) { - None => { - let oid = sturdy::Oid(SignedInteger::from(self.next_export_oid as u128)); - self.next_export_oid += 1; - self.exported.insert(oid, obj) - } - Some(ws) => Arc::clone(ws) - }; - if and_acquire { - ws.acquire(); - } - ws - } - - fn import_oid( - &mut self, - t: &mut Activation, - relay_ref: &TunnelRelayRef, - oid: sturdy::Oid, - ) -> Arc { - let obj = t.create(RelayEntity { relay_ref: Arc::clone(relay_ref), oid: oid.clone() }); - self.imported.insert(oid, Cap::new(&obj)) - } - - fn decode_embedded<'de, 'src, S: BinarySource<'de>>( - &mut self, - t: &mut Activation, - relay_ref: &TunnelRelayRef, - src: &'src mut S, - _read_annotations: bool, - ) -> io::Result { - let v: IOValue = PackedReader::new(src, NoEmbeddedDomainCodec).demand_next(false)?; - match sturdy::WireRef::try_from(&v)? { - sturdy::WireRef::Mine{ oid: b } => { - let oid = *b; - match self.imported.oid_map.get(&oid) { - Some(ws) => Ok(Arc::clone(&ws.obj)), - None => Ok(Arc::clone(&self.import_oid(t, relay_ref, oid).obj)), - } - } - sturdy::WireRef::Yours { oid: b, attenuation } => { - let oid = *b; - match self.exported.oid_map.get(&oid) { - Some(ws) => { - if attenuation.is_empty() { - Ok(Arc::clone(&ws.obj)) - } else { - Ok(ws.obj.attenuate(&sturdy::Attenuation(attenuation)) - .map_err(|e| { - io::Error::new( - io::ErrorKind::InvalidInput, - format!("Invalid capability attenuation: {:?}", e)) - })?) - } - } - None => Ok(Cap::new(&t.inert_entity())), - } - } - } - } -} - -struct ActivatedMembranes<'a, 'activation, 'm>(&'a mut Activation<'activation>, - &'m TunnelRelayRef, - &'m mut Membranes); - -impl<'a, 'activation, 'm> DomainDecode for ActivatedMembranes<'a, 'activation, 'm> { - fn decode_embedded<'de, 'src, S: BinarySource<'de>>( - &mut self, - src: &'src mut S, - read_annotations: bool, - ) -> io::Result { - self.2.decode_embedded(self.0, self.1, src, read_annotations) - } -} - -impl DomainEncode for Membranes { - fn encode_embedded( - &mut self, - w: &mut W, - d: &P::_Ptr, - ) -> io::Result<()> { - w.write(&mut NoEmbeddedDomainCodec, &AnyValue::from(&match self.exported.ref_map.get(d) { - Some(ws) => sturdy::WireRef::Mine { - oid: Box::new(ws.oid.clone()), - }, - None => match self.imported.ref_map.get(d) { - Some(ws) => { - if d.attenuation.is_empty() { - sturdy::WireRef::Yours { - oid: Box::new(ws.oid.clone()), - attenuation: vec![], - } - } else { - // We may trust the peer to enforce attenuation on our behalf, in - // which case we can return sturdy::WireRef::Yours with an attenuation - // attached here, but for now we don't. - sturdy::WireRef::Mine { - oid: Box::new(self.export_ref(Arc::clone(d), false).oid.clone()), - } - } - } - None => - sturdy::WireRef::Mine { - oid: Box::new(self.export_ref(Arc::clone(d), false).oid.clone()), - }, - } - })) - } -} - async fn input_loop( facet: FacetRef, i: Input, @@ -593,43 +623,100 @@ impl Entity<()> for TunnelRefEntity { fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) -> ActorResult { if let Err(e) = &**exit_status { - let e = e.clone(); let mut g = self.relay_ref.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); + let crate::schemas::internal_protocol::Error { message, detail } = e; + let e = P::Error { + message: message.clone(), + detail: tr.export(detail.clone(), &mut vec![])?, + }; tr.send_packet(&t.account(), 1, P::Packet::Error(Box::new(e)))?; } Ok(()) } } -impl Entity for RelayEntity { - fn assert(&mut self, t: &mut Activation, a: AnyValue, h: Handle) -> ActorResult { - let mut g = self.relay_ref.lock().expect("unpoisoned"); - let tr = g.as_mut().expect("initialized"); - tr.send_event(t, self.oid.clone(), P::Event::Assert(Box::new(P::Assert { - assertion: P::Assertion(a), - handle: P::Handle(h.into()), - }))) +#[inline] +fn ensure_no_transient_references(pins: &Vec>) -> ActorResult { + for ws in pins.iter() { + if ws.current_ref_count() == 1 { + return Err(error("Cannot receive transient reference", AnyValue::new(false))); + } } - fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult { + Ok(()) +} + +impl RelayEntity { + fn with_tunnel_relay ActorResult>( + &mut self, + f: F, + ) -> ActorResult { let mut g = self.relay_ref.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); - tr.send_event(t, self.oid.clone(), P::Event::Retract(Box::new(P::Retract { - handle: P::Handle(h.into()), - }))) - } - fn message(&mut self, t: &mut Activation, m: AnyValue) -> ActorResult { - let mut g = self.relay_ref.lock().expect("unpoisoned"); - let tr = g.as_mut().expect("initialized"); - tr.send_event(t, self.oid.clone(), P::Event::Message(Box::new(P::Message { - body: P::Assertion(m) - }))) - } - fn sync(&mut self, t: &mut Activation, peer: Arc>) -> ActorResult { - let mut g = self.relay_ref.lock().expect("unpoisoned"); - let tr = g.as_mut().expect("initialized"); - tr.send_event(t, self.oid.clone(), P::Event::Sync(Box::new(P::Sync { - peer: Cap::guard(&peer) - }))) + f(tr) + } +} + +impl Entity for RelayEntity { + fn assert(&mut self, t: &mut Activation, a: AnyValue, h: Handle) -> ActorResult { + let oid = self.oid.clone(); + self.with_tunnel_relay(|tr| { + let handle = P::Handle(h.into()); + + let mut pins = vec![]; + tr.wire_symbol_for_imported_oid(&oid).inc_ref(&mut pins); + let a = tr.export(a, &mut pins)?; + tr.outbound_assertions.insert(handle.clone(), pins); + dump_membranes!(tr.membranes); + + tr.send_event(t, oid, P::Event::Assert(Box::new(P::Assert { + assertion: P::Assertion(a), + handle, + }))) + }) + } + fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult { + let oid = self.oid.clone(); + self.with_tunnel_relay(|tr| { + let handle = P::Handle(h.into()); + + if let Some(outbound) = tr.outbound_assertions.remove(&handle) { + tr.membranes.release(outbound); + } + dump_membranes!(tr.membranes); + + tr.send_event(t, oid, P::Event::Retract(Box::new(P::Retract { + handle, + }))) + }) + } + fn message(&mut self, t: &mut Activation, m: AnyValue) -> ActorResult { + let oid = self.oid.clone(); + self.with_tunnel_relay(|tr| { + let mut pins = vec![]; + let m = tr.export(m, &mut pins)?; + ensure_no_transient_references(&pins)?; + + tr.send_event(t, oid, P::Event::Message(Box::new(P::Message { + body: P::Assertion(m) + })))?; + tr.membranes.release(pins); + dump_membranes!(tr.membranes); + Ok(()) + }) + } + fn sync(&mut self, t: &mut Activation, peer: Arc>) -> ActorResult { + todo!("TODO not yet implemented"); + + // let oid = self.oid.clone(); + // self.with_tunnel_relay(|tr| { + // ... + // tr.wire_symbol_for_imported_oid(&oid).inc_ref(&mut pins); etc. etc. + // + // tr.send_event(t, oid, P::Event::Sync(Box::new(P::Sync { + // peer: Cap::guard(&peer) + // }))) + // dump_membranes!(tr.membranes); etc. etc. + // }) } }