diff --git a/dev-scripts/restarting-consumer b/dev-scripts/restarting-consumer index 414de5c..3b5673a 100755 --- a/dev-scripts/restarting-consumer +++ b/dev-scripts/restarting-consumer @@ -1,2 +1,2 @@ #!/bin/sh -while true; do ../target/release/examples/consumer "$@"; sleep 2; done +while true; do ../target/release/examples/dirty-consumer "$@"; sleep 2; done diff --git a/dev-scripts/restarting-producer b/dev-scripts/restarting-producer index 9553e8a..53be420 100755 --- a/dev-scripts/restarting-producer +++ b/dev-scripts/restarting-producer @@ -1,2 +1,2 @@ #!/bin/sh -while true; do ../target/release/examples/producer "$@"; sleep 2; done +while true; do ../target/release/examples/dirty-producer "$@"; sleep 2; done diff --git a/syndicate-server/examples/dirty-consumer.rs b/syndicate-server/examples/dirty-consumer.rs new file mode 100644 index 0000000..9c60667 --- /dev/null +++ b/syndicate-server/examples/dirty-consumer.rs @@ -0,0 +1,96 @@ +//! I am a low-level hack intended to consume bytes as quickly as +//! possible, so that the consumer isn't the bottleneck in +//! single-producer/single-consumer broker throughput measurement. + +use preserves_schema::Codec; + +use structopt::StructOpt; + +use syndicate::schemas::Language; +use syndicate::schemas::gatekeeper; +use syndicate::schemas::protocol as P; +use syndicate::schemas::dataspace::Observe; +use syndicate::sturdy; +use syndicate::value::BinarySource; +use syndicate::value::BytesBinarySource; +use syndicate::value::IOValue; +use syndicate::value::NestedValue; +use syndicate::value::PackedWriter; +use syndicate::value::Reader; +use syndicate::value::Value; + +use std::io::Read; +use std::io::Write; +use std::net::TcpStream; +use std::time::Duration; +use std::time::Instant; + +mod dirty; + +#[derive(Clone, Debug, StructOpt)] +pub struct Config { + #[structopt(short = "d", default_value = "b4b303726566b10973796e646963617465b584b210a6480df5306611ddd0d3882b546e197784")] + dataspace: String, +} + +fn main() -> Result<(), Box> { + let config = Config::from_args(); + + let mut stream = TcpStream::connect("127.0.0.1:8001")?; + dirty::dirty_resolve(&mut stream, &config.dataspace)?; + + let iolang = Language::::default(); + + { + let turn = P::Turn::(vec![ + P::TurnEvent { + oid: P::Oid(1.into()), + event: P::Event::Assert(Box::new(P::Assert { + assertion: P::Assertion(iolang.unparse(&Observe { + pattern: syndicate_macros::pattern!{}, + observer: iolang.unparse(&sturdy::WireRef::Mine { + oid: Box::new(sturdy::Oid(2.into())), + }), + })), + handle: P::Handle(2.into()), + })), + } + ]); + stream.write_all(&PackedWriter::encode_iovalue(&iolang.unparse(&turn))?)?; + } + + let mut buf = [0; 131072]; + let turn_size = { + stream.read(&mut buf)?; + let mut src = BytesBinarySource::new(&buf); + src.packed_iovalues().demand_next(false)?; + src.index + }; + + let mut start = Instant::now(); + let interval = Duration::from_secs(1); + let mut deadline = start + interval; + let mut total_bytes = 0; + loop { + let n = stream.read(&mut buf)?; + if n == 0 { + break; + } + total_bytes += n; + + let now = Instant::now(); + if now >= deadline { + let delta = now - start; + let message_count = total_bytes as f64 / turn_size as f64; + println!("{} messages in the last second ({} Hz)", + message_count, + message_count / delta.as_secs_f64()); + + start = now; + total_bytes = 0; + deadline = deadline + interval; + } + } + + Ok(()) +} diff --git a/syndicate-server/examples/dirty-producer.rs b/syndicate-server/examples/dirty-producer.rs new file mode 100644 index 0000000..fe5b080 --- /dev/null +++ b/syndicate-server/examples/dirty-producer.rs @@ -0,0 +1,67 @@ +//! I am a low-level hack intended to shovel bytes out the gate as +//! quickly as possible, so that the producer isn't the bottleneck in +//! single-producer/single-consumer broker throughput measurement. + +use preserves_schema::Codec; + +use structopt::StructOpt; + +use syndicate::schemas::Language; +use syndicate::schemas::protocol as P; +use syndicate::value::IOValue; +use syndicate::value::PackedWriter; +use syndicate::value::Value; + +use std::io::Write; +use std::net::TcpStream; + +mod dirty; + +#[derive(Clone, Debug, StructOpt)] +pub struct Config { + #[structopt(short = "a", default_value = "1")] + action_count: u32, + + #[structopt(short = "b", default_value = "0")] + bytes_padding: usize, + + #[structopt(short = "d", default_value = "b4b303726566b10973796e646963617465b584b210a6480df5306611ddd0d3882b546e197784")] + dataspace: String, +} + +#[inline] +fn says(who: IOValue, what: IOValue) -> IOValue { + let mut r = Value::simple_record("Says", 2); + r.fields_vec_mut().push(who); + r.fields_vec_mut().push(what); + r.finish().wrap() +} + +fn main() -> Result<(), Box> { + let config = Config::from_args(); + + let mut stream = TcpStream::connect("127.0.0.1:8001")?; + dirty::dirty_resolve(&mut stream, &config.dataspace)?; + + let padding: IOValue = Value::ByteString(vec![0; config.bytes_padding]).wrap(); + let mut events = Vec::new(); + for _ in 0 .. config.action_count { + events.push(P::TurnEvent:: { + oid: P::Oid(1.into()), + event: P::Event::Message(Box::new(P::Message { + body: P::Assertion(says(Value::from("producer").wrap(), padding.clone())), + })), + }); + } + let turn = P::Turn(events); + + let mut buf: Vec = vec![]; + let iolang = Language::::default(); + while buf.len() < 16384 { + buf.extend(&PackedWriter::encode_iovalue(&iolang.unparse(&turn))?); + } + + loop { + stream.write_all(&buf)?; + } +} diff --git a/syndicate-server/examples/dirty/mod.rs b/syndicate-server/examples/dirty/mod.rs new file mode 100644 index 0000000..2358a76 --- /dev/null +++ b/syndicate-server/examples/dirty/mod.rs @@ -0,0 +1,47 @@ +use preserves_schema::Codec; + +use syndicate::schemas::Language; +use syndicate::schemas::gatekeeper; +use syndicate::schemas::protocol as P; +use syndicate::sturdy; +use syndicate::value::IOValue; +use syndicate::value::NestedValue; +use syndicate::value::PackedWriter; +use syndicate::value::Value; + +use std::io::Read; +use std::io::Write; +use std::net::TcpStream; + +pub fn dirty_resolve(stream: &mut TcpStream, dataspace: &str) -> Result<(), Box> { + let iolang = Language::::default(); + + let sturdyref = sturdy::SturdyRef::from_hex(dataspace)?; + let sturdyref = iolang.parse(&syndicate::language().unparse(&sturdyref) + .copy_via(&mut |_| Err("no!"))?)?; + + let resolve_turn = P::Turn(vec![ + P::TurnEvent { + oid: P::Oid(0.into()), + event: P::Event::Assert(Box::new(P::Assert { + assertion: P::Assertion(iolang.unparse(&gatekeeper::Resolve:: { + sturdyref, + observer: iolang.unparse(&sturdy::WireRef::Mine { + oid: Box::new(sturdy::Oid(0.into())), + }), + })), + handle: P::Handle(1.into()), + })), + } + ]); + stream.write_all(&PackedWriter::encode_iovalue(&iolang.unparse(&resolve_turn))?)?; + + { + let mut buf = [0; 1024]; + stream.read(&mut buf)?; + // We just assume we got a positive response here!! + // We further assume that the resolved dataspace was assigned peer-oid 1 + } + + Ok(()) +}