Interoperable observe/capture/discard record labels; new state-consumer/state-producer examples
This commit is contained in:
parent
f5021bacec
commit
51f3ccc9c4
|
@ -13,15 +13,15 @@ use tokio::time::interval;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let discard: V = Value::simple_record("Discard", vec![]).wrap();
|
let discard: V = Value::simple_record("discard", vec![]).wrap();
|
||||||
let capture: V = Value::simple_record("Capture", vec![discard]).wrap();
|
let capture: V = Value::simple_record("capture", vec![discard]).wrap();
|
||||||
|
|
||||||
let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, ClientCodec::new());
|
let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, ClientCodec::new());
|
||||||
frames.send(C2S::Connect(Value::from("chat").wrap())).await?;
|
frames.send(C2S::Connect(Value::from("chat").wrap())).await?;
|
||||||
frames.send(
|
frames.send(
|
||||||
C2S::Turn(vec![Action::Assert(
|
C2S::Turn(vec![Action::Assert(
|
||||||
Value::from(0).wrap(),
|
Value::from(0).wrap(),
|
||||||
Value::simple_record("Observe", vec![
|
Value::simple_record("observe", vec![
|
||||||
Value::simple_record("Says", vec![capture.clone(), capture]).wrap()]).wrap())]))
|
Value::simple_record("Says", vec![capture.clone(), capture]).wrap()]).wrap())]))
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
|
|
@ -50,7 +50,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
frames.send(
|
frames.send(
|
||||||
C2S::Turn(vec![Action::Assert(
|
C2S::Turn(vec![Action::Assert(
|
||||||
Value::from(0).wrap(),
|
Value::from(0).wrap(),
|
||||||
Value::simple_record("Observe", vec![
|
Value::simple_record("observe", vec![
|
||||||
Value::simple_record(recv_label, vec![]).wrap()]).wrap())]))
|
Value::simple_record(recv_label, vec![]).wrap()]).wrap())]))
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,76 @@
|
||||||
|
#![recursion_limit = "256"]
|
||||||
|
|
||||||
|
use syndicate::{V, value::Value};
|
||||||
|
use syndicate::packets::{ClientCodec, C2S, S2C, Action, Event};
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
use tokio_util::codec::Framed;
|
||||||
|
use futures::SinkExt;
|
||||||
|
use futures::StreamExt;
|
||||||
|
use futures::FutureExt;
|
||||||
|
use futures::select;
|
||||||
|
use core::time::Duration;
|
||||||
|
use tokio::time::interval;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let discard: V = Value::simple_record("discard", vec![]).wrap();
|
||||||
|
let capture: V = Value::simple_record("capture", vec![discard]).wrap();
|
||||||
|
|
||||||
|
let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, ClientCodec::new());
|
||||||
|
frames.send(C2S::Connect(Value::from("chat").wrap())).await?;
|
||||||
|
frames.send(
|
||||||
|
C2S::Turn(vec![Action::Assert(
|
||||||
|
Value::from(0).wrap(),
|
||||||
|
Value::simple_record("observe", vec![
|
||||||
|
Value::simple_record("Present", vec![capture]).wrap()]).wrap())]))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut stats_timer = interval(Duration::from_secs(1));
|
||||||
|
let mut turn_counter = 0;
|
||||||
|
let mut event_counter = 0;
|
||||||
|
let mut arrival_counter = 0;
|
||||||
|
let mut departure_counter = 0;
|
||||||
|
let mut occupancy = 0;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
_instant = stats_timer.next().boxed().fuse() => {
|
||||||
|
print!("{:?} turns, {:?} events, {:?} arrivals, {:?} departures, {:?} present in the last second\n",
|
||||||
|
turn_counter,
|
||||||
|
event_counter,
|
||||||
|
arrival_counter,
|
||||||
|
departure_counter,
|
||||||
|
occupancy);
|
||||||
|
turn_counter = 0;
|
||||||
|
event_counter = 0;
|
||||||
|
arrival_counter = 0;
|
||||||
|
departure_counter = 0;
|
||||||
|
},
|
||||||
|
frame = frames.next().boxed().fuse() => match frame {
|
||||||
|
None => return Ok(()),
|
||||||
|
Some(res) => match res? {
|
||||||
|
S2C::Err(msg, _) => return Err(msg.into()),
|
||||||
|
S2C::Turn(events) => {
|
||||||
|
turn_counter = turn_counter + 1;
|
||||||
|
event_counter = event_counter + events.len();
|
||||||
|
for e in events {
|
||||||
|
match e {
|
||||||
|
Event::Add(_, _) => {
|
||||||
|
arrival_counter = arrival_counter + 1;
|
||||||
|
occupancy = occupancy + 1;
|
||||||
|
},
|
||||||
|
Event::Del(_, _) => {
|
||||||
|
departure_counter = departure_counter + 1;
|
||||||
|
occupancy = occupancy - 1;
|
||||||
|
},
|
||||||
|
_ => ()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
S2C::Ping() => frames.send(C2S::Pong()).await?,
|
||||||
|
S2C::Pong() => (),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,49 @@
|
||||||
|
use futures::{SinkExt, StreamExt, poll};
|
||||||
|
use std::task::Poll;
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
use tokio_util::codec::Framed;
|
||||||
|
|
||||||
|
use syndicate::packets::{ClientCodec, C2S, S2C, Action, Event};
|
||||||
|
use syndicate::value::Value;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, ClientCodec::new());
|
||||||
|
frames.send(C2S::Connect(Value::from("chat").wrap())).await?;
|
||||||
|
|
||||||
|
let present_action = Action::Assert(
|
||||||
|
Value::from(0).wrap(),
|
||||||
|
Value::simple_record("Present", vec![Value::from(std::process::id()).wrap()]).wrap());
|
||||||
|
let absent_action = Action::Clear(
|
||||||
|
Value::from(0).wrap());
|
||||||
|
|
||||||
|
frames.send(C2S::Turn(vec![present_action.clone()])).await?;
|
||||||
|
loop {
|
||||||
|
frames.send(C2S::Turn(vec![absent_action.clone()])).await?;
|
||||||
|
frames.send(C2S::Turn(vec![present_action.clone()])).await?;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match poll!(frames.next()) {
|
||||||
|
Poll::Pending => break,
|
||||||
|
Poll::Ready(None) => {
|
||||||
|
print!("Server closed connection");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
Poll::Ready(Some(res)) => {
|
||||||
|
match res? {
|
||||||
|
S2C::Turn(events) => {
|
||||||
|
for e in events {
|
||||||
|
match e {
|
||||||
|
Event::End(_) => (),
|
||||||
|
_ => println!("{:?}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
S2C::Ping() => frames.send(C2S::Pong()).await?,
|
||||||
|
p => println!("{:?}", p),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -142,7 +142,7 @@ impl Dataspace {
|
||||||
}
|
}
|
||||||
|
|
||||||
let ar =
|
let ar =
|
||||||
if let Some(fs) = assertion.value().as_simple_record("Observe", Some(1)) {
|
if let Some(fs) = assertion.value().as_simple_record("observe", Some(1)) {
|
||||||
let ar = skeleton::analyze(&fs[0]);
|
let ar = skeleton::analyze(&fs[0]);
|
||||||
let events = self.index.add_endpoint(&ar, skeleton::Endpoint {
|
let events = self.index.add_endpoint(&ar, skeleton::Endpoint {
|
||||||
connection: id,
|
connection: id,
|
||||||
|
|
|
@ -507,12 +507,12 @@ pub struct Analyzer {
|
||||||
|
|
||||||
impl Analyzer {
|
impl Analyzer {
|
||||||
fn walk(&mut self, mut a: &Assertion) -> Skeleton {
|
fn walk(&mut self, mut a: &Assertion) -> Skeleton {
|
||||||
while let Some(fields) = a.value().as_simple_record("Capture", Some(1)) {
|
while let Some(fields) = a.value().as_simple_record("capture", Some(1)) {
|
||||||
self.capture_paths.push(self.path.clone());
|
self.capture_paths.push(self.path.clone());
|
||||||
a = &fields[0];
|
a = &fields[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
if a.value().is_simple_record("Discard", Some(0)) {
|
if a.value().is_simple_record("discard", Some(0)) {
|
||||||
Skeleton::Blank
|
Skeleton::Blank
|
||||||
} else {
|
} else {
|
||||||
match class_of(a) {
|
match class_of(a) {
|
||||||
|
@ -566,12 +566,12 @@ pub fn analyze(a: &Assertion) -> AnalysisResults {
|
||||||
// path: &mut Path,
|
// path: &mut Path,
|
||||||
// vs: &mut Vec<Assertion>,
|
// vs: &mut Vec<Assertion>,
|
||||||
// a: &Assertion) -> Assertion {
|
// a: &Assertion) -> Assertion {
|
||||||
// if let Some(fields) = a.value().as_simple_record("Capture", Some(1)) {
|
// if let Some(fields) = a.value().as_simple_record("capture", Some(1)) {
|
||||||
// capture_paths.push(path.clone());
|
// capture_paths.push(path.clone());
|
||||||
// let v = vs.pop().unwrap();
|
// let v = vs.pop().unwrap();
|
||||||
// instantiate_assertion_walk(capture_paths, path, vs, &fields[0]);
|
// instantiate_assertion_walk(capture_paths, path, vs, &fields[0]);
|
||||||
// v
|
// v
|
||||||
// } else if a.value().is_simple_record("Discard", Some(0)) {
|
// } else if a.value().is_simple_record("discard", Some(0)) {
|
||||||
// Value::Domain(Syndicate::new_placeholder()).wrap()
|
// Value::Domain(Syndicate::new_placeholder()).wrap()
|
||||||
// } else {
|
// } else {
|
||||||
// let f = |(i, aa)| {
|
// let f = |(i, aa)| {
|
||||||
|
|
Loading…
Reference in New Issue