Repair reference-counting across membranes.
This commit is contained in:
parent
7aa67adfbf
commit
b5b1a6883c
|
@ -46,17 +46,26 @@ use tokio::io::AsyncWriteExt;
|
|||
|
||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver};
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum WireSymbolSide {
|
||||
Imported,
|
||||
Exported,
|
||||
}
|
||||
|
||||
struct WireSymbol {
|
||||
oid: sturdy::Oid,
|
||||
obj: Arc<Cap>,
|
||||
ref_count: AtomicUsize,
|
||||
side: WireSymbolSide,
|
||||
}
|
||||
|
||||
struct Membrane {
|
||||
side: WireSymbolSide,
|
||||
oid_map: Map<sturdy::Oid, Arc<WireSymbol>>,
|
||||
ref_map: Map<Arc<Cap>, Arc<WireSymbol>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Membranes {
|
||||
exported: Membrane,
|
||||
imported: Membrane,
|
||||
|
@ -97,21 +106,49 @@ struct TunnelRefEntity {
|
|||
relay_ref: TunnelRelayRef,
|
||||
}
|
||||
|
||||
struct ActivatedMembranes<'a, 'activation, 'm> {
|
||||
turn: &'a mut Activation<'activation>,
|
||||
tr_ref: &'m TunnelRelayRef,
|
||||
membranes: &'m mut Membranes,
|
||||
}
|
||||
|
||||
//---------------------------------------------------------------------------
|
||||
|
||||
impl WireSymbol {
|
||||
fn acquire(&self) {
|
||||
#[inline]
|
||||
fn inc_ref<'a>(self: &'a Arc<Self>) -> &'a Arc<Self> {
|
||||
self.ref_count.fetch_add(1, Ordering::SeqCst);
|
||||
tracing::trace!(?self, "after inc_ref");
|
||||
self
|
||||
}
|
||||
|
||||
fn release(&self) -> bool {
|
||||
self.ref_count.fetch_sub(1, Ordering::SeqCst) == 1
|
||||
#[inline]
|
||||
fn dec_ref(&self) -> bool {
|
||||
let old_count = self.ref_count.fetch_sub(1, Ordering::SeqCst);
|
||||
tracing::trace!(?self, "after dec_ref");
|
||||
old_count == 1
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn current_ref_count(&self) -> usize {
|
||||
self.ref_count.load(Ordering::SeqCst)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for WireSymbol {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
|
||||
write!(f, "#<WireSymbol oid={:?}:{} obj={:?} ref_count={}>",
|
||||
self.side,
|
||||
self.oid.0,
|
||||
self.obj,
|
||||
self.current_ref_count())
|
||||
}
|
||||
}
|
||||
|
||||
impl Membrane {
|
||||
fn new() -> Self {
|
||||
fn new(side: WireSymbolSide) -> Self {
|
||||
Membrane {
|
||||
side,
|
||||
oid_map: Map::new(),
|
||||
ref_map: Map::new(),
|
||||
}
|
||||
|
@ -122,23 +159,15 @@ impl Membrane {
|
|||
oid: oid.clone(),
|
||||
obj: Arc::clone(&obj),
|
||||
ref_count: AtomicUsize::new(0),
|
||||
side: self.side,
|
||||
});
|
||||
self.oid_map.insert(oid, Arc::clone(&ws));
|
||||
self.ref_map.insert(obj, Arc::clone(&ws));
|
||||
ws
|
||||
}
|
||||
|
||||
fn acquire(&mut self, r: &Arc<Cap>) -> Arc<WireSymbol> {
|
||||
let ws = self.ref_map.get(r).expect("WireSymbol must be present at acquire() time");
|
||||
ws.acquire();
|
||||
Arc::clone(ws)
|
||||
}
|
||||
|
||||
fn release(&mut self, ws: &Arc<WireSymbol>) {
|
||||
if ws.release() {
|
||||
self.oid_map.remove(&ws.oid);
|
||||
self.ref_map.remove(&ws.obj);
|
||||
}
|
||||
fn insert_inert_entity(&mut self, t: &mut Activation, oid: sturdy::Oid) -> Arc<WireSymbol> {
|
||||
self.insert(oid, Cap::new(&t.inert_entity()))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -168,6 +197,18 @@ pub fn connect_stream<I, O, E, F>(
|
|||
});
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Membrane {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
|
||||
f.debug_struct("Membrane")
|
||||
.field("side", &self.side)
|
||||
.field("refs", &self.oid_map.values())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! dump_membranes { ($e:expr) => { tracing::trace!("membranes: {:#?}", $e); } }
|
||||
// macro_rules! dump_membranes { ($e:expr) => { (); } }
|
||||
|
||||
impl TunnelRelay {
|
||||
pub fn run(
|
||||
t: &mut Activation,
|
||||
|
@ -188,18 +229,19 @@ impl TunnelRelay {
|
|||
inbound_assertions: Map::new(),
|
||||
outbound_assertions: Map::new(),
|
||||
membranes: Membranes {
|
||||
exported: Membrane::new(),
|
||||
imported: Membrane::new(),
|
||||
exported: Membrane::new(WireSymbolSide::Exported),
|
||||
imported: Membrane::new(WireSymbolSide::Imported),
|
||||
next_export_oid: 0,
|
||||
},
|
||||
pending_outbound: Vec::new(),
|
||||
self_entity: self_entity.clone(),
|
||||
};
|
||||
if let Some(ir) = initial_ref {
|
||||
tr.membranes.export_ref(ir, true);
|
||||
tr.membranes.export_ref(ir).inc_ref();
|
||||
}
|
||||
let result = initial_oid.map(
|
||||
|io| Arc::clone(&tr.membranes.import_oid(t, &tr_ref, io).obj));
|
||||
|io| Arc::clone(&tr.membranes.import_oid(t, &tr_ref, io).inc_ref().obj));
|
||||
dump_membranes!(tr.membranes);
|
||||
*tr_ref.lock().unwrap() = Some(tr);
|
||||
t.linked_task(crate::name!("writer"), output_loop(o, output_rx));
|
||||
t.linked_task(crate::name!("reader"), input_loop(t.facet.clone(), i, tr_ref));
|
||||
|
@ -209,7 +251,11 @@ impl TunnelRelay {
|
|||
|
||||
fn deserialize_one(&mut self, t: &mut Activation, bs: &[u8]) -> (Result<P::Packet, ParseError>, usize) {
|
||||
let mut src = BytesBinarySource::new(&bs);
|
||||
let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes);
|
||||
let mut dec = ActivatedMembranes {
|
||||
turn: t,
|
||||
tr_ref: &self.self_ref,
|
||||
membranes: &mut self.membranes,
|
||||
};
|
||||
match src.peek() {
|
||||
Ok(v) => if v >= 128 {
|
||||
self.output_text = false;
|
||||
|
@ -248,7 +294,7 @@ impl TunnelRelay {
|
|||
}
|
||||
|
||||
fn handle_inbound_packet(&mut self, t: &mut Activation, p: P::Packet) -> ActorResult {
|
||||
// tracing::trace!(packet = ?p, "-->");
|
||||
tracing::trace!(packet = ?p, "-->");
|
||||
match p {
|
||||
P::Packet::Error(b) => {
|
||||
tracing::info!(message = ?b.message.clone(),
|
||||
|
@ -260,69 +306,80 @@ impl TunnelRelay {
|
|||
let P::Turn(events) = *b;
|
||||
for P::TurnEvent { oid, event } in events {
|
||||
let target = match self.membranes.exported.oid_map.get(&sturdy::Oid(oid.0.clone())) {
|
||||
Some(ws) => &ws.obj,
|
||||
None => return Err(error("Cannot deliver event: nonexistent oid",
|
||||
AnyValue::from(&P::TurnEvent { oid, event }))),
|
||||
Some(ws) =>
|
||||
ws.inc_ref(),
|
||||
None => {
|
||||
tracing::warn!(
|
||||
event = ?AnyValue::from(&P::TurnEvent { oid, event }),
|
||||
"Cannot deliver event: nonexistent oid");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
let mut pins = vec![target.clone()];
|
||||
let target = Arc::clone(&target.obj);
|
||||
match event {
|
||||
P::Event::Assert(b) => {
|
||||
let P::Assert { assertion: P::Assertion(a), handle: remote_handle } = *b;
|
||||
let mut imported = vec![];
|
||||
let imported_membrane = &mut self.membranes.imported;
|
||||
a.foreach_embedded::<_, Error>(&mut |r| {
|
||||
Ok(imported.push(imported_membrane.acquire(r)))
|
||||
})?;
|
||||
a.foreach_embedded::<_, Error>(
|
||||
&mut |r| Ok(pins.push(self.membranes.lookup_ref(r))))?;
|
||||
if let Some(local_handle) = target.assert(t, a) {
|
||||
if let Some(_) = self.inbound_assertions.insert(remote_handle, (local_handle, imported)) {
|
||||
if let Some(_) = self.inbound_assertions.insert(remote_handle, (local_handle, pins)) {
|
||||
return Err(error("Assertion with duplicate handle", AnyValue::new(false)));
|
||||
}
|
||||
} else {
|
||||
self.membranes.release(pins);
|
||||
}
|
||||
dump_membranes!(self.membranes);
|
||||
}
|
||||
P::Event::Retract(b) => {
|
||||
let P::Retract { handle: remote_handle } = *b;
|
||||
let (local_handle, imported) = match self.inbound_assertions.remove(&remote_handle) {
|
||||
let (local_handle, previous_pins) = match self.inbound_assertions.remove(&remote_handle) {
|
||||
None => return Err(error("Retraction of nonexistent handle", AnyValue::from(&remote_handle))),
|
||||
Some(wss) => wss,
|
||||
};
|
||||
for ws in imported.into_iter() {
|
||||
self.membranes.imported.release(&ws);
|
||||
}
|
||||
self.membranes.release(previous_pins);
|
||||
self.membranes.release(pins);
|
||||
t.retract(local_handle);
|
||||
dump_membranes!(self.membranes);
|
||||
}
|
||||
P::Event::Message(b) => {
|
||||
let P::Message { body: P::Assertion(a) } = *b;
|
||||
let imported_membrane = &mut self.membranes.imported;
|
||||
a.foreach_embedded(&mut |r| {
|
||||
let ws = imported_membrane.acquire(r);
|
||||
match ws.ref_count.load(Ordering::SeqCst) {
|
||||
let ws = self.membranes.lookup_ref(r);
|
||||
let rc = ws.current_ref_count();
|
||||
pins.push(ws);
|
||||
match rc {
|
||||
1 => Err(error("Cannot receive transient reference", AnyValue::new(false))),
|
||||
_ => Ok(())
|
||||
}
|
||||
})?;
|
||||
target.message(t, a);
|
||||
self.membranes.release(pins);
|
||||
dump_membranes!(self.membranes);
|
||||
}
|
||||
P::Event::Sync(b) => {
|
||||
let P::Sync { peer } = *b;
|
||||
self.membranes.imported.acquire(&peer);
|
||||
pins.push(self.membranes.lookup_ref(&peer));
|
||||
dump_membranes!(self.membranes);
|
||||
struct SyncPeer {
|
||||
relay_ref: TunnelRelayRef,
|
||||
peer: Arc<Cap>,
|
||||
pins: Vec<Arc<WireSymbol>>,
|
||||
}
|
||||
impl Entity<Synced> for SyncPeer {
|
||||
fn message(&mut self, t: &mut Activation, _a: Synced) -> ActorResult {
|
||||
self.peer.message(t, AnyValue::new(true));
|
||||
let mut g = self.relay_ref.lock().expect("unpoisoned");
|
||||
let tr = g.as_mut().expect("initialized");
|
||||
if let Some(ws) = tr.membranes.imported.ref_map.get(&self.peer) {
|
||||
let ws = Arc::clone(ws); // cloned to release the borrow to permit the release
|
||||
tr.membranes.imported.release(&ws);
|
||||
}
|
||||
tr.membranes.release(std::mem::take(&mut self.pins));
|
||||
dump_membranes!(tr.membranes);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
let k = t.create(SyncPeer {
|
||||
relay_ref: Arc::clone(&self.self_ref),
|
||||
peer: Arc::clone(&peer),
|
||||
pins,
|
||||
});
|
||||
t.sync(&peer.underlying, k);
|
||||
}
|
||||
|
@ -334,83 +391,79 @@ impl TunnelRelay {
|
|||
}
|
||||
}
|
||||
|
||||
fn handle_outbound_event(&mut self, t: &mut Activation, event: P::Event) -> Result<P::Event, Error> {
|
||||
match &event {
|
||||
fn outbound_event_bookkeeping(
|
||||
&mut self,
|
||||
t: &mut Activation,
|
||||
remote_oid: sturdy::Oid,
|
||||
event: &P::Event,
|
||||
) -> ActorResult {
|
||||
match event {
|
||||
P::Event::Assert(b) => {
|
||||
let P::Assert { assertion: P::Assertion(a), handle } = &**b;
|
||||
let mut outbound = Vec::new();
|
||||
let target_ws = self.membranes.imported.oid_map.get(&remote_oid).map(Arc::clone)
|
||||
.expect("RelayEntity to have a valid oid");
|
||||
target_ws.inc_ref(); // encoding won't do this; target oid is syntactically special
|
||||
let mut pins = vec![target_ws];
|
||||
a.foreach_embedded::<_, Error>(
|
||||
&mut |r| Ok(outbound.push(self.membranes.export_ref(Arc::clone(r), true))))?;
|
||||
self.outbound_assertions.insert(handle.clone(), outbound);
|
||||
&mut |r| Ok(pins.push(self.membranes.lookup_ref(r))))?;
|
||||
self.outbound_assertions.insert(handle.clone(), pins);
|
||||
dump_membranes!(self.membranes);
|
||||
}
|
||||
P::Event::Retract(b) => {
|
||||
let P::Retract { handle } = &**b;
|
||||
if let Some(outbound) = self.outbound_assertions.remove(handle) {
|
||||
for ws in outbound.into_iter() {
|
||||
self.membranes.exported.release(&ws);
|
||||
}
|
||||
if let Some(pins) = self.outbound_assertions.remove(handle) {
|
||||
self.membranes.release(pins);
|
||||
}
|
||||
dump_membranes!(self.membranes);
|
||||
}
|
||||
P::Event::Message(b) => {
|
||||
let P::Message { body: P::Assertion(a) } = &**b;
|
||||
a.foreach_embedded(&mut |r| {
|
||||
let ws = self.membranes.export_ref(Arc::clone(r), false);
|
||||
match ws.ref_count.load(Ordering::SeqCst) {
|
||||
0 => Err(error("Cannot send transient reference", AnyValue::new(false))),
|
||||
_ => Ok(())
|
||||
let ws = self.membranes.lookup_ref(r);
|
||||
if self.membranes.release_one(ws) { // undo the inc_ref from encoding
|
||||
Err(error("Sent transient reference", AnyValue::new(false)))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
})?;
|
||||
dump_membranes!(self.membranes);
|
||||
},
|
||||
P::Event::Sync(_b) => panic!("TODO not yet implemented"),
|
||||
}
|
||||
Ok(event)
|
||||
}
|
||||
|
||||
fn encode_packet(&mut self, p: P::Packet) -> Result<Vec<u8>, Error> {
|
||||
let item = AnyValue::from(&p);
|
||||
// tracing::trace!(packet = ?item, "<--");
|
||||
if self.output_text {
|
||||
let mut s = TextWriter::encode::<_, AnyValue, _>(&mut self.membranes, &item)?;
|
||||
s.push('\n');
|
||||
Ok(s.into_bytes())
|
||||
} else {
|
||||
Ok(PackedWriter::encode::<_, AnyValue, _>(&mut self.membranes, &item)?)
|
||||
P::Event::Sync(_b) =>
|
||||
todo!(),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn send_packet(&mut self, account: &Arc<Account>, cost: usize, p: P::Packet) -> ActorResult {
|
||||
let bs = self.encode_packet(p)?;
|
||||
let item = AnyValue::from(&p);
|
||||
tracing::trace!(packet = ?item, "<--");
|
||||
|
||||
let bs = if self.output_text {
|
||||
let mut s = TextWriter::encode::<_, AnyValue, _>(&mut self.membranes, &item)?;
|
||||
s.push('\n');
|
||||
s.into_bytes()
|
||||
} else {
|
||||
PackedWriter::encode::<_, AnyValue, _>(&mut self.membranes, &item)?
|
||||
};
|
||||
|
||||
let _ = self.output.send(LoanedItem::new(account, cost, bs));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn send_event(&mut self, t: &mut Activation, oid: sturdy::Oid, event: P::Event) -> ActorResult {
|
||||
pub fn send_event(&mut self, t: &mut Activation, remote_oid: sturdy::Oid, event: P::Event) -> ActorResult {
|
||||
if self.pending_outbound.is_empty() {
|
||||
t.message_for_myself(&self.self_entity, ());
|
||||
}
|
||||
let turn_event = P::TurnEvent {
|
||||
oid: P::Oid(oid.0),
|
||||
event: self.handle_outbound_event(t, event)?,
|
||||
};
|
||||
self.pending_outbound.push(turn_event);
|
||||
self.pending_outbound.push(P::TurnEvent { oid: P::Oid(remote_oid.0), event });
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Membranes {
|
||||
fn export_ref(&mut self, obj: Arc<Cap>, and_acquire: bool) -> Arc<WireSymbol> {
|
||||
let ws = match self.exported.ref_map.get(&obj) {
|
||||
None => {
|
||||
let oid = sturdy::Oid(SignedInteger::from(self.next_export_oid as u128));
|
||||
self.next_export_oid += 1;
|
||||
self.exported.insert(oid, obj)
|
||||
}
|
||||
Some(ws) => Arc::clone(ws)
|
||||
};
|
||||
if and_acquire {
|
||||
ws.acquire();
|
||||
}
|
||||
ws
|
||||
fn export_ref(&mut self, obj: Arc<Cap>) -> Arc<WireSymbol> {
|
||||
let oid = sturdy::Oid(SignedInteger::from(self.next_export_oid as u128));
|
||||
self.next_export_oid += 1;
|
||||
self.exported.insert(oid, obj)
|
||||
}
|
||||
|
||||
fn import_oid(
|
||||
|
@ -423,6 +476,39 @@ impl Membranes {
|
|||
self.imported.insert(oid, Cap::new(&obj))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn lookup_ref(&mut self, r: &Arc<Cap>) -> Arc<WireSymbol> {
|
||||
self.imported.ref_map.get(r).or_else(|| self.exported.ref_map.get(r)).map(Arc::clone)
|
||||
.expect("WireSymbol must be present at lookup_ref() time")
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn membrane(&mut self, side: WireSymbolSide) -> &mut Membrane {
|
||||
match side {
|
||||
WireSymbolSide::Imported => &mut self.imported,
|
||||
WireSymbolSide::Exported => &mut self.exported,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn release_one(&mut self, ws: Arc<WireSymbol>) -> bool {
|
||||
if ws.dec_ref() {
|
||||
let membrane = self.membrane(ws.side);
|
||||
membrane.oid_map.remove(&ws.oid);
|
||||
membrane.ref_map.remove(&ws.obj);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn release<I: IntoIterator<Item = Arc<WireSymbol>>>(&mut self, wss: I) {
|
||||
for ws in wss {
|
||||
self.release_one(ws);
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_embedded<'de, 'src, S: BinarySource<'de>>(
|
||||
&mut self,
|
||||
t: &mut Activation,
|
||||
|
@ -430,47 +516,44 @@ impl Membranes {
|
|||
src: &'src mut S,
|
||||
_read_annotations: bool,
|
||||
) -> io::Result<P::_Ptr> {
|
||||
match sturdy::WireRef::deserialize(&mut src.packed(NoEmbeddedDomainCodec))? {
|
||||
let ws = match sturdy::WireRef::deserialize(&mut src.packed(NoEmbeddedDomainCodec))? {
|
||||
sturdy::WireRef::Mine{ oid: b } => {
|
||||
let oid = *b;
|
||||
match self.imported.oid_map.get(&oid) {
|
||||
Some(ws) => Ok(Arc::clone(&ws.obj)),
|
||||
None => Ok(Arc::clone(&self.import_oid(t, relay_ref, oid).obj)),
|
||||
}
|
||||
self.imported.oid_map.get(&oid).map(Arc::clone)
|
||||
.unwrap_or_else(|| self.import_oid(t, relay_ref, oid))
|
||||
}
|
||||
sturdy::WireRef::Yours { oid: b, attenuation } => {
|
||||
let oid = *b;
|
||||
match self.exported.oid_map.get(&oid) {
|
||||
Some(ws) => {
|
||||
if attenuation.is_empty() {
|
||||
Ok(Arc::clone(&ws.obj))
|
||||
} else {
|
||||
Ok(ws.obj.attenuate(&sturdy::Attenuation(attenuation))
|
||||
.map_err(|e| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
format!("Invalid capability attenuation: {:?}", e))
|
||||
})?)
|
||||
if attenuation.is_empty() {
|
||||
self.exported.oid_map.get(&oid).map(Arc::clone).unwrap_or_else(
|
||||
|| self.exported.insert_inert_entity(t, oid))
|
||||
} else {
|
||||
match self.exported.oid_map.get(&oid) {
|
||||
None => self.exported.insert_inert_entity(t, oid),
|
||||
Some(ws) => {
|
||||
let attenuated_obj = ws.obj.attenuate(&sturdy::Attenuation(attenuation))
|
||||
.map_err(|e| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
format!("Invalid capability attenuation: {:?}", e))
|
||||
})?;
|
||||
self.exported.insert(oid, attenuated_obj)
|
||||
}
|
||||
}
|
||||
None => Ok(Cap::new(&t.inert_entity())),
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
Ok(Arc::clone(&ws.inc_ref().obj))
|
||||
}
|
||||
}
|
||||
|
||||
struct ActivatedMembranes<'a, 'activation, 'm>(&'a mut Activation<'activation>,
|
||||
&'m TunnelRelayRef,
|
||||
&'m mut Membranes);
|
||||
|
||||
impl<'a, 'activation, 'm> DomainDecode<P::_Ptr> for ActivatedMembranes<'a, 'activation, 'm> {
|
||||
fn decode_embedded<'de, 'src, S: BinarySource<'de>>(
|
||||
&mut self,
|
||||
src: &'src mut S,
|
||||
read_annotations: bool,
|
||||
) -> io::Result<P::_Ptr> {
|
||||
self.2.decode_embedded(self.0, self.1, src, read_annotations)
|
||||
self.membranes.decode_embedded(self.turn, self.tr_ref, src, read_annotations)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -482,13 +565,13 @@ impl DomainEncode<P::_Ptr> for Membranes {
|
|||
) -> io::Result<()> {
|
||||
w.write(&mut NoEmbeddedDomainCodec, &AnyValue::from(&match self.exported.ref_map.get(d) {
|
||||
Some(ws) => sturdy::WireRef::Mine {
|
||||
oid: Box::new(ws.oid.clone()),
|
||||
oid: Box::new(ws.inc_ref().oid.clone()),
|
||||
},
|
||||
None => match self.imported.ref_map.get(d) {
|
||||
Some(ws) => {
|
||||
if d.attenuation.is_empty() {
|
||||
sturdy::WireRef::Yours {
|
||||
oid: Box::new(ws.oid.clone()),
|
||||
oid: Box::new(ws.inc_ref().oid.clone()),
|
||||
attenuation: vec![],
|
||||
}
|
||||
} else {
|
||||
|
@ -496,13 +579,13 @@ impl DomainEncode<P::_Ptr> for Membranes {
|
|||
// which case we can return sturdy::WireRef::Yours with an attenuation
|
||||
// attached here, but for now we don't.
|
||||
sturdy::WireRef::Mine {
|
||||
oid: Box::new(self.export_ref(Arc::clone(d), false).oid.clone()),
|
||||
oid: Box::new(self.export_ref(Arc::clone(d)).inc_ref().oid.clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
None =>
|
||||
sturdy::WireRef::Mine {
|
||||
oid: Box::new(self.export_ref(Arc::clone(d), false).oid.clone()),
|
||||
oid: Box::new(self.export_ref(Arc::clone(d)).inc_ref().oid.clone()),
|
||||
},
|
||||
}
|
||||
}))
|
||||
|
@ -583,7 +666,11 @@ impl Entity<()> for TunnelRefEntity {
|
|||
let mut g = self.relay_ref.lock().expect("unpoisoned");
|
||||
let tr = g.as_mut().expect("initialized");
|
||||
let events = std::mem::take(&mut tr.pending_outbound);
|
||||
tr.send_packet(&t.account(), events.len(), P::Packet::Turn(Box::new(P::Turn(events))))
|
||||
tr.send_packet(&t.account(), events.len(), P::Packet::Turn(Box::new(P::Turn(events.clone()))))?;
|
||||
for P::TurnEvent { oid, event } in events.into_iter() {
|
||||
tr.outbound_event_bookkeeping(t, sturdy::Oid(oid.0), &event)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult {
|
||||
|
|
Loading…
Reference in New Issue