diff --git a/examples/consumer.rs b/examples/consumer.rs index 62708b3..9adaeff 100644 --- a/examples/consumer.rs +++ b/examples/consumer.rs @@ -13,15 +13,15 @@ use tokio::time::interval; #[tokio::main] async fn main() -> Result<(), Box> { - let discard: V = Value::simple_record("Discard", vec![]).wrap(); - let capture: V = Value::simple_record("Capture", vec![discard]).wrap(); + let discard: V = Value::simple_record("discard", vec![]).wrap(); + let capture: V = Value::simple_record("capture", vec![discard]).wrap(); let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, ClientCodec::new()); frames.send(C2S::Connect(Value::from("chat").wrap())).await?; frames.send( C2S::Turn(vec![Action::Assert( Value::from(0).wrap(), - Value::simple_record("Observe", vec![ + Value::simple_record("observe", vec![ Value::simple_record("Says", vec![capture.clone(), capture]).wrap()]).wrap())])) .await?; diff --git a/examples/pingpong.rs b/examples/pingpong.rs index 1191852..7c3f1ae 100644 --- a/examples/pingpong.rs +++ b/examples/pingpong.rs @@ -50,7 +50,7 @@ async fn main() -> Result<(), Box> { frames.send( C2S::Turn(vec![Action::Assert( Value::from(0).wrap(), - Value::simple_record("Observe", vec![ + Value::simple_record("observe", vec![ Value::simple_record(recv_label, vec![]).wrap()]).wrap())])) .await?; diff --git a/examples/state-consumer.rs b/examples/state-consumer.rs new file mode 100644 index 0000000..793b198 --- /dev/null +++ b/examples/state-consumer.rs @@ -0,0 +1,76 @@ +#![recursion_limit = "256"] + +use syndicate::{V, value::Value}; +use syndicate::packets::{ClientCodec, C2S, S2C, Action, Event}; +use tokio::net::TcpStream; +use tokio_util::codec::Framed; +use futures::SinkExt; +use futures::StreamExt; +use futures::FutureExt; +use futures::select; +use core::time::Duration; +use tokio::time::interval; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let discard: V = Value::simple_record("discard", vec![]).wrap(); + let capture: V = Value::simple_record("capture", vec![discard]).wrap(); + + let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, ClientCodec::new()); + frames.send(C2S::Connect(Value::from("chat").wrap())).await?; + frames.send( + C2S::Turn(vec![Action::Assert( + Value::from(0).wrap(), + Value::simple_record("observe", vec![ + Value::simple_record("Present", vec![capture]).wrap()]).wrap())])) + .await?; + + let mut stats_timer = interval(Duration::from_secs(1)); + let mut turn_counter = 0; + let mut event_counter = 0; + let mut arrival_counter = 0; + let mut departure_counter = 0; + let mut occupancy = 0; + + loop { + select! { + _instant = stats_timer.next().boxed().fuse() => { + print!("{:?} turns, {:?} events, {:?} arrivals, {:?} departures, {:?} present in the last second\n", + turn_counter, + event_counter, + arrival_counter, + departure_counter, + occupancy); + turn_counter = 0; + event_counter = 0; + arrival_counter = 0; + departure_counter = 0; + }, + frame = frames.next().boxed().fuse() => match frame { + None => return Ok(()), + Some(res) => match res? { + S2C::Err(msg, _) => return Err(msg.into()), + S2C::Turn(events) => { + turn_counter = turn_counter + 1; + event_counter = event_counter + events.len(); + for e in events { + match e { + Event::Add(_, _) => { + arrival_counter = arrival_counter + 1; + occupancy = occupancy + 1; + }, + Event::Del(_, _) => { + departure_counter = departure_counter + 1; + occupancy = occupancy - 1; + }, + _ => () + } + } + }, + S2C::Ping() => frames.send(C2S::Pong()).await?, + S2C::Pong() => (), + } + }, + } + } +} diff --git a/examples/state-producer.rs b/examples/state-producer.rs new file mode 100644 index 0000000..c6b9c4f --- /dev/null +++ b/examples/state-producer.rs @@ -0,0 +1,49 @@ +use futures::{SinkExt, StreamExt, poll}; +use std::task::Poll; +use tokio::net::TcpStream; +use tokio_util::codec::Framed; + +use syndicate::packets::{ClientCodec, C2S, S2C, Action, Event}; +use syndicate::value::Value; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, ClientCodec::new()); + frames.send(C2S::Connect(Value::from("chat").wrap())).await?; + + let present_action = Action::Assert( + Value::from(0).wrap(), + Value::simple_record("Present", vec![Value::from(std::process::id()).wrap()]).wrap()); + let absent_action = Action::Clear( + Value::from(0).wrap()); + + frames.send(C2S::Turn(vec![present_action.clone()])).await?; + loop { + frames.send(C2S::Turn(vec![absent_action.clone()])).await?; + frames.send(C2S::Turn(vec![present_action.clone()])).await?; + + loop { + match poll!(frames.next()) { + Poll::Pending => break, + Poll::Ready(None) => { + print!("Server closed connection"); + return Ok(()); + } + Poll::Ready(Some(res)) => { + match res? { + S2C::Turn(events) => { + for e in events { + match e { + Event::End(_) => (), + _ => println!("{:?}", e), + } + } + } + S2C::Ping() => frames.send(C2S::Pong()).await?, + p => println!("{:?}", p), + } + } + } + } + } +} diff --git a/src/dataspace.rs b/src/dataspace.rs index 4dd1cba..c7ca20c 100644 --- a/src/dataspace.rs +++ b/src/dataspace.rs @@ -142,7 +142,7 @@ impl Dataspace { } let ar = - if let Some(fs) = assertion.value().as_simple_record("Observe", Some(1)) { + if let Some(fs) = assertion.value().as_simple_record("observe", Some(1)) { let ar = skeleton::analyze(&fs[0]); let events = self.index.add_endpoint(&ar, skeleton::Endpoint { connection: id, diff --git a/src/skeleton.rs b/src/skeleton.rs index 5cb125c..08ff120 100644 --- a/src/skeleton.rs +++ b/src/skeleton.rs @@ -507,12 +507,12 @@ pub struct Analyzer { impl Analyzer { fn walk(&mut self, mut a: &Assertion) -> Skeleton { - while let Some(fields) = a.value().as_simple_record("Capture", Some(1)) { + while let Some(fields) = a.value().as_simple_record("capture", Some(1)) { self.capture_paths.push(self.path.clone()); a = &fields[0]; } - if a.value().is_simple_record("Discard", Some(0)) { + if a.value().is_simple_record("discard", Some(0)) { Skeleton::Blank } else { match class_of(a) { @@ -566,12 +566,12 @@ pub fn analyze(a: &Assertion) -> AnalysisResults { // path: &mut Path, // vs: &mut Vec, // a: &Assertion) -> Assertion { -// if let Some(fields) = a.value().as_simple_record("Capture", Some(1)) { +// if let Some(fields) = a.value().as_simple_record("capture", Some(1)) { // capture_paths.push(path.clone()); // let v = vs.pop().unwrap(); // instantiate_assertion_walk(capture_paths, path, vs, &fields[0]); // v -// } else if a.value().is_simple_record("Discard", Some(0)) { +// } else if a.value().is_simple_record("discard", Some(0)) { // Value::Domain(Syndicate::new_placeholder()).wrap() // } else { // let f = |(i, aa)| {