Simple producer-consumer.

This commit is contained in:
Tony Garnock-Jones 2020-05-11 23:31:00 +02:00
parent 13c30843a3
commit c0477c07ba
5 changed files with 87 additions and 8 deletions

View File

@ -1,12 +1,50 @@
use syndicate::{packets, value::Value};
use syndicate::{V, value::Value};
use syndicate::packets::{ClientCodec, C2S, S2C, Action};
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use futures::SinkExt;
use futures::StreamExt;
use futures::FutureExt;
use futures::select;
use core::time::Duration;
use tokio::time::interval;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let discard: V = Value::simple_record("Discard", vec![]).wrap();
let capture: V = Value::simple_record("Capture", vec![discard]).wrap();
let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?,
packets::ClientCodec::standard());
frames.send(packets::C2S::Connect(Value::from("producer-consumer-example").wrap())).await?;
Ok(())
ClientCodec::standard());
frames.send(C2S::Connect(Value::from("chat").wrap())).await?;
frames.send(
C2S::Turn(vec![Action::Assert(
Value::from(0).wrap(),
Value::simple_record("Observe", vec![
Value::simple_record("Says", vec![capture.clone(), capture]).wrap()]).wrap())]))
.await?;
let mut stats_timer = interval(Duration::from_secs(1));
let mut turn_counter = 0;
loop {
select! {
_instant = stats_timer.next().boxed().fuse() => {
print!("{:?} turns in the last second\n", turn_counter);
turn_counter = 0;
},
frame = frames.next().boxed().fuse() => match frame {
None => return Ok(()),
Some(res) => match res? {
S2C::Err(msg, _) => return Err(msg.into()),
S2C::Turn(es) => {
// print!("{:?}\n", es);
turn_counter = turn_counter + 1;
},
S2C::Ping() => frames.send(C2S::Pong()).await?,
S2C::Pong() => (),
}
},
}
}
}

31
examples/producer.rs Normal file
View File

@ -0,0 +1,31 @@
use syndicate::value::Value;
use syndicate::packets::{ClientCodec, C2S, S2C, Action};
use tokio::io::AsyncRead;
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use futures::SinkExt;
use futures::StreamExt;
use std::future::Future;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?,
ClientCodec::standard());
frames.send(C2S::Connect(Value::from("chat").wrap())).await?;
let mut counter: u64 = 0;
loop {
counter = counter + 1;
frames.send(C2S::Turn(vec![Action::Message(
Value::simple_record("Says", vec![
Value::from("producer").wrap(),
Value::from(counter).wrap(),
]).wrap())])).await?;
// match frames.poll_read() {
// None => (),
// Some(res) => match res {
// S2C::Ping() => frames.send(C2S::Pong()).await?,
// other => print!("{:?}\n", other),
// }
// }
}
}

View File

@ -79,7 +79,7 @@ impl Dataspace {
{
let mut outbound_turns: Map<ConnId, Vec<packets::Event>> = Map::new();
for a in actions {
println!("Turn action: {:?}", &a);
// println!("Turn action: {:?}", &a);
match a {
packets::Action::Assert(ref epname, ref assertion) => {
let ac = self.peers.get_mut(&id).unwrap();

View File

@ -56,6 +56,15 @@ impl From<io::Error> for DecodeError {
}
}
impl std::fmt::Display for DecodeError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for DecodeError {
}
//---------------------------------------------------------------------------
#[derive(Debug)]
@ -94,7 +103,6 @@ impl std::fmt::Display for EncodeError {
impl std::error::Error for EncodeError {
}
//---------------------------------------------------------------------------
pub struct Codec<InT, OutT> {

View File

@ -61,7 +61,7 @@ impl Peer {
frame = self.frames.next().boxed().fuse() => match frame {
Some(res) => match res {
Ok(p) => {
println!("{:?}: input {:?}", self.id, &p);
// println!("{:?}: input {:?}", self.id, &p);
match p {
packets::C2S::Turn(actions) => {
match self.space.as_ref().unwrap().write().unwrap()
@ -113,10 +113,12 @@ impl Peer {
if let packets::S2C::Err(ref msg, ref ctx) = v {
println!("{:?}: connection crashed: {}; context {:?}", self.id, msg, ctx);
} else {
println!("{:?}: output {:?}", self.id, &v);
// println!("{:?}: output {:?}", self.id, &v);
()
}
self.frames.send(v).await?;
}
tokio::task::yield_now().await;
}
Ok(())
}