dirty-consumer, dirty-producer

This commit is contained in:
Tony Garnock-Jones 2021-10-05 14:10:57 +02:00
parent e214d9dce3
commit 81dfae92d8
5 changed files with 212 additions and 2 deletions

View File

@ -1,2 +1,2 @@
#!/bin/sh
while true; do ../target/release/examples/consumer "$@"; sleep 2; done
while true; do ../target/release/examples/dirty-consumer "$@"; sleep 2; done

View File

@ -1,2 +1,2 @@
#!/bin/sh
while true; do ../target/release/examples/producer "$@"; sleep 2; done
while true; do ../target/release/examples/dirty-producer "$@"; sleep 2; done

View File

@ -0,0 +1,96 @@
//! I am a low-level hack intended to consume bytes as quickly as
//! possible, so that the consumer isn't the bottleneck in
//! single-producer/single-consumer broker throughput measurement.
use preserves_schema::Codec;
use structopt::StructOpt;
use syndicate::schemas::Language;
use syndicate::schemas::gatekeeper;
use syndicate::schemas::protocol as P;
use syndicate::schemas::dataspace::Observe;
use syndicate::sturdy;
use syndicate::value::BinarySource;
use syndicate::value::BytesBinarySource;
use syndicate::value::IOValue;
use syndicate::value::NestedValue;
use syndicate::value::PackedWriter;
use syndicate::value::Reader;
use syndicate::value::Value;
use std::io::Read;
use std::io::Write;
use std::net::TcpStream;
use std::time::Duration;
use std::time::Instant;
mod dirty;
#[derive(Clone, Debug, StructOpt)]
pub struct Config {
#[structopt(short = "d", default_value = "b4b303726566b10973796e646963617465b584b210a6480df5306611ddd0d3882b546e197784")]
dataspace: String,
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::from_args();
let mut stream = TcpStream::connect("127.0.0.1:8001")?;
dirty::dirty_resolve(&mut stream, &config.dataspace)?;
let iolang = Language::<IOValue>::default();
{
let turn = P::Turn::<IOValue>(vec![
P::TurnEvent {
oid: P::Oid(1.into()),
event: P::Event::Assert(Box::new(P::Assert {
assertion: P::Assertion(iolang.unparse(&Observe {
pattern: syndicate_macros::pattern!{<Says $ $>},
observer: iolang.unparse(&sturdy::WireRef::Mine {
oid: Box::new(sturdy::Oid(2.into())),
}),
})),
handle: P::Handle(2.into()),
})),
}
]);
stream.write_all(&PackedWriter::encode_iovalue(&iolang.unparse(&turn))?)?;
}
let mut buf = [0; 131072];
let turn_size = {
stream.read(&mut buf)?;
let mut src = BytesBinarySource::new(&buf);
src.packed_iovalues().demand_next(false)?;
src.index
};
let mut start = Instant::now();
let interval = Duration::from_secs(1);
let mut deadline = start + interval;
let mut total_bytes = 0;
loop {
let n = stream.read(&mut buf)?;
if n == 0 {
break;
}
total_bytes += n;
let now = Instant::now();
if now >= deadline {
let delta = now - start;
let message_count = total_bytes as f64 / turn_size as f64;
println!("{} messages in the last second ({} Hz)",
message_count,
message_count / delta.as_secs_f64());
start = now;
total_bytes = 0;
deadline = deadline + interval;
}
}
Ok(())
}

View File

@ -0,0 +1,67 @@
//! I am a low-level hack intended to shovel bytes out the gate as
//! quickly as possible, so that the producer isn't the bottleneck in
//! single-producer/single-consumer broker throughput measurement.
use preserves_schema::Codec;
use structopt::StructOpt;
use syndicate::schemas::Language;
use syndicate::schemas::protocol as P;
use syndicate::value::IOValue;
use syndicate::value::PackedWriter;
use syndicate::value::Value;
use std::io::Write;
use std::net::TcpStream;
mod dirty;
#[derive(Clone, Debug, StructOpt)]
pub struct Config {
#[structopt(short = "a", default_value = "1")]
action_count: u32,
#[structopt(short = "b", default_value = "0")]
bytes_padding: usize,
#[structopt(short = "d", default_value = "b4b303726566b10973796e646963617465b584b210a6480df5306611ddd0d3882b546e197784")]
dataspace: String,
}
#[inline]
fn says(who: IOValue, what: IOValue) -> IOValue {
let mut r = Value::simple_record("Says", 2);
r.fields_vec_mut().push(who);
r.fields_vec_mut().push(what);
r.finish().wrap()
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::from_args();
let mut stream = TcpStream::connect("127.0.0.1:8001")?;
dirty::dirty_resolve(&mut stream, &config.dataspace)?;
let padding: IOValue = Value::ByteString(vec![0; config.bytes_padding]).wrap();
let mut events = Vec::new();
for _ in 0 .. config.action_count {
events.push(P::TurnEvent::<IOValue> {
oid: P::Oid(1.into()),
event: P::Event::Message(Box::new(P::Message {
body: P::Assertion(says(Value::from("producer").wrap(), padding.clone())),
})),
});
}
let turn = P::Turn(events);
let mut buf: Vec<u8> = vec![];
let iolang = Language::<IOValue>::default();
while buf.len() < 16384 {
buf.extend(&PackedWriter::encode_iovalue(&iolang.unparse(&turn))?);
}
loop {
stream.write_all(&buf)?;
}
}

View File

@ -0,0 +1,47 @@
use preserves_schema::Codec;
use syndicate::schemas::Language;
use syndicate::schemas::gatekeeper;
use syndicate::schemas::protocol as P;
use syndicate::sturdy;
use syndicate::value::IOValue;
use syndicate::value::NestedValue;
use syndicate::value::PackedWriter;
use syndicate::value::Value;
use std::io::Read;
use std::io::Write;
use std::net::TcpStream;
pub fn dirty_resolve(stream: &mut TcpStream, dataspace: &str) -> Result<(), Box<dyn std::error::Error>> {
let iolang = Language::<IOValue>::default();
let sturdyref = sturdy::SturdyRef::from_hex(dataspace)?;
let sturdyref = iolang.parse(&syndicate::language().unparse(&sturdyref)
.copy_via(&mut |_| Err("no!"))?)?;
let resolve_turn = P::Turn(vec![
P::TurnEvent {
oid: P::Oid(0.into()),
event: P::Event::Assert(Box::new(P::Assert {
assertion: P::Assertion(iolang.unparse(&gatekeeper::Resolve::<IOValue> {
sturdyref,
observer: iolang.unparse(&sturdy::WireRef::Mine {
oid: Box::new(sturdy::Oid(0.into())),
}),
})),
handle: P::Handle(1.into()),
})),
}
]);
stream.write_all(&PackedWriter::encode_iovalue(&iolang.unparse(&resolve_turn))?)?;
{
let mut buf = [0; 1024];
stream.read(&mut buf)?;
// We just assume we got a positive response here!!
// We further assume that the resolved dataspace was assigned peer-oid 1
}
Ok(())
}