From c0477c07ba889796ad8843cf898f8d6cf31d92d2 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 11 May 2020 23:31:00 +0200 Subject: [PATCH] Simple producer-consumer. --- examples/consumer.rs | 46 ++++++++++++++++++++++++++++++++++++++++---- examples/producer.rs | 31 +++++++++++++++++++++++++++++ src/dataspace.rs | 2 +- src/packets.rs | 10 +++++++++- src/peer.rs | 6 ++++-- 5 files changed, 87 insertions(+), 8 deletions(-) create mode 100644 examples/producer.rs diff --git a/examples/consumer.rs b/examples/consumer.rs index 739dbf8..0eaeecb 100644 --- a/examples/consumer.rs +++ b/examples/consumer.rs @@ -1,12 +1,50 @@ -use syndicate::{packets, value::Value}; +use syndicate::{V, value::Value}; +use syndicate::packets::{ClientCodec, C2S, S2C, Action}; use tokio::net::TcpStream; use tokio_util::codec::Framed; use futures::SinkExt; +use futures::StreamExt; +use futures::FutureExt; +use futures::select; +use core::time::Duration; +use tokio::time::interval; #[tokio::main] async fn main() -> Result<(), Box> { + let discard: V = Value::simple_record("Discard", vec![]).wrap(); + let capture: V = Value::simple_record("Capture", vec![discard]).wrap(); + let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, - packets::ClientCodec::standard()); - frames.send(packets::C2S::Connect(Value::from("producer-consumer-example").wrap())).await?; - Ok(()) + ClientCodec::standard()); + frames.send(C2S::Connect(Value::from("chat").wrap())).await?; + frames.send( + C2S::Turn(vec![Action::Assert( + Value::from(0).wrap(), + Value::simple_record("Observe", vec![ + Value::simple_record("Says", vec![capture.clone(), capture]).wrap()]).wrap())])) + .await?; + + let mut stats_timer = interval(Duration::from_secs(1)); + let mut turn_counter = 0; + + loop { + select! { + _instant = stats_timer.next().boxed().fuse() => { + print!("{:?} turns in the last second\n", turn_counter); + turn_counter = 0; + }, + frame = frames.next().boxed().fuse() => match frame { + None => return Ok(()), + Some(res) => match res? { + S2C::Err(msg, _) => return Err(msg.into()), + S2C::Turn(es) => { + // print!("{:?}\n", es); + turn_counter = turn_counter + 1; + }, + S2C::Ping() => frames.send(C2S::Pong()).await?, + S2C::Pong() => (), + } + }, + } + } } diff --git a/examples/producer.rs b/examples/producer.rs new file mode 100644 index 0000000..a4ba8f5 --- /dev/null +++ b/examples/producer.rs @@ -0,0 +1,31 @@ +use syndicate::value::Value; +use syndicate::packets::{ClientCodec, C2S, S2C, Action}; +use tokio::io::AsyncRead; +use tokio::net::TcpStream; +use tokio_util::codec::Framed; +use futures::SinkExt; +use futures::StreamExt; +use std::future::Future; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, + ClientCodec::standard()); + frames.send(C2S::Connect(Value::from("chat").wrap())).await?; + let mut counter: u64 = 0; + loop { + counter = counter + 1; + frames.send(C2S::Turn(vec![Action::Message( + Value::simple_record("Says", vec![ + Value::from("producer").wrap(), + Value::from(counter).wrap(), + ]).wrap())])).await?; + // match frames.poll_read() { + // None => (), + // Some(res) => match res { + // S2C::Ping() => frames.send(C2S::Pong()).await?, + // other => print!("{:?}\n", other), + // } + // } + } +} diff --git a/src/dataspace.rs b/src/dataspace.rs index 2648fc0..1091771 100644 --- a/src/dataspace.rs +++ b/src/dataspace.rs @@ -79,7 +79,7 @@ impl Dataspace { { let mut outbound_turns: Map> = Map::new(); for a in actions { - println!("Turn action: {:?}", &a); + // println!("Turn action: {:?}", &a); match a { packets::Action::Assert(ref epname, ref assertion) => { let ac = self.peers.get_mut(&id).unwrap(); diff --git a/src/packets.rs b/src/packets.rs index fcc4d84..a06c965 100644 --- a/src/packets.rs +++ b/src/packets.rs @@ -56,6 +56,15 @@ impl From for DecodeError { } } +impl std::fmt::Display for DecodeError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl std::error::Error for DecodeError { +} + //--------------------------------------------------------------------------- #[derive(Debug)] @@ -94,7 +103,6 @@ impl std::fmt::Display for EncodeError { impl std::error::Error for EncodeError { } - //--------------------------------------------------------------------------- pub struct Codec { diff --git a/src/peer.rs b/src/peer.rs index 9c9e71a..b161c61 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -61,7 +61,7 @@ impl Peer { frame = self.frames.next().boxed().fuse() => match frame { Some(res) => match res { Ok(p) => { - println!("{:?}: input {:?}", self.id, &p); + // println!("{:?}: input {:?}", self.id, &p); match p { packets::C2S::Turn(actions) => { match self.space.as_ref().unwrap().write().unwrap() @@ -113,10 +113,12 @@ impl Peer { if let packets::S2C::Err(ref msg, ref ctx) = v { println!("{:?}: connection crashed: {}; context {:?}", self.id, msg, ctx); } else { - println!("{:?}: output {:?}", self.id, &v); + // println!("{:?}: output {:?}", self.id, &v); + () } self.frames.send(v).await?; } + tokio::task::yield_now().await; } Ok(()) }