Compare commits
1 Commits
main
...
relay-exte
Author | SHA1 | Date |
---|---|---|
Tony Garnock-Jones | e2fb023393 |
|
@ -6,7 +6,7 @@ use crate::during;
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::error::error;
|
use crate::error::error;
|
||||||
use crate::schemas::gatekeeper;
|
use crate::schemas::gatekeeper;
|
||||||
use crate::schemas::internal_protocol as P;
|
use crate::schemas::external_protocol as P;
|
||||||
use crate::schemas::sturdy;
|
use crate::schemas::sturdy;
|
||||||
|
|
||||||
use futures::Sink;
|
use futures::Sink;
|
||||||
|
@ -24,10 +24,9 @@ use preserves::value::IOValue;
|
||||||
use preserves::value::Map;
|
use preserves::value::Map;
|
||||||
use preserves::value::NestedValue;
|
use preserves::value::NestedValue;
|
||||||
use preserves::value::NoEmbeddedDomainCodec;
|
use preserves::value::NoEmbeddedDomainCodec;
|
||||||
use preserves::value::PackedReader;
|
|
||||||
use preserves::value::PackedWriter;
|
use preserves::value::PackedWriter;
|
||||||
use preserves::value::Reader;
|
|
||||||
use preserves::value::TextWriter;
|
use preserves::value::TextWriter;
|
||||||
|
use preserves::value::Value;
|
||||||
use preserves::value::ViaCodec;
|
use preserves::value::ViaCodec;
|
||||||
use preserves::value::Writer;
|
use preserves::value::Writer;
|
||||||
use preserves::value::signed_integer::SignedInteger;
|
use preserves::value::signed_integer::SignedInteger;
|
||||||
|
@ -35,7 +34,6 @@ use preserves::value::signed_integer::SignedInteger;
|
||||||
use preserves_schema::support::Deserialize;
|
use preserves_schema::support::Deserialize;
|
||||||
use preserves_schema::support::ParseError;
|
use preserves_schema::support::ParseError;
|
||||||
|
|
||||||
use std::convert::TryFrom;
|
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
@ -50,23 +48,34 @@ use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver};
|
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
enum WireSymbolSide {
|
||||||
|
Imported,
|
||||||
|
Exported,
|
||||||
|
}
|
||||||
|
|
||||||
struct WireSymbol {
|
struct WireSymbol {
|
||||||
oid: sturdy::Oid,
|
oid: sturdy::Oid,
|
||||||
obj: Arc<Cap>,
|
obj: Arc<Cap>,
|
||||||
ref_count: AtomicUsize,
|
ref_count: AtomicUsize,
|
||||||
|
side: WireSymbolSide,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Membrane {
|
struct Membrane {
|
||||||
|
side: WireSymbolSide,
|
||||||
oid_map: Map<sturdy::Oid, Arc<WireSymbol>>,
|
oid_map: Map<sturdy::Oid, Arc<WireSymbol>>,
|
||||||
ref_map: Map<Arc<Cap>, Arc<WireSymbol>>,
|
ref_map: Map<Arc<Cap>, Arc<WireSymbol>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
struct Membranes {
|
struct Membranes {
|
||||||
exported: Membrane,
|
exported: Membrane,
|
||||||
imported: Membrane,
|
imported: Membrane,
|
||||||
next_export_oid: usize,
|
next_export_oid: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct WireRefCodec;
|
||||||
|
|
||||||
pub enum Input {
|
pub enum Input {
|
||||||
Packets(Pin<Box<dyn Stream<Item = Result<Vec<u8>, Error>> + Send>>),
|
Packets(Pin<Box<dyn Stream<Item = Result<Vec<u8>, Error>> + Send>>),
|
||||||
Bytes(Pin<Box<dyn AsyncRead + Send>>),
|
Bytes(Pin<Box<dyn AsyncRead + Send>>),
|
||||||
|
@ -101,21 +110,46 @@ struct TunnelRefEntity {
|
||||||
relay_ref: TunnelRelayRef,
|
relay_ref: TunnelRelayRef,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Pins<'a> = &'a mut Vec<Arc<WireSymbol>>;
|
||||||
|
|
||||||
//---------------------------------------------------------------------------
|
//---------------------------------------------------------------------------
|
||||||
|
|
||||||
impl WireSymbol {
|
impl WireSymbol {
|
||||||
fn acquire(&self) {
|
#[inline]
|
||||||
|
fn inc_ref<'a>(self: &'a Arc<Self>, pins: Pins) -> &'a Arc<Self> {
|
||||||
self.ref_count.fetch_add(1, Ordering::SeqCst);
|
self.ref_count.fetch_add(1, Ordering::SeqCst);
|
||||||
|
pins.push(Arc::clone(&self));
|
||||||
|
tracing::trace!(?self, "acquire");
|
||||||
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
fn release(&self) -> bool {
|
#[inline]
|
||||||
self.ref_count.fetch_sub(1, Ordering::SeqCst) == 1
|
fn dec_ref(&self) -> bool {
|
||||||
|
let old_count = self.ref_count.fetch_sub(1, Ordering::SeqCst);
|
||||||
|
tracing::trace!(?self, "release");
|
||||||
|
old_count == 1
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn current_ref_count(&self) -> usize {
|
||||||
|
self.ref_count.load(Ordering::SeqCst)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Debug for WireSymbol {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
|
||||||
|
write!(f, "#<WireSymbol oid={:?}:{} obj={:?} ref_count={}>",
|
||||||
|
self.side,
|
||||||
|
self.oid.0,
|
||||||
|
self.obj,
|
||||||
|
self.current_ref_count())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Membrane {
|
impl Membrane {
|
||||||
fn new() -> Self {
|
fn new(side: WireSymbolSide) -> Self {
|
||||||
Membrane {
|
Membrane {
|
||||||
|
side,
|
||||||
oid_map: Map::new(),
|
oid_map: Map::new(),
|
||||||
ref_map: Map::new(),
|
ref_map: Map::new(),
|
||||||
}
|
}
|
||||||
|
@ -126,24 +160,83 @@ impl Membrane {
|
||||||
oid: oid.clone(),
|
oid: oid.clone(),
|
||||||
obj: Arc::clone(&obj),
|
obj: Arc::clone(&obj),
|
||||||
ref_count: AtomicUsize::new(0),
|
ref_count: AtomicUsize::new(0),
|
||||||
|
side: self.side,
|
||||||
});
|
});
|
||||||
self.oid_map.insert(oid, Arc::clone(&ws));
|
self.oid_map.insert(oid, Arc::clone(&ws));
|
||||||
self.ref_map.insert(obj, Arc::clone(&ws));
|
self.ref_map.insert(obj, Arc::clone(&ws));
|
||||||
ws
|
ws
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn acquire(&mut self, r: &Arc<Cap>) -> Arc<WireSymbol> {
|
impl std::fmt::Debug for Membrane {
|
||||||
let ws = self.ref_map.get(r).expect("WireSymbol must be present at acquire() time");
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
|
||||||
ws.acquire();
|
f.debug_struct("Membrane")
|
||||||
Arc::clone(ws)
|
.field("side", &self.side)
|
||||||
|
.field("refs", &self.oid_map.values())
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Membranes {
|
||||||
|
fn export_ref(&mut self, obj: Arc<Cap>) -> Arc<WireSymbol> {
|
||||||
|
let ws = match self.exported.ref_map.get(&obj) {
|
||||||
|
None => {
|
||||||
|
let oid = sturdy::Oid(SignedInteger::from(self.next_export_oid as u128));
|
||||||
|
self.next_export_oid += 1;
|
||||||
|
self.exported.insert(oid, obj)
|
||||||
|
}
|
||||||
|
Some(ws) => Arc::clone(ws)
|
||||||
|
};
|
||||||
|
ws
|
||||||
}
|
}
|
||||||
|
|
||||||
fn release(&mut self, ws: &Arc<WireSymbol>) {
|
fn import_oid(
|
||||||
if ws.release() {
|
&mut self,
|
||||||
self.oid_map.remove(&ws.oid);
|
t: &mut Activation,
|
||||||
self.ref_map.remove(&ws.obj);
|
relay_ref: &TunnelRelayRef,
|
||||||
|
oid: sturdy::Oid,
|
||||||
|
) -> Arc<WireSymbol> {
|
||||||
|
let obj = t.create(RelayEntity { relay_ref: Arc::clone(relay_ref), oid: oid.clone() });
|
||||||
|
self.imported.insert(oid, Cap::new(&obj))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn membrane(&mut self, side: WireSymbolSide) -> &mut Membrane {
|
||||||
|
match side {
|
||||||
|
WireSymbolSide::Imported => &mut self.imported,
|
||||||
|
WireSymbolSide::Exported => &mut self.exported,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn release<I: IntoIterator<Item = Arc<WireSymbol>>>(&mut self, wss: I) {
|
||||||
|
for ws in wss {
|
||||||
|
if ws.dec_ref() {
|
||||||
|
let membrane = self.membrane(ws.side);
|
||||||
|
membrane.oid_map.remove(&ws.oid);
|
||||||
|
membrane.ref_map.remove(&ws.obj);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DomainEncode<P::_Ptr> for WireRefCodec {
|
||||||
|
fn encode_embedded<W: Writer>(
|
||||||
|
&mut self,
|
||||||
|
w: &mut W,
|
||||||
|
d: &P::_Ptr,
|
||||||
|
) -> io::Result<()> {
|
||||||
|
w.write(&mut NoEmbeddedDomainCodec, &IOValue::from(&**d))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DomainDecode<P::_Ptr> for WireRefCodec {
|
||||||
|
fn decode_embedded<'de, 'src, S: BinarySource<'de>>(
|
||||||
|
&mut self,
|
||||||
|
src: &'src mut S,
|
||||||
|
_read_annotations: bool,
|
||||||
|
) -> io::Result<P::_Ptr> {
|
||||||
|
Ok(Arc::new(P::_Dom::deserialize(&mut src.packed(NoEmbeddedDomainCodec))?))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn connect_stream<I, O, E, F>(
|
pub fn connect_stream<I, O, E, F>(
|
||||||
|
@ -172,6 +265,9 @@ pub fn connect_stream<I, O, E, F>(
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// macro_rules! dump_membranes { ($e:expr) => { tracing::trace!("membranes: {:#?}", $e); } }
|
||||||
|
macro_rules! dump_membranes { ($e:expr) => { (); } }
|
||||||
|
|
||||||
impl TunnelRelay {
|
impl TunnelRelay {
|
||||||
pub fn run(
|
pub fn run(
|
||||||
t: &mut Activation,
|
t: &mut Activation,
|
||||||
|
@ -192,18 +288,19 @@ impl TunnelRelay {
|
||||||
inbound_assertions: Map::new(),
|
inbound_assertions: Map::new(),
|
||||||
outbound_assertions: Map::new(),
|
outbound_assertions: Map::new(),
|
||||||
membranes: Membranes {
|
membranes: Membranes {
|
||||||
exported: Membrane::new(),
|
exported: Membrane::new(WireSymbolSide::Exported),
|
||||||
imported: Membrane::new(),
|
imported: Membrane::new(WireSymbolSide::Imported),
|
||||||
next_export_oid: 0,
|
next_export_oid: 0,
|
||||||
},
|
},
|
||||||
pending_outbound: Vec::new(),
|
pending_outbound: Vec::new(),
|
||||||
self_entity: self_entity.clone(),
|
self_entity: self_entity.clone(),
|
||||||
};
|
};
|
||||||
if let Some(ir) = initial_ref {
|
if let Some(ir) = initial_ref {
|
||||||
tr.membranes.export_ref(ir, true);
|
tr.membranes.export_ref(ir).inc_ref(&mut vec![]);
|
||||||
}
|
}
|
||||||
let result = initial_oid.map(
|
let result = initial_oid.map(
|
||||||
|io| Arc::clone(&tr.membranes.import_oid(t, &tr_ref, io).obj));
|
|io| Arc::clone(&tr.membranes.import_oid(t, &tr_ref, io).obj));
|
||||||
|
dump_membranes!(tr.membranes);
|
||||||
*tr_ref.lock().unwrap() = Some(tr);
|
*tr_ref.lock().unwrap() = Some(tr);
|
||||||
t.linked_task(crate::name!("writer"), output_loop(o, output_rx));
|
t.linked_task(crate::name!("writer"), output_loop(o, output_rx));
|
||||||
t.linked_task(crate::name!("reader"), input_loop(t.facet.clone(), i, tr_ref));
|
t.linked_task(crate::name!("reader"), input_loop(t.facet.clone(), i, tr_ref));
|
||||||
|
@ -211,19 +308,18 @@ impl TunnelRelay {
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
fn deserialize_one(&mut self, t: &mut Activation, bs: &[u8]) -> (Result<P::Packet, ParseError>, usize) {
|
fn deserialize_one(&mut self, bs: &[u8]) -> (Result<P::Packet, ParseError>, usize) {
|
||||||
let mut src = BytesBinarySource::new(&bs);
|
let mut src = BytesBinarySource::new(&bs);
|
||||||
let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes);
|
|
||||||
match src.peek() {
|
match src.peek() {
|
||||||
Ok(v) => if v >= 128 {
|
Ok(v) => if v >= 128 {
|
||||||
self.output_text = false;
|
self.output_text = false;
|
||||||
let mut r = src.packed::<_, AnyValue, _>(&mut dec);
|
let mut r = src.packed(WireRefCodec);
|
||||||
let res = P::Packet::deserialize(&mut r);
|
let res = P::Packet::deserialize(&mut r);
|
||||||
(res, r.source.index)
|
(res, r.source.index)
|
||||||
} else {
|
} else {
|
||||||
self.output_text = true;
|
self.output_text = true;
|
||||||
let mut dec = ViaCodec::new(dec);
|
let mut dec = ViaCodec::new(WireRefCodec);
|
||||||
let mut r = src.text::<_, AnyValue, _>(&mut dec);
|
let mut r = src.text::<_, P::_Any, _>(&mut dec);
|
||||||
let res = P::Packet::deserialize(&mut r);
|
let res = P::Packet::deserialize(&mut r);
|
||||||
(res, r.source.index)
|
(res, r.source.index)
|
||||||
},
|
},
|
||||||
|
@ -232,17 +328,18 @@ impl TunnelRelay {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_inbound_datagram(&mut self, t: &mut Activation, bs: &[u8]) -> ActorResult {
|
fn handle_inbound_datagram(&mut self, t: &mut Activation, bs: &[u8]) -> ActorResult {
|
||||||
let item = self.deserialize_one(t, bs).0?;
|
let item = self.deserialize_one(bs).0?;
|
||||||
self.handle_inbound_packet(t, item)
|
self.handle_inbound_packet(t, item)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_inbound_stream(&mut self, t: &mut Activation, buf: &mut BytesMut) -> ActorResult {
|
fn handle_inbound_stream(&mut self, t: &mut Activation, buf: &mut BytesMut) -> ActorResult {
|
||||||
loop {
|
loop {
|
||||||
let (result, count) = self.deserialize_one(t, buf);
|
let (result, count) = self.deserialize_one(buf);
|
||||||
match result {
|
match result {
|
||||||
Err(ParseError::Preserves(PreservesError::Io(e)))
|
Err(ParseError::Preserves(PreservesError::Io(e)))
|
||||||
if is_eof_io_error(&e) => return Ok(()),
|
if is_eof_io_error(&e) => return Ok(()),
|
||||||
Err(e) => return Err(e)?,
|
Err(e) =>
|
||||||
|
return Err(e)?,
|
||||||
Ok(item) => {
|
Ok(item) => {
|
||||||
buf.advance(count);
|
buf.advance(count);
|
||||||
self.handle_inbound_packet(t, item)?;
|
self.handle_inbound_packet(t, item)?;
|
||||||
|
@ -251,82 +348,160 @@ impl TunnelRelay {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn import_wire_ref(&mut self, t: &mut Activation, d: &Arc<sturdy::WireRef>, pins: Pins) -> io::Result<Arc<Cap>> {
|
||||||
|
match &**d {
|
||||||
|
sturdy::WireRef::Mine { oid: b } => {
|
||||||
|
let oid = &**b;
|
||||||
|
let ws = match self.membranes.imported.oid_map.get(&oid) {
|
||||||
|
Some(ws) => Arc::clone(ws),
|
||||||
|
None => self.membranes.import_oid(t, &self.self_ref, oid.clone()),
|
||||||
|
};
|
||||||
|
Ok(Arc::clone(&ws.inc_ref(pins).obj))
|
||||||
|
}
|
||||||
|
sturdy::WireRef::Yours { oid: b, attenuation } => {
|
||||||
|
let oid = &**b;
|
||||||
|
match self.membranes.exported.oid_map.get(&oid) {
|
||||||
|
Some(ws) => {
|
||||||
|
ws.inc_ref(pins);
|
||||||
|
if attenuation.is_empty() {
|
||||||
|
Ok(Arc::clone(&ws.obj))
|
||||||
|
} else {
|
||||||
|
ws.obj.attenuate(&sturdy::Attenuation(attenuation.clone()))
|
||||||
|
.map_err(|e| {
|
||||||
|
io::Error::new(
|
||||||
|
io::ErrorKind::InvalidInput,
|
||||||
|
format!("Invalid capability attenuation: {:?}", e))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => Ok(Cap::new(&t.inert_entity())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn import<V: Into<P::_Any>>(&mut self, t: &mut Activation, v: V, pins: Pins) -> io::Result<AnyValue> {
|
||||||
|
v.into().copy_via(&mut |d| Ok(Value::Embedded(self.import_wire_ref(t, d, pins)?)))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn wire_symbol_for_imported_oid(&mut self, oid: &sturdy::Oid) -> &Arc<WireSymbol> {
|
||||||
|
self.membranes.imported.oid_map.get(oid).expect("imported oid entry to exist for RelayEntity")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn export_cap(&mut self, d: &Arc<Cap>, pins: Pins) -> io::Result<Arc<sturdy::WireRef>> {
|
||||||
|
Ok(Arc::new(match self.membranes.exported.ref_map.get(d) {
|
||||||
|
Some(ws) => sturdy::WireRef::Mine {
|
||||||
|
oid: Box::new(ws.inc_ref(pins).oid.clone()),
|
||||||
|
},
|
||||||
|
None => match self.membranes.imported.ref_map.get(d) {
|
||||||
|
Some(ws) => {
|
||||||
|
if d.attenuation.is_empty() {
|
||||||
|
sturdy::WireRef::Yours {
|
||||||
|
oid: Box::new(ws.inc_ref(pins).oid.clone()),
|
||||||
|
attenuation: vec![],
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// We may trust the peer to enforce attenuation on our behalf, in
|
||||||
|
// which case we can return sturdy::WireRef::Yours with an attenuation
|
||||||
|
// attached here, but for now we don't.
|
||||||
|
sturdy::WireRef::Mine {
|
||||||
|
oid: Box::new(self.membranes.export_ref(Arc::clone(d)).inc_ref(pins).oid.clone()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None =>
|
||||||
|
sturdy::WireRef::Mine {
|
||||||
|
oid: Box::new(self.membranes.export_ref(Arc::clone(d)).inc_ref(pins).oid.clone()),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn export<V: Into<AnyValue>>(&mut self, v: V, pins: Pins) -> io::Result<P::_Any> {
|
||||||
|
v.into().copy_via(&mut |d| Ok(Value::Embedded(self.export_cap(d, pins)?)))
|
||||||
|
}
|
||||||
|
|
||||||
fn handle_inbound_packet(&mut self, t: &mut Activation, p: P::Packet) -> ActorResult {
|
fn handle_inbound_packet(&mut self, t: &mut Activation, p: P::Packet) -> ActorResult {
|
||||||
// tracing::trace!(packet = ?p, "-->");
|
tracing::trace!(packet = ?p, "-->");
|
||||||
match p {
|
match p {
|
||||||
P::Packet::Error(b) => {
|
P::Packet::Error(b) => {
|
||||||
tracing::info!(message = ?b.message.clone(),
|
tracing::info!(message = ?b.message.clone(),
|
||||||
detail = ?b.detail.clone(),
|
detail = ?b.detail.clone(),
|
||||||
"received Error from peer");
|
"received Error from peer");
|
||||||
Err(*b)
|
let P::Error { message, detail } = *b;
|
||||||
|
Err(error(&message, self.import(t, detail, &mut vec![])?))
|
||||||
},
|
},
|
||||||
P::Packet::Turn(b) => {
|
P::Packet::Turn(b) => {
|
||||||
let P::Turn(events) = *b;
|
let P::Turn(events) = *b;
|
||||||
for P::TurnEvent { oid, event } in events {
|
for P::TurnEvent { oid, event } in events {
|
||||||
let target = match self.membranes.exported.oid_map.get(&sturdy::Oid(oid.0.clone())) {
|
let target = match self.membranes.exported.oid_map.get(&sturdy::Oid(oid.0.clone())) {
|
||||||
Some(ws) => &ws.obj,
|
Some(ws) => Arc::clone(ws),
|
||||||
None => return Err(error("Cannot deliver event: nonexistent oid",
|
None => return Err(
|
||||||
AnyValue::from(&P::TurnEvent { oid, event }))),
|
error("Cannot deliver event: nonexistent oid",
|
||||||
|
self.import(t, &P::TurnEvent { oid, event }, &mut vec![])?)),
|
||||||
};
|
};
|
||||||
match event {
|
match event {
|
||||||
P::Event::Assert(b) => {
|
P::Event::Assert(b) => {
|
||||||
let P::Assert { assertion: P::Assertion(a), handle: remote_handle } = *b;
|
let P::Assert { assertion: P::Assertion(a), handle: remote_handle } = *b;
|
||||||
let mut imported = vec![];
|
let mut pins = vec![];
|
||||||
let imported_membrane = &mut self.membranes.imported;
|
target.inc_ref(&mut pins);
|
||||||
a.foreach_embedded::<_, Error>(&mut |r| {
|
let a = self.import(t, a, &mut pins)?;
|
||||||
Ok(imported.push(imported_membrane.acquire(r)))
|
dump_membranes!(self.membranes);
|
||||||
})?;
|
if let Some(local_handle) = target.obj.assert(t, a) {
|
||||||
if let Some(local_handle) = target.assert(t, a) {
|
if let Some(_) = self.inbound_assertions.insert(remote_handle, (local_handle, pins)) {
|
||||||
if let Some(_) = self.inbound_assertions.insert(remote_handle, (local_handle, imported)) {
|
return Err(error("Assertion with duplicate handle",
|
||||||
return Err(error("Assertion with duplicate handle", AnyValue::new(false)));
|
AnyValue::new(false)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
P::Event::Retract(b) => {
|
P::Event::Retract(b) => {
|
||||||
let P::Retract { handle: remote_handle } = *b;
|
let P::Retract { handle: remote_handle } = *b;
|
||||||
let (local_handle, imported) = match self.inbound_assertions.remove(&remote_handle) {
|
let (local_handle, pins) = match self.inbound_assertions.remove(&remote_handle) {
|
||||||
None => return Err(error("Retraction of nonexistent handle", AnyValue::from(&remote_handle))),
|
None => return Err(error("Retraction of nonexistent handle",
|
||||||
|
self.import(t, &remote_handle, &mut vec![])?)),
|
||||||
Some(wss) => wss,
|
Some(wss) => wss,
|
||||||
};
|
};
|
||||||
for ws in imported.into_iter() {
|
self.membranes.release(pins);
|
||||||
self.membranes.imported.release(&ws);
|
dump_membranes!(self.membranes);
|
||||||
}
|
|
||||||
t.retract(local_handle);
|
t.retract(local_handle);
|
||||||
}
|
}
|
||||||
P::Event::Message(b) => {
|
P::Event::Message(b) => {
|
||||||
let P::Message { body: P::Assertion(a) } = *b;
|
let P::Message { body: P::Assertion(a) } = *b;
|
||||||
let imported_membrane = &mut self.membranes.imported;
|
let mut pins = vec![];
|
||||||
a.foreach_embedded(&mut |r| {
|
let a = self.import(t, a, &mut pins)?;
|
||||||
let ws = imported_membrane.acquire(r);
|
ensure_no_transient_references(&pins)?;
|
||||||
match ws.ref_count.load(Ordering::SeqCst) {
|
target.obj.message(t, a);
|
||||||
1 => Err(error("Cannot receive transient reference", AnyValue::new(false))),
|
self.membranes.release(pins);
|
||||||
_ => Ok(())
|
dump_membranes!(self.membranes);
|
||||||
}
|
|
||||||
})?;
|
|
||||||
target.message(t, a);
|
|
||||||
}
|
}
|
||||||
P::Event::Sync(b) => {
|
P::Event::Sync(b) => {
|
||||||
let P::Sync { peer } = *b;
|
let P::Sync { peer } = *b;
|
||||||
self.membranes.imported.acquire(&peer);
|
let mut pins = vec![];
|
||||||
|
target.inc_ref(&mut pins);
|
||||||
|
let peer = self.import_wire_ref(t, &peer, &mut pins)?;
|
||||||
|
dump_membranes!(self.membranes);
|
||||||
struct SyncPeer {
|
struct SyncPeer {
|
||||||
relay_ref: TunnelRelayRef,
|
relay_ref: TunnelRelayRef,
|
||||||
peer: Arc<Cap>,
|
peer: Arc<Cap>,
|
||||||
|
pins: Vec<Arc<WireSymbol>>,
|
||||||
}
|
}
|
||||||
impl Entity<Synced> for SyncPeer {
|
impl Entity<Synced> for SyncPeer {
|
||||||
fn message(&mut self, t: &mut Activation, _a: Synced) -> ActorResult {
|
fn message(&mut self, t: &mut Activation, _a: Synced) -> ActorResult {
|
||||||
self.peer.message(t, AnyValue::new(true));
|
self.peer.message(t, AnyValue::new(true));
|
||||||
let mut g = self.relay_ref.lock().expect("unpoisoned");
|
let mut g = self.relay_ref.lock().expect("unpoisoned");
|
||||||
let tr = g.as_mut().expect("initialized");
|
let tr = g.as_mut().expect("initialized");
|
||||||
if let Some(ws) = tr.membranes.imported.ref_map.get(&self.peer) {
|
tr.membranes.release(std::mem::take(&mut self.pins));
|
||||||
let ws = Arc::clone(ws); // cloned to release the borrow to permit the release
|
dump_membranes!(tr.membranes);
|
||||||
tr.membranes.imported.release(&ws);
|
|
||||||
}
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let k = t.create(SyncPeer {
|
let k = t.create(SyncPeer {
|
||||||
relay_ref: Arc::clone(&self.self_ref),
|
relay_ref: Arc::clone(&self.self_ref),
|
||||||
peer: Arc::clone(&peer),
|
peer: Arc::clone(&peer),
|
||||||
|
pins,
|
||||||
});
|
});
|
||||||
t.sync(&peer.underlying, k);
|
t.sync(&peer.underlying, k);
|
||||||
}
|
}
|
||||||
|
@ -338,47 +513,15 @@ impl TunnelRelay {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_outbound_event(&mut self, t: &mut Activation, event: P::Event) -> Result<P::Event, Error> {
|
|
||||||
match &event {
|
|
||||||
P::Event::Assert(b) => {
|
|
||||||
let P::Assert { assertion: P::Assertion(a), handle } = &**b;
|
|
||||||
let mut outbound = Vec::new();
|
|
||||||
a.foreach_embedded::<_, Error>(
|
|
||||||
&mut |r| Ok(outbound.push(self.membranes.export_ref(Arc::clone(r), true))))?;
|
|
||||||
self.outbound_assertions.insert(handle.clone(), outbound);
|
|
||||||
}
|
|
||||||
P::Event::Retract(b) => {
|
|
||||||
let P::Retract { handle } = &**b;
|
|
||||||
if let Some(outbound) = self.outbound_assertions.remove(handle) {
|
|
||||||
for ws in outbound.into_iter() {
|
|
||||||
self.membranes.exported.release(&ws);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
P::Event::Message(b) => {
|
|
||||||
let P::Message { body: P::Assertion(a) } = &**b;
|
|
||||||
a.foreach_embedded(&mut |r| {
|
|
||||||
let ws = self.membranes.export_ref(Arc::clone(r), false);
|
|
||||||
match ws.ref_count.load(Ordering::SeqCst) {
|
|
||||||
0 => Err(error("Cannot send transient reference", AnyValue::new(false))),
|
|
||||||
_ => Ok(())
|
|
||||||
}
|
|
||||||
})?;
|
|
||||||
},
|
|
||||||
P::Event::Sync(_b) => panic!("TODO not yet implemented"),
|
|
||||||
}
|
|
||||||
Ok(event)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn encode_packet(&mut self, p: P::Packet) -> Result<Vec<u8>, Error> {
|
fn encode_packet(&mut self, p: P::Packet) -> Result<Vec<u8>, Error> {
|
||||||
let item = AnyValue::from(&p);
|
let item = P::_Any::from(&p);
|
||||||
// tracing::trace!(packet = ?item, "<--");
|
tracing::trace!(packet = ?item, "<--");
|
||||||
if self.output_text {
|
if self.output_text {
|
||||||
let mut s = TextWriter::encode::<_, AnyValue, _>(&mut self.membranes, &item)?;
|
let mut s = TextWriter::encode(&mut WireRefCodec, &item)?;
|
||||||
s.push('\n');
|
s.push('\n');
|
||||||
Ok(s.into_bytes())
|
Ok(s.into_bytes())
|
||||||
} else {
|
} else {
|
||||||
Ok(PackedWriter::encode::<_, AnyValue, _>(&mut self.membranes, &item)?)
|
Ok(PackedWriter::encode(&mut WireRefCodec, &item)?)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -394,126 +537,13 @@ impl TunnelRelay {
|
||||||
}
|
}
|
||||||
let turn_event = P::TurnEvent {
|
let turn_event = P::TurnEvent {
|
||||||
oid: P::Oid(oid.0),
|
oid: P::Oid(oid.0),
|
||||||
event: self.handle_outbound_event(t, event)?,
|
event,
|
||||||
};
|
};
|
||||||
self.pending_outbound.push(turn_event);
|
self.pending_outbound.push(turn_event);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Membranes {
|
|
||||||
fn export_ref(&mut self, obj: Arc<Cap>, and_acquire: bool) -> Arc<WireSymbol> {
|
|
||||||
let ws = match self.exported.ref_map.get(&obj) {
|
|
||||||
None => {
|
|
||||||
let oid = sturdy::Oid(SignedInteger::from(self.next_export_oid as u128));
|
|
||||||
self.next_export_oid += 1;
|
|
||||||
self.exported.insert(oid, obj)
|
|
||||||
}
|
|
||||||
Some(ws) => Arc::clone(ws)
|
|
||||||
};
|
|
||||||
if and_acquire {
|
|
||||||
ws.acquire();
|
|
||||||
}
|
|
||||||
ws
|
|
||||||
}
|
|
||||||
|
|
||||||
fn import_oid(
|
|
||||||
&mut self,
|
|
||||||
t: &mut Activation,
|
|
||||||
relay_ref: &TunnelRelayRef,
|
|
||||||
oid: sturdy::Oid,
|
|
||||||
) -> Arc<WireSymbol> {
|
|
||||||
let obj = t.create(RelayEntity { relay_ref: Arc::clone(relay_ref), oid: oid.clone() });
|
|
||||||
self.imported.insert(oid, Cap::new(&obj))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn decode_embedded<'de, 'src, S: BinarySource<'de>>(
|
|
||||||
&mut self,
|
|
||||||
t: &mut Activation,
|
|
||||||
relay_ref: &TunnelRelayRef,
|
|
||||||
src: &'src mut S,
|
|
||||||
_read_annotations: bool,
|
|
||||||
) -> io::Result<P::_Ptr> {
|
|
||||||
let v: IOValue = PackedReader::new(src, NoEmbeddedDomainCodec).demand_next(false)?;
|
|
||||||
match sturdy::WireRef::try_from(&v)? {
|
|
||||||
sturdy::WireRef::Mine{ oid: b } => {
|
|
||||||
let oid = *b;
|
|
||||||
match self.imported.oid_map.get(&oid) {
|
|
||||||
Some(ws) => Ok(Arc::clone(&ws.obj)),
|
|
||||||
None => Ok(Arc::clone(&self.import_oid(t, relay_ref, oid).obj)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sturdy::WireRef::Yours { oid: b, attenuation } => {
|
|
||||||
let oid = *b;
|
|
||||||
match self.exported.oid_map.get(&oid) {
|
|
||||||
Some(ws) => {
|
|
||||||
if attenuation.is_empty() {
|
|
||||||
Ok(Arc::clone(&ws.obj))
|
|
||||||
} else {
|
|
||||||
Ok(ws.obj.attenuate(&sturdy::Attenuation(attenuation))
|
|
||||||
.map_err(|e| {
|
|
||||||
io::Error::new(
|
|
||||||
io::ErrorKind::InvalidInput,
|
|
||||||
format!("Invalid capability attenuation: {:?}", e))
|
|
||||||
})?)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None => Ok(Cap::new(&t.inert_entity())),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct ActivatedMembranes<'a, 'activation, 'm>(&'a mut Activation<'activation>,
|
|
||||||
&'m TunnelRelayRef,
|
|
||||||
&'m mut Membranes);
|
|
||||||
|
|
||||||
impl<'a, 'activation, 'm> DomainDecode<P::_Ptr> for ActivatedMembranes<'a, 'activation, 'm> {
|
|
||||||
fn decode_embedded<'de, 'src, S: BinarySource<'de>>(
|
|
||||||
&mut self,
|
|
||||||
src: &'src mut S,
|
|
||||||
read_annotations: bool,
|
|
||||||
) -> io::Result<P::_Ptr> {
|
|
||||||
self.2.decode_embedded(self.0, self.1, src, read_annotations)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DomainEncode<P::_Ptr> for Membranes {
|
|
||||||
fn encode_embedded<W: Writer>(
|
|
||||||
&mut self,
|
|
||||||
w: &mut W,
|
|
||||||
d: &P::_Ptr,
|
|
||||||
) -> io::Result<()> {
|
|
||||||
w.write(&mut NoEmbeddedDomainCodec, &AnyValue::from(&match self.exported.ref_map.get(d) {
|
|
||||||
Some(ws) => sturdy::WireRef::Mine {
|
|
||||||
oid: Box::new(ws.oid.clone()),
|
|
||||||
},
|
|
||||||
None => match self.imported.ref_map.get(d) {
|
|
||||||
Some(ws) => {
|
|
||||||
if d.attenuation.is_empty() {
|
|
||||||
sturdy::WireRef::Yours {
|
|
||||||
oid: Box::new(ws.oid.clone()),
|
|
||||||
attenuation: vec![],
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// We may trust the peer to enforce attenuation on our behalf, in
|
|
||||||
// which case we can return sturdy::WireRef::Yours with an attenuation
|
|
||||||
// attached here, but for now we don't.
|
|
||||||
sturdy::WireRef::Mine {
|
|
||||||
oid: Box::new(self.export_ref(Arc::clone(d), false).oid.clone()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None =>
|
|
||||||
sturdy::WireRef::Mine {
|
|
||||||
oid: Box::new(self.export_ref(Arc::clone(d), false).oid.clone()),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn input_loop(
|
async fn input_loop(
|
||||||
facet: FacetRef,
|
facet: FacetRef,
|
||||||
i: Input,
|
i: Input,
|
||||||
|
@ -593,43 +623,100 @@ impl Entity<()> for TunnelRefEntity {
|
||||||
|
|
||||||
fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult {
|
fn exit_hook(&mut self, t: &mut Activation, exit_status: &Arc<ActorResult>) -> ActorResult {
|
||||||
if let Err(e) = &**exit_status {
|
if let Err(e) = &**exit_status {
|
||||||
let e = e.clone();
|
|
||||||
let mut g = self.relay_ref.lock().expect("unpoisoned");
|
let mut g = self.relay_ref.lock().expect("unpoisoned");
|
||||||
let tr = g.as_mut().expect("initialized");
|
let tr = g.as_mut().expect("initialized");
|
||||||
|
let crate::schemas::internal_protocol::Error { message, detail } = e;
|
||||||
|
let e = P::Error {
|
||||||
|
message: message.clone(),
|
||||||
|
detail: tr.export(detail.clone(), &mut vec![])?,
|
||||||
|
};
|
||||||
tr.send_packet(&t.account(), 1, P::Packet::Error(Box::new(e)))?;
|
tr.send_packet(&t.account(), 1, P::Packet::Error(Box::new(e)))?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Entity<AnyValue> for RelayEntity {
|
#[inline]
|
||||||
fn assert(&mut self, t: &mut Activation, a: AnyValue, h: Handle) -> ActorResult {
|
fn ensure_no_transient_references(pins: &Vec<Arc<WireSymbol>>) -> ActorResult {
|
||||||
let mut g = self.relay_ref.lock().expect("unpoisoned");
|
for ws in pins.iter() {
|
||||||
let tr = g.as_mut().expect("initialized");
|
if ws.current_ref_count() == 1 {
|
||||||
tr.send_event(t, self.oid.clone(), P::Event::Assert(Box::new(P::Assert {
|
return Err(error("Cannot receive transient reference", AnyValue::new(false)));
|
||||||
assertion: P::Assertion(a),
|
}
|
||||||
handle: P::Handle(h.into()),
|
|
||||||
})))
|
|
||||||
}
|
}
|
||||||
fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult {
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RelayEntity {
|
||||||
|
fn with_tunnel_relay<F: FnOnce(&mut TunnelRelay) -> ActorResult>(
|
||||||
|
&mut self,
|
||||||
|
f: F,
|
||||||
|
) -> ActorResult {
|
||||||
let mut g = self.relay_ref.lock().expect("unpoisoned");
|
let mut g = self.relay_ref.lock().expect("unpoisoned");
|
||||||
let tr = g.as_mut().expect("initialized");
|
let tr = g.as_mut().expect("initialized");
|
||||||
tr.send_event(t, self.oid.clone(), P::Event::Retract(Box::new(P::Retract {
|
f(tr)
|
||||||
handle: P::Handle(h.into()),
|
}
|
||||||
})))
|
}
|
||||||
}
|
|
||||||
fn message(&mut self, t: &mut Activation, m: AnyValue) -> ActorResult {
|
impl Entity<AnyValue> for RelayEntity {
|
||||||
let mut g = self.relay_ref.lock().expect("unpoisoned");
|
fn assert(&mut self, t: &mut Activation, a: AnyValue, h: Handle) -> ActorResult {
|
||||||
let tr = g.as_mut().expect("initialized");
|
let oid = self.oid.clone();
|
||||||
tr.send_event(t, self.oid.clone(), P::Event::Message(Box::new(P::Message {
|
self.with_tunnel_relay(|tr| {
|
||||||
body: P::Assertion(m)
|
let handle = P::Handle(h.into());
|
||||||
})))
|
|
||||||
}
|
let mut pins = vec![];
|
||||||
fn sync(&mut self, t: &mut Activation, peer: Arc<Ref<Synced>>) -> ActorResult {
|
tr.wire_symbol_for_imported_oid(&oid).inc_ref(&mut pins);
|
||||||
let mut g = self.relay_ref.lock().expect("unpoisoned");
|
let a = tr.export(a, &mut pins)?;
|
||||||
let tr = g.as_mut().expect("initialized");
|
tr.outbound_assertions.insert(handle.clone(), pins);
|
||||||
tr.send_event(t, self.oid.clone(), P::Event::Sync(Box::new(P::Sync {
|
dump_membranes!(tr.membranes);
|
||||||
peer: Cap::guard(&peer)
|
|
||||||
})))
|
tr.send_event(t, oid, P::Event::Assert(Box::new(P::Assert {
|
||||||
|
assertion: P::Assertion(a),
|
||||||
|
handle,
|
||||||
|
})))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult {
|
||||||
|
let oid = self.oid.clone();
|
||||||
|
self.with_tunnel_relay(|tr| {
|
||||||
|
let handle = P::Handle(h.into());
|
||||||
|
|
||||||
|
if let Some(outbound) = tr.outbound_assertions.remove(&handle) {
|
||||||
|
tr.membranes.release(outbound);
|
||||||
|
}
|
||||||
|
dump_membranes!(tr.membranes);
|
||||||
|
|
||||||
|
tr.send_event(t, oid, P::Event::Retract(Box::new(P::Retract {
|
||||||
|
handle,
|
||||||
|
})))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
fn message(&mut self, t: &mut Activation, m: AnyValue) -> ActorResult {
|
||||||
|
let oid = self.oid.clone();
|
||||||
|
self.with_tunnel_relay(|tr| {
|
||||||
|
let mut pins = vec![];
|
||||||
|
let m = tr.export(m, &mut pins)?;
|
||||||
|
ensure_no_transient_references(&pins)?;
|
||||||
|
|
||||||
|
tr.send_event(t, oid, P::Event::Message(Box::new(P::Message {
|
||||||
|
body: P::Assertion(m)
|
||||||
|
})))?;
|
||||||
|
tr.membranes.release(pins);
|
||||||
|
dump_membranes!(tr.membranes);
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
fn sync(&mut self, t: &mut Activation, peer: Arc<Ref<Synced>>) -> ActorResult {
|
||||||
|
todo!("TODO not yet implemented");
|
||||||
|
|
||||||
|
// let oid = self.oid.clone();
|
||||||
|
// self.with_tunnel_relay(|tr| {
|
||||||
|
// ...
|
||||||
|
// tr.wire_symbol_for_imported_oid(&oid).inc_ref(&mut pins); etc. etc.
|
||||||
|
//
|
||||||
|
// tr.send_event(t, oid, P::Event::Sync(Box::new(P::Sync {
|
||||||
|
// peer: Cap::guard(&peer)
|
||||||
|
// })))
|
||||||
|
// dump_membranes!(tr.membranes); etc. etc.
|
||||||
|
// })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue