diff --git a/implementations/rust/Cargo.toml b/implementations/rust/Cargo.toml index 648eae1..fffe3c8 100644 --- a/implementations/rust/Cargo.toml +++ b/implementations/rust/Cargo.toml @@ -17,6 +17,7 @@ num_enum = "0.4.1" serde = { version = "1.0", features = ["derive"] } serde_bytes = "0.11" lazy_static = "1.4.0" +pretty-hex = "0.1.1" [dev-dependencies] criterion = "0.3" diff --git a/implementations/rust/benches/codec.rs b/implementations/rust/benches/codec.rs index 94a395d..f9aa4de 100644 --- a/implementations/rust/benches/codec.rs +++ b/implementations/rust/benches/codec.rs @@ -1,5 +1,5 @@ use criterion::{criterion_group, criterion_main, Criterion}; -use preserves::value::{self, Reader, Writer, PackedReader, PackedWriter}; +use preserves::value::{self, Reader, Writer, PackedReader, PackedWriter, IndexedWriter}; use preserves::{de, ser}; use std::io::Read; use std::io::BufReader; @@ -43,6 +43,16 @@ pub fn bench_encoder(c: &mut Criterion) { })); } +pub fn bench_indexed_encoder(c: &mut Criterion) { + let mut fh = std::fs::File::open("../../tests/samples.bin").unwrap(); + let v = PackedReader::decode_read(&mut fh).demand_next(true).unwrap(); + c.bench_function("encode samples.bin (indexed)", |b| b.iter_with_large_drop(|| { + let mut bs = vec![]; + IndexedWriter::new(&mut bs).write(&v).unwrap(); + bs + })); +} + pub fn bench_de(c: &mut Criterion) { let mut fh = std::fs::File::open("../../tests/samples.bin").unwrap(); let mut bs = vec![]; @@ -122,14 +132,33 @@ pub fn large_testdata_encoder(c: &mut Criterion) { }); } +pub fn large_testdata_indexed_encoder(c: &mut Criterion) { + c.bench_function("encode testdata.bin (indexed)", |b| { + let mut fh = BufReader::new(std::fs::File::open("benches/testdata.bin").unwrap()); + let mut vs = vec![]; + let mut r = PackedReader::decode_read(&mut fh); + while let Some(v) = r.next(true).unwrap() { + vs.push(v); + } + b.iter_with_large_drop(|| { + let mut bs = vec![]; + let mut w = IndexedWriter::new(&mut bs); + for v in &vs { + w.write(&v).unwrap(); + } + bs + }) + }); +} + criterion_group!(codec, bench_decoder_bytes, bench_decoder_file, bench_decoder_buffered_file, - bench_encoder); + bench_encoder, bench_indexed_encoder); criterion_group!(serde, bench_de, bench_ser); criterion_group!(codec_then_serde, bench_decoder_de, bench_ser_encoder); criterion_group!{ name = large_testdata; config = Criterion::default().sample_size(10); - targets = large_testdata_decoder_with_ann, large_testdata_decoder_without_ann, large_testdata_encoder + targets = large_testdata_decoder_with_ann, large_testdata_decoder_without_ann, large_testdata_encoder, large_testdata_indexed_encoder } criterion_main!(codec, serde, codec_then_serde, large_testdata); diff --git a/implementations/rust/src/value/indexed/mod.rs b/implementations/rust/src/value/indexed/mod.rs new file mode 100644 index 0000000..bc488dc --- /dev/null +++ b/implementations/rust/src/value/indexed/mod.rs @@ -0,0 +1,5 @@ +pub mod reader; +pub mod writer; + +pub use reader::IndexedReader; +pub use writer::IndexedWriter; diff --git a/implementations/rust/src/value/indexed/reader.rs b/implementations/rust/src/value/indexed/reader.rs new file mode 100644 index 0000000..842f2c9 --- /dev/null +++ b/implementations/rust/src/value/indexed/reader.rs @@ -0,0 +1,2 @@ +pub struct IndexedReader { +} diff --git a/implementations/rust/src/value/indexed/writer.rs b/implementations/rust/src/value/indexed/writer.rs new file mode 100644 index 0000000..10fdb43 --- /dev/null +++ b/implementations/rust/src/value/indexed/writer.rs @@ -0,0 +1,429 @@ +use num; +use num::bigint::{BigInt, Sign}; +use num::cast::ToPrimitive; +use std::cmp::{max, min}; +use std::convert::TryInto; +use super::super::{IOValue, ZERO}; + +use super::super::writer::{Writer, Result, varint}; + +pub type StackBase = usize; +pub type Position = u64; +pub type Tag = u8; + +#[derive(Clone, Copy, Debug)] +pub enum Pointer { + Immediate { + value: i64, + tag: Tag, + }, + Relative { + position: Position, + tag: Tag, + }, + Empty { + tag: Tag, + }, +} + +pub struct CountingWrite<'w, W: std::io::Write> { + pub w: &'w mut W, + pub position: Position, +} + +pub struct IndexedWriter<'w, W: std::io::Write> { + pub w: CountingWrite<'w, W>, + pub annotations: Vec<(Pointer, Vec)>, + pub stack: Vec, +} + +pub enum NotSupported {} + +fn pointer_len(v: u64, extra_bits: u32) -> usize { + let value_bits = 64 - v.leading_zeros(); + let total_bits = value_bits + extra_bits; + let total_bytes = (total_bits + 7) >> 3; + total_bytes.next_power_of_two() as usize +} + +impl Pointer { + fn size_estimate(&self, clamp_immediate: usize) -> usize { + match *self { + Pointer::Immediate { value: v, .. } => + min(pointer_len(if v < 0 { !(v as u64) } else { v as u64 }, 4), clamp_immediate), + Pointer::Relative { position: p, .. } => + pointer_len(p, 4), + Pointer::Empty { .. } => + 1, + } + } + + fn maybe_spill<'w, W: std::io::Write>(&mut self, cw: &mut CountingWrite<'w, W>, pointer_size: usize) -> Result<()> { + match *self { + Pointer::Immediate { value: v, tag: 0 } => { + *self = Pointer::Immediate { value: v, tag: 12 }; + Ok(()) + } + Pointer::Immediate { value: v, tag: 3 } => { + if pointer_size < 8 && !signed_in_pointer_range(v, pointer_size) { + *self = cw.write_out_of_line_immediate(v, 3)?; + } else { + *self = Pointer::Immediate { value: v, tag: 13 }; + } + Ok(()) + } + Pointer::Immediate { value: v, tag: 4 } => { + if pointer_size < 8 && !unsigned_in_pointer_range(v as u64, pointer_size) { + *self = cw.write_out_of_line_immediate(v, 4)?; + } else { + *self = Pointer::Immediate { value: v, tag: 14 }; + } + Ok(()) + } + Pointer::Immediate { .. } => unreachable!(), + Pointer::Relative { .. } => Ok(()), + Pointer::Empty { .. } => Ok(()), + } + } + + fn append_to<'w, W: std::io::Write>(&self, cw: &mut CountingWrite<'w, W>, pointer_size: usize) -> Result<()> { + let w = match *self { + Pointer::Immediate { value: v, tag: t } => + ((v as u64) << 4) | (t & 15) as u64, + Pointer::Relative { position: p, tag: t } => + ((cw.position - p) << 4) | (t & 15) as u64, + Pointer::Empty { tag: t } => + (t & 15) as u64, + }; + cw.emit(&w.to_le_bytes()[..pointer_size])?; + Ok(()) + } +} + + +impl<'w, W: std::io::Write> CountingWrite<'w, W> { + pub fn emit(&mut self, bs: &[u8]) -> Result { + self.w.write_all(bs)?; + let p = self.position; + self.position = self.position + bs.len() as Position; + Ok(p) + } + + pub fn varint(&mut self, v: u64) -> Result { + let p = self.position; + self.position = self.position + varint(self.w, v)? as Position; + Ok(p) + } + + pub fn write_out_of_line_immediate(&mut self, value: i64, tag: Tag) -> Result { + // value is really unsigned EXCEPT when tag == 3 (negative integer) + let u = value as u64; + let len = if tag == 3 { + pointer_len(if value < 0 { !u } else { u }, 0) + } else { + pointer_len(u, 0) + }; + self.emit(&u.to_le_bytes()[..len])?; + Ok(Pointer::Relative { position: self.varint(len as u64)?, tag }) + } +} + +impl<'w> IndexedWriter<'w, &'w mut Vec> { + pub fn encode(v: &IOValue) -> std::io::Result> { + // TODO: annotations + let mut buf: Vec = Vec::new(); + let mut w = IndexedWriter::new(&mut buf); + let mut c = w.start_record(1)?; + let p = w.write(&ZERO)?; + w.extend_compound(&mut c, p)?; + let p = w.write(v)?; + w.extend_compound(&mut c, p)?; + let _final_pointer = w.end_compound(c)?; + { + use pretty_hex::*; + println!("Indexed:\n{:?}\n", &buf.hex_dump()); + } + Ok(buf) + } +} + +impl<'w, W: std::io::Write> IndexedWriter<'w, W> { + pub fn new(w: &'w mut W) -> Self { + IndexedWriter { + w: CountingWrite { w, position: 0 }, + annotations: vec![], + stack: Vec::with_capacity(32), + } + } + + pub fn write_atom(&mut self, bs: &[u8], tag: Tag) -> Result { + if bs.is_empty() { + Ok(Pointer::Empty { tag }) + } else { + self.w.emit(bs)?; + Ok(Pointer::Relative { position: self.w.varint(bs.len() as u64)?, tag }) + } + } +} + +pub fn simple_in_immediate_range(v: i64, tag_bits: u8) -> bool { + let available_bits = 64 - tag_bits; + v >= -(1 << available_bits) && v < (1 << available_bits) +} + +pub fn signed_in_pointer_range(v: i64, pointer_size: usize) -> bool { + let limit = 1 << (pointer_size << 3); + v >= -limit && v < limit +} + +pub fn unsigned_in_pointer_range(v: u64, pointer_size: usize) -> bool { + let limit = 1 << (pointer_size << 3); + v < limit +} + +pub fn int_tag(v: T) -> Tag { + if v < T::zero() { 3 } else { 4 } +} + +pub fn pointer_size_for(v: &[Pointer], clamp_immediate: usize) -> usize { + let mut biggest: usize = 1; + for p in v { + biggest = max(biggest, p.size_estimate(clamp_immediate)); + } + biggest +} + +impl<'w, W: std::io::Write> Writer for IndexedWriter<'w, W> { + type Pointer = Pointer; + type Annotation = StackBase; + type Compound = (StackBase, Tag); + type Dictionary = StackBase; + type StreamedAtom = NotSupported; + type KeyPointer = Pointer; + + fn align(&mut self, natural_chunksize: u64) -> Result<()> { + let overlap = self.w.position % natural_chunksize; + let mut remaining = ((natural_chunksize - overlap) % natural_chunksize) as usize; + const ZEROS: [u8; 16] = [0; 16]; + while remaining > 0 { + let chunk = min(16, remaining); + self.w.emit(&ZEROS[..chunk])?; + remaining = remaining - chunk; + } + Ok(()) + } + + fn start_annotation(&mut self) -> Result { + Ok(self.stack.len()) + } + + fn extend_annotation(&mut self, _a: &mut Self::Annotation, annotation: &IOValue) -> Result<()> { + let p = self.write(annotation)?; + self.stack.push(p); + Ok(()) + } + + fn end_annotation(&mut self, a: Self::Annotation, value_p: Self::Pointer) -> Result { + let value_p = match value_p { + Pointer::Immediate { value: v, tag: t } => self.w.write_out_of_line_immediate(v, t)?, + Pointer::Relative { .. } => value_p, + Pointer::Empty { tag: t } => Pointer::Relative { position: self.w.varint(0)?, tag: t }, + }; + self.annotations.push((value_p, self.stack.split_off(a))); + Ok(value_p) + } + + fn write_bool(&mut self, v: bool) -> Result { + Ok(Pointer::Immediate { value: if v { 1 } else { 0 }, tag: 0 }) + } + + fn write_f32(&mut self, v: f32) -> Result { + self.write_atom(&u32::to_le_bytes(f32::to_bits(v)), 1) + } + + fn write_f64(&mut self, v: f64) -> Result { + self.write_atom(&u64::to_le_bytes(f64::to_bits(v)), 2) + } + + fn write_i8(&mut self, v: i8) -> Result { + Ok(Pointer::Immediate { value: v as i64, tag: int_tag(v) }) + } + + fn write_u8(&mut self, v: u8) -> Result { + Ok(Pointer::Immediate { value: v as i64, tag: 4 }) + } + + fn write_i16(&mut self, v: i16) -> Result { + Ok(Pointer::Immediate { value: v as i64, tag: int_tag(v) }) + } + + fn write_u16(&mut self, v: u16) -> Result { + Ok(Pointer::Immediate { value: v as i64, tag: 4 }) + } + + fn write_i32(&mut self, v: i32) -> Result { + Ok(Pointer::Immediate { value: v as i64, tag: int_tag(v) }) + } + + fn write_u32(&mut self, v: u32) -> Result { + Ok(Pointer::Immediate { value: v as i64, tag: 4 }) + } + + fn write_i64(&mut self, v: i64) -> Result { + if simple_in_immediate_range(v, 4) { + Ok(Pointer::Immediate { value: v, tag: int_tag(v) }) + } else { + self.write_atom(&i64::to_le_bytes(v), int_tag(v)) + } + } + + fn write_u64(&mut self, v: u64) -> Result { + if let Ok(w) = v.try_into() { return self.write_i64(w) } + self.write_atom(&u64::to_le_bytes(v), 4) + } + + fn write_i128(&mut self, v: i128) -> Result { + if let Ok(w) = v.try_into() { return self.write_i64(w) } + self.write_atom(&i128::to_le_bytes(v), int_tag(v)) + } + + fn write_u128(&mut self, v: u128) -> Result { + if let Ok(w) = v.try_into() { return self.write_i64(w) } + self.write_atom(&u128::to_le_bytes(v), 4) + } + + fn write_int(&mut self, v: &BigInt) -> Result { + if let Some(w) = v.to_i64() { return self.write_i64(w) } + self.write_atom(&v.to_signed_bytes_le(), if let Sign::Minus = v.sign() { 3 } else { 4 }) + } + + fn write_string(&mut self, v: &str) -> Result { + self.write_atom(v.as_bytes(), 5) + } + + fn write_bytes(&mut self, v: &[u8]) -> Result { + self.write_atom(v, 6) + } + + fn write_symbol(&mut self, v: &str) -> Result { + self.write_atom(v.as_bytes(), 7) + } + + fn stream_string(&mut self) -> Result> { + Ok(None) + } + + fn stream_bytes(&mut self) -> Result> { + Ok(None) + } + + fn stream_symbol(&mut self) -> Result> { + Ok(None) + } + + fn extend_atom(&mut self, _s: &mut Self::StreamedAtom, _bs: &[u8]) -> Result<()> { + unreachable!() + } + + fn end_atom(&mut self, _s: Self::StreamedAtom) -> Result { + unreachable!() + } + + fn start_record(&mut self, _field_count: usize) -> Result { + Ok((self.stack.len(), 8)) + } + + fn start_sequence(&mut self, _item_count: usize) -> Result { + Ok((self.stack.len(), 9)) + } + + fn start_set(&mut self, _item_count: usize) -> Result { + Ok((self.stack.len(), 10)) + } + + fn stream_record(&mut self) -> Result> { + Ok(Some((self.stack.len(), 8))) + } + + fn stream_sequence(&mut self) -> Result> { + Ok(Some((self.stack.len(), 9))) + } + + fn stream_set(&mut self) -> Result> { + Ok(Some((self.stack.len(), 10))) + } + + fn extend_compound(&mut self, _s: &mut Self::Compound, value_p: Self::Pointer) -> Result<()> { + self.stack.push(value_p); + Ok(()) + } + + fn end_compound(&mut self, s: Self::Compound) -> Result { + let (stack_base, tag) = s; + let stack_limit = self.stack.len(); + let value_count = stack_limit - stack_base; + + if value_count == 0 { + Ok(Pointer::Empty { tag }) + } else { + let pointer_size = pointer_size_for(&self.stack[stack_base..stack_limit], 2); + self.align(pointer_size as u64)?; + for index in stack_base..stack_limit { + self.stack[index].maybe_spill(&mut self.w, pointer_size)?; + } + for index in stack_base..stack_limit { + let p = self.stack[index]; + p.append_to(&mut self.w, pointer_size)?; + } + let len_num = (value_count << 2) as u64 | pointer_size.trailing_zeros() as u64; + self.stack.truncate(stack_base); + Ok(Pointer::Relative { position: self.w.varint(len_num)?, tag }) + } + } + + fn start_dictionary(&mut self, _value_count: usize) -> Result { + Ok(self.stack.len()) + } + + fn stream_dictionary(&mut self) -> Result> { + Ok(Some(self.stack.len())) + } + + fn extend_dictionary_key(&mut self, _s: &mut Self::Dictionary, key_p: Self::Pointer) -> Result { + Ok(key_p) + } + + fn extend_dictionary_value(&mut self, _s: &mut Self::Dictionary, key_p: Self::KeyPointer, value_p: Self::Pointer) -> Result<()> { + self.stack.push(key_p); + self.stack.push(value_p); + Ok(()) + } + + fn end_dictionary(&mut self, s: Self::Dictionary) -> Result { + let stack_base = s; + let stack_limit = self.stack.len(); + let value_count = stack_limit - stack_base; + let tag = 11; + + if value_count == 0 { + Ok(Pointer::Empty { tag }) + } else { + let pointer_size = pointer_size_for(&self.stack[stack_base..], 2); + self.align(pointer_size as u64)?; + for index in stack_base..stack_limit { + self.stack[index].maybe_spill(&mut self.w, pointer_size)?; + } + for index in (stack_base..stack_limit).step_by(2) { + let p = self.stack[index]; + p.append_to(&mut self.w, pointer_size)?; + } + for index in (stack_base..stack_limit).step_by(2) { + let p = self.stack[index + 1]; + p.append_to(&mut self.w, pointer_size)?; + } + let len_num = (value_count << 2) as u64 | pointer_size.trailing_zeros() as u64; + self.stack.truncate(stack_base); + Ok(Pointer::Relative { position: self.w.varint(len_num)?, tag }) + } + } +} diff --git a/implementations/rust/src/value/mod.rs b/implementations/rust/src/value/mod.rs index 97469a6..11b0634 100644 --- a/implementations/rust/src/value/mod.rs +++ b/implementations/rust/src/value/mod.rs @@ -1,4 +1,5 @@ pub mod de; +pub mod indexed; pub mod magic; pub mod packed; pub mod reader; @@ -9,6 +10,8 @@ pub mod writer; pub use de::Deserializer; pub use de::from_value; +pub use indexed::IndexedReader; +pub use indexed::IndexedWriter; pub use packed::PackedReader; pub use packed::PackedWriter; pub use reader::ConfiguredReader; @@ -32,6 +35,7 @@ pub use writer::Writer; pub use value::FALSE; pub use value::TRUE; pub use value::EMPTY_SEQ; +pub use value::ZERO; pub fn invert_map(m: &Map) -> Map where A: Clone, B: Clone + Ord diff --git a/implementations/rust/src/value/packed/writer.rs b/implementations/rust/src/value/packed/writer.rs index 01dceb8..c7e6f77 100644 --- a/implementations/rust/src/value/packed/writer.rs +++ b/implementations/rust/src/value/packed/writer.rs @@ -45,6 +45,11 @@ impl<'w> PackedWriter<'w, &'w mut Vec> { pub fn encode(v: &IOValue) -> std::io::Result> { let mut buf: Vec = Vec::new(); PackedWriter(&mut buf).write(v)?; + { + use pretty_hex::*; + println!("Packed:\n{:?}\n", &buf.hex_dump()); + super::super::indexed::IndexedWriter::encode(v)?; + } Ok(buf) } } diff --git a/implementations/rust/src/value/value.rs b/implementations/rust/src/value/value.rs index 43428de..21579f3 100644 --- a/implementations/rust/src/value/value.rs +++ b/implementations/rust/src/value/value.rs @@ -1056,6 +1056,7 @@ lazy_static! { pub static ref FALSE: IOValue = IOValue(Arc::new(AnnotatedValue(Annotations::empty(), Value::Boolean(false)))); pub static ref TRUE: IOValue = IOValue(Arc::new(AnnotatedValue(Annotations::empty(), Value::Boolean(true)))); pub static ref EMPTY_SEQ: IOValue = IOValue(Arc::new(AnnotatedValue(Annotations::empty(), Value::Sequence(Vec::new())))); + pub static ref ZERO: IOValue = IOValue(Arc::new(AnnotatedValue(Annotations::empty(), Value::SignedInteger(0u128.into())))); } impl NestedValue for IOValue {