Draw the rest of the bloody owl

This commit is contained in:
Tony Garnock-Jones 2021-07-15 09:13:31 +02:00
parent bc99dad13e
commit 94fd0d3f14
21 changed files with 1377 additions and 589 deletions

2
.gitignore vendored
View File

@ -1,4 +1,4 @@
/target
**/*.rs.bk
scratch/
src/gen/**/*.rs
src/schemas/**/*.rs

42
Cargo.lock generated
View File

@ -256,6 +256,16 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "crypto-mac"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25fab6889090c8133f3deb8f73ba3c65a7f456f66436fc012a1b1e272b1e103e"
dependencies = [
"generic-array",
"subtle",
]
[[package]]
name = "csv"
version = "1.1.6"
@ -469,6 +479,16 @@ dependencies = [
"libc",
]
[[package]]
name = "hmac"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a2a2320eb7ec0ebe8da8f744d7812d9fc4cb4d09344ac01898dbcb6a20ae69b"
dependencies = [
"crypto-mac",
"digest",
]
[[package]]
name = "http"
version = "0.2.4"
@ -1202,6 +1222,19 @@ dependencies = [
"opaque-debug",
]
[[package]]
name = "sha2"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b362ae5752fd2137731f9fa25fd4d9058af34666ca1966fb969119cc35719f12"
dependencies = [
"block-buffer",
"cfg-if",
"cpufeatures",
"digest",
"opaque-debug",
]
[[package]]
name = "sharded-slab"
version = "0.1.1"
@ -1253,6 +1286,12 @@ dependencies = [
"syn",
]
[[package]]
name = "subtle"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]]
name = "syn"
version = "1.0.73"
@ -1271,11 +1310,14 @@ dependencies = [
"bytes",
"criterion",
"futures",
"getrandom",
"hmac",
"openssl",
"preserves",
"preserves-schema",
"serde",
"serde_bytes",
"sha2",
"structopt",
"tokio",
"tokio-tungstenite",

View File

@ -29,11 +29,15 @@ serde = { version = "1.0", features = ["derive", "rc"] }
serde_bytes = "0.11"
tokio = { version = "1.7.1", features = ["macros", "sync", "net", "rt", "rt-multi-thread", "time"] }
tokio-util = { version = "0.6.7", features = ["codec"] }
tokio-util = "0.6.7"
bytes = "1.0.1"
futures = "0.3.5"
getrandom = "0.2.3"
hmac = "0.11.0"
sha2 = "0.9.5"
structopt = "0.3.14"
tungstenite = "0.13.0"

View File

@ -1,57 +1,41 @@
use criterion::{criterion_group, criterion_main, Criterion};
use futures::Sink;
use std::mem::drop;
use std::pin::Pin;
use std::sync::{Arc, Mutex, atomic::{AtomicU64, Ordering}};
use std::task::{Context, Poll};
use std::thread;
use std::iter::FromIterator;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Instant;
use structopt::StructOpt;
use syndicate::peer::Peer;
use syndicate::{config, spaces, packets, value::{Value, IOValue}};
use syndicate::actor::*;
use syndicate::dataspace::Dataspace;
use syndicate::schemas::dataspace::Observe;
use syndicate::schemas::dataspace_patterns as p;
use syndicate::schemas::internal_protocol::*;
use syndicate::value::Map;
use syndicate::value::NestedValue;
use syndicate::value::Value;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tracing::Level;
struct SinkTx<T> {
tx: Option<UnboundedSender<T>>,
}
impl<T> SinkTx<T> {
fn new(tx: UnboundedSender<T>) -> Self {
SinkTx { tx: Some(tx) }
}
}
impl<T> Sink<T> for SinkTx<T> {
type Error = packets::Error;
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), packets::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, v: T) -> Result<(), packets::Error> {
self.tx.as_ref().unwrap().send(v).map_err(|e| packets::Error::Message(e.to_string()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), packets::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), packets::Error>> {
(&mut self).tx = None;
Poll::Ready(Ok(()))
}
}
#[inline]
fn says(who: IOValue, what: IOValue) -> IOValue {
fn says(who: _Any, what: _Any) -> _Any {
let mut r = Value::simple_record("Says", 2);
r.fields_vec_mut().push(who);
r.fields_vec_mut().push(what);
r.finish().wrap()
}
struct ShutdownEntity;
impl Entity for ShutdownEntity {
fn message(&mut self, t: &mut Activation, _m: _Any) -> ActorResult {
t.actor.shutdown();
Ok(())
}
}
pub fn bench_pub(c: &mut Criterion) {
let filter = tracing_subscriber::filter::EnvFilter::from_default_env()
.add_directive(tracing_subscriber::filter::LevelFilter::INFO.into());
@ -63,119 +47,103 @@ pub fn bench_pub(c: &mut Criterion) {
tracing::subscriber::set_global_default(subscriber)
.expect("Could not set tracing global subscriber");
let rt = Runtime::new().unwrap();
c.bench_function("publication alone", |b| {
b.iter_custom(|iters| {
let no_args: Vec<String> = vec![];
let config = Arc::new(config::ServerConfig::from_iter(no_args.iter()));
let spaces = Arc::new(Mutex::new(spaces::Spaces::new()));
let (c2s_tx, c2s_rx) = unbounded_channel();
let (s2c_tx, _s2c_rx) = unbounded_channel();
let runtime_handle = thread::spawn(move || {
let mut rt = Runtime::new().unwrap();
rt.block_on(async {
Peer::new(0, c2s_rx, SinkTx::new(s2c_tx)).run(spaces, &config).await.unwrap();
})
});
c2s_tx.send(Ok(packets::C2S::Connect(Value::from("bench_pub").wrap()))).unwrap();
let turn = packets::C2S::Turn(vec![
packets::Action::Message(says(Value::from("bench_pub").wrap(),
Value::ByteString(vec![]).wrap()))]);
let start = Instant::now();
for _ in 0..iters {
c2s_tx.send(Ok(turn.clone())).unwrap();
}
drop(c2s_tx);
runtime_handle.join().unwrap();
rt.block_on(async move {
let mut ac = Actor::new();
let ds = ac.create(Dataspace::new());
let shutdown = ac.create(ShutdownEntity);
ac.linked_task(syndicate::name!("sender"), async move {
for _ in 0..iters {
ds.external_event(Event::Message(Box::new(Message {
body: Assertion(says(_Any::new("bench_pub"),
Value::ByteString(vec![]).wrap())),
}))).await
}
shutdown.external_event(Event::Message(Box::new(Message {
body: Assertion(_Any::new(true)),
}))).await;
Ok(())
});
ac.start(syndicate::name!("dataspace")).await.unwrap().unwrap();
});
start.elapsed()
})
});
c.bench_function("publication and subscription", |b| {
b.iter_custom(|iters| {
let no_args: Vec<String> = vec![];
let config = Arc::new(config::ServerConfig::from_iter(no_args.iter()));
let spaces = Arc::new(Mutex::new(spaces::Spaces::new()));
let start = Instant::now();
rt.block_on(async move {
let ds = Actor::create_and_start(syndicate::name!("dataspace"), Dataspace::new());
let turn_count = Arc::new(AtomicU64::new(0));
let turn_count = Arc::new(AtomicU64::new(0));
let (c2s_tx, c2s_rx) = unbounded_channel();
let c2s_tx = Arc::new(c2s_tx);
{
let c2s_tx = c2s_tx.clone();
c2s_tx.send(Ok(packets::C2S::Connect(Value::from("bench_pub").wrap()))).unwrap();
let discard: IOValue = Value::simple_record0("discard").wrap();
let capture: IOValue = Value::simple_record1("capture", discard).wrap();
c2s_tx.send(Ok(packets::C2S::Turn(vec![
packets::Action::Assert(Value::from(0).wrap(),
Value::simple_record1(
"observe",
says(Value::from("bench_pub").wrap(),
capture)).wrap())]))).unwrap();
// tracing::info!("Sending {} messages", iters);
let turn = packets::C2S::Turn(vec![
packets::Action::Message(says(Value::from("bench_pub").wrap(),
Value::ByteString(vec![]).wrap()))]);
for _ in 0..iters {
c2s_tx.send(Ok(turn.clone())).unwrap();
struct Receiver(Arc<AtomicU64>);
impl Entity for Receiver {
fn message(&mut self, _t: &mut Activation, _m: _Any) -> ActorResult {
self.0.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}
c2s_tx.send(Ok(packets::C2S::Turn(vec![
packets::Action::Clear(Value::from(0).wrap())]))).unwrap();
}
let mut ac = Actor::new();
let shutdown = ac.create(ShutdownEntity);
let receiver = ac.create(Receiver(Arc::clone(&turn_count)));
let start = Instant::now();
let runtime_handle = {
let turn_count = turn_count.clone();
let mut c2s_tx = Some(c2s_tx.clone());
thread::spawn(move || {
let mut rt = Runtime::new().unwrap();
rt.block_on(async move {
let (s2c_tx, mut s2c_rx) = unbounded_channel();
let consumer_handle = tokio::spawn(async move {
while let Some(p) = s2c_rx.recv().await {
// tracing::info!("Consumer got {:?}", &p);
match p {
packets::S2C::Ping() => (),
packets::S2C::Turn(actions) => {
for a in actions {
match a {
packets::Event::Msg(_, _) => {
turn_count.fetch_add(1, Ordering::Relaxed);
},
packets::Event::End(_) => {
c2s_tx.take();
}
_ => panic!("Unexpected action: {:?}", a),
}
}
},
_ => panic!("Unexpected packet: {:?}", p),
}
}
// tracing::info!("Consumer terminating");
{
let iters = iters.clone();
ac.boot(syndicate::name!("dataspace"), move |t| Box::pin(async move {
t.assert(&ds, &Observe {
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec {
ctor: Box::new(p::CRec {
label: Value::symbol("Says").wrap(),
arity: 2.into(),
}),
members: Map::from_iter(vec![
(0.into(), p::Pattern::DLit(Box::new(p::DLit {
value: _Any::new("bench_pub"),
}))),
(1.into(), p::Pattern::DBind(Box::new(p::DBind {
name: "what".to_owned(),
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)),
}))),
].into_iter()),
})),
observer: receiver,
});
Peer::new(0, c2s_rx, SinkTx::new(s2c_tx)).run(spaces, &config).await.unwrap();
consumer_handle.await.unwrap();
})
})
};
drop(c2s_tx);
runtime_handle.join().unwrap();
let elapsed = start.elapsed();
let actual_turns = turn_count.load(Ordering::SeqCst);
if actual_turns != iters {
panic!("Expected {}, got {} messages", iters, actual_turns);
}
elapsed
t.assert(&ds, &Observe {
pattern: p::Pattern::DBind(Box::new(p::DBind {
name: "shutdownTrigger".to_owned(),
pattern: p::Pattern::DLit(Box::new(p::DLit {
value: _Any::new(true),
})),
})),
observer: shutdown,
});
t.actor.linked_task(syndicate::name!("sender"), async move {
for _ in 0..iters {
ds.external_event(Event::Message(Box::new(Message {
body: Assertion(says(_Any::new("bench_pub"),
Value::ByteString(vec![]).wrap())),
}))).await
}
ds.external_event(Event::Message(Box::new(Message {
body: Assertion(_Any::new(true)),
}))).await;
Ok(())
});
Ok(())
})).await.unwrap().unwrap();
}
let actual_turns = turn_count.load(Ordering::SeqCst);
if actual_turns != iters {
panic!("Expected {}, got {} messages", iters, actual_turns);
}
});
start.elapsed()
})
});
}

View File

@ -13,9 +13,6 @@ fn main() -> Result<(), Error> {
let inputs = expand_inputs(&vec!["protocols/schema-bundle.bin".to_owned(),
"local-protocols/schema-bundle.bin".to_owned()])?;
for i in &inputs {
println!("cargo:rerun-if-changed={:?}", i);
}
c.load_schemas_and_bundles(&inputs)?;
compile(&c)
}

View File

@ -1,61 +1,81 @@
#![recursion_limit = "256"]
use std::iter::FromIterator;
use std::sync::Arc;
use structopt::StructOpt;
use syndicate::actor::*;
use syndicate::relay;
use syndicate::schemas::dataspace::Observe;
use syndicate::schemas::dataspace_patterns as p;
use syndicate::schemas::internal_protocol::*;
use syndicate::sturdy;
use syndicate::value::Map;
use syndicate::value::NestedValue;
use syndicate::value::Value;
use syndicate::{V, value::Value};
use syndicate::packets::{ClientCodec, C2S, S2C, Action};
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use futures::SinkExt;
use futures::StreamExt;
use futures::FutureExt;
use futures::select;
use core::time::Duration;
use tokio::time::interval;
#[inline]
fn says(who: V, what: V) -> V {
let mut r = Value::simple_record("Says", 2);
r.fields_vec_mut().push(who);
r.fields_vec_mut().push(what);
r.finish().wrap()
#[derive(Clone, Debug, StructOpt)]
pub struct Config {
#[structopt(short = "d", default_value = "b4b303726566b10973796e646963617465b584b210a6480df5306611ddd0d3882b546e197784")]
dataspace: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let discard: V = Value::simple_record0("discard").wrap();
let capture: V = Value::simple_record1("capture", discard).wrap();
syndicate::convenient_logging()?;
Actor::new().boot(syndicate::name!("consumer"), |t| Box::pin(async move {
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| {
let consumer = syndicate::entity(0)
.on_message(|message_count, _t, m| {
if m.value().is_boolean() {
tracing::info!("{:?} messages in the last second", message_count);
*message_count = 0;
} else {
*message_count += 1;
}
Ok(())
})
.create(t.actor);
let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, ClientCodec::new());
frames.send(C2S::Connect(Value::from("chat").wrap())).await?;
frames.send(
C2S::Turn(vec![Action::Assert(
Value::from(0).wrap(),
Value::simple_record1("observe", says(capture.clone(), capture)).wrap())]))
.await?;
t.assert(&ds, &Observe {
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec {
ctor: Box::new(p::CRec {
label: Value::symbol("Says").wrap(),
arity: 2.into(),
}),
members: Map::from_iter(vec![
(0.into(), p::Pattern::DBind(Box::new(p::DBind {
name: "who".to_owned(),
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)),
}))),
(1.into(), p::Pattern::DBind(Box::new(p::DBind {
name: "what".to_owned(),
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)),
}))),
].into_iter()),
})),
observer: Arc::clone(&consumer),
});
let mut stats_timer = interval(Duration::from_secs(1));
let mut turn_counter = 0;
let mut event_counter = 0;
loop {
select! {
_instant = stats_timer.next().boxed().fuse() => {
print!("{:?} turns, {:?} events in the last second\n", turn_counter, event_counter);
turn_counter = 0;
event_counter = 0;
},
frame = frames.next().boxed().fuse() => match frame {
None => return Ok(()),
Some(res) => match res? {
S2C::Err(msg, _) => return Err(msg.into()),
S2C::Turn(es) => {
// print!("{:?}\n", es);
turn_counter = turn_counter + 1;
event_counter = event_counter + es.len();
},
S2C::Ping() => frames.send(C2S::Pong()).await?,
S2C::Pong() => (),
t.actor.linked_task(syndicate::name!("tick"), async move {
let mut stats_timer = interval(Duration::from_secs(1));
loop {
stats_timer.tick().await;
consumer.external_event(Event::Message(Box::new(Message {
body: Assertion(_Any::new(true)),
}))).await;
}
},
}
}
});
Ok(None)
});
Ok(())
})).await??;
Ok(())
}

View File

@ -1,18 +1,23 @@
#![recursion_limit = "512"]
use std::iter::FromIterator;
use std::sync::Arc;
use std::time::SystemTime;
use structopt::StructOpt;
use syndicate::actor::*;
use syndicate::relay;
use syndicate::schemas::dataspace::Observe;
use syndicate::schemas::dataspace_patterns as p;
use syndicate::schemas::internal_protocol::*;
use syndicate::sturdy;
use syndicate::value::Map;
use syndicate::value::NestedValue;
use syndicate::value::Value;
use tokio::net::TcpStream;
use core::time::Duration;
use futures::FutureExt;
use futures::SinkExt;
use futures::StreamExt;
use futures::select;
use std::time::{SystemTime, SystemTimeError};
use structopt::StructOpt;
use tokio::net::TcpStream;
use tokio::time::interval;
use tokio_util::codec::Framed;
use syndicate::packets::{ClientCodec, C2S, S2C, Action, Event};
use syndicate::value::{NestedValue, Value, IOValue};
#[derive(Clone, Debug, StructOpt)]
pub struct PingConfig {
@ -40,15 +45,15 @@ pub struct Config {
#[structopt(subcommand)]
mode: PingPongMode,
#[structopt(default_value = "pingpong")]
#[structopt(short = "d", default_value = "b4b303726566b10973796e646963617465b584b210a6480df5306611ddd0d3882b546e197784")]
dataspace: String,
}
fn now() -> Result<u64, SystemTimeError> {
Ok(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_nanos() as u64)
fn now() -> u64 {
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).expect("time after epoch").as_nanos() as u64
}
fn simple_record2(label: &str, v1: IOValue, v2: IOValue) -> IOValue {
fn simple_record2(label: &str, v1: _Any, v2: _Any) -> _Any {
let mut r = Value::simple_record(label, 2);
r.fields_vec_mut().push(v1);
r.fields_vec_mut().push(v2);
@ -87,107 +92,134 @@ fn report_latencies(rtt_ns_samples: &Vec<u64>) {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::from_args();
syndicate::convenient_logging()?;
let (send_label, recv_label, report_latency_every, should_echo, bytes_padding) =
match config.mode {
PingPongMode::Ping(ref c) =>
("Ping", "Pong", c.report_latency_every, false, c.bytes_padding),
PingPongMode::Pong =>
("Pong", "Ping", 0, true, 0),
};
Actor::new().boot(syndicate::name!("pingpong"), move |t| Box::pin(async move {
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| {
let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, ClientCodec::new());
frames.send(C2S::Connect(Value::from(config.dataspace).wrap())).await?;
let (send_label, recv_label, report_latency_every, should_echo, bytes_padding) =
match config.mode {
PingPongMode::Ping(ref c) =>
("Ping", "Pong", c.report_latency_every, false, c.bytes_padding),
PingPongMode::Pong =>
("Pong", "Ping", 0, true, 0),
};
let discard: IOValue = Value::simple_record0("discard").wrap();
let capture: IOValue = Value::simple_record1("capture", discard).wrap();
let pat: IOValue = simple_record2(recv_label, capture.clone(), capture);
frames.send(
C2S::Turn(vec![Action::Assert(
Value::from(0).wrap(),
Value::simple_record1("observe", pat).wrap())]))
.await?;
let consumer = {
let ds = Arc::clone(&ds);
let mut turn_counter: u64 = 0;
let mut event_counter: u64 = 0;
let mut rtt_ns_samples: Vec<u64> = vec![0; report_latency_every];
let mut rtt_batch_count: usize = 0;
let mut current_reply = None;
syndicate::entity(Arc::clone(&*INERT_REF))
.on_message(move |self_ref, t, m| {
match m.value().as_boolean() {
Some(true) => {
tracing::info!("{:?} turns, {:?} events in the last second",
turn_counter,
event_counter);
turn_counter = 0;
event_counter = 0;
}
Some(false) => {
current_reply = None;
}
None => {
event_counter += 1;
let bindings = m.value().to_sequence()?;
let timestamp = &bindings[0];
let padding = &bindings[1];
let padding: IOValue = Value::ByteString(vec![0; bytes_padding]).wrap();
if should_echo || (report_latency_every == 0) {
t.message(&ds, simple_record2(&send_label,
timestamp.clone(),
padding.clone()));
} else {
if let None = current_reply {
turn_counter += 1;
t.message_immediate_self(&self_ref, _Any::new(false));
let rtt_ns = now() - timestamp.value().to_u64()?;
rtt_ns_samples[rtt_batch_count] = rtt_ns;
rtt_batch_count += 1;
let mut stats_timer = interval(Duration::from_secs(1));
let mut turn_counter = 0;
let mut event_counter = 0;
let mut current_rec: IOValue = simple_record2(send_label,
Value::from(0).wrap(),
padding.clone());
if let PingPongMode::Ping(ref c) = config.mode {
for _ in 0..c.turn_count {
let mut actions = vec![];
current_rec = simple_record2(send_label,
Value::from(now()?).wrap(),
padding.clone());
for _ in 0..c.action_count {
actions.push(Action::Message(current_rec.clone()));
}
frames.send(C2S::Turn(actions)).await?;
}
}
let mut rtt_ns_samples: Vec<u64> = vec![0; report_latency_every];
let mut rtt_batch_count = 0;
loop {
select! {
_instant = stats_timer.next().boxed().fuse() => {
print!("{:?} turns, {:?} events in the last second\n", turn_counter, event_counter);
turn_counter = 0;
event_counter = 0;
},
frame = frames.next().boxed().fuse() => match frame {
None => return Ok(()),
Some(res) => match res? {
S2C::Err(msg, _) => return Err(msg.into()),
S2C::Turn(events) => {
turn_counter = turn_counter + 1;
event_counter = event_counter + events.len();
let mut actions = vec![];
let mut have_sample = false;
for e in events {
match e {
Event::Msg(_, captures) => {
if should_echo || (report_latency_every == 0) {
actions.push(Action::Message(
simple_record2(send_label,
captures[0].clone(),
captures[1].clone())));
} else {
if !have_sample {
let rtt_ns = now()? - captures[0].value().to_u64()?;
rtt_ns_samples[rtt_batch_count] = rtt_ns;
rtt_batch_count = rtt_batch_count + 1;
if rtt_batch_count == report_latency_every {
rtt_ns_samples.sort();
report_latencies(&rtt_ns_samples);
rtt_batch_count = 0;
}
have_sample = true;
current_rec = simple_record2(send_label,
Value::from(now()?).wrap(),
padding.clone());
if rtt_batch_count == report_latency_every {
rtt_ns_samples.sort();
report_latencies(&rtt_ns_samples);
rtt_batch_count = 0;
}
actions.push(Action::Message(current_rec.clone()));
current_reply = Some(
simple_record2(&send_label,
Value::from(now()).wrap(),
padding.clone()));
}
t.message(&ds, current_reply.as_ref().expect("some reply").clone());
}
_ =>
()
}
}
frames.send(C2S::Turn(actions)).await?;
},
S2C::Ping() => frames.send(C2S::Pong()).await?,
S2C::Pong() => (),
Ok(())
})
.create_rec(t.actor, |_ac, self_ref, e_ref| *self_ref = Arc::clone(e_ref))
};
t.assert(&ds, &Observe {
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec {
ctor: Box::new(p::CRec {
label: Value::symbol(recv_label).wrap(),
arity: 2.into(),
}),
members: Map::from_iter(vec![
(0.into(), p::Pattern::DBind(Box::new(p::DBind {
name: "timestamp".to_owned(),
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)),
}))),
(1.into(), p::Pattern::DBind(Box::new(p::DBind {
name: "padding".to_owned(),
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)),
}))),
].into_iter()),
})),
observer: Arc::clone(&consumer),
});
t.actor.linked_task(syndicate::name!("tick"), async move {
let mut stats_timer = interval(Duration::from_secs(1));
loop {
stats_timer.tick().await;
consumer.external_event(Event::Message(Box::new(Message {
body: Assertion(_Any::new(true)),
}))).await;
}
},
}
}
});
if let PingPongMode::Ping(c) = &config.mode {
let turn_count = c.turn_count;
let action_count = c.action_count;
t.actor.linked_task(syndicate::name!("boot-ping"), async move {
let padding: _Any = Value::ByteString(vec![0; bytes_padding]).wrap();
for _ in 0..turn_count {
let mut events = vec![];
let current_rec = simple_record2(send_label,
Value::from(now()).wrap(),
padding.clone());
for _ in 0..action_count {
events.push(Event::Message(Box::new(Message {
body: Assertion(current_rec.clone()),
})));
}
ds.external_events(events).await
}
Ok(())
});
}
Ok(None)
});
Ok(())
})).await??;
Ok(())
}

View File

@ -1,11 +1,14 @@
use futures::{SinkExt, StreamExt, poll};
use std::task::Poll;
use structopt::StructOpt;
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use std::sync::Arc;
use syndicate::packets::{ClientCodec, C2S, S2C, Action};
use syndicate::value::{Value, IOValue};
use structopt::StructOpt;
use syndicate::actor::*;
use syndicate::relay;
use syndicate::sturdy;
use syndicate::value::NestedValue;
use syndicate::value::Value;
use tokio::net::TcpStream;
#[derive(Clone, Debug, StructOpt)]
pub struct Config {
@ -14,10 +17,13 @@ pub struct Config {
#[structopt(short = "b", default_value = "0")]
bytes_padding: usize,
#[structopt(short = "d", default_value = "b4b303726566b10973796e646963617465b584b210a6480df5306611ddd0d3882b546e197784")]
dataspace: String,
}
#[inline]
fn says(who: IOValue, what: IOValue) -> IOValue {
fn says(who: _Any, what: _Any) -> _Any {
let mut r = Value::simple_record("Says", 2);
r.fields_vec_mut().push(who);
r.fields_vec_mut().push(what);
@ -26,34 +32,29 @@ fn says(who: IOValue, what: IOValue) -> IOValue {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::from_args();
syndicate::convenient_logging()?;
Actor::new().boot(syndicate::name!("producer"), |t| Box::pin(async move {
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| {
let padding: _Any = Value::ByteString(vec![0; config.bytes_padding]).wrap();
let action_count = config.action_count;
let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, ClientCodec::new());
frames.send(C2S::Connect(Value::from("chat").wrap())).await?;
let producer = syndicate::entity(Arc::clone(&*INERT_REF))
.on_message(move |self_ref, t, _m| {
for _ in 0..action_count {
t.message(&ds, says(Value::from("producer").wrap(), padding.clone()));
}
t.message(&self_ref, _Any::new(true));
Ok(())
})
.create_rec(t.actor, |_ac, self_ref, p_ref| *self_ref = Arc::clone(p_ref));
let padding: IOValue = Value::ByteString(vec![0; config.bytes_padding]).wrap();
loop {
let mut actions = vec![];
for _ in 0..config.action_count {
actions.push(Action::Message(says(Value::from("producer").wrap(),
padding.clone())));
}
frames.send(C2S::Turn(actions)).await?;
loop {
match poll!(frames.next()) {
Poll::Pending => break,
Poll::Ready(None) => {
print!("Server closed connection");
return Ok(());
}
Poll::Ready(Some(res)) => {
let p = res?;
print!("{:?}\n", p);
if let S2C::Ping() = p { frames.send(C2S::Pong()).await? }
}
}
}
}
t.message(&producer, _Any::new(true));
Ok(None)
});
Ok(())
})).await??;
Ok(())
}

View File

@ -1,76 +1,97 @@
#![recursion_limit = "256"]
use std::iter::FromIterator;
use std::sync::Arc;
use structopt::StructOpt;
use syndicate::actor::*;
use syndicate::relay;
use syndicate::schemas::dataspace::Observe;
use syndicate::schemas::dataspace_patterns as p;
use syndicate::schemas::internal_protocol::*;
use syndicate::sturdy;
use syndicate::value::Map;
use syndicate::value::NestedValue;
use syndicate::value::Value;
use syndicate::{V, value::Value};
use syndicate::packets::{ClientCodec, C2S, S2C, Action, Event};
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use futures::SinkExt;
use futures::StreamExt;
use futures::FutureExt;
use futures::select;
use core::time::Duration;
use tokio::time::interval;
#[derive(Clone, Debug, StructOpt)]
pub struct Config {
#[structopt(short = "d", default_value = "b4b303726566b10973796e646963617465b584b210a6480df5306611ddd0d3882b546e197784")]
dataspace: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let discard: V = Value::simple_record0("discard").wrap();
let capture: V = Value::simple_record1("capture", discard).wrap();
let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, ClientCodec::new());
frames.send(C2S::Connect(Value::from("chat").wrap())).await?;
frames.send(
C2S::Turn(vec![Action::Assert(
Value::from(0).wrap(),
Value::simple_record1("observe",
Value::simple_record1("Present", capture).wrap()).wrap())]))
.await?;
let mut stats_timer = interval(Duration::from_secs(1));
let mut turn_counter = 0;
let mut event_counter = 0;
let mut arrival_counter = 0;
let mut departure_counter = 0;
let mut occupancy = 0;
loop {
select! {
_instant = stats_timer.next().boxed().fuse() => {
print!("{:?} turns, {:?} events, {:?} arrivals, {:?} departures, {:?} present in the last second\n",
turn_counter,
event_counter,
arrival_counter,
departure_counter,
occupancy);
turn_counter = 0;
event_counter = 0;
arrival_counter = 0;
departure_counter = 0;
},
frame = frames.next().boxed().fuse() => match frame {
None => return Ok(()),
Some(res) => match res? {
S2C::Err(msg, _) => return Err(msg.into()),
S2C::Turn(events) => {
turn_counter = turn_counter + 1;
event_counter = event_counter + events.len();
for e in events {
match e {
Event::Add(_, _) => {
arrival_counter = arrival_counter + 1;
occupancy = occupancy + 1;
},
Event::Del(_, _) => {
departure_counter = departure_counter + 1;
occupancy = occupancy - 1;
},
_ => ()
}
}
},
S2C::Ping() => frames.send(C2S::Pong()).await?,
S2C::Pong() => (),
syndicate::convenient_logging()?;
Actor::new().boot(syndicate::name!("consumer"), |t| Box::pin(async move {
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
relay::connect_stream(t, i, o, sturdyref, (), |_state, t, ds| {
let consumer = {
#[derive(Default)]
struct State {
event_counter: u64,
arrival_counter: u64,
departure_counter: u64,
occupancy: u64,
}
},
}
}
syndicate::entity(State::default()).on_asserted(move |s, _, _| {
s.event_counter += 1;
s.arrival_counter += 1;
s.occupancy += 1;
Ok(Some(Box::new(|s, _| {
s.event_counter += 1;
s.departure_counter += 1;
s.occupancy -= 1;
Ok(())
})))
}).on_message(move |s, _, _| {
tracing::info!(
"{:?} events, {:?} arrivals, {:?} departures, {:?} present in the last second",
s.event_counter,
s.arrival_counter,
s.departure_counter,
s.occupancy);
s.event_counter = 0;
s.arrival_counter = 0;
s.departure_counter = 0;
Ok(())
}).create(t.actor)
};
t.assert(&ds, &Observe {
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec {
ctor: Box::new(p::CRec {
label: Value::symbol("Present").wrap(),
arity: 1.into(),
}),
members: Map::from_iter(vec![
(0.into(), p::Pattern::DBind(Box::new(p::DBind {
name: "who".to_owned(),
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)),
}))),
].into_iter()),
})),
observer: Arc::clone(&consumer),
});
t.actor.linked_task(syndicate::name!("tick"), async move {
let mut stats_timer = interval(Duration::from_secs(1));
loop {
stats_timer.tick().await;
consumer.external_event(Event::Message(Box::new(Message {
body: Assertion(_Any::new(true)),
}))).await;
}
});
Ok(None)
});
Ok(())
})).await??;
Ok(())
}

View File

@ -1,49 +1,55 @@
use futures::{SinkExt, StreamExt, poll};
use std::task::Poll;
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use std::sync::Arc;
use syndicate::packets::{ClientCodec, C2S, S2C, Action, Event};
use structopt::StructOpt;
use syndicate::actor::*;
use syndicate::relay;
use syndicate::sturdy;
use syndicate::value::NestedValue;
use syndicate::value::Value;
use tokio::net::TcpStream;
#[derive(Clone, Debug, StructOpt)]
pub struct Config {
#[structopt(short = "d", default_value = "b4b303726566b10973796e646963617465b584b210a6480df5306611ddd0d3882b546e197784")]
dataspace: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut frames = Framed::new(TcpStream::connect("127.0.0.1:8001").await?, ClientCodec::new());
frames.send(C2S::Connect(Value::from("chat").wrap())).await?;
syndicate::convenient_logging()?;
Actor::new().boot(syndicate::name!("producer"), |t| Box::pin(async move {
let config = Config::from_args();
let sturdyref = sturdy::SturdyRef::from_hex(&config.dataspace)?;
let (i, o) = TcpStream::connect("127.0.0.1:8001").await?.into_split();
relay::connect_stream(t, i, o, sturdyref, (), move |_state, t, ds| {
let presence: _Any = Value::simple_record1(
"Present",
Value::from(std::process::id()).wrap()).wrap();
let present_action = Action::Assert(
Value::from(0).wrap(),
Value::simple_record1("Present", Value::from(std::process::id()).wrap()).wrap());
let absent_action = Action::Clear(
Value::from(0).wrap());
let mut handle = Some(t.assert(&ds, presence.clone()));
frames.send(C2S::Turn(vec![present_action.clone()])).await?;
loop {
frames.send(C2S::Turn(vec![absent_action.clone()])).await?;
frames.send(C2S::Turn(vec![present_action.clone()])).await?;
loop {
match poll!(frames.next()) {
Poll::Pending => break,
Poll::Ready(None) => {
print!("Server closed connection");
return Ok(());
}
Poll::Ready(Some(res)) => {
match res? {
S2C::Turn(events) => {
for e in events {
match e {
Event::End(_) => (),
_ => println!("{:?}", e),
}
}
let producer = syndicate::entity(Arc::clone(&*INERT_REF))
.on_message(move |self_ref, t, m| {
match m.value().to_boolean()? {
true => {
handle = Some(t.assert(&ds, presence.clone()));
t.message(&self_ref, _Any::new(false));
}
false => {
t.retract(handle.take().unwrap());
t.message(&self_ref, _Any::new(true));
}
S2C::Ping() => frames.send(C2S::Pong()).await?,
p => println!("{:?}", p),
}
}
}
}
}
Ok(())
})
.create_rec(t.actor, |_ac, self_ref, p_ref| *self_ref = Arc::clone(p_ref));
t.message(&producer, _Any::new(false));
Ok(None)
});
Ok(())
})).await??;
Ok(())
}

View File

@ -4,8 +4,11 @@ pub use std::future::ready;
use super::ActorId;
use super::schemas::internal_protocol::*;
use super::schemas::sturdy;
use super::error::Error;
use super::error::error;
use super::rewrite::CaveatError;
use super::rewrite::CheckedCaveat;
use preserves::value::Domain;
use preserves::value::IOValue;
@ -24,7 +27,7 @@ use tokio_util::sync::CancellationToken;
use tracing;
use tracing::Instrument;
pub type Assertion = super::schemas::dataspace::_Any;
pub use super::schemas::internal_protocol::_Any;
pub use super::schemas::internal_protocol::Handle;
pub use super::schemas::internal_protocol::Oid;
@ -32,17 +35,17 @@ pub type ActorResult = Result<(), Error>;
pub type ActorHandle = tokio::task::JoinHandle<ActorResult>;
pub trait Entity: Send {
fn assert(&mut self, _t: &mut Activation, _a: Assertion, _h: Handle) -> ActorResult {
fn assert(&mut self, _t: &mut Activation, _a: _Any, _h: Handle) -> ActorResult {
Ok(())
}
fn retract(&mut self, _t: &mut Activation, _h: Handle) -> ActorResult {
Ok(())
}
fn message(&mut self, _t: &mut Activation, _m: Assertion) -> ActorResult {
fn message(&mut self, _t: &mut Activation, _m: _Any) -> ActorResult {
Ok(())
}
fn sync(&mut self, t: &mut Activation, peer: Arc<Ref>) -> ActorResult {
t.message(&peer, Assertion::new(true));
t.message(&peer, _Any::new(true));
Ok(())
}
fn turn_end(&mut self, _t: &mut Activation) -> ActorResult {
@ -88,21 +91,26 @@ pub struct Mailbox {
pub struct Actor {
actor_id: ActorId,
tx: UnboundedSender<SystemMessage>,
rx: UnboundedReceiver<SystemMessage>,
rx: Option<UnboundedReceiver<SystemMessage>>,
queue_depth: Arc<AtomicUsize>,
mailbox_count: Arc<AtomicUsize>,
outbound_assertions: OutboundAssertions,
oid_map: Map<Oid, Box<dyn Entity + Send>>,
next_task_id: u64,
linked_tasks: Map<u64, CancellationToken>,
exit_hooks: Vec<Oid>,
exit_hooks: Vec<Arc<Ref>>,
}
#[derive(PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct ObjectAddress {
pub mailbox: Mailbox,
pub oid: Oid,
}
#[derive(PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct Ref {
pub relay: Mailbox,
pub target: Oid,
/* TODO: attenuation */
pub addr: Arc<ObjectAddress>,
pub attenuation: Vec<CheckedCaveat>,
}
//---------------------------------------------------------------------------
@ -129,29 +137,33 @@ impl<'activation> Activation<'activation> {
}
fn immediate_oid(&self, r: &Arc<Ref>) -> Oid {
if r.relay.actor_id != self.actor.actor_id {
if r.addr.mailbox.actor_id != self.actor.actor_id {
panic!("Cannot use immediate_self to send to remote peers");
}
r.target.clone()
r.addr.oid.clone()
}
pub fn assert<M>(&mut self, r: &Arc<Ref>, a: M) -> Handle where M: Into<Assertion> {
pub fn assert<M>(&mut self, r: &Arc<Ref>, a: M) -> Handle where M: Into<_Any> {
let handle = crate::next_handle();
self.queue_for(r).push((Arc::clone(r), Event::Assert(Box::new(
Assert { assertion: Assertion(a.into()), handle: handle.clone() }))));
self.actor.outbound_assertions.insert(handle.clone(), Destination::Remote(Arc::clone(r)));
if let Some(assertion) = r.rewrite(a.into()) {
self.queue_for(r).push((Arc::clone(r), Event::Assert(Box::new(
Assert { assertion, handle: handle.clone() }))));
self.actor.outbound_assertions.insert(handle.clone(), Destination::Remote(Arc::clone(r)));
}
handle
}
pub fn assert_immediate_self<M>(&mut self, r: &Arc<Ref>, a: M) -> Handle where M: Into<Assertion> {
pub fn assert_immediate_self<M>(&mut self, r: &Arc<Ref>, a: M) -> Handle where M: Into<_Any> {
let oid = self.immediate_oid(r);
let handle = crate::next_handle();
self.immediate_self.push(TurnEvent {
oid: oid.clone(),
event: Event::Assert(Box::new(
Assert { assertion: Assertion(a.into()), handle: handle.clone() })),
});
self.actor.outbound_assertions.insert(handle.clone(), Destination::ImmediateSelf(oid));
if let Some(assertion) = r.rewrite(a.into()) {
self.immediate_self.push(TurnEvent {
oid: oid.clone(),
event: Event::Assert(Box::new(
Assert { assertion, handle: handle.clone() })),
});
self.actor.outbound_assertions.insert(handle.clone(), Destination::ImmediateSelf(oid));
}
handle
}
@ -173,16 +185,20 @@ impl<'activation> Activation<'activation> {
}
}
pub fn message<M>(&mut self, r: &Arc<Ref>, m: M) where M: Into<Assertion> {
self.queue_for(r).push((Arc::clone(r), Event::Message(Box::new(
Message { body: Assertion(m.into()) }))))
pub fn message<M>(&mut self, r: &Arc<Ref>, m: M) where M: Into<_Any> {
if let Some(body) = r.rewrite(m.into()) {
self.queue_for(r).push((Arc::clone(r), Event::Message(Box::new(
Message { body }))))
}
}
pub fn message_immediate_self<M>(&mut self, r: &Arc<Ref>, m: M) where M: Into<Assertion> {
self.immediate_self.push(TurnEvent {
oid: self.immediate_oid(r),
event: Event::Message(Box::new(Message { body: Assertion(m.into()) })),
})
pub fn message_immediate_self<M>(&mut self, r: &Arc<Ref>, m: M) where M: Into<_Any> {
if let Some(body) = r.rewrite(m.into()) {
self.immediate_self.push(TurnEvent {
oid: self.immediate_oid(r),
event: Event::Message(Box::new(Message { body })),
})
}
}
pub fn sync(&mut self, r: &Arc<Ref>, peer: Arc<Ref>) {
@ -190,7 +206,7 @@ impl<'activation> Activation<'activation> {
}
fn queue_for(&mut self, r: &Arc<Ref>) -> &mut PendingEventQueue {
self.queues.entry(r.relay.actor_id).or_default()
self.queues.entry(r.addr.mailbox.actor_id).or_default()
}
fn deliver(&mut self) {
@ -200,9 +216,9 @@ impl<'activation> Activation<'activation> {
for (_actor_id, turn) in std::mem::take(&mut self.queues).into_iter() {
if turn.len() == 0 { continue; }
let first_ref = Arc::clone(&turn[0].0);
let target = &first_ref.relay;
let target = &first_ref.addr.mailbox;
target.send(Turn(turn.into_iter().map(
|(r, e)| TurnEvent { oid: r.target.clone(), event: e }).collect()));
|(r, e)| TurnEvent { oid: r.addr.oid.clone(), event: e }).collect()));
}
}
@ -233,8 +249,13 @@ impl<'activation> Drop for Activation<'activation> {
impl Mailbox {
pub fn send(&self, t: Turn) {
let _ = self.tx.send(SystemMessage::Turn(t));
self.queue_depth.fetch_add(1, Ordering::Relaxed);
if let Ok(()) = self.tx.send(SystemMessage::Turn(t)) {
self.queue_depth.fetch_add(1, Ordering::Relaxed);
}
}
pub fn current_queue_depth(&self) -> usize {
self.queue_depth.load(Ordering::Relaxed)
}
}
@ -308,7 +329,7 @@ impl Actor {
Actor {
actor_id,
tx,
rx,
rx: Some(rx),
queue_depth: Arc::new(AtomicUsize::new(0)),
mailbox_count: Arc::new(AtomicUsize::new(0)),
outbound_assertions: Map::new(),
@ -369,7 +390,13 @@ impl Actor {
f: F,
) -> Arc<Ref> {
let oid = crate::next_oid();
let r = Arc::new(Ref { relay: self.mailbox(), target: oid.clone() });
let r = Arc::new(Ref {
addr: Arc::new(ObjectAddress {
mailbox: self.mailbox(),
oid: oid.clone(),
}),
attenuation: Vec::new(),
});
f(self, &mut e, &r);
self.oid_map.insert(oid, Box::new(e));
r
@ -383,16 +410,26 @@ impl Actor {
name.record("actor_id", &self.id());
tokio::spawn(async move {
tracing::trace!("start");
{
let queue_depth = Arc::clone(&self.queue_depth);
self.linked_task(crate::name!("queue-monitor"), async move {
let mut timer = tokio::time::interval(core::time::Duration::from_secs(1));
loop {
timer.tick().await;
tracing::info!(queue_depth = debug(queue_depth.load(Ordering::Relaxed)));
}
});
}
let result = self.run(boot).await;
{
let mut t = Activation::for_actor(&mut self);
for oid in std::mem::take(&mut t.actor.exit_hooks) {
match t.actor.oid_map.remove_entry(&oid) {
for r in std::mem::take(&mut t.actor.exit_hooks) {
match t.actor.oid_map.remove_entry(&r.addr.oid) {
None => (),
Some((k, mut e)) => {
if let Err(err) = e.exit_hook(&mut t, &result).await {
tracing::error!(err = debug(err),
oid = debug(oid),
r = debug(&r),
"error in exit hook");
}
t.actor.oid_map.insert(k, e);
@ -424,7 +461,7 @@ impl Actor {
boot(&mut Activation::for_actor(self)).await?;
// tracing::trace!(_id, "run");
loop {
match self.rx.recv().await {
match self.rx.as_mut().expect("present rx channel half").recv().await {
None =>
Err(error("Unexpected channel close", _Any::new(false)))?,
Some(m) => {
@ -442,8 +479,8 @@ impl Actor {
}
}
pub fn add_exit_hook(&mut self, oid: &Oid) {
self.exit_hooks.push(oid.clone())
pub fn add_exit_hook(&mut self, r: &Arc<Ref>) {
self.exit_hooks.push(Arc::clone(r))
}
async fn handle(&mut self, m: SystemMessage) -> Result<bool, Error> {
@ -535,35 +572,78 @@ impl Actor {
impl Drop for Actor {
fn drop(&mut self) {
let mut rx = self.rx.take().expect("present rx channel half during drop");
rx.close();
for (_task_id, token) in std::mem::take(&mut self.linked_tasks).into_iter() {
token.cancel();
}
let to_clear = std::mem::take(&mut self.outbound_assertions);
let mut t = Activation::for_actor(self);
for (handle, r) in to_clear.into_iter() {
tracing::trace!(h = debug(&handle), "retract on termination");
t.retract_known_ref(r, handle);
{
let mut t = Activation::for_actor(self);
for (handle, r) in to_clear.into_iter() {
tracing::trace!(h = debug(&handle), "retract on termination");
t.retract_known_ref(r, handle);
}
}
// In future, could do this:
// tokio::spawn(async move {
// while let Some(m) = rx.recv().await {
// match m { ... }
// }
// });
tracing::trace!("Actor::drop");
}
}
impl Ref {
pub fn external_event(&self, event: Event) {
self.relay.send(Turn(vec![TurnEvent { oid: self.target.clone(), event }]))
pub async fn external_event(&self, event: Event) {
self.addr.mailbox.send(Turn(vec![TurnEvent { oid: self.addr.oid.clone(), event }]))
}
pub async fn external_events(&self, events: Vec<Event>) {
self.addr.mailbox.send(Turn(events.into_iter().map(|event| TurnEvent {
oid: self.addr.oid.clone(),
event,
}).collect()))
}
pub fn attenuate(&self, attenuation: &sturdy::Attenuation) -> Result<Arc<Self>, CaveatError> {
let mut r = Ref {
addr: Arc::clone(&self.addr),
attenuation: self.attenuation.clone(),
};
r.attenuation.extend(attenuation.check()?);
Ok(Arc::new(r))
}
pub fn rewrite(&self, mut a: _Any) -> Option<Assertion> {
for c in &self.attenuation {
match c.rewrite(&a) {
Some(v) => a = v,
None => return None,
}
}
Some(Assertion(a))
}
}
impl std::fmt::Debug for Ref {
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
write!(f, "⌜{}:{}⌝", self.relay.actor_id, self.target.0)
if self.attenuation.is_empty() {
write!(f, "⌜{}:{}⌝", self.addr.mailbox.actor_id, self.addr.oid.0)
} else {
write!(f, "⌜{}:{}\\{:?}⌝", self.addr.mailbox.actor_id, self.addr.oid.0, self.attenuation)
}
}
}
impl Drop for Ref {
impl Drop for ObjectAddress {
fn drop(&mut self) {
let _ = self.relay.tx.send(SystemMessage::ReleaseOid(self.target.clone()));
let _ = self.mailbox.tx.send(SystemMessage::ReleaseOid(self.oid.clone()));
()
}
}

View File

@ -14,32 +14,24 @@ use structopt::StructOpt; // for from_args in main
use syndicate::actor::*;
use syndicate::dataspace::*;
use syndicate::during::DuringEntity;
use syndicate::during::DuringResult;
use syndicate::error::Error;
use syndicate::error::error;
use syndicate::config;
use syndicate::relay;
use syndicate::schemas::internal_protocol::_Any;
use syndicate::sturdy;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tracing::{Level, info, trace};
use tracing::{info, trace};
use tungstenite::Message;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let filter = tracing_subscriber::filter::EnvFilter::from_default_env()
.add_directive(tracing_subscriber::filter::LevelFilter::TRACE.into());
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_ansi(true)
.with_max_level(Level::TRACE)
.with_env_filter(filter)
.finish();
tracing::subscriber::set_global_default(subscriber)
.expect("Could not set tracing global subscriber");
syndicate::convenient_logging()?;
{
const BRIGHT_GREEN: &str = "\x1b[92m";
@ -81,17 +73,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
trace!("startup");
let ds = Actor::create_and_start(syndicate::name!("dataspace"), Dataspace::new());
let gateway = Actor::create_and_start(syndicate::name!("gateway"),
DuringEntity::new(Arc::clone(&ds), handle_resolve));
let gateway = Actor::create_and_start(
syndicate::name!("gateway"),
syndicate::entity(Arc::clone(&ds)).on_asserted(handle_resolve));
{
let ds = Arc::clone(&ds);
Actor::new().boot(syndicate::name!("rootcap"), |t| Box::pin(async move {
use syndicate::schemas::gatekeeper;
t.assert(&ds, &gatekeeper::Bind {
oid: _Any::new("syndicate"),
key: vec![0; 16],
target: ds.clone(),
});
let key = vec![0; 16];
let sr = sturdy::SturdyRef::mint(_Any::new("syndicate"), &key);
tracing::info!(rootcap = debug(&_Any::from(&sr)));
tracing::info!(rootcap = display(sr.to_hex()));
t.assert(&ds, &gatekeeper::Bind { oid: sr.oid.clone(), key, target: ds.clone() });
Ok(())
}));
}
@ -157,12 +150,22 @@ async fn run_connection(
_ => {
info!(protocol = display("raw"), peer = debug(addr));
let (i, o) = stream.into_split();
(relay::Input::Bytes(Box::pin(i)), relay::Output::Bytes(Box::pin(o)))
(relay::Input::Bytes(Box::pin(i)),
relay::Output::Bytes(Box::pin(o /* BufWriter::new(o) */)))
}
}
0 => Err(error("closed before starting", _Any::new(false)))?,
_ => unreachable!()
};
struct ExitListener;
impl Entity for ExitListener {
fn exit_hook(&mut self, _t: &mut Activation, exit_status: &ActorResult) -> BoxFuture<ActorResult> {
info!(exit_status = debug(exit_status), "disconnect");
Box::pin(ready(Ok(())))
}
}
let exit_listener = t.actor.create(ExitListener);
t.actor.add_exit_hook(&exit_listener);
relay::TunnelRelay::run(t, i, o, Some(gateway), None);
Ok(())
}
@ -172,7 +175,9 @@ async fn run_listener(
port: u16,
config: Arc<config::ServerConfig>,
) -> ActorResult {
let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?;
let listen_addr = format!("0.0.0.0:{}", port);
tracing::info!("Listening on {}", listen_addr);
let listener = TcpListener::bind(listen_addr).await?;
loop {
let (stream, addr) = listener.accept().await?;
let gateway = Arc::clone(&gateway);
@ -185,31 +190,46 @@ async fn run_listener(
//---------------------------------------------------------------------------
fn handle_resolve(ds: &mut Arc<Ref>, t: &mut Activation, a: Assertion) -> DuringResult<Arc<Ref>> {
fn handle_resolve(ds: &mut Arc<Ref>, t: &mut Activation, a: _Any) -> DuringResult<Arc<Ref>> {
use syndicate::schemas::dataspace;
use syndicate::schemas::dataspace_patterns as p;
use syndicate::schemas::gatekeeper;
match gatekeeper::Resolve::try_from(&a) {
Err(_) => Ok(None),
Ok(gatekeeper::Resolve { sturdyref, observer }) => {
// TODO: codegen plugin to generate pattern constructors
let handler = t.actor.create(DuringEntity::new(observer, |observer, t, a| {
let bindings = a.value().to_sequence()?;
let key = bindings[0].value().to_bytestring()?;
let target = bindings[1].value().to_embedded()?;
tracing::trace!(key = debug(&key), target = debug(&target), "resolved!");
// TODO attenuation and validity checks
let h = t.assert(observer, _Any::domain(Arc::clone(target)));
Ok(Some(Box::new(|_observer, t| Ok(t.retract(h)))))
}));
let queried_oid = sturdyref.oid.clone();
let handler = syndicate::entity(observer)
.on_asserted(move |observer, t, a| {
let bindings = a.value().to_sequence()?;
let key = bindings[0].value().to_bytestring()?;
let unattenuated_target = bindings[1].value().to_embedded()?;
match sturdyref.validate_and_attenuate(key, unattenuated_target) {
Err(e) => {
tracing::warn!(sturdyref = debug(&_Any::from(&sturdyref)),
"sturdyref failed validation: {}", e);
Ok(None)
},
Ok(target) => {
tracing::trace!(sturdyref = debug(&_Any::from(&sturdyref)),
target = debug(&target),
"sturdyref resolved");
let h = t.assert(observer, _Any::domain(target));
Ok(Some(Box::new(|_observer, t| Ok(t.retract(h)))))
}
}
})
.create(t.actor);
let oh = t.assert(ds, &dataspace::Observe {
// TODO: codegen plugin to generate pattern constructors
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec {
ctor: Box::new(p::CRec {
label: Value::symbol("bind").wrap(),
arity: 3.into(),
}),
members: Map::from_iter(vec![
(0.into(), p::Pattern::DLit(Box::new(p::DLit { value: sturdyref.oid }))),
(0.into(), p::Pattern::DLit(Box::new(p::DLit {
value: queried_oid,
}))),
(1.into(), p::Pattern::DBind(Box::new(p::DBind {
name: "key".to_owned(),
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)),

View File

@ -1,6 +1,7 @@
use super::skeleton;
use super::actor::*;
use super::schemas::dataspace::*;
use super::schemas::dataspace::_Any;
use preserves::value::Map;
@ -40,7 +41,7 @@ impl Churn {
#[derive(Debug)]
pub struct Dataspace {
pub index: skeleton::Index,
pub handle_map: Map<Handle, (Assertion, Option<Observe>)>,
pub handle_map: Map<Handle, (_Any, Option<Observe>)>,
pub churn: Churn,
}
@ -67,7 +68,7 @@ impl Dataspace {
}
impl Entity for Dataspace {
fn assert(&mut self, t: &mut Activation, a: Assertion, h: Handle) -> ActorResult {
fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult {
tracing::trace!(assertion = debug(&a), handle = debug(&h), "assert");
let old_assertions = self.index.assertion_count();
@ -102,7 +103,7 @@ impl Entity for Dataspace {
Ok(())
}
fn message(&mut self, t: &mut Activation, m: Assertion) -> ActorResult {
fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult {
tracing::trace!(body = debug(&m), "message");
self.index.send(t, &m, &mut self.churn.messages_delivered);

View File

@ -3,6 +3,8 @@ use crate::error::Error;
use preserves::value::Map;
use std::sync::Arc;
pub type DuringRetractionHandler<T> = Box<dyn Send + FnOnce(&mut T, &mut Activation) -> ActorResult>;
pub struct During<T>(Map<Handle, DuringRetractionHandler<T>>);
@ -10,13 +12,15 @@ pub type DuringResult<E> =
Result<Option<Box<dyn 'static + Send + FnOnce(&mut E, &mut Activation) -> ActorResult>>,
Error>;
pub struct DuringEntity<E, Fa>
pub struct DuringEntity<E, Fa, Fm>
where
E: 'static + Send,
Fa: Send + FnMut(&mut E, &mut Activation, Assertion) -> DuringResult<E>,
Fa: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> DuringResult<E>,
Fm: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> ActorResult,
{
state: E,
handler: Fa,
assertion_handler: Option<Fa>,
message_handler: Option<Fm>,
during: During<E>,
}
@ -39,33 +43,92 @@ impl<T> During<T> {
}
}
impl<E, Fa> DuringEntity<E, Fa>
pub fn entity<E>(
state: E
) -> DuringEntity<E,
fn (&mut E, &mut Activation, _Any) -> DuringResult<E>,
fn (&mut E, &mut Activation, _Any) -> ActorResult>
where
E: 'static + Send,
Fa: Send + FnMut(&mut E, &mut Activation, Assertion) -> DuringResult<E>,
{
pub fn new(state: E, handler: Fa) -> Self {
DuringEntity::new(state, None, None)
}
impl<E, Fa, Fm> DuringEntity<E, Fa, Fm>
where
E: 'static + Send,
Fa: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> DuringResult<E>,
Fm: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> ActorResult,
{
pub fn new(state: E, assertion_handler: Option<Fa>, message_handler: Option<Fm>) -> Self {
DuringEntity {
state,
handler,
assertion_handler,
message_handler,
during: During::new(),
}
}
pub fn on_asserted<Fa1>(self, assertion_handler: Fa1) -> DuringEntity<E, Fa1, Fm>
where
Fa1: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> DuringResult<E>,
{
DuringEntity {
state: self.state,
assertion_handler: Some(assertion_handler),
message_handler: self.message_handler,
during: self.during,
}
}
pub fn on_message<Fm1>(self, message_handler: Fm1) -> DuringEntity<E, Fa, Fm1>
where
Fm1: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> ActorResult,
{
DuringEntity {
state: self.state,
assertion_handler: self.assertion_handler,
message_handler: Some(message_handler),
during: self.during,
}
}
pub fn create(self, ac: &mut Actor) -> Arc<Ref> {
ac.create(self)
}
pub fn create_rec<F>(self, ac: &mut Actor, f: F) -> Arc<Ref>
where
F: FnOnce(&mut Actor, &mut E, &Arc<Ref>) -> ()
{
ac.create_rec(self, |ac, e, e_ref| f(ac, &mut e.state, e_ref))
}
}
impl<E, Fa> Entity for DuringEntity<E, Fa>
impl<E, Fa, Fm> Entity for DuringEntity<E, Fa, Fm>
where
E: 'static + Send,
Fa: Send + FnMut(&mut E, &mut Activation, Assertion) -> DuringResult<E>,
Fa: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> DuringResult<E>,
Fm: 'static + Send + FnMut(&mut E, &mut Activation, _Any) -> ActorResult,
{
fn assert(&mut self, t: &mut Activation, a: Assertion, h: Handle) -> ActorResult {
match (self.handler)(&mut self.state, t, a)? {
Some(f) => self.during.await_retraction(h, f),
None => Ok(())
fn assert(&mut self, t: &mut Activation, a: _Any, h: Handle) -> ActorResult {
match &mut self.assertion_handler {
Some(handler) => match handler(&mut self.state, t, a)? {
Some(f) => self.during.await_retraction(h, f),
None => Ok(())
}
None => Ok(()),
}
}
fn retract(&mut self, t: &mut Activation, h: Handle) -> ActorResult {
self.during.retract(h)(&mut self.state, t)
}
fn message(&mut self, t: &mut Activation, m: _Any) -> ActorResult {
match &mut self.message_handler {
Some(handler) => handler(&mut self.state, t, m),
None => Ok(()),
}
}
}

View File

@ -13,12 +13,17 @@ pub mod during;
pub mod error;
pub mod pattern;
pub mod relay;
pub mod rewrite;
pub mod schemas;
pub mod skeleton;
pub mod sturdy;
pub mod tracer;
pub use during::entity;
pub use tracer::tracer;
pub use tracer::tracer_top;
pub use tracer::convenient_logging;
pub type ActorId = u64;

View File

@ -1,4 +1,3 @@
use crate::actor::Assertion;
use crate::schemas::dataspace_patterns::*;
use preserves::value::NestedValue;
@ -8,7 +7,7 @@ use std::convert::TryFrom;
#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq)]
pub enum PathStep {
Index(usize),
Key(Assertion),
Key(_Any),
}
pub type Path = Vec<PathStep>;

View File

@ -2,8 +2,10 @@ use bytes::Buf;
use bytes::BytesMut;
use crate::actor::*;
use crate::during;
use crate::error::Error;
use crate::error::error;
use crate::schemas::gatekeeper;
use crate::schemas::internal_protocol::*;
use crate::schemas::sturdy;
use crate::schemas::tunnel_relay;
@ -13,6 +15,7 @@ use futures::SinkExt;
use futures::Stream;
use futures::StreamExt;
use preserves::error::is_eof_io_error;
use preserves::value::BinarySource;
use preserves::value::BytesBinarySource;
use preserves::value::DomainDecode;
@ -132,6 +135,32 @@ impl Membrane {
}
}
pub fn connect_stream<I, O, E, F>(
t: &mut Activation,
i: I,
o: O,
sturdyref: sturdy::SturdyRef,
initial_state: E,
mut f: F,
) where
I: 'static + Send + AsyncRead,
O: 'static + Send + AsyncWrite,
E: 'static + Send,
F: 'static + Send + FnMut(&mut E, &mut Activation, Arc<Ref>) -> during::DuringResult<E>
{
let i = Input::Bytes(Box::pin(i));
let o = Output::Bytes(Box::pin(o));
let gatekeeper = TunnelRelay::run(t, i, o, None, Some(sturdy::Oid(0.into()))).unwrap();
let main_entity = t.actor.create(during::entity(initial_state).on_asserted(move |state, t, a| {
let denotation = a.value().to_embedded()?;
f(state, t, Arc::clone(denotation))
}));
t.assert(&gatekeeper, &gatekeeper::Resolve {
sturdyref,
observer: main_entity,
});
}
impl TunnelRelay {
pub fn run(
t: &mut Activation,
@ -164,7 +193,7 @@ impl TunnelRelay {
result = Some(Arc::clone(&tr.membranes.import_oid(ac, tr_ref, io).obj));
}
});
t.actor.add_exit_hook(&tr_ref.target);
t.actor.add_exit_hook(&tr_ref);
t.actor.linked_task(crate::name!("writer"), output_loop(o, output_rx));
t.actor.linked_task(crate::name!("reader"), input_loop(i, tr_ref));
result
@ -179,6 +208,7 @@ impl TunnelRelay {
Err(*b)
},
Packet::Turn(b) => {
let t = &mut Activation::for_actor(t.actor);
let Turn(events) = *b;
for TurnEvent { oid, event } in events {
let target = match self.membranes.exported.oid_map.get(&sturdy::Oid(oid.0.clone())) {
@ -347,8 +377,12 @@ impl Membranes {
if attenuation.is_empty() {
Ok(Arc::clone(&ws.obj))
} else {
// TODO
panic!("Non-empty attenuation not yet implemented")
Ok(ws.obj.attenuate(&sturdy::Attenuation(attenuation))
.map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!("Invalid capability attenuation: {:?}", e))
})?)
}
}
None => Ok(Arc::clone(&*INERT_REF)),
@ -379,14 +413,29 @@ impl DomainEncode<_Ptr> for Membranes {
d: &_Ptr,
) -> io::Result<()> {
w.write(&mut NoEmbeddedDomainCodec, &_Any::from(&match self.exported.ref_map.get(d) {
Some(ws) => sturdy::WireRef::Mine { oid: Box::new(ws.oid.clone()) },
Some(ws) => sturdy::WireRef::Mine {
oid: Box::new(ws.oid.clone()),
},
None => match self.imported.ref_map.get(d) {
Some(ws) => {
// TODO: attenuation check
sturdy::WireRef::Yours { oid: Box::new(ws.oid.clone()), attenuation: vec![] }
if d.attenuation.is_empty() {
sturdy::WireRef::Yours {
oid: Box::new(ws.oid.clone()),
attenuation: vec![],
}
} else {
// We may trust the peer to enforce attenuation on our behalf, in
// which case we can return sturdy::WireRef::Yours with an attenuation
// attached here, but for now we don't.
sturdy::WireRef::Mine {
oid: Box::new(self.export_ref(Arc::clone(d), false).oid.clone()),
}
}
}
None =>
sturdy::WireRef::Mine { oid: Box::new(self.export_ref(Arc::clone(d), false).oid.clone()) },
sturdy::WireRef::Mine {
oid: Box::new(self.export_ref(Arc::clone(d), false).oid.clone()),
},
}
}))
}
@ -396,8 +445,8 @@ pub async fn input_loop(
i: Input,
relay: Arc<Ref>,
) -> ActorResult {
fn s<M: Into<_Any>>(relay: &Arc<Ref>, m: M) {
relay.external_event(Event::Message(Box::new(Message { body: Assertion(m.into()) })))
async fn s<M: Into<_Any>>(relay: &Arc<Ref>, m: M) -> () {
relay.external_event(Event::Message(Box::new(Message { body: Assertion(m.into()) }))).await
}
match i {
@ -405,28 +454,39 @@ pub async fn input_loop(
loop {
match src.next().await {
None => {
s(&relay, &tunnel_relay::Input::Eof);
s(&relay, &tunnel_relay::Input::Eof).await;
return Ok(());
}
Some(bs) => s(&relay, &tunnel_relay::Input::Packet { bs: bs? }),
Some(bs) => {
s(&relay, &tunnel_relay::Input::Packet { bs: bs? }).await;
}
}
}
}
Input::Bytes(mut r) => {
let mut buf = BytesMut::with_capacity(1024);
loop {
buf.reserve(1024);
let n = r.read_buf(&mut buf).await?;
buf.reserve(8192);
let n = match r.read_buf(&mut buf).await {
Ok(n) => n,
Err(e) =>
if e.kind() == io::ErrorKind::ConnectionReset {
s(&relay, &tunnel_relay::Input::Eof).await;
return Ok(());
} else {
return Err(e)?;
},
};
match n {
0 => {
s(&relay, &tunnel_relay::Input::Eof);
s(&relay, &tunnel_relay::Input::Eof).await;
return Ok(());
}
_ => {
while buf.has_remaining() {
let bs = buf.chunk();
let n = bs.len();
s(&relay, &tunnel_relay::Input::Segment { bs: bs.to_vec() });
s(&relay, &tunnel_relay::Input::Segment { bs: bs.to_vec() }).await;
buf.advance(n);
}
}
@ -446,7 +506,10 @@ pub async fn output_loop(
return Ok(()),
Some(bs) => match &mut o {
Output::Packets(sink) => sink.send(bs).await?,
Output::Bytes(w) => w.write_all(&bs).await?,
Output::Bytes(w) => {
w.write_all(&bs).await?;
w.flush().await?;
}
}
}
}
@ -466,7 +529,7 @@ impl Entity for TunnelRelay {
&mut ActivatedMembranes(t, &self.self_ref, &mut self.membranes))
.demand_next(false)?;
tracing::trace!(packet = debug(&item), "-->");
self.handle_inbound_packet(t, Packet::try_from(&item)?)?
self.handle_inbound_packet(t, Packet::try_from(&item)?)?;
}
tunnel_relay::Input::Segment { bs } => {
self.input_buffer.extend_from_slice(&bs);
@ -475,7 +538,10 @@ impl Entity for TunnelRelay {
let mut src = BytesBinarySource::new(&self.input_buffer);
let mut dec = ActivatedMembranes(t, &self.self_ref, &mut self.membranes);
let mut r = src.packed::<_, _Any, _>(&mut dec);
let e = r.next(false)?;
let e = match r.next(false) {
Err(e) if is_eof_io_error(&e) => None,
result => result?,
};
(e, r.source.index)
};
match e {

325
src/rewrite.rs Normal file
View File

@ -0,0 +1,325 @@
use preserves::value::Map;
use preserves::value::NestedValue;
use preserves::value::Value;
use std::convert::TryFrom;
use super::schemas::sturdy::*;
pub type CheckedRewrite = (usize, Pattern, Template);
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct CheckedCaveat { alts: Vec<CheckedRewrite> }
#[derive(Debug)]
pub enum CaveatError {
UnboundRef,
BindingUnderNegation,
LudicrousArity,
IndexOutOfBounds,
InvalidIndex,
IncompleteTemplate,
}
impl Attenuation {
pub fn validate(&self) -> Result<(), CaveatError> {
for c in &self.0 { c.validate()? }
Ok(())
}
pub fn check(&self) -> Result<Vec<CheckedCaveat>, CaveatError> {
self.0.iter().map(Caveat::check).collect()
}
}
impl Caveat {
pub fn validate(&self) -> Result<(), CaveatError> {
match self {
Caveat::Rewrite(b) => (&**b).validate(),
Caveat::Alts(b) => (&**b).alternatives.iter().map(Rewrite::validate).collect::<Result<(), _>>(),
}
}
pub fn check(&self) -> Result<CheckedCaveat, CaveatError> {
match self {
Caveat::Rewrite(b) =>
Ok(CheckedCaveat {
alts: vec![ (*b).check()? ]
}),
Caveat::Alts(b) => {
let Alts { alternatives } = &**b;
Ok(CheckedCaveat {
alts: alternatives.into_iter().map(Rewrite::check)
.collect::<Result<Vec<CheckedRewrite>, CaveatError>>()?
})
}
}
}
}
impl ConstructorSpec {
fn arity(&self) -> Result<Option<usize>, CaveatError> {
match self {
ConstructorSpec::CRec(b) => match usize::try_from(&(&**b).arity) {
Err(_) => Err(CaveatError::LudicrousArity),
Ok(a) => Ok(Some(a)),
}
ConstructorSpec::CArr(b) => match usize::try_from(&(&**b).arity) {
Err(_) => return Err(CaveatError::LudicrousArity),
Ok(a) => Ok(Some(a)),
}
ConstructorSpec::CDict(_) => Ok(None),
}
}
}
fn check_member_key(limit: usize, k: &_Any) -> Result<(), CaveatError> {
match k.value().as_signedinteger() {
None => Err(CaveatError::InvalidIndex),
Some(k) => match usize::try_from(k) {
Err(_) => Err(CaveatError::IndexOutOfBounds),
Ok(k) =>
if k >= limit {
Err(CaveatError::IndexOutOfBounds)
} else {
Ok(())
},
}
}
}
impl Pattern {
fn binding_count(&self) -> Result<usize, CaveatError> {
match self {
Pattern::PDiscard(_) |
Pattern::PAtom(_) |
Pattern::PEmbedded(_) |
Pattern::Lit(_) => Ok(0),
Pattern::PBind(b) => Ok(1 + (&**b).pattern.binding_count()?),
Pattern::PNot(b) => {
let PNot { pattern } = &**b;
if pattern.binding_count()? == 0 {
Ok(0)
} else {
Err(CaveatError::BindingUnderNegation)
}
}
Pattern::PAnd(b) => {
let mut count = 0;
for p in &(&**b).patterns {
count += p.binding_count()?
}
Ok(count)
}
Pattern::PCompound(b) => {
let PCompound { ctor, members: PCompoundMembers(ms) } = &**b;
let arity = ctor.arity()?;
let mut count = 0;
for (k, p) in ms.iter() {
if let Some(limit) = arity {
check_member_key(limit, k)?;
}
count += p.binding_count()?
}
Ok(count)
}
}
}
fn matches(&self, a: &_Any, bindings: &mut Vec<_Any>) -> bool {
match self {
Pattern::PDiscard(_) => true,
Pattern::PAtom(b) => match &**b {
PAtom::Boolean => a.value().is_boolean(),
PAtom::Float => a.value().is_float(),
PAtom::Double => a.value().is_double(),
PAtom::SignedInteger => a.value().is_signedinteger(),
PAtom::String => a.value().is_string(),
PAtom::ByteString => a.value().is_bytestring(),
PAtom::Symbol => a.value().is_symbol(),
}
Pattern::PEmbedded(_) => a.value().is_embedded(),
Pattern::PBind(b) => {
bindings.push(a.clone());
(&**b).pattern.matches(a, bindings)
}
Pattern::PAnd(b) => {
for p in &(&**b).patterns {
if !p.matches(a, bindings) { return false; }
}
true
},
Pattern::PNot(b) => !(&**b).pattern.matches(a, bindings),
Pattern::Lit(b) => &(&**b).value == a,
Pattern::PCompound(b) => match &**b {
PCompound { ctor: ConstructorSpec::CRec(b), members: PCompoundMembers(ms) } => {
let CRec { label, arity } = &**b;
let arity = usize::try_from(arity).expect("in-range arity");
match a.value().as_record(Some(arity)) {
Some(r) => {
if r.label() != label { return false; }
for (k, p) in ms.iter() {
let k = k.value().as_signedinteger().expect("integer index");
let k = usize::try_from(k).expect("in-range index");
if !p.matches(&r.fields()[k], bindings) { return false; }
}
true
},
None => false,
}
}
PCompound { ctor: ConstructorSpec::CArr(b), members: PCompoundMembers(ms) } => {
let CArr { arity } = &**b;
let arity = usize::try_from(arity).expect("in-range arity");
match a.value().as_sequence() {
Some(vs) => {
if vs.len() < arity { return false; }
for (k, p) in ms.iter() {
let k = k.value().as_signedinteger().expect("integer index");
let k = usize::try_from(k).expect("in-range index");
if !p.matches(&vs[k], bindings) { return false; }
}
true
},
None => false,
}
}
PCompound { ctor: ConstructorSpec::CDict(_), members: PCompoundMembers(ms) } => {
match a.value().as_dictionary() {
Some(es) => {
for (k, p) in ms.iter() {
match es.get(k) {
Some(v) => if !p.matches(v, bindings) { return false; },
None => return false,
}
}
true
}
None => false,
}
}
}
}
}
}
impl Template {
fn implied_binding_count(&self) -> Result<usize, CaveatError> {
match self {
Template::TAttenuate(b) => {
let TAttenuate { template, attenuation } = &**b;
attenuation.validate()?;
Ok(template.implied_binding_count()?)
}
Template::TRef(b) => match usize::try_from(&(&**b).binding) {
Ok(v) => Ok(1 + v),
Err(_) => Err(CaveatError::UnboundRef),
},
Template::Lit(_) => Ok(0),
Template::TCompound(b) => {
let TCompound { ctor, members: TCompoundMembers(ms) } = &**b;
let arity = ctor.arity()?;
let mut max = 0;
if let Some(limit) = arity {
if ms.len() != limit {
return Err(CaveatError::IncompleteTemplate);
}
}
for (k, t) in ms.iter() {
if let Some(limit) = arity {
check_member_key(limit, k)?;
}
max = max.max(t.implied_binding_count()?);
}
Ok(max)
}
}
}
fn instantiate(&self, bindings: &Vec<_Any>) -> Option<_Any> {
match self {
Template::TAttenuate(b) => {
let TAttenuate { template, attenuation } = &**b;
template.instantiate(bindings)
.and_then(|r| r.value().as_embedded().cloned())
.map(|r| Value::Embedded(r.attenuate(attenuation).expect("checked attenuation")).wrap())
}
Template::TRef(b) => Some(bindings[usize::try_from(&(&**b).binding).expect("in-range index")].clone()),
Template::Lit(b) => Some((&**b).value.clone()),
Template::TCompound(b) => {
let TCompound { ctor, members: TCompoundMembers(ms) } = &**b;
match ctor {
ConstructorSpec::CRec(b) => {
let CRec { label, arity } = &**b;
let arity = usize::try_from(arity).expect("in-range arity");
let mut r = Value::record(label.clone(), arity);
for i in 0..arity {
let t = ms.get(&Value::from(i).wrap()).expect("entry for each index");
match t.instantiate(bindings) {
None => return None,
Some(v) => r.fields_vec_mut().push(v),
}
}
Some(r.finish().wrap())
}
ConstructorSpec::CArr(b) => {
let CArr { arity } = &**b;
let arity = usize::try_from(arity).expect("in-range arity");
let mut r = Vec::with_capacity(arity);
for i in 0..arity {
let t = ms.get(&Value::from(i).wrap()).expect("entry for each index");
match t.instantiate(bindings) {
None => return None,
Some(v) => r.push(v),
}
}
Some(Value::from(r).wrap())
}
ConstructorSpec::CDict(_) => {
let mut r = Map::new();
for (k, t) in ms.iter() {
match t.instantiate(bindings) {
None => return None,
Some(v) => {
r.insert(k.clone(), v);
()
}
}
}
Some(Value::from(r).wrap())
}
}
}
}
}
}
impl Rewrite {
fn validated_binding_count(&self) -> Result<usize, CaveatError> {
let binding_count = self.pattern.binding_count()?;
let implied_binding_count = self.template.implied_binding_count()?;
if implied_binding_count > binding_count { return Err(CaveatError::UnboundRef); }
Ok(binding_count)
}
fn validate(&self) -> Result<(), CaveatError> {
let _ = self.validated_binding_count()?;
Ok(())
}
fn check(&self) -> Result<CheckedRewrite, CaveatError> {
Ok((self.validated_binding_count()?, self.pattern.clone(), self.template.clone()))
}
}
impl CheckedCaveat {
pub fn rewrite(&self, a: &_Any) -> Option<_Any> {
for (n, p, t) in &self.alts {
let mut bindings = Vec::with_capacity(*n);
if let true = p.matches(a, &mut bindings) {
return t.instantiate(&bindings);
}
}
None
}
}

View File

@ -6,7 +6,7 @@ use std::convert::TryFrom;
use std::convert::TryInto;
use std::sync::Arc;
use crate::actor::Assertion;
use crate::actor::_Any;
use crate::actor::Activation;
use crate::actor::Ref;
use crate::schemas::internal_protocol::Handle;
@ -15,18 +15,18 @@ use crate::pattern::{self, PathStep, Path, Paths};
type Bag<A> = bag::BTreeBag<A>;
type Captures = Assertion;
type Captures = _Any;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub enum Guard {
Rec(Assertion, usize),
Rec(_Any, usize),
Seq(usize),
Map,
}
#[derive(Debug)]
pub struct Index {
all_assertions: Bag<Assertion>,
all_assertions: Bag<_Any>,
observer_count: usize,
root: Node,
}
@ -39,7 +39,7 @@ struct Node {
#[derive(Debug)]
struct Continuation {
cached_assertions: Set<Assertion>,
cached_assertions: Set<_Any>,
leaf_map: Map<Paths, Map<Captures, Leaf>>,
}
@ -51,7 +51,7 @@ struct Selector {
#[derive(Debug)]
struct Leaf { // aka Topic
cached_assertions: Set<Assertion>,
cached_assertions: Set<_Any>,
endpoints_map: Map<Paths, Endpoints>,
}
@ -94,7 +94,7 @@ impl Index {
self.observer_count -= 1;
}
pub fn insert(&mut self, t: &mut Activation, outer_value: &Assertion) {
pub fn insert(&mut self, t: &mut Activation, outer_value: &_Any) {
let net = self.all_assertions.change(outer_value.clone(), 1);
match net {
bag::Net::AbsentToPresent => {
@ -117,7 +117,7 @@ impl Index {
}
}
pub fn remove(&mut self, t: &mut Activation, outer_value: &Assertion) {
pub fn remove(&mut self, t: &mut Activation, outer_value: &_Any) {
let net = self.all_assertions.change(outer_value.clone(), -1);
match net {
bag::Net::PresentToAbsent => {
@ -142,7 +142,7 @@ impl Index {
}
}
pub fn send(&mut self, t: &mut Activation, outer_value: &Assertion, delivery_count: &mut usize) {
pub fn send(&mut self, t: &mut Activation, outer_value: &_Any, delivery_count: &mut usize) {
Modification::new(
false,
&outer_value,
@ -254,24 +254,24 @@ impl<'a, T> Stack<'a, T> {
}
struct Modification<'op, FCont, FLeaf, FEndpoints>
where FCont: FnMut(&mut Continuation, &Assertion) -> (),
FLeaf: FnMut(&mut Leaf, &Assertion) -> (),
where FCont: FnMut(&mut Continuation, &_Any) -> (),
FLeaf: FnMut(&mut Leaf, &_Any) -> (),
FEndpoints: FnMut(&mut Endpoints, Captures) -> ()
{
create_leaf_if_absent: bool,
outer_value: &'op Assertion,
outer_value: &'op _Any,
m_cont: FCont,
m_leaf: FLeaf,
m_endpoints: FEndpoints,
}
impl<'op, FCont, FLeaf, FEndpoints> Modification<'op, FCont, FLeaf, FEndpoints>
where FCont: FnMut(&mut Continuation, &Assertion) -> (),
FLeaf: FnMut(&mut Leaf, &Assertion) -> (),
where FCont: FnMut(&mut Continuation, &_Any) -> (),
FLeaf: FnMut(&mut Leaf, &_Any) -> (),
FEndpoints: FnMut(&mut Endpoints, Captures) -> ()
{
fn new(create_leaf_if_absent: bool,
outer_value: &'op Assertion,
outer_value: &'op _Any,
m_cont: FCont,
m_leaf: FLeaf,
m_endpoints: FEndpoints,
@ -289,7 +289,7 @@ where FCont: FnMut(&mut Continuation, &Assertion) -> (),
self.node(n, &Stack::Item(&Value::from(vec![self.outer_value.clone()]).wrap(), &Stack::Empty))
}
fn node(&mut self, n: &mut Node, term_stack: &Stack<&Assertion>) {
fn node(&mut self, n: &mut Node, term_stack: &Stack<&_Any>) {
self.continuation(&mut n.continuation);
for (selector, table) in &mut n.edges {
let mut next_stack = term_stack;
@ -336,7 +336,7 @@ where FCont: FnMut(&mut Continuation, &Assertion) -> (),
}
}
fn class_of(v: &Assertion) -> Option<Guard> {
fn class_of(v: &_Any) -> Option<Guard> {
match v.value() {
Value::Sequence(vs) => Some(Guard::Seq(vs.len())),
Value::Record(r) => Some(Guard::Rec(r.label().clone(), r.arity())),
@ -345,7 +345,7 @@ fn class_of(v: &Assertion) -> Option<Guard> {
}
}
fn project_path<'a>(v: &'a Assertion, p: &Path) -> Option<&'a Assertion> {
fn project_path<'a>(v: &'a _Any, p: &Path) -> Option<&'a _Any> {
let mut v = v;
for i in p {
match step(v, i) {
@ -356,7 +356,7 @@ fn project_path<'a>(v: &'a Assertion, p: &Path) -> Option<&'a Assertion> {
Some(v)
}
fn project_paths<'a>(v: &'a Assertion, ps: &Paths) -> Option<Captures> {
fn project_paths<'a>(v: &'a _Any, ps: &Paths) -> Option<Captures> {
let mut vs = Vec::new();
for p in ps {
match project_path(v, p) {
@ -367,7 +367,7 @@ fn project_paths<'a>(v: &'a Assertion, ps: &Paths) -> Option<Captures> {
Some(Captures::new(vs))
}
fn step<'a>(v: &'a Assertion, s: &PathStep) -> Option<&'a Assertion> {
fn step<'a>(v: &'a _Any, s: &PathStep) -> Option<&'a _Any> {
match (v.value(), s) {
(Value::Sequence(vs), PathStep::Index(i)) =>
if *i < vs.len() { Some(&vs[*i]) } else { None },
@ -381,7 +381,7 @@ fn step<'a>(v: &'a Assertion, s: &PathStep) -> Option<&'a Assertion> {
}
impl Continuation {
fn new(cached_assertions: Set<Assertion>) -> Self {
fn new(cached_assertions: Set<_Any>) -> Self {
Continuation { cached_assertions, leaf_map: Map::new() }
}

120
src/sturdy.rs Normal file
View File

@ -0,0 +1,120 @@
use getrandom::getrandom;
use hmac::{Hmac, Mac, NewMac, crypto_mac::MacError};
use preserves::hex::HexParser;
use preserves::hex::HexFormatter;
use preserves::value::Embeddable;
use preserves::value::NestedValue;
use preserves::value::NoEmbeddedDomainCodec;
use preserves::value::packed::PackedWriter;
use preserves::value::packed::from_bytes;
use sha2::Sha256;
use std::convert::TryFrom;
use std::io;
use super::error::Error;
use super::rewrite::CaveatError;
pub use super::schemas::sturdy::*;
#[derive(Debug)]
pub enum ValidationError {
SignatureError(MacError),
AttenuationError(CaveatError),
}
impl std::fmt::Display for ValidationError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
match self {
ValidationError::SignatureError(_) =>
write!(f, "Invalid SturdyRef signature"),
ValidationError::AttenuationError(e) =>
write!(f, "Invalid SturdyRef attenuation: {:?}", e),
}
}
}
impl std::error::Error for ValidationError {}
const KEY_LENGTH: usize = 16; // bytes; 128 bits
fn signature(key: &[u8], data: &[u8]) -> Vec<u8> {
let mut m = Hmac::<Sha256>::new_from_slice(key).expect("valid key length");
m.update(data);
let mut result = m.finalize().into_bytes().to_vec();
result.truncate(KEY_LENGTH);
result
}
pub fn new_key() -> Vec<u8> {
let mut buf = vec![0; KEY_LENGTH];
getrandom(&mut buf).expect("successful random number generation");
buf
}
pub fn encode<D: Embeddable, N: NestedValue<D>>(v: &N) -> Vec<u8> {
PackedWriter::encode::<D, N, _>(&mut NoEmbeddedDomainCodec, v).expect("no io errors")
}
pub fn decode<D: Embeddable, N: NestedValue<D>>(bs: &[u8]) -> io::Result<N> {
from_bytes(bs, &mut NoEmbeddedDomainCodec)
}
impl SturdyRef {
pub fn mint(oid: _Any, key: &[u8]) -> Self {
let sig = signature(key, &encode(&oid));
SturdyRef { oid, caveat_chain: Vec::new(), sig }
}
pub fn from_hex(s: &str) -> Result<Self, Error> {
let binary = HexParser::Liberal.decode(s).expect("hex encoded sturdyref");
Ok(Self::try_from(&decode::<_, _Any>(&binary)?)?)
}
pub fn to_hex(&self) -> String {
HexFormatter::Packed.encode(&encode::<_, _Any>(&self.into()))
}
pub fn validate_and_attenuate(
&self,
key: &[u8],
unattenuated_target: &_Ptr,
) -> Result<_Ptr, ValidationError> {
self.validate(key).map_err(ValidationError::SignatureError)?;
let mut attenuation = Vec::new();
// TODO:: Make sure of the ordering here!!
for a in self.caveat_chain.iter().rev() {
attenuation.extend(a.0.iter().rev().cloned());
}
let target = unattenuated_target
.attenuate(&Attenuation(attenuation))
.map_err(ValidationError::AttenuationError)?;
Ok(target)
}
pub fn validate(&self, key: &[u8]) -> Result<(), MacError> {
let SturdyRef { oid, caveat_chain, sig } = self;
let mut key = key.to_vec();
key = signature(&key, &encode(oid));
for c in caveat_chain {
key = signature(&key, &encode(&_Any::from(c)));
}
if &key == sig {
Ok(())
} else {
Err(MacError)
}
}
pub fn attenuate(&self, attenuation: &Attenuation) -> Result<Self, CaveatError> {
attenuation.validate()?;
let SturdyRef { oid, caveat_chain, sig } = self;
let oid = oid.clone();
let mut caveat_chain = caveat_chain.clone();
caveat_chain.push(attenuation.clone());
let sig = signature(&sig, &encode(&_Any::from(attenuation)));
Ok(SturdyRef { oid, caveat_chain, sig })
}
}

View File

@ -7,7 +7,7 @@ use std::sync::Arc;
struct Tracer(tracing::Span);
fn set_name_oid(_ac: &mut Actor, t: &mut Tracer, r: &Arc<Ref>) {
t.0.record("oid", &tracing::field::display(&r.target.0));
t.0.record("oid", &tracing::field::display(&r.addr.oid.0));
}
pub fn tracer(ac: &mut Actor, name: tracing::Span) -> Arc<Ref> {
@ -19,7 +19,7 @@ pub fn tracer_top(name: tracing::Span) -> Arc<Ref> {
}
impl Entity for Tracer {
fn assert(&mut self, _t: &mut Activation, a: Assertion, h: Handle) -> ActorResult {
fn assert(&mut self, _t: &mut Activation, a: _Any, h: Handle) -> ActorResult {
let _guard = self.0.enter();
tracing::trace!(a = debug(&a), h = debug(&h), "assert");
Ok(())
@ -29,7 +29,7 @@ impl Entity for Tracer {
tracing::trace!(h = debug(&h), "retract");
Ok(())
}
fn message(&mut self, _t: &mut Activation, m: Assertion) -> ActorResult {
fn message(&mut self, _t: &mut Activation, m: _Any) -> ActorResult {
let _guard = self.0.enter();
tracing::trace!(m = debug(&m), "message");
Ok(())
@ -37,7 +37,25 @@ impl Entity for Tracer {
fn sync(&mut self, t: &mut Activation, peer: Arc<Ref>) -> ActorResult {
let _guard = self.0.enter();
tracing::trace!(peer = debug(&peer), "sync");
t.message(&peer, Assertion::new(true));
t.message(&peer, _Any::new(true));
Ok(())
}
}
pub fn convenient_logging() -> Result<(), Box<dyn std::error::Error>> {
let filter = match std::env::var(tracing_subscriber::filter::EnvFilter::DEFAULT_ENV) {
Err(std::env::VarError::NotPresent) =>
tracing_subscriber::filter::EnvFilter::default()
.add_directive(tracing_subscriber::filter::LevelFilter::INFO.into()),
_ =>
tracing_subscriber::filter::EnvFilter::try_from_default_env()?,
};
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_ansi(true)
.with_max_level(tracing::Level::TRACE)
.with_env_filter(filter)
.finish();
tracing::subscriber::set_global_default(subscriber)
.expect("Could not set tracing global subscriber");
Ok(())
}