Expose a more flexible interface to relays

This commit is contained in:
Tony Garnock-Jones 2023-01-30 17:28:20 +01:00
parent dbbbc8c1c6
commit c3571a2faf
1 changed files with 20 additions and 9 deletions

View File

@ -16,7 +16,7 @@ use futures::SinkExt;
use futures::Stream;
use futures::StreamExt;
use parking_lot::Mutex;
pub use parking_lot::Mutex;
use preserves::error::Error as PreservesError;
use preserves::error::is_eof_io_error;
@ -89,6 +89,7 @@ pub enum Output {
type TunnelRelayRef = Arc<Mutex<Option<TunnelRelay>>>;
// There are other kinds of relay. This one has exactly two participants connected to each other.
#[derive(Debug)]
pub struct TunnelRelay
{
self_ref: TunnelRelayRef,
@ -222,6 +223,20 @@ impl TunnelRelay {
initial_oid: Option<sturdy::Oid>,
output_text: bool,
) -> Option<Arc<Cap>> {
let (result, tr_ref, output_rx) = TunnelRelay::_run(t, initial_ref, initial_oid, output_text);
t.linked_task(Some(AnyValue::symbol("writer")),
output_loop(o, output_rx));
t.linked_task(Some(AnyValue::symbol("reader")),
input_loop(t.trace_collector(), t.facet.clone(), i, tr_ref));
result
}
pub fn _run(
t: &mut Activation,
initial_ref: Option<Arc<Cap>>,
initial_oid: Option<sturdy::Oid>,
output_text: bool,
) -> (Option<Arc<Cap>>, Arc<Mutex<Option<TunnelRelay>>>, UnboundedReceiver<LoanedItem<Vec<u8>>>) {
let (output_tx, output_rx) = unbounded_channel();
let tr_ref = Arc::new(Mutex::new(None));
let self_entity = t.create(TunnelRefEntity {
@ -247,12 +262,8 @@ impl TunnelRelay {
|io| Arc::clone(&tr.membranes.import_oid(t, &tr_ref, io).inc_ref().obj));
dump_membranes!(tr.membranes);
*tr_ref.lock() = Some(tr);
t.linked_task(Some(AnyValue::symbol("writer")),
output_loop(o, output_rx));
t.linked_task(Some(AnyValue::symbol("reader")),
input_loop(t.trace_collector(), t.facet.clone(), i, tr_ref));
t.state.add_exit_hook(&self_entity);
result
(result, tr_ref, output_rx)
}
fn deserialize_one(&mut self, t: &mut Activation, bs: &[u8]) -> (Result<P::Packet<AnyValue>, ParseError>, usize) {
@ -279,13 +290,13 @@ impl TunnelRelay {
}
}
fn handle_inbound_datagram(&mut self, t: &mut Activation, bs: &[u8]) -> ActorResult {
pub fn handle_inbound_datagram(&mut self, t: &mut Activation, bs: &[u8]) -> ActorResult {
tracing::trace!(bytes = ?bs, "inbound datagram");
let item = self.deserialize_one(t, bs).0?;
self.handle_inbound_packet(t, item)
}
fn handle_inbound_stream(&mut self, t: &mut Activation, buf: &mut BytesMut) -> ActorResult {
pub fn handle_inbound_stream(&mut self, t: &mut Activation, buf: &mut BytesMut) -> ActorResult {
loop {
tracing::trace!(buffer = ?buf, "inbound stream");
let (result, count) = self.deserialize_one(t, buf);
@ -301,7 +312,7 @@ impl TunnelRelay {
}
}
fn handle_inbound_packet(&mut self, t: &mut Activation, p: P::Packet<AnyValue>) -> ActorResult {
pub fn handle_inbound_packet(&mut self, t: &mut Activation, p: P::Packet<AnyValue>) -> ActorResult {
tracing::debug!(packet = ?p, "-->");
match p {
P::Packet::Extension(b) => {