From 54454e608bab0f1d6099fffff69c31fb73c6224d Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 9 Aug 2021 10:02:45 -0400 Subject: [PATCH] Binary/text autodetect --- src/relay.rs | 54 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 35 insertions(+), 19 deletions(-) diff --git a/src/relay.rs b/src/relay.rs index 2812a93..c57d4a4 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -27,6 +27,8 @@ use preserves::value::NoEmbeddedDomainCodec; use preserves::value::PackedReader; use preserves::value::PackedWriter; use preserves::value::Reader; +use preserves::value::TextWriter; +use preserves::value::ViaCodec; use preserves::value::Writer; use preserves::value::signed_integer::SignedInteger; @@ -87,6 +89,7 @@ pub struct TunnelRelay pending_outbound: Vec, self_entity: Arc>, output: UnboundedSender>>, + output_text: bool, } struct RelayEntity { @@ -185,6 +188,7 @@ impl TunnelRelay { let mut tr = TunnelRelay { self_ref: Arc::clone(&tr_ref), output: output_tx, + output_text: true, inbound_assertions: Map::new(), outbound_assertions: Map::new(), membranes: Membranes { @@ -207,31 +211,39 @@ impl TunnelRelay { result } - fn handle_inbound_datagram(&mut self, t: &mut Activation, bs: &[u8]) -> ActorResult { + fn deserialize_one(&mut self, t: &mut Activation, bs: &[u8]) -> (Result, usize) { let mut src = BytesBinarySource::new(&bs); let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes); - let mut r = src.packed::<_, _Any, _>(&mut dec); - let item = P::Packet::deserialize(&mut r)?; + match src.peek() { + Ok(v) => if v >= 128 { + self.output_text = false; + let mut r = src.packed::<_, _Any, _>(&mut dec); + let res = P::Packet::deserialize(&mut r); + (res, r.source.index) + } else { + self.output_text = true; + let mut dec = ViaCodec::new(dec); + let mut r = src.text::<_, _Any, _>(&mut dec); + let res = P::Packet::deserialize(&mut r); + (res, r.source.index) + }, + Err(e) => (Err(e.into()), 0) + } + } + + fn handle_inbound_datagram(&mut self, t: &mut Activation, bs: &[u8]) -> ActorResult { + let item = self.deserialize_one(t, bs).0?; self.handle_inbound_packet(t, item) } fn handle_inbound_stream(&mut self, t: &mut Activation, buf: &mut BytesMut) -> ActorResult { loop { - let (e, count) = { - let mut src = BytesBinarySource::new(buf); - let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes); - let mut r = src.packed::<_, _Any, _>(&mut dec); - let e = match P::Packet::deserialize(&mut r) { - Err(ParseError::Preserves(PreservesError::Io(e))) - if is_eof_io_error(&e) => - None, - result => Some(result?), - }; - (e, r.source.index) - }; - match e { - None => return Ok(()), - Some(item) => { + let (result, count) = self.deserialize_one(t, buf); + match result { + Err(ParseError::Preserves(PreservesError::Io(e))) + if is_eof_io_error(&e) => return Ok(()), + Err(e) => return Err(e)?, + Ok(item) => { buf.advance(count); self.handle_inbound_packet(t, item)?; } @@ -361,7 +373,11 @@ impl TunnelRelay { fn encode_packet(&mut self, p: P::Packet) -> Result, Error> { let item = _Any::from(&p); // tracing::trace!(packet = debug(&item), "<--"); - Ok(PackedWriter::encode::<_, _Any, _>(&mut self.membranes, &item)?) + if self.output_text { + Ok(TextWriter::encode::<_, _Any, _>(&mut self.membranes, &item)?.into_bytes()) + } else { + Ok(PackedWriter::encode::<_, _Any, _>(&mut self.membranes, &item)?) + } } pub fn send_packet(&mut self, debtor: &Arc, cost: usize, p: P::Packet) -> ActorResult {