diff --git a/syndicate/src/relay.rs b/syndicate/src/relay.rs index cebe515..06b1e4d 100644 --- a/syndicate/src/relay.rs +++ b/syndicate/src/relay.rs @@ -46,17 +46,26 @@ use tokio::io::AsyncWriteExt; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver}; +#[derive(Debug, Clone, Copy)] +enum WireSymbolSide { + Imported, + Exported, +} + struct WireSymbol { oid: sturdy::Oid, obj: Arc, ref_count: AtomicUsize, + side: WireSymbolSide, } struct Membrane { + side: WireSymbolSide, oid_map: Map>, ref_map: Map, Arc>, } +#[derive(Debug)] struct Membranes { exported: Membrane, imported: Membrane, @@ -97,21 +106,49 @@ struct TunnelRefEntity { relay_ref: TunnelRelayRef, } +struct ActivatedMembranes<'a, 'activation, 'm> { + turn: &'a mut Activation<'activation>, + tr_ref: &'m TunnelRelayRef, + membranes: &'m mut Membranes, +} + //--------------------------------------------------------------------------- impl WireSymbol { - fn acquire(&self) { + #[inline] + fn inc_ref<'a>(self: &'a Arc) -> &'a Arc { self.ref_count.fetch_add(1, Ordering::SeqCst); + tracing::trace!(?self, "after inc_ref"); + self } - fn release(&self) -> bool { - self.ref_count.fetch_sub(1, Ordering::SeqCst) == 1 + #[inline] + fn dec_ref(&self) -> bool { + let old_count = self.ref_count.fetch_sub(1, Ordering::SeqCst); + tracing::trace!(?self, "after dec_ref"); + old_count == 1 + } + + #[inline] + fn current_ref_count(&self) -> usize { + self.ref_count.load(Ordering::SeqCst) + } +} + +impl std::fmt::Debug for WireSymbol { + fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { + write!(f, "#", + self.side, + self.oid.0, + self.obj, + self.current_ref_count()) } } impl Membrane { - fn new() -> Self { + fn new(side: WireSymbolSide) -> Self { Membrane { + side, oid_map: Map::new(), ref_map: Map::new(), } @@ -122,23 +159,15 @@ impl Membrane { oid: oid.clone(), obj: Arc::clone(&obj), ref_count: AtomicUsize::new(0), + side: self.side, }); self.oid_map.insert(oid, Arc::clone(&ws)); self.ref_map.insert(obj, Arc::clone(&ws)); ws } - fn acquire(&mut self, r: &Arc) -> Arc { - let ws = self.ref_map.get(r).expect("WireSymbol must be present at acquire() time"); - ws.acquire(); - Arc::clone(ws) - } - - fn release(&mut self, ws: &Arc) { - if ws.release() { - self.oid_map.remove(&ws.oid); - self.ref_map.remove(&ws.obj); - } + fn insert_inert_entity(&mut self, t: &mut Activation, oid: sturdy::Oid) -> Arc { + self.insert(oid, Cap::new(&t.inert_entity())) } } @@ -168,6 +197,18 @@ pub fn connect_stream( }); } +impl std::fmt::Debug for Membrane { + fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { + f.debug_struct("Membrane") + .field("side", &self.side) + .field("refs", &self.oid_map.values()) + .finish() + } +} + +macro_rules! dump_membranes { ($e:expr) => { tracing::trace!("membranes: {:#?}", $e); } } +// macro_rules! dump_membranes { ($e:expr) => { (); } } + impl TunnelRelay { pub fn run( t: &mut Activation, @@ -188,18 +229,19 @@ impl TunnelRelay { inbound_assertions: Map::new(), outbound_assertions: Map::new(), membranes: Membranes { - exported: Membrane::new(), - imported: Membrane::new(), + exported: Membrane::new(WireSymbolSide::Exported), + imported: Membrane::new(WireSymbolSide::Imported), next_export_oid: 0, }, pending_outbound: Vec::new(), self_entity: self_entity.clone(), }; if let Some(ir) = initial_ref { - tr.membranes.export_ref(ir, true); + tr.membranes.export_ref(ir).inc_ref(); } let result = initial_oid.map( - |io| Arc::clone(&tr.membranes.import_oid(t, &tr_ref, io).obj)); + |io| Arc::clone(&tr.membranes.import_oid(t, &tr_ref, io).inc_ref().obj)); + dump_membranes!(tr.membranes); *tr_ref.lock().unwrap() = Some(tr); t.linked_task(crate::name!("writer"), output_loop(o, output_rx)); t.linked_task(crate::name!("reader"), input_loop(t.facet.clone(), i, tr_ref)); @@ -209,7 +251,11 @@ impl TunnelRelay { fn deserialize_one(&mut self, t: &mut Activation, bs: &[u8]) -> (Result, usize) { let mut src = BytesBinarySource::new(&bs); - let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes); + let mut dec = ActivatedMembranes { + turn: t, + tr_ref: &self.self_ref, + membranes: &mut self.membranes, + }; match src.peek() { Ok(v) => if v >= 128 { self.output_text = false; @@ -248,7 +294,7 @@ impl TunnelRelay { } fn handle_inbound_packet(&mut self, t: &mut Activation, p: P::Packet) -> ActorResult { - // tracing::trace!(packet = ?p, "-->"); + tracing::trace!(packet = ?p, "-->"); match p { P::Packet::Error(b) => { tracing::info!(message = ?b.message.clone(), @@ -260,69 +306,80 @@ impl TunnelRelay { let P::Turn(events) = *b; for P::TurnEvent { oid, event } in events { let target = match self.membranes.exported.oid_map.get(&sturdy::Oid(oid.0.clone())) { - Some(ws) => &ws.obj, - None => return Err(error("Cannot deliver event: nonexistent oid", - AnyValue::from(&P::TurnEvent { oid, event }))), + Some(ws) => + ws.inc_ref(), + None => { + tracing::warn!( + event = ?AnyValue::from(&P::TurnEvent { oid, event }), + "Cannot deliver event: nonexistent oid"); + return Ok(()); + } }; + let mut pins = vec![target.clone()]; + let target = Arc::clone(&target.obj); match event { P::Event::Assert(b) => { let P::Assert { assertion: P::Assertion(a), handle: remote_handle } = *b; - let mut imported = vec![]; - let imported_membrane = &mut self.membranes.imported; - a.foreach_embedded::<_, Error>(&mut |r| { - Ok(imported.push(imported_membrane.acquire(r))) - })?; + a.foreach_embedded::<_, Error>( + &mut |r| Ok(pins.push(self.membranes.lookup_ref(r))))?; if let Some(local_handle) = target.assert(t, a) { - if let Some(_) = self.inbound_assertions.insert(remote_handle, (local_handle, imported)) { + if let Some(_) = self.inbound_assertions.insert(remote_handle, (local_handle, pins)) { return Err(error("Assertion with duplicate handle", AnyValue::new(false))); } + } else { + self.membranes.release(pins); } + dump_membranes!(self.membranes); } P::Event::Retract(b) => { let P::Retract { handle: remote_handle } = *b; - let (local_handle, imported) = match self.inbound_assertions.remove(&remote_handle) { + let (local_handle, previous_pins) = match self.inbound_assertions.remove(&remote_handle) { None => return Err(error("Retraction of nonexistent handle", AnyValue::from(&remote_handle))), Some(wss) => wss, }; - for ws in imported.into_iter() { - self.membranes.imported.release(&ws); - } + self.membranes.release(previous_pins); + self.membranes.release(pins); t.retract(local_handle); + dump_membranes!(self.membranes); } P::Event::Message(b) => { let P::Message { body: P::Assertion(a) } = *b; - let imported_membrane = &mut self.membranes.imported; a.foreach_embedded(&mut |r| { - let ws = imported_membrane.acquire(r); - match ws.ref_count.load(Ordering::SeqCst) { + let ws = self.membranes.lookup_ref(r); + let rc = ws.current_ref_count(); + pins.push(ws); + match rc { 1 => Err(error("Cannot receive transient reference", AnyValue::new(false))), _ => Ok(()) } })?; target.message(t, a); + self.membranes.release(pins); + dump_membranes!(self.membranes); } P::Event::Sync(b) => { let P::Sync { peer } = *b; - self.membranes.imported.acquire(&peer); + pins.push(self.membranes.lookup_ref(&peer)); + dump_membranes!(self.membranes); struct SyncPeer { relay_ref: TunnelRelayRef, peer: Arc, + pins: Vec>, } impl Entity for SyncPeer { fn message(&mut self, t: &mut Activation, _a: Synced) -> ActorResult { self.peer.message(t, AnyValue::new(true)); let mut g = self.relay_ref.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); - if let Some(ws) = tr.membranes.imported.ref_map.get(&self.peer) { - let ws = Arc::clone(ws); // cloned to release the borrow to permit the release - tr.membranes.imported.release(&ws); - } + tr.membranes.release(std::mem::take(&mut self.pins)); + dump_membranes!(tr.membranes); Ok(()) } } let k = t.create(SyncPeer { relay_ref: Arc::clone(&self.self_ref), peer: Arc::clone(&peer), + pins, }); t.sync(&peer.underlying, k); } @@ -334,83 +391,79 @@ impl TunnelRelay { } } - fn handle_outbound_event(&mut self, t: &mut Activation, event: P::Event) -> Result { - match &event { + fn outbound_event_bookkeeping( + &mut self, + t: &mut Activation, + remote_oid: sturdy::Oid, + event: &P::Event, + ) -> ActorResult { + match event { P::Event::Assert(b) => { let P::Assert { assertion: P::Assertion(a), handle } = &**b; - let mut outbound = Vec::new(); + let target_ws = self.membranes.imported.oid_map.get(&remote_oid).map(Arc::clone) + .expect("RelayEntity to have a valid oid"); + target_ws.inc_ref(); // encoding won't do this; target oid is syntactically special + let mut pins = vec![target_ws]; a.foreach_embedded::<_, Error>( - &mut |r| Ok(outbound.push(self.membranes.export_ref(Arc::clone(r), true))))?; - self.outbound_assertions.insert(handle.clone(), outbound); + &mut |r| Ok(pins.push(self.membranes.lookup_ref(r))))?; + self.outbound_assertions.insert(handle.clone(), pins); + dump_membranes!(self.membranes); } P::Event::Retract(b) => { let P::Retract { handle } = &**b; - if let Some(outbound) = self.outbound_assertions.remove(handle) { - for ws in outbound.into_iter() { - self.membranes.exported.release(&ws); - } + if let Some(pins) = self.outbound_assertions.remove(handle) { + self.membranes.release(pins); } + dump_membranes!(self.membranes); } P::Event::Message(b) => { let P::Message { body: P::Assertion(a) } = &**b; a.foreach_embedded(&mut |r| { - let ws = self.membranes.export_ref(Arc::clone(r), false); - match ws.ref_count.load(Ordering::SeqCst) { - 0 => Err(error("Cannot send transient reference", AnyValue::new(false))), - _ => Ok(()) + let ws = self.membranes.lookup_ref(r); + if self.membranes.release_one(ws) { // undo the inc_ref from encoding + Err(error("Sent transient reference", AnyValue::new(false))) + } else { + Ok(()) } })?; + dump_membranes!(self.membranes); }, - P::Event::Sync(_b) => panic!("TODO not yet implemented"), - } - Ok(event) - } - - fn encode_packet(&mut self, p: P::Packet) -> Result, Error> { - let item = AnyValue::from(&p); - // tracing::trace!(packet = ?item, "<--"); - if self.output_text { - let mut s = TextWriter::encode::<_, AnyValue, _>(&mut self.membranes, &item)?; - s.push('\n'); - Ok(s.into_bytes()) - } else { - Ok(PackedWriter::encode::<_, AnyValue, _>(&mut self.membranes, &item)?) + P::Event::Sync(_b) => + todo!(), } + Ok(()) } pub fn send_packet(&mut self, account: &Arc, cost: usize, p: P::Packet) -> ActorResult { - let bs = self.encode_packet(p)?; + let item = AnyValue::from(&p); + tracing::trace!(packet = ?item, "<--"); + + let bs = if self.output_text { + let mut s = TextWriter::encode::<_, AnyValue, _>(&mut self.membranes, &item)?; + s.push('\n'); + s.into_bytes() + } else { + PackedWriter::encode::<_, AnyValue, _>(&mut self.membranes, &item)? + }; + let _ = self.output.send(LoanedItem::new(account, cost, bs)); Ok(()) } - pub fn send_event(&mut self, t: &mut Activation, oid: sturdy::Oid, event: P::Event) -> ActorResult { + pub fn send_event(&mut self, t: &mut Activation, remote_oid: sturdy::Oid, event: P::Event) -> ActorResult { if self.pending_outbound.is_empty() { t.message_for_myself(&self.self_entity, ()); } - let turn_event = P::TurnEvent { - oid: P::Oid(oid.0), - event: self.handle_outbound_event(t, event)?, - }; - self.pending_outbound.push(turn_event); + self.pending_outbound.push(P::TurnEvent { oid: P::Oid(remote_oid.0), event }); Ok(()) } } impl Membranes { - fn export_ref(&mut self, obj: Arc, and_acquire: bool) -> Arc { - let ws = match self.exported.ref_map.get(&obj) { - None => { - let oid = sturdy::Oid(SignedInteger::from(self.next_export_oid as u128)); - self.next_export_oid += 1; - self.exported.insert(oid, obj) - } - Some(ws) => Arc::clone(ws) - }; - if and_acquire { - ws.acquire(); - } - ws + fn export_ref(&mut self, obj: Arc) -> Arc { + let oid = sturdy::Oid(SignedInteger::from(self.next_export_oid as u128)); + self.next_export_oid += 1; + self.exported.insert(oid, obj) } fn import_oid( @@ -423,6 +476,39 @@ impl Membranes { self.imported.insert(oid, Cap::new(&obj)) } + #[inline] + fn lookup_ref(&mut self, r: &Arc) -> Arc { + self.imported.ref_map.get(r).or_else(|| self.exported.ref_map.get(r)).map(Arc::clone) + .expect("WireSymbol must be present at lookup_ref() time") + } + + #[inline] + fn membrane(&mut self, side: WireSymbolSide) -> &mut Membrane { + match side { + WireSymbolSide::Imported => &mut self.imported, + WireSymbolSide::Exported => &mut self.exported, + } + } + + #[inline] + fn release_one(&mut self, ws: Arc) -> bool { + if ws.dec_ref() { + let membrane = self.membrane(ws.side); + membrane.oid_map.remove(&ws.oid); + membrane.ref_map.remove(&ws.obj); + true + } else { + false + } + } + + #[inline] + fn release>>(&mut self, wss: I) { + for ws in wss { + self.release_one(ws); + } + } + fn decode_embedded<'de, 'src, S: BinarySource<'de>>( &mut self, t: &mut Activation, @@ -430,47 +516,44 @@ impl Membranes { src: &'src mut S, _read_annotations: bool, ) -> io::Result { - match sturdy::WireRef::deserialize(&mut src.packed(NoEmbeddedDomainCodec))? { + let ws = match sturdy::WireRef::deserialize(&mut src.packed(NoEmbeddedDomainCodec))? { sturdy::WireRef::Mine{ oid: b } => { let oid = *b; - match self.imported.oid_map.get(&oid) { - Some(ws) => Ok(Arc::clone(&ws.obj)), - None => Ok(Arc::clone(&self.import_oid(t, relay_ref, oid).obj)), - } + self.imported.oid_map.get(&oid).map(Arc::clone) + .unwrap_or_else(|| self.import_oid(t, relay_ref, oid)) } sturdy::WireRef::Yours { oid: b, attenuation } => { let oid = *b; - match self.exported.oid_map.get(&oid) { - Some(ws) => { - if attenuation.is_empty() { - Ok(Arc::clone(&ws.obj)) - } else { - Ok(ws.obj.attenuate(&sturdy::Attenuation(attenuation)) - .map_err(|e| { - io::Error::new( - io::ErrorKind::InvalidInput, - format!("Invalid capability attenuation: {:?}", e)) - })?) + if attenuation.is_empty() { + self.exported.oid_map.get(&oid).map(Arc::clone).unwrap_or_else( + || self.exported.insert_inert_entity(t, oid)) + } else { + match self.exported.oid_map.get(&oid) { + None => self.exported.insert_inert_entity(t, oid), + Some(ws) => { + let attenuated_obj = ws.obj.attenuate(&sturdy::Attenuation(attenuation)) + .map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidInput, + format!("Invalid capability attenuation: {:?}", e)) + })?; + self.exported.insert(oid, attenuated_obj) } } - None => Ok(Cap::new(&t.inert_entity())), } } - } + }; + Ok(Arc::clone(&ws.inc_ref().obj)) } } -struct ActivatedMembranes<'a, 'activation, 'm>(&'a mut Activation<'activation>, - &'m TunnelRelayRef, - &'m mut Membranes); - impl<'a, 'activation, 'm> DomainDecode for ActivatedMembranes<'a, 'activation, 'm> { fn decode_embedded<'de, 'src, S: BinarySource<'de>>( &mut self, src: &'src mut S, read_annotations: bool, ) -> io::Result { - self.2.decode_embedded(self.0, self.1, src, read_annotations) + self.membranes.decode_embedded(self.turn, self.tr_ref, src, read_annotations) } } @@ -482,13 +565,13 @@ impl DomainEncode for Membranes { ) -> io::Result<()> { w.write(&mut NoEmbeddedDomainCodec, &AnyValue::from(&match self.exported.ref_map.get(d) { Some(ws) => sturdy::WireRef::Mine { - oid: Box::new(ws.oid.clone()), + oid: Box::new(ws.inc_ref().oid.clone()), }, None => match self.imported.ref_map.get(d) { Some(ws) => { if d.attenuation.is_empty() { sturdy::WireRef::Yours { - oid: Box::new(ws.oid.clone()), + oid: Box::new(ws.inc_ref().oid.clone()), attenuation: vec![], } } else { @@ -496,13 +579,13 @@ impl DomainEncode for Membranes { // which case we can return sturdy::WireRef::Yours with an attenuation // attached here, but for now we don't. sturdy::WireRef::Mine { - oid: Box::new(self.export_ref(Arc::clone(d), false).oid.clone()), + oid: Box::new(self.export_ref(Arc::clone(d)).inc_ref().oid.clone()), } } } None => sturdy::WireRef::Mine { - oid: Box::new(self.export_ref(Arc::clone(d), false).oid.clone()), + oid: Box::new(self.export_ref(Arc::clone(d)).inc_ref().oid.clone()), }, } })) @@ -583,7 +666,11 @@ impl Entity<()> for TunnelRefEntity { let mut g = self.relay_ref.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); let events = std::mem::take(&mut tr.pending_outbound); - tr.send_packet(&t.account(), events.len(), P::Packet::Turn(Box::new(P::Turn(events)))) + tr.send_packet(&t.account(), events.len(), P::Packet::Turn(Box::new(P::Turn(events.clone()))))?; + for P::TurnEvent { oid, event } in events.into_iter() { + tr.outbound_event_bookkeeping(t, sturdy::Oid(oid.0), &event)?; + } + Ok(()) } fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc) -> ActorResult {