diff --git a/syndicate/src/relay.rs b/syndicate/src/relay.rs index 30cabcb..ae2ea31 100644 --- a/syndicate/src/relay.rs +++ b/syndicate/src/relay.rs @@ -16,7 +16,7 @@ use futures::SinkExt; use futures::Stream; use futures::StreamExt; -use parking_lot::Mutex; +pub use parking_lot::Mutex; use preserves::error::Error as PreservesError; use preserves::error::is_eof_io_error; @@ -89,6 +89,7 @@ pub enum Output { type TunnelRelayRef = Arc>>; // There are other kinds of relay. This one has exactly two participants connected to each other. +#[derive(Debug)] pub struct TunnelRelay { self_ref: TunnelRelayRef, @@ -222,6 +223,20 @@ impl TunnelRelay { initial_oid: Option, output_text: bool, ) -> Option> { + let (result, tr_ref, output_rx) = TunnelRelay::_run(t, initial_ref, initial_oid, output_text); + t.linked_task(Some(AnyValue::symbol("writer")), + output_loop(o, output_rx)); + t.linked_task(Some(AnyValue::symbol("reader")), + input_loop(t.trace_collector(), t.facet.clone(), i, tr_ref)); + result + } + + pub fn _run( + t: &mut Activation, + initial_ref: Option>, + initial_oid: Option, + output_text: bool, + ) -> (Option>, Arc>>, UnboundedReceiver>>) { let (output_tx, output_rx) = unbounded_channel(); let tr_ref = Arc::new(Mutex::new(None)); let self_entity = t.create(TunnelRefEntity { @@ -247,12 +262,8 @@ impl TunnelRelay { |io| Arc::clone(&tr.membranes.import_oid(t, &tr_ref, io).inc_ref().obj)); dump_membranes!(tr.membranes); *tr_ref.lock() = Some(tr); - t.linked_task(Some(AnyValue::symbol("writer")), - output_loop(o, output_rx)); - t.linked_task(Some(AnyValue::symbol("reader")), - input_loop(t.trace_collector(), t.facet.clone(), i, tr_ref)); t.state.add_exit_hook(&self_entity); - result + (result, tr_ref, output_rx) } fn deserialize_one(&mut self, t: &mut Activation, bs: &[u8]) -> (Result, ParseError>, usize) { @@ -279,13 +290,13 @@ impl TunnelRelay { } } - fn handle_inbound_datagram(&mut self, t: &mut Activation, bs: &[u8]) -> ActorResult { + pub fn handle_inbound_datagram(&mut self, t: &mut Activation, bs: &[u8]) -> ActorResult { tracing::trace!(bytes = ?bs, "inbound datagram"); let item = self.deserialize_one(t, bs).0?; self.handle_inbound_packet(t, item) } - fn handle_inbound_stream(&mut self, t: &mut Activation, buf: &mut BytesMut) -> ActorResult { + pub fn handle_inbound_stream(&mut self, t: &mut Activation, buf: &mut BytesMut) -> ActorResult { loop { tracing::trace!(buffer = ?buf, "inbound stream"); let (result, count) = self.deserialize_one(t, buf); @@ -301,7 +312,7 @@ impl TunnelRelay { } } - fn handle_inbound_packet(&mut self, t: &mut Activation, p: P::Packet) -> ActorResult { + pub fn handle_inbound_packet(&mut self, t: &mut Activation, p: P::Packet) -> ActorResult { tracing::debug!(packet = ?p, "-->"); match p { P::Packet::Extension(b) => {