Binary/text autodetect
This commit is contained in:
parent
4db9511b12
commit
54454e608b
54
src/relay.rs
54
src/relay.rs
|
@ -27,6 +27,8 @@ use preserves::value::NoEmbeddedDomainCodec;
|
|||
use preserves::value::PackedReader;
|
||||
use preserves::value::PackedWriter;
|
||||
use preserves::value::Reader;
|
||||
use preserves::value::TextWriter;
|
||||
use preserves::value::ViaCodec;
|
||||
use preserves::value::Writer;
|
||||
use preserves::value::signed_integer::SignedInteger;
|
||||
|
||||
|
@ -87,6 +89,7 @@ pub struct TunnelRelay
|
|||
pending_outbound: Vec<P::TurnEvent>,
|
||||
self_entity: Arc<Ref<()>>,
|
||||
output: UnboundedSender<LoanedItem<Vec<u8>>>,
|
||||
output_text: bool,
|
||||
}
|
||||
|
||||
struct RelayEntity {
|
||||
|
@ -185,6 +188,7 @@ impl TunnelRelay {
|
|||
let mut tr = TunnelRelay {
|
||||
self_ref: Arc::clone(&tr_ref),
|
||||
output: output_tx,
|
||||
output_text: true,
|
||||
inbound_assertions: Map::new(),
|
||||
outbound_assertions: Map::new(),
|
||||
membranes: Membranes {
|
||||
|
@ -207,31 +211,39 @@ impl TunnelRelay {
|
|||
result
|
||||
}
|
||||
|
||||
fn handle_inbound_datagram(&mut self, t: &mut Activation, bs: &[u8]) -> ActorResult {
|
||||
fn deserialize_one(&mut self, t: &mut Activation, bs: &[u8]) -> (Result<P::Packet, ParseError>, usize) {
|
||||
let mut src = BytesBinarySource::new(&bs);
|
||||
let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes);
|
||||
let mut r = src.packed::<_, _Any, _>(&mut dec);
|
||||
let item = P::Packet::deserialize(&mut r)?;
|
||||
match src.peek() {
|
||||
Ok(v) => if v >= 128 {
|
||||
self.output_text = false;
|
||||
let mut r = src.packed::<_, _Any, _>(&mut dec);
|
||||
let res = P::Packet::deserialize(&mut r);
|
||||
(res, r.source.index)
|
||||
} else {
|
||||
self.output_text = true;
|
||||
let mut dec = ViaCodec::new(dec);
|
||||
let mut r = src.text::<_, _Any, _>(&mut dec);
|
||||
let res = P::Packet::deserialize(&mut r);
|
||||
(res, r.source.index)
|
||||
},
|
||||
Err(e) => (Err(e.into()), 0)
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_inbound_datagram(&mut self, t: &mut Activation, bs: &[u8]) -> ActorResult {
|
||||
let item = self.deserialize_one(t, bs).0?;
|
||||
self.handle_inbound_packet(t, item)
|
||||
}
|
||||
|
||||
fn handle_inbound_stream(&mut self, t: &mut Activation, buf: &mut BytesMut) -> ActorResult {
|
||||
loop {
|
||||
let (e, count) = {
|
||||
let mut src = BytesBinarySource::new(buf);
|
||||
let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes);
|
||||
let mut r = src.packed::<_, _Any, _>(&mut dec);
|
||||
let e = match P::Packet::deserialize(&mut r) {
|
||||
Err(ParseError::Preserves(PreservesError::Io(e)))
|
||||
if is_eof_io_error(&e) =>
|
||||
None,
|
||||
result => Some(result?),
|
||||
};
|
||||
(e, r.source.index)
|
||||
};
|
||||
match e {
|
||||
None => return Ok(()),
|
||||
Some(item) => {
|
||||
let (result, count) = self.deserialize_one(t, buf);
|
||||
match result {
|
||||
Err(ParseError::Preserves(PreservesError::Io(e)))
|
||||
if is_eof_io_error(&e) => return Ok(()),
|
||||
Err(e) => return Err(e)?,
|
||||
Ok(item) => {
|
||||
buf.advance(count);
|
||||
self.handle_inbound_packet(t, item)?;
|
||||
}
|
||||
|
@ -361,7 +373,11 @@ impl TunnelRelay {
|
|||
fn encode_packet(&mut self, p: P::Packet) -> Result<Vec<u8>, Error> {
|
||||
let item = _Any::from(&p);
|
||||
// tracing::trace!(packet = debug(&item), "<--");
|
||||
Ok(PackedWriter::encode::<_, _Any, _>(&mut self.membranes, &item)?)
|
||||
if self.output_text {
|
||||
Ok(TextWriter::encode::<_, _Any, _>(&mut self.membranes, &item)?.into_bytes())
|
||||
} else {
|
||||
Ok(PackedWriter::encode::<_, _Any, _>(&mut self.membranes, &item)?)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send_packet(&mut self, debtor: &Arc<Debtor>, cost: usize, p: P::Packet) -> ActorResult {
|
||||
|
|
Loading…
Reference in New Issue