Simplify again by moving away from excessive internal buffering

This commit is contained in:
Tony Garnock-Jones 2020-05-25 15:29:21 +02:00
parent f07d2e6a5d
commit 94058b5ec2
4 changed files with 34 additions and 64 deletions

View File

@ -16,4 +16,3 @@ num = "0.2"
num_enum = "0.4.1" num_enum = "0.4.1"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_bytes = "0.11" serde_bytes = "0.11"
bytes = "0.5.4"

View File

@ -234,7 +234,7 @@ mod value_tests {
#[cfg(test)] #[cfg(test)]
mod decoder_tests { mod decoder_tests {
use crate::value::{Decoder, BinaryReader, Reader}; use crate::value::{Decoder, BinaryReader};
use crate::value::{Value, PlainValue, NestedValue}; use crate::value::{Value, PlainValue, NestedValue};
use super::dom::Dom; use super::dom::Dom;
@ -274,10 +274,10 @@ mod decoder_tests {
let mut r = BinaryReader::new(&mut buf); let mut r = BinaryReader::new(&mut buf);
let mut d = Decoder::<_, PlainValue<Dom>, Dom>::new(&mut r, None); let mut d = Decoder::<_, PlainValue<Dom>, Dom>::new(&mut r, None);
assert_eq!(d.next_or_err().unwrap().value(), &Value::simple_record("Ping", vec![])); assert_eq!(d.next_or_err().unwrap().value(), &Value::simple_record("Ping", vec![]));
assert!(r.buffered_len().unwrap() > 0); assert_eq!(buf.len(), 6);
let mut r = BinaryReader::new(&mut buf);
let mut d = Decoder::<_, PlainValue<Dom>, Dom>::new(&mut r, None); let mut d = Decoder::<_, PlainValue<Dom>, Dom>::new(&mut r, None);
assert_eq!(d.next_or_err().unwrap().value(), &Value::simple_record("Pong", vec![])); assert_eq!(d.next_or_err().unwrap().value(), &Value::simple_record("Pong", vec![]));
assert!(r.buffered_len().unwrap() == 0);
assert_eq!(buf.len(), 0); assert_eq!(buf.len(), 0);
} }
} }

View File

@ -3,7 +3,7 @@ use super::{
decoder::{self, Decoder, DecodePlaceholderMap}, decoder::{self, Decoder, DecodePlaceholderMap},
encoder::{Encoder, EncodePlaceholderMap}, encoder::{Encoder, EncodePlaceholderMap},
invert_map, invert_map,
reader::{Reader, BinaryReader, Error}, reader::{BinaryReader, Error, is_eof_error},
value::{ value::{
NestedValue, Domain, NestedValue, Domain,
}, },
@ -35,9 +35,10 @@ impl<N: NestedValue<D>, D: Domain> Codec<N, D> {
pub fn decode_all<'r, R: Read>(&self, read: &'r mut R) -> decoder::Result<Vec<N>> { pub fn decode_all<'r, R: Read>(&self, read: &'r mut R) -> decoder::Result<Vec<N>> {
let mut r = BinaryReader::new(read); let mut r = BinaryReader::new(read);
let vs: Vec<N> = Decoder::new(&mut r, self.decode_placeholders.as_ref()).collect::<decoder::Result<Vec<N>>>()?; let vs: Vec<N> = Decoder::new(&mut r, self.decode_placeholders.as_ref()).collect::<decoder::Result<Vec<N>>>()?;
match r.buffered_len()? { match r.peek() {
0 => Ok(vs), Err(e) if is_eof_error(&e) => Ok(vs),
count => Err(Error::new(std::io::ErrorKind::Other, format!("{} trailing bytes", count))) Err(e) => Err(e),
Ok(_) => Err(Error::new(std::io::ErrorKind::Other, "trailing bytes")),
} }
} }

View File

@ -1,4 +1,3 @@
use bytes::{Buf, BufMut, BytesMut};
use num::bigint::BigInt; use num::bigint::BigInt;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::convert::TryInto; use std::convert::TryInto;
@ -12,7 +11,8 @@ pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug)] #[derive(Debug)]
enum PeekState { enum PeekState {
Eof, Eof,
Buffer(BytesMut), Empty,
Full(u8),
} }
pub type DecodePlaceholderMap<N, D> = Map<usize, Value<N, D>>; pub type DecodePlaceholderMap<N, D> = Map<usize, Value<N, D>>;
@ -23,8 +23,6 @@ pub trait Reader {
placeholders: Option<&DecodePlaceholderMap<N, Dom>>, placeholders: Option<&DecodePlaceholderMap<N, Dom>>,
read_annotations: bool, read_annotations: bool,
) -> Result<N>; ) -> Result<N>;
fn buffered_len(&mut self) -> Result<usize>;
} }
impl<'re, R: Reader> Reader for &'re mut R { impl<'re, R: Reader> Reader for &'re mut R {
@ -35,16 +33,11 @@ impl<'re, R: Reader> Reader for &'re mut R {
) -> Result<N> { ) -> Result<N> {
(*self).next(placeholders, read_annotations) (*self).next(placeholders, read_annotations)
} }
fn buffered_len(&mut self) -> Result<usize> {
(*self).buffered_len()
}
} }
pub struct BinaryReader<'a, R: Read> { pub struct BinaryReader<'a, R: Read> {
read: &'a mut R, read: &'a mut R,
buf: PeekState, buf: PeekState,
chunksize: usize,
} }
struct ConfiguredBinaryReader<'de, 'pl, 'a, R: Read, N: NestedValue<Dom>, Dom: Domain> { struct ConfiguredBinaryReader<'de, 'pl, 'a, R: Read, N: NestedValue<Dom>, Dom: Domain> {
@ -98,7 +91,7 @@ pub fn decodestr(bs: &[u8]) -> Result<&str> {
std::str::from_utf8(bs).map_err(|_| err("Invalid UTF-8")) std::str::from_utf8(bs).map_err(|_| err("Invalid UTF-8"))
} }
pub fn decodebinary<N: NestedValue<Dom>, Dom: Domain>(minor: AtomMinor, bs: BytesMut) -> Result<N> { pub fn decodebinary<N: NestedValue<Dom>, Dom: Domain>(minor: AtomMinor, bs: Vec<u8>) -> Result<N> {
match minor { match minor {
AtomMinor::SignedInteger => Ok(Value::from(decodeint(&bs)).wrap()), AtomMinor::SignedInteger => Ok(Value::from(decodeint(&bs)).wrap()),
AtomMinor::String => Ok(Value::from(decodestr(&bs)?).wrap()), AtomMinor::String => Ok(Value::from(decodestr(&bs)?).wrap()),
@ -170,33 +163,21 @@ pub fn is_eof_error(e: &Error) -> bool {
} }
} }
fn read_buffer(buf: &mut BytesMut, count: usize) -> &mut [u8] {
buf.reserve(count);
unsafe {
let m = &mut buf.bytes_mut()[..count];
core::ptr::write_bytes(m.as_mut_ptr(), 0, count);
&mut *(m as *mut [core::mem::MaybeUninit<u8>] as *mut [u8])
}
}
impl<'a, R: Read> BinaryReader<'a, R> { impl<'a, R: Read> BinaryReader<'a, R> {
pub fn new(read: &'a mut R) -> Self { pub fn new(read: &'a mut R) -> Self {
BinaryReader { BinaryReader {
read, read,
buf: PeekState::Buffer(BytesMut::new()), buf: PeekState::Empty,
chunksize: 64,
} }
} }
fn prime(&mut self) -> Result<()> { fn prime(&mut self) -> Result<()> {
if let PeekState::Buffer(ref mut buf) = self.buf { if let PeekState::Empty = self.buf {
if buf.remaining() == 0 { let b = &mut [0];
let nbytes = self.read.read(read_buffer(buf, self.chunksize))?; match self.read.read(b)? {
if nbytes == 0 { 0 => self.buf = PeekState::Eof,
self.buf = PeekState::Eof; 1 => self.buf = PeekState::Full(b[0]),
} else { _ => unreachable!(),
unsafe { buf.advance_mut(nbytes); }
}
} }
} }
Ok(()) Ok(())
@ -204,8 +185,8 @@ impl<'a, R: Read> BinaryReader<'a, R> {
pub fn skip(&mut self) -> Result<()> { pub fn skip(&mut self) -> Result<()> {
self.prime()?; self.prime()?;
if let PeekState::Buffer(ref mut buf) = self.buf { if let PeekState::Full(_) = self.buf {
buf.advance(1); self.buf = PeekState::Empty;
} }
Ok(()) Ok(())
} }
@ -214,30 +195,26 @@ impl<'a, R: Read> BinaryReader<'a, R> {
self.prime()?; self.prime()?;
match self.buf { match self.buf {
PeekState::Eof => Err(eof()), PeekState::Eof => Err(eof()),
PeekState::Buffer(ref mut buf) => Ok(buf[0]), PeekState::Empty => unreachable!(),
PeekState::Full(b) => Ok(b),
} }
} }
pub fn read(&mut self) -> Result<u8> { pub fn read(&mut self) -> Result<u8> {
let v = self.peek()?; let v = self.peek()?;
if let PeekState::Buffer(ref mut buf) = self.buf { if let PeekState::Full(_) = self.buf {
buf.advance(1); self.buf = PeekState::Empty;
} }
Ok(v) Ok(v)
} }
pub fn readbytes(&mut self, req: usize) -> Result<BytesMut> { pub fn readbytes(&mut self, bs: &mut [u8]) -> Result<()> {
let buf = match self.buf { match self.buf {
PeekState::Eof => unreachable!(), PeekState::Eof => unreachable!(),
PeekState::Buffer(ref mut buf) => buf, PeekState::Empty => (),
PeekState::Full(_) => unreachable!(),
}; };
let avail = buf.remaining(); self.read.read_exact(bs)
if avail < req {
let count = req - avail;
self.read.read_exact(read_buffer(buf, count))?;
unsafe { buf.advance_mut(count); }
}
Ok(buf.split_to(req))
} }
pub fn varint(&mut self) -> Result<usize> { pub fn varint(&mut self) -> Result<usize> {
@ -281,12 +258,12 @@ impl<'re, 'a, R: Read> Reader for BinaryReader<'a, R> {
(Op::Misc(0), 1) => Ok(Value::from(true).wrap()), (Op::Misc(0), 1) => Ok(Value::from(true).wrap()),
(Op::Misc(0), 2) => { (Op::Misc(0), 2) => {
let mut bs = [0; 4]; let mut bs = [0; 4];
bs.copy_from_slice(&self.readbytes(4)?); self.readbytes(&mut bs)?;
Ok(Value::from(f32::from_bits(u32::from_be_bytes(bs.try_into().unwrap()))).wrap()) Ok(Value::from(f32::from_bits(u32::from_be_bytes(bs.try_into().unwrap()))).wrap())
} }
(Op::Misc(0), 3) => { (Op::Misc(0), 3) => {
let mut bs = [0; 8]; let mut bs = [0; 8];
bs.copy_from_slice(&self.readbytes(8)?); self.readbytes(&mut bs)?;
Ok(Value::from(f64::from_bits(u64::from_be_bytes(bs.try_into().unwrap()))).wrap()) Ok(Value::from(f64::from_bits(u64::from_be_bytes(bs.try_into().unwrap()))).wrap())
} }
(Op::Misc(0), 5) => { (Op::Misc(0), 5) => {
@ -314,7 +291,7 @@ impl<'re, 'a, R: Read> Reader for BinaryReader<'a, R> {
} }
(Op::Misc(2), arg) => match Op::try_from(arg)? { (Op::Misc(2), arg) => match Op::try_from(arg)? {
Op::Atom(minor) => { Op::Atom(minor) => {
let mut bs = BytesMut::with_capacity(256); let mut bs = Vec::with_capacity(256);
while !self.peekend()? { while !self.peekend()? {
match self.next(placeholders, false)?.value().as_bytestring() { match self.next(placeholders, false)?.value().as_bytestring() {
Some(chunk) => bs.extend_from_slice(chunk), Some(chunk) => bs.extend_from_slice(chunk),
@ -339,7 +316,8 @@ impl<'re, 'a, R: Read> Reader for BinaryReader<'a, R> {
(Op::Misc(_), _) => unreachable!(), (Op::Misc(_), _) => unreachable!(),
(Op::Atom(minor), arg) => { (Op::Atom(minor), arg) => {
let count = self.wirelength(arg)?; let count = self.wirelength(arg)?;
let bs = self.readbytes(count)?; let mut bs = vec![0; count];
self.readbytes(&mut bs)?;
decodebinary(minor, bs) decodebinary(minor, bs)
} }
(Op::Compound(minor), arg) => { (Op::Compound(minor), arg) => {
@ -358,12 +336,4 @@ impl<'re, 'a, R: Read> Reader for BinaryReader<'a, R> {
} }
} }
} }
fn buffered_len(&mut self) -> Result<usize> {
self.prime()?;
match self.buf {
PeekState::Eof => Ok(0),
PeekState::Buffer(ref b) => Ok(b.remaining()),
}
}
} }