Adapt to new Preserves major version; stub daemon basis

This commit is contained in:
Tony Garnock-Jones 2021-09-19 16:53:37 +02:00
parent 3763b9ac86
commit ccd54be3b2
36 changed files with 473 additions and 245 deletions

16
Cargo.lock generated
View File

@ -1053,9 +1053,9 @@ checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
[[package]]
name = "preserves"
version = "1.0.0"
version = "2.0.0-rc1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41e4496df1095b2d85d6ac4cad0b2b8c77ae9224aca1789ad41b0654c79c64eb"
checksum = "e281800f60acacfaf49115f3032d3c9939a4eb4d6921cb280d7593744cada262"
dependencies = [
"base64",
"dtoa",
@ -1067,9 +1067,9 @@ dependencies = [
[[package]]
name = "preserves-schema"
version = "1.0.0"
version = "2.0.0-rc1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04ed3d65a4e263db4117a86e209ef1732e52749aa8fbaeb6b9b6ed3b49da1d94"
checksum = "192e1fb003f58e76d60a70536deb6bc731ad9132ffb095ea31e7ecebaaf8de0e"
dependencies = [
"convert_case",
"glob",
@ -1458,13 +1458,14 @@ dependencies = [
[[package]]
name = "syndicate"
version = "0.11.0"
version = "0.12.0"
dependencies = [
"bytes",
"criterion",
"futures",
"getrandom",
"hmac",
"lazy_static",
"openssl",
"preserves",
"preserves-schema",
@ -1478,7 +1479,7 @@ dependencies = [
[[package]]
name = "syndicate-macros"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"proc-macro2",
"quote",
@ -1488,9 +1489,10 @@ dependencies = [
[[package]]
name = "syndicate-server"
version = "0.11.0"
version = "0.12.0"
dependencies = [
"futures",
"lazy_static",
"notify",
"preserves-schema",
"structopt",

View File

@ -8,8 +8,8 @@ members = [
]
# [patch.crates-io]
# preserves = { path = "/home/tonyg/src/preserves/implementations/rust/preserves" }
# preserves-schema = { path = "/home/tonyg/src/preserves/implementations/rust/preserves-schema" }
# preserves = { path = "../preserves/implementations/rust/preserves" }
# preserves-schema = { path = "../preserves/implementations/rust/preserves-schema" }
[profile.release]
strip = true

View File

@ -1,6 +1,6 @@
[package]
name = "syndicate-macros"
version = "0.6.0"
version = "0.7.0"
authors = ["Tony Garnock-Jones <tonyg@leastfixedpoint.com>"]
edition = "2018"
@ -13,7 +13,7 @@ license = "Apache-2.0"
proc-macro = true
[dependencies]
syndicate = { path = "../syndicate", version = "^0.11.0"}
syndicate = { path = "../syndicate", version = "^0.12.0"}
proc-macro2 = { version = "^1.0", features = ["span-locations"] }
quote = "^1.0"

View File

@ -0,0 +1,87 @@
use proc_macro2::Span;
use quote::quote_spanned;
use syn::parse_macro_input;
use syn::Expr;
use syn::Ident;
use syn::LitInt;
use syn::Token;
use syn::Type;
use syn::parse::Error;
use syn::parse::Parse;
use syn::parse::ParseStream;
use crate::stx::Stx;
use crate::pat;
#[derive(Debug)]
struct During {
turn_stx: Expr,
ds_stx: Expr,
pat_stx: Stx,
body_stx: Expr,
}
fn comma_parse<T: Parse>(input: ParseStream) -> syn::parse::Result<T> {
let _: Token![,] = input.parse()?;
input.parse()
}
impl Parse for During {
fn parse(input: ParseStream) -> syn::parse::Result<Self> {
Ok(During {
turn_stx: input.parse()?,
ds_stx: comma_parse(input)?,
pat_stx: comma_parse(input)?,
body_stx: comma_parse(input)?,
})
}
}
impl During {
fn bindings(&self) -> (Vec<Ident>, Vec<Type>, Vec<LitInt>) {
let mut ids = vec![];
let mut tys = vec![];
let mut indexes = vec![];
for (i, (maybe_id, ty)) in self.pat_stx.bindings().into_iter().enumerate() {
if let Some(id) = maybe_id {
indexes.push(LitInt::new(&i.to_string(), id.span()));
ids.push(id);
tys.push(ty);
}
}
(ids, tys, indexes)
}
}
pub fn during(src: proc_macro::TokenStream) -> proc_macro::TokenStream {
let d = parse_macro_input!(src as During);
let During { turn_stx, ds_stx, pat_stx, body_stx } = &d;
let (varname_stx, type_stx, index_stx) = d.bindings();
let binding_count = varname_stx.len();
let pat_stx_expr = match pat::to_pattern_expr(pat_stx) {
Ok(e) => e,
Err(e) => return Error::new(Span::call_site(), e).to_compile_error().into(),
};
(quote_spanned!{Span::mixed_site()=> {
let monitor = syndicate::during::entity(())
.on_asserted_facet(|_, t, captures: syndicate::actor::AnyValue| {
if let Some(captures) = captures.value().as_sequence() {
if captures.len() == #binding_count {
#(let #varname_stx = match #type_stx::try_from(captures[#index_stx]) {
Ok(v) => v,
Err(_) => return Ok(()),
};)*
return (#body_stx)(t);
}
}
Ok(())
})
.create_cap(#turn_stx);
#ds_stx.assert(#turn_stx, &syndicate::schemas::dataspace::Observe {
pattern: #pat_stx_expr,
observer: monitor,
});
}}).into()
}

View File

@ -18,8 +18,9 @@ use syn::Ident;
use syn::Lit;
use syn::LitByteStr;
mod stx;
mod dur;
mod pat;
mod stx;
mod val;
use pat::lit;
@ -261,3 +262,10 @@ pub fn template(src: proc_macro::TokenStream) -> proc_macro::TokenStream {
pub fn pattern(src: proc_macro::TokenStream) -> proc_macro::TokenStream {
pat::pattern(src)
}
//---------------------------------------------------------------------------
#[proc_macro]
pub fn during(src: proc_macro::TokenStream) -> proc_macro::TokenStream {
dur::during(src)
}

View File

@ -17,14 +17,14 @@ pub fn lit<T: ToTokens>(e: T) -> TokenStream2 {
syndicate::schemas::dataspace_patterns::DLit { value: #e })))
}
fn compile_sequence_members(stxs: Vec<Stx>) -> Result<Vec<TokenStream2>, &'static str> {
stxs.into_iter().enumerate().map(|(i, stx)| {
fn compile_sequence_members(stxs: &Vec<Stx>) -> Result<Vec<TokenStream2>, &'static str> {
stxs.iter().enumerate().map(|(i, stx)| {
let p = to_pattern_expr(stx)?;
Ok(quote!((#i .into(), #p)))
}).collect()
}
fn to_pattern_expr(stx: Stx) -> Result<TokenStream2, &'static str> {
pub fn to_pattern_expr(stx: &Stx) -> Result<TokenStream2, &'static str> {
#[allow(non_snake_case)]
let P_: TokenStream2 = quote!(syndicate::schemas::dataspace_patterns);
#[allow(non_snake_case)]
@ -35,21 +35,24 @@ fn to_pattern_expr(stx: Stx) -> Result<TokenStream2, &'static str> {
match stx {
Stx::Atom(v) =>
Ok(lit(value_to_value_expr(&v))),
Stx::Binder(_, p) => {
let pe = to_pattern_expr(*p)?;
Ok(quote!(#P_::Pattern::DBind(Box::new(#P_::DBind { pattern: #pe }))))
Stx::Binder(_, maybe_ty, maybe_pat) => {
let inner_pat_expr = match maybe_pat {
Some(p) => to_pattern_expr(&*p)?,
None => match maybe_ty {
Some(ty) => quote!(#ty::wildcard_dataspace_pattern()),
None => to_pattern_expr(&Stx::Discard)?,
}
};
Ok(quote!(#P_::Pattern::DBind(Box::new(#P_::DBind { pattern: #inner_pat_expr }))))
}
Stx::Subst(e) =>
Ok(lit(e)),
Stx::Discard =>
Ok(quote!(#P_::Pattern::DDiscard(Box::new(#P_::DDiscard)))),
Stx::Ctor1(_, _) => todo!(),
Stx::CtorN(_, _) => todo!(),
Stx::Rec(l, fs) => {
let arity = fs.len() as u128;
let label = to_value_expr(*l)?;
let label = to_value_expr(&*l)?;
let members = compile_sequence_members(fs)?;
Ok(quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Rec {
ctor: Box::new(#P_::CRec { label: #label, arity: #arity .into() }),
@ -65,9 +68,9 @@ fn to_pattern_expr(stx: Stx) -> Result<TokenStream2, &'static str> {
}))))
}
Stx::Set(stxs) =>
Ok(lit(emit_set(&stxs.into_iter().map(to_value_expr).collect::<Result<Vec<_>,_>>()?))),
Ok(lit(emit_set(&stxs.iter().map(to_value_expr).collect::<Result<Vec<_>,_>>()?))),
Stx::Dict(d) => {
let members = d.into_iter().map(|(k, v)| {
let members = d.iter().map(|(k, v)| {
let k = to_value_expr(k)?;
let v = to_pattern_expr(v)?;
Ok(quote!((#k, #v)))
@ -82,7 +85,8 @@ fn to_pattern_expr(stx: Stx) -> Result<TokenStream2, &'static str> {
pub fn pattern(src: TokenStream) -> TokenStream {
let src2 = src.clone();
let e = to_pattern_expr(parse_macro_input!(src2 as Stx)).expect("Cannot compile pattern").into();
let e = to_pattern_expr(&parse_macro_input!(src2 as Stx))
.expect("Cannot compile pattern").into();
// println!("\n{:#} -->\n{:#}\n", &src, &e);
e
}

View File

@ -3,12 +3,16 @@ use proc_macro2::LineColumn;
use proc_macro2::TokenTree;
use syn::ExprLit;
use syn::Ident;
use syn::Lit;
use syn::Result;
use syn::Type;
use syn::buffer::Cursor;
use syn::parse::Error;
use syn::parse::Parse;
use syn::parse::Parser;
use syn::parse::ParseStream;
use syn::parse_str;
use syndicate::value::Float;
use syndicate::value::Double;
@ -18,11 +22,9 @@ use syndicate::value::NestedValue;
#[derive(Debug, Clone)]
pub enum Stx {
Atom(IOValue),
Binder(Option<String>, Box<Stx>),
Binder(Option<Ident>, Option<Type>, Option<Box<Stx>>),
Discard,
Subst(TokenTree),
Ctor1(String, Box<Stx>),
CtorN(String, Vec<(Stx, Stx)>),
Rec(Box<Stx>, Vec<Stx>),
Seq(Vec<Stx>),
Set(Vec<Stx>),
@ -35,6 +37,39 @@ impl Parse for Stx {
}
}
impl Stx {
pub fn bindings(&self) -> Vec<(Option<Ident>, Type)> {
let mut bs = vec![];
self._bindings(&mut bs);
bs
}
fn _bindings(&self, bs: &mut Vec<(Option<Ident>, Type)>) {
match self {
Stx::Atom(_) | Stx::Discard | Stx::Subst(_) => (),
Stx::Binder(id, ty, pat) => {
bs.push((id.clone(),
ty.clone().unwrap_or_else(
|| parse_str("syndicate::actor::AnyValue").unwrap())));
if let Some(p) = pat {
p._bindings(bs);
}
},
Stx::Rec(l, fs) => {
l._bindings(bs);
fs.iter().for_each(|f| f._bindings(bs));
},
Stx::Seq(vs) => vs.iter().for_each(|v| v._bindings(bs)),
Stx::Set(vs) => vs.iter().for_each(|v| v._bindings(bs)),
Stx::Dict(kvs) => kvs.iter().for_each(|(_k, v)| v._bindings(bs)),
}
}
}
fn punct_char(c: Cursor) -> Option<(char, Cursor)> {
c.punct().map(|(p, c)| (p.as_char(), c))
}
fn parse_id(mut c: Cursor) -> Result<(String, Cursor)> {
let mut id = String::new();
let mut prev_pos = c.span().start();
@ -83,11 +118,9 @@ fn parse_seq(delim_ch: char, mut c: Cursor) -> Result<(Vec<Stx>, Cursor)> {
fn skip_commas(mut c: Cursor) -> Cursor {
loop {
if let Some((p, next)) = c.punct() {
if p.as_char() == ',' {
c = next;
continue;
}
if let Some((',', next)) = punct_char(c) {
c = next;
continue;
}
return c;
}
@ -121,25 +154,20 @@ fn parse_group<'c, R, F: Fn(Cursor<'c>) -> Result<(R, Cursor<'c>)>>(
fn parse_kv(c: Cursor) -> Result<((Stx, Stx), Cursor)> {
let (k, c) = parse1(c)?;
if let Some((p, c)) = c.punct() {
if p.as_char() == ':' {
let (v, c) = parse1(c)?;
return Ok(((k, v), c));
}
if let Some((':', c)) = punct_char(c) {
let (v, c) = parse1(c)?;
return Ok(((k, v), c));
}
Err(Error::new(c.span(), "Expected ':'"))
}
fn adjacent_id(pos: LineColumn, c: Cursor) -> (Option<String>, Cursor) {
fn adjacent_ident(pos: LineColumn, c: Cursor) -> (Option<Ident>, Cursor) {
if c.span().start() != pos {
(None, c)
} else if let Some((id, next)) = c.ident() {
(Some(id), next)
} else {
let (s, next) = parse_id(c).unwrap();
if s.is_empty() {
(None, c)
} else {
(Some(s), next)
}
(None, c)
}
}
@ -151,6 +179,27 @@ fn parse_exactly_one<'c>(c: Cursor<'c>) -> Result<Stx> {
})
}
fn parse_generic<T: Parse>(mut c: Cursor) -> Option<(T, Cursor)> {
match T::parse.parse2(c.token_stream()) {
Ok(t) => Some((t, Cursor::empty())), // because parse2 checks for end-of-stream!
Err(e) => {
// OK, because parse2 checks for end-of-stream, let's chop
// the input at the position of the error and try again (!).
let mut collected = Vec::new();
let upto = e.span().start();
while !c.eof() && c.span().start() != upto {
let (tt, next) = c.token_tree().unwrap();
collected.push(tt);
c = next;
}
match T::parse.parse2(collected.into_iter().collect()) {
Ok(t) => Some((t, c)),
Err(_) => None,
}
}
}
}
fn parse1(c: Cursor) -> Result<(Stx, Cursor)> {
if let Some((p, next)) = c.punct() {
match p.as_char() {
@ -162,12 +211,20 @@ fn parse1(c: Cursor) -> Result<(Stx, Cursor)> {
'{' => parse_group(Delimiter::Brace, parse_kv, c).map(|(q,c)| (Stx::Dict(q),c)),
'[' => parse_group(Delimiter::Bracket, parse1, c).map(|(q,c)| (Stx::Seq(q),c)),
'$' => {
let (maybe_id, next) = adjacent_id(p.span().end(), next);
if let Some((inner, _, next)) = next.group(Delimiter::Parenthesis) {
parse_exactly_one(inner).map(
|q| (Stx::Binder(maybe_id, Box::new(q)), next))
let (maybe_id, next) = adjacent_ident(p.span().end(), next);
let (maybe_type, next) = if let Some((':', next)) = punct_char(next) {
match parse_generic::<Type>(next) {
Some((t, next)) => (Some(t), next),
None => (None, next)
}
} else {
Ok((Stx::Binder(maybe_id, Box::new(Stx::Discard)), next))
(None, next)
};
if let Some((inner, _, next)) = next.group(Delimiter::Brace) {
parse_exactly_one(inner).map(
|q| (Stx::Binder(maybe_id, maybe_type, Some(Box::new(q))), next))
} else {
Ok((Stx::Binder(maybe_id, maybe_type, None), next))
}
}
'#' => {
@ -185,17 +242,7 @@ fn parse1(c: Cursor) -> Result<(Stx, Cursor)> {
if i.to_string() == "_" {
Ok((Stx::Discard, next))
} else {
parse_id(c).and_then(|(q,c)| {
if let Some((inner, _, next)) = c.group(Delimiter::Parenthesis) {
match parse_group_inner(inner, parse_kv, next) {
Ok((kvs, next)) => Ok((Stx::CtorN(q, kvs), next)),
Err(_) => parse_exactly_one(inner).map(
|v| (Stx::Ctor1(q, Box::new(v)), next)),
}
} else {
Ok((Stx::Atom(IOValue::symbol(&q)), c))
}
})
parse_id(c).and_then(|(q,c)| Ok((Stx::Atom(IOValue::symbol(&q)), c)))
}
} else if let Some((literal, next)) = c.literal() {
let t: ExprLit = syn::parse_str(&literal.to_string())?;

View File

@ -84,18 +84,15 @@ pub fn value_to_value_expr(v: &IOValue) -> TokenStream2 {
}
}
pub fn to_value_expr(stx: Stx) -> Result<TokenStream2, &'static str> {
pub fn to_value_expr(stx: &Stx) -> Result<TokenStream2, &'static str> {
match stx {
Stx::Atom(v) => Ok(value_to_value_expr(&v)),
Stx::Binder(_, _) => Err("Cannot use binder in literal value"),
Stx::Binder(_, _, _) => Err("Cannot use binder in literal value"),
Stx::Discard => Err("Cannot use discard in literal value"),
Stx::Subst(e) => Ok(e.into()),
Stx::Ctor1(_, _) => todo!(),
Stx::CtorN(_, _) => todo!(),
Stx::Subst(e) => Ok(e.clone().into()),
Stx::Rec(l, fs) =>
Ok(emit_record(to_value_expr(*l)?,
Ok(emit_record(to_value_expr(&*l)?,
&fs.into_iter().map(to_value_expr).collect::<Result<Vec<_>,_>>()?)),
Stx::Seq(vs) =>
Ok(emit_seq(&vs.into_iter().map(to_value_expr).collect::<Result<Vec<_>,_>>()?)),

View File

@ -1,6 +1,6 @@
[package]
name = "syndicate-server"
version = "0.11.0"
version = "0.12.0"
authors = ["Tony Garnock-Jones <tonyg@leastfixedpoint.com>"]
edition = "2018"
@ -10,17 +10,16 @@ repository = "https://git.syndicate-lang.org/syndicate-lang/syndicate-rs"
license = "Apache-2.0"
[build-dependencies]
preserves-schema = "1.0.0"
preserves-schema = "2.0.0-rc1"
[dependencies]
preserves-schema = "1.0.0"
syndicate = { path = "../syndicate", version = "^0.11.0"}
syndicate-macros = { path = "../syndicate-macros", version = "^0.6.0"}
preserves-schema = "2.0.0-rc1"
syndicate = { path = "../syndicate", version = "^0.12.0"}
syndicate-macros = { path = "../syndicate-macros", version = "^0.7.0"}
futures = "0.3"
lazy_static = "1.4"
notify = "4.0"
structopt = "0.3"
tungstenite = "0.13"

View File

@ -8,8 +8,12 @@ fn main() -> std::io::Result<()> {
let mut c = CompilerConfig::new(gen_dir, "crate::schemas".to_owned());
// c.plugins.push(Box::new(syndicate_plugins::PatternPlugin));
c.module_aliases.insert(vec!["EntityRef".to_owned()], "syndicate::actor".to_owned());
c.module_aliases.insert(vec!["TransportAddress".to_owned()], "syndicate::schemas::transport_address".to_owned());
c.add_external_module(ExternalModule::new(vec!["EntityRef".to_owned()], "syndicate::actor"));
c.add_external_module(
ExternalModule::new(vec!["TransportAddress".to_owned()],
"syndicate::schemas::transport_address")
.set_fallback_language_types(
|v| vec![format!("syndicate::schemas::Language<{}>", v)].into_iter().collect()));
let inputs = expand_inputs(&vec!["protocols/schema-bundle.bin".to_owned()])?;
c.load_schemas_and_bundles(&inputs)?;

View File

@ -3,6 +3,7 @@ use std::sync::Arc;
use structopt::StructOpt;
use syndicate::actor::*;
use syndicate::language;
use syndicate::relay;
use syndicate::schemas::dataspace::Observe;
use syndicate::sturdy;
@ -42,7 +43,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
})
.create_cap(t);
ds.assert(t, &Observe {
ds.assert(t, language(), &Observe {
pattern: syndicate_macros::pattern!{<Says $ $>},
observer: Arc::clone(&consumer),
});

View File

@ -4,6 +4,7 @@ use std::time::SystemTime;
use structopt::StructOpt;
use syndicate::actor::*;
use syndicate::language;
use syndicate::relay;
use syndicate::schemas::dataspace::Observe;
use syndicate::sturdy;
@ -137,9 +138,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let padding = &bindings[1];
if should_echo || (report_latency_every == 0) {
ds.message(t, simple_record2(&send_label,
timestamp.clone(),
padding.clone()));
ds.message(t, &(), &simple_record2(&send_label,
timestamp.clone(),
padding.clone()));
} else {
if let None = current_reply {
turn_counter += 1;
@ -159,7 +160,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Value::from(now()).wrap(),
padding.clone()));
}
ds.message(t, current_reply.as_ref().expect("some reply").clone());
ds.message(t, &(), current_reply.as_ref().expect("some reply"));
}
}
}
@ -168,7 +169,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Cap::new(&self_ref)
};
ds.assert(t, &Observe {
ds.assert(t, language(), &Observe {
pattern: {
let recv_label = AnyValue::symbol(recv_label);
syndicate_macros::pattern!{<#(recv_label) $ $>}

View File

@ -3,6 +3,7 @@ use std::sync::Arc;
use structopt::StructOpt;
use syndicate::actor::*;
use syndicate::language;
use syndicate::relay;
use syndicate::schemas::dataspace::Observe;
use syndicate::sturdy;
@ -63,7 +64,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}).create_cap(t)
};
ds.assert(t, &Observe {
ds.assert(t, language(), &Observe {
pattern: syndicate_macros::pattern!{<Present $>},
observer: Arc::clone(&consumer),
});

View File

@ -1 +1,4 @@
´³bundle·µ³internalServices„´³schema·³version³ definitions·³ DebtReporter´³lit³ debt-reporter„³ ConfigWatcher´³rec´³lit³config-watcher„´³tupleµ´³named³path´³atom³String„„„„„³TcpRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Tcp„„„„„³UnixRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Unix„„„„„„³ embeddedType€„„„„
´³bundle·µ³externalServices„´³schema·³version³ definitions·³Service´³refµ„³ DaemonService„³ClearEnv´³orµµ±present´³dict·³clearEnv´³named³clearEnv´³atom³Boolean„„„„„µ±absent´³dict·„„„„„³DaemonId³any³EnvValue´³orµµ±set´³atom³String„„µ±remove´³lit€„„„„³ DaemonDir´³orµµ±present´³dict·³dir´³named³dir´³atom³String„„„„„µ±absent´³dict·„„„„„³ DaemonEnv´³orµµ±present´³dict·³env´³named³env´³dictof´³atom³String„´³refµ„³EnvValue„„„„„„µ±absent´³dict·„„„„„³
DaemonSpec´³andµ´³dict·³argv´³named³argv´³seqof´³atom³String„„„„„´³named³env´³refµ„³ DaemonEnv„„´³named³dir´³refµ„³ DaemonDir„„´³named³clearEnv´³refµ„³ClearEnv„„„„³ DaemonProcess´³rec´³lit³daemon„´³tupleµ´³named³id´³refµ„³DaemonId„„´³named³config´³refµ„³
DaemonSpec„„„„„³ DaemonService´³rec´³lit³daemon„´³tupleµ´³named³id´³refµ„³DaemonId„„„„„³ServiceDependency´³rec´³lit³
depends-on„´³tupleµ´³named³depender³any„´³named³dependee³any„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³internalServices„´³schema·³version³ definitions·³ Milestone´³rec´³lit³ milestone„´³tupleµ´³named³name³any„„„„³ DebtReporter´³lit³ debt-reporter„³ ConfigWatcher´³rec´³lit³config-watcher„´³tupleµ´³named³path´³atom³String„„„„„³TcpRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Tcp„„„„„³UnixRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Unix„„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„„„

View File

@ -0,0 +1,17 @@
version 1 .
embeddedType EntityRef.Cap .
Service = DaemonService .
DaemonService = <daemon @id DaemonId> .
ServiceDependency = <depends-on @depender any @dependee any> .
DaemonProcess = <daemon @id DaemonId @config DaemonSpec>.
DaemonId = any .
DaemonSpec = { argv: [string ...] } & @env DaemonEnv & @dir DaemonDir & @clearEnv ClearEnv .
DaemonEnv = @present { env: { string: EnvValue ...:... } } / @absent {} .
DaemonDir = @present { dir: string } / @absent {} .
ClearEnv = @present { clearEnv: bool } / @absent {} .
EnvValue = @set string / @remove #f .

View File

@ -1,7 +1,9 @@
version 1 .
embeddedType EntityRef.Cap .
DebtReporter = =debt-reporter .
TcpRelayListener = <relay-listener @addr TransportAddress.Tcp> .
UnixRelayListener = <relay-listener @addr TransportAddress.Unix> .
ConfigWatcher = <config-watcher @path string>.
Milestone = <milestone @name any>.

View File

@ -1,3 +1,5 @@
use preserves_schema::Codec;
use std::sync::Arc;
use syndicate::actor::*;
@ -6,6 +8,8 @@ use syndicate::schemas::gatekeeper;
use syndicate::sturdy;
use syndicate::value::NestedValue;
use crate::language::language;
pub fn bind(
t: &mut Activation,
ds: &Arc<Cap>,
@ -14,8 +18,8 @@ pub fn bind(
target: Arc<Cap>,
) {
let sr = sturdy::SturdyRef::mint(oid.clone(), &key);
tracing::info!(cap = ?AnyValue::from(&sr), hex = %sr.to_hex());
ds.assert(t, &gatekeeper::Bind { oid, key: key.to_vec(), target });
tracing::info!(cap = ?language().unparse(&sr), hex = %sr.to_hex());
ds.assert(t, language(), &gatekeeper::Bind { oid, key: key.to_vec(), target });
}
pub fn handle_resolve(
@ -34,15 +38,15 @@ pub fn handle_resolve(
let unattenuated_target = bindings[1].value().to_embedded()?;
match sturdyref.validate_and_attenuate(key, unattenuated_target) {
Err(e) => {
tracing::warn!(sturdyref = ?AnyValue::from(&sturdyref),
tracing::warn!(sturdyref = ?language().unparse(&sturdyref),
"sturdyref failed validation: {}", e);
Ok(None)
},
Ok(target) => {
tracing::trace!(sturdyref = ?AnyValue::from(&sturdyref),
tracing::trace!(sturdyref = ?language().unparse(&sturdyref),
?target,
"sturdyref resolved");
if let Some(h) = observer.assert(t, AnyValue::domain(target)) {
if let Some(h) = observer.assert(t, &(), &AnyValue::domain(target)) {
Ok(Some(Box::new(move |_observer, t| Ok(t.retract(h)))))
} else {
Ok(None)
@ -51,7 +55,7 @@ pub fn handle_resolve(
}
})
.create_cap(t);
if let Some(oh) = ds.assert(t, &dataspace::Observe {
if let Some(oh) = ds.assert(t, language(), &dataspace::Observe {
// TODO: codegen plugin to generate pattern constructors
pattern: syndicate_macros::pattern!{<bind #(queried_oid) $ $>},
observer: handler,

View File

@ -0,0 +1,6 @@
use syndicate::actor;
preserves_schema::define_language!(language(): Language<actor::AnyValue> {
syndicate: syndicate::schemas::Language,
server: crate::schemas::Language,
});

View File

@ -1,10 +1,11 @@
use preserves_schema::Codec;
use std::path::PathBuf;
use std::sync::Arc;
use structopt::StructOpt;
use syndicate::actor::*;
use syndicate::convert::from_io_value;
use syndicate::dataspace::*;
use syndicate::relay;
use syndicate::schemas::service;
@ -13,6 +14,7 @@ use syndicate::schemas::transport_address;
use syndicate::value::NestedValue;
mod gatekeeper;
mod language;
mod protocol;
mod services;
@ -20,6 +22,7 @@ mod schemas {
include!(concat!(env!("OUT_DIR"), "/src/schemas/mod.rs"));
}
use language::language;
use schemas::internal_services;
#[derive(Clone, StructOpt)]
@ -91,49 +94,47 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
gatekeeper::bind(t, &root_ds, AnyValue::new("server-config"), [0; 16],
Arc::clone(&server_config_ds));
let gateway = Cap::guard(&t.create(
let gateway = Cap::guard(Arc::clone(&language().syndicate), t.create(
syndicate::entity(Arc::clone(&root_ds)).on_asserted(gatekeeper::handle_resolve)));
services::debt_reporter::on_demand(t, Arc::clone(&server_config_ds));
services::tcp_relay_listener::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&gateway));
services::unix_relay_listener::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&gateway));
services::config_watcher::on_demand(t, Arc::clone(&server_config_ds));
services::daemon::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&root_ds));
if config.debt_reporter {
server_config_ds.assert(t, &service::RequireService {
service_name: from_io_value(&internal_services::DebtReporter)?,
server_config_ds.assert(t, language(), &service::RequireService {
service_name: language().unparse(&internal_services::DebtReporter),
});
}
for port in config.ports.clone() {
server_config_ds.assert(t, &service::RequireService {
service_name: from_io_value(
&internal_services::TcpRelayListener {
addr: transport_address::Tcp {
host: "0.0.0.0".to_owned(),
port: (port as i32).into(),
}
})?,
server_config_ds.assert(t, language(), &service::RequireService {
service_name: language().unparse(&internal_services::TcpRelayListener {
addr: transport_address::Tcp {
host: "0.0.0.0".to_owned(),
port: (port as i32).into(),
}
}),
});
}
for path in config.sockets.clone() {
server_config_ds.assert(t, &service::RequireService {
service_name: from_io_value(
&internal_services::UnixRelayListener {
addr: transport_address::Unix {
path: path.to_str().expect("representable UnixListener path").to_owned(),
}
})?,
server_config_ds.assert(t, language(), &service::RequireService {
service_name: language().unparse(&internal_services::UnixRelayListener {
addr: transport_address::Unix {
path: path.to_str().expect("representable UnixListener path").to_owned(),
}
}),
});
}
for path in config.config.clone() {
server_config_ds.assert(t, &service::RequireService {
service_name: from_io_value(
&internal_services::ConfigWatcher {
path: path.to_str().expect("representable ConfigWatcher path").to_owned(),
})?,
server_config_ds.assert(t, language(), &service::RequireService {
service_name: language().unparse(&internal_services::ConfigWatcher {
path: path.to_str().expect("representable ConfigWatcher path").to_owned(),
}),
});
}

View File

@ -3,7 +3,8 @@ use notify::Watcher;
use notify::RecursiveMode;
use notify::watcher;
use std::convert::TryFrom;
use preserves_schema::Codec;
use std::fs;
use std::future;
use std::io;
@ -14,7 +15,6 @@ use std::thread;
use std::time::Duration;
use syndicate::actor::*;
use syndicate::convert::*;
use syndicate::during::entity;
use syndicate::schemas::dataspace::Observe;
use syndicate::value::BinarySource;
@ -26,6 +26,7 @@ use syndicate::value::Reader;
use syndicate::value::Set;
use syndicate::value::ViaCodec;
use crate::language::language;
use crate::schemas::internal_services;
pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
@ -41,8 +42,8 @@ pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
}
})
.create_cap(t);
ds.assert(t, &Observe {
pattern: syndicate_macros::pattern!{<require-service $(<config-watcher _>)>},
ds.assert(t, language(), &Observe {
pattern: syndicate_macros::pattern!{<require-service ${<config-watcher _>}>},
observer: monitor,
});
Ok(())
@ -57,13 +58,13 @@ fn assertions_at_existing_file(t: &mut Activation, ds: &Arc<Cap>, path: &PathBuf
let mut handles = Set::new();
let fh = fs::File::open(path)?;
let mut src = IOBinarySource::new(fh);
let mut r = src.text::<_, AnyValue, _>(ViaCodec::new(NoEmbeddedDomainCodec));
let mut r = src.text::<AnyValue, _>(ViaCodec::new(NoEmbeddedDomainCodec));
let mut values = Vec::new();
while let Some(value) = Reader::<_, AnyValue>::next(&mut r, true)? {
while let Some(value) = Reader::<AnyValue>::next(&mut r, true)? {
values.push(value);
}
for value in values.into_iter() {
if let Some(handle) = ds.assert(t, value.clone()) {
if let Some(handle) = ds.assert(t, &(), &value) {
handles.insert(handle);
}
}
@ -145,11 +146,10 @@ fn initial_scan(
}
fn run(t: &mut Activation, ds: Arc<Cap>, captures: AnyValue) -> ActorResult {
let spec = internal_services::ConfigWatcher::try_from(&from_any_value(
&captures.value().to_sequence()?[0])?)?;
let spec: internal_services::ConfigWatcher = language().parse(&captures.value().to_sequence()?[0])?;
{
let spec = from_io_value(&spec)?;
ds.assert(t, syndicate_macros::template!("<service-running =spec>"));
let spec = language().unparse(&spec);
ds.assert(t, &(), &syndicate_macros::template!("<service-running =spec>"));
}
let path = fs::canonicalize(spec.path)?;

View File

@ -0,0 +1,47 @@
use std::convert::TryFrom;
use std::sync::Arc;
use syndicate::actor::*;
use syndicate::during::entity;
use syndicate::schemas::dataspace::Observe;
use syndicate::supervise::{Supervisor, SupervisorConfiguration};
use syndicate::value::NestedValue;
use crate::schemas::external_services;
use syndicate_macros::during;
pub fn on_demand(t: &mut Activation, config_ds: Arc<Cap>, root_ds: Arc<Cap>) {
// t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| {
// during!(t, config_ds, <require-service $spec: external_services::DaemonService>, |t| {
// let config_ds = Arc::clone(&config_ds);
// let root_ds = Arc::clone(&root_ds);
// Ok(Supervisor::start(
// t,
// syndicate::name!(parent: None, "daemon", service = ?spec_any),
// SupervisorConfiguration::default(),
// move |t| run(t, Arc::clone(&config_ds), Arc::clone(&root_ds), spec.clone())))
// });
// Ok(())
// });
}
// fn run(
// t: &mut Activation,
// config_ds: Arc<Cap>,
// _root_ds: Arc<Cap>,
// captures: AnyValue,
// ) -> ActorResult {
// let spec = external_services::DaemonService::try_from(&from_any_value(
// &captures.value().to_sequence()?[0])?)?;
// {
// let spec = from_io_value(&spec)?;
// config_ds.assert(t, syndicate_macros::template!("<service-running =spec>"));
// }
// tracing::info!("daemon {:?}", &spec);
// Ok(())
// }

View File

@ -1,10 +1,12 @@
use preserves_schema::Codec;
use std::sync::Arc;
use syndicate::actor::*;
use syndicate::convert::*;
use syndicate::during::entity;
use syndicate::schemas::dataspace::Observe;
use crate::language::language;
use crate::schemas::internal_services;
pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
@ -19,8 +21,8 @@ pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
}
})
.create_cap(t);
let spec = from_io_value(&internal_services::DebtReporter)?;
ds.assert(t, &Observe {
let spec = language().unparse(&internal_services::DebtReporter);
ds.assert(t, language(), &Observe {
pattern: syndicate_macros::pattern!{<require-service #(spec)>},
observer: monitor,
});
@ -29,8 +31,8 @@ pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
}
fn run(t: &mut Activation, ds: Arc<Cap>) -> ActorResult {
let spec = from_io_value(&internal_services::DebtReporter)?;
ds.assert(t, syndicate_macros::template!("<service-running =spec>"));
let spec = language().unparse(&internal_services::DebtReporter);
ds.assert(t, &(), &syndicate_macros::template!("<service-running =spec>"));
t.linked_task(syndicate::name!("tick"), async {
let mut timer = tokio::time::interval(core::time::Duration::from_secs(1));
loop {

View File

@ -1,4 +1,5 @@
pub mod config_watcher;
pub mod daemon;
pub mod debt_reporter;
pub mod tcp_relay_listener;
pub mod unix_relay_listener;

View File

@ -1,8 +1,9 @@
use preserves_schema::Codec;
use std::convert::TryFrom;
use std::sync::Arc;
use syndicate::actor::*;
use syndicate::convert::*;
use syndicate::during::entity;
use syndicate::schemas::dataspace::Observe;
use syndicate::supervise::{Supervisor, SupervisorConfiguration};
@ -10,6 +11,7 @@ use syndicate::value::NestedValue;
use tokio::net::TcpListener;
use crate::language::language;
use crate::protocol::detect_protocol;
use crate::schemas::internal_services;
@ -30,8 +32,8 @@ pub fn on_demand(t: &mut Activation, ds: Arc<Cap>, gateway: Arc<Cap>) {
}
})
.create_cap(t);
ds.assert(t, &Observe {
pattern: syndicate_macros::pattern!{<require-service $(<relay-listener <tcp _ _>>)>},
ds.assert(t, language(), &Observe {
pattern: syndicate_macros::pattern!{<require-service ${<relay-listener <tcp _ _>>}>},
observer: monitor,
});
Ok(())
@ -44,13 +46,12 @@ fn run(
gateway: Arc<Cap>,
captures: AnyValue,
) -> ActorResult {
let spec = internal_services::TcpRelayListener::try_from(&from_any_value(
&captures.value().to_sequence()?[0])?)?;
let spec: internal_services::TcpRelayListener = language().parse(&captures.value().to_sequence()?[0])?;
let host = spec.addr.host.clone();
let port = u16::try_from(&spec.addr.port).map_err(|_| "Invalid TCP port number")?;
{
let spec = from_io_value(&spec)?;
ds.assert(t, syndicate_macros::template!("<service-running =spec>"));
let spec = language().unparse(&spec);
ds.assert(t, &(), &syndicate_macros::template!("<service-running =spec>"));
}
let parent_span = tracing::Span::current();
t.linked_task(syndicate::name!("listener"), async move {

View File

@ -1,10 +1,10 @@
use std::convert::TryFrom;
use preserves_schema::Codec;
use std::io;
use std::path::PathBuf;
use std::sync::Arc;
use syndicate::actor::*;
use syndicate::convert::*;
use syndicate::during::entity;
use syndicate::error::Error;
use syndicate::relay;
@ -14,6 +14,7 @@ use syndicate::value::NestedValue;
use tokio::net::UnixListener;
use tokio::net::UnixStream;
use crate::language::language;
use crate::protocol::run_connection;
use crate::schemas::internal_services;
@ -31,8 +32,8 @@ pub fn on_demand(t: &mut Activation, ds: Arc<Cap>, gateway: Arc<Cap>) {
}
})
.create_cap(t);
ds.assert(t, &Observe {
pattern: syndicate_macros::pattern!{<require-service $(<relay-listener <unix _>>)>},
ds.assert(t, language(), &Observe {
pattern: syndicate_macros::pattern!{<require-service ${<relay-listener <unix _>>}>},
observer: monitor,
});
Ok(())
@ -45,12 +46,11 @@ fn run(
gateway: Arc<Cap>,
captures: AnyValue,
) -> ActorResult {
let spec = internal_services::UnixRelayListener::try_from(&from_any_value(
&captures.value().to_sequence()?[0])?)?;
let spec: internal_services::UnixRelayListener = language().parse(&captures.value().to_sequence()?[0])?;
let path_str = spec.addr.path.clone();
{
let spec = from_io_value(&spec)?;
ds.assert(t, syndicate_macros::template!("<service-running =spec>"));
let spec = language().unparse(&spec);
ds.assert(t, &(), &syndicate_macros::template!("<service-running =spec>"));
}
let parent_span = tracing::Span::current();
t.linked_task(syndicate::name!("listener"), async move {

View File

@ -1,6 +1,6 @@
[package]
name = "syndicate"
version = "0.11.0"
version = "0.12.0"
authors = ["Tony Garnock-Jones <tonyg@leastfixedpoint.com>"]
edition = "2018"
@ -13,11 +13,11 @@ license = "Apache-2.0"
vendored-openssl = ["openssl/vendored"]
[build-dependencies]
preserves-schema = "1.0.0"
preserves-schema = "2.0.0-rc1"
[dependencies]
preserves = "1.0.0"
preserves-schema = "1.0.0"
preserves = "2.0.0-rc1"
preserves-schema = "2.0.0-rc1"
tokio = { version = "1.10", features = ["io-util", "macros", "rt", "rt-multi-thread", "time"] }
tokio-util = "0.6"
@ -27,6 +27,7 @@ futures = "0.3"
getrandom = "0.2"
hmac = "0.11"
lazy_static = "1.4"
sha2 = "0.9"
tracing = "0.1"

View File

@ -6,6 +6,7 @@ use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Instant;
use syndicate::language;
use syndicate::actor::*;
use syndicate::during::entity;
use syndicate::dataspace::Dataspace;
@ -101,7 +102,7 @@ pub fn bench_pub(c: &mut Criterion) {
})
.create_cap(t);
ds.assert(t, &Observe {
ds.assert(t, language(), &Observe {
pattern: p::Pattern::DBind(Box::new(p::DBind {
pattern: p::Pattern::DLit(Box::new(p::DLit {
value: AnyValue::symbol("consumer"),
@ -122,8 +123,8 @@ pub fn bench_pub(c: &mut Criterion) {
let shutdown = Cap::new(&t.create(ShutdownEntity));
let receiver = Cap::new(&t.create(Receiver(Arc::clone(&turn_count))));
ds.assert(t, Value::<AnyValue, _>::symbol("consumer").wrap());
ds.assert(t, &Observe {
ds.assert(t, &(), &AnyValue::symbol("consumer"));
ds.assert(t, language(), &Observe {
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec {
ctor: Box::new(p::CRec {
label: AnyValue::symbol("Says"),
@ -140,7 +141,7 @@ pub fn bench_pub(c: &mut Criterion) {
})),
observer: receiver,
});
ds.assert(t, &Observe {
ds.assert(t, language(), &Observe {
pattern: p::Pattern::DBind(Box::new(p::DBind {
pattern: p::Pattern::DLit(Box::new(p::DLit {
value: AnyValue::new(true),

View File

@ -9,16 +9,12 @@ mod syndicate_plugins {
pub(super) struct PatternPlugin;
impl Plugin for PatternPlugin {
fn generate(
fn generate_definition(
&self,
m: &mut context::ModuleContext,
_m: &mut context::ModuleContext,
_definition_name: &str,
_definition: &Definition,
) {
if m.mode.is_some() {
return;
}
// TODO: Emit code for building instances of sturdy.Pattern and sturdy.Template
}
}
@ -32,7 +28,7 @@ fn main() -> std::io::Result<()> {
let mut c = CompilerConfig::new(gen_dir, "crate::schemas".to_owned());
c.plugins.push(Box::new(syndicate_plugins::PatternPlugin));
c.module_aliases.insert(vec!["EntityRef".to_owned()], "crate::actor".to_owned());
c.add_external_module(ExternalModule::new(vec!["EntityRef".to_owned()], "crate::actor"));
let inputs = expand_inputs(&vec!["protocols/schema-bundle.bin".to_owned()])?;
c.load_schemas_and_bundles(&inputs)?;

View File

@ -17,7 +17,9 @@ use preserves::value::IOValue;
use preserves::value::Map;
use preserves::value::NestedValue;
use preserves::value::Set;
use preserves_schema::support::ParseError;
use preserves_schema::ParseError;
use preserves_schema::support::Parse;
use preserves_schema::support::Unparse;
use std::boxed::Box;
use std::collections::hash_map::HashMap;
@ -401,12 +403,13 @@ pub struct Cap {
/// The [`Entity`] implementation for `Guard` decodes `AnyValue`
/// assertions/messages to type `M` before passing them on to the
/// underlying entity.
pub struct Guard<M>
pub struct Guard<L, M>
where
for<'a> &'a M: Into<AnyValue>,
for<'a> M: TryFrom<&'a AnyValue>,
M: for<'a> Unparse<&'a L, AnyValue>,
M: for<'a> Parse<&'a L, AnyValue>,
{
underlying: Arc<Ref<M>>
underlying: Arc<Ref<M>>,
literals: Arc<L>,
}
/// Simple entity that stops its containing facet when any assertion it receives is
@ -472,6 +475,18 @@ impl From<&Synced> for AnyValue {
}
}
impl<'a> Parse<&'a (), AnyValue> for Synced {
fn parse(_language: &'a (), value: &AnyValue) -> Result<Self, ParseError> {
Synced::try_from(value)
}
}
impl<'a> Unparse<&'a (), AnyValue> for Synced {
fn unparse(&self, _language: &'a ()) -> AnyValue {
self.into()
}
}
impl FacetRef {
/// Executes `f` in a new "[turn][Activation]" for `actor`. If `f` returns `Ok(())`,
/// [commits the turn][Activation::deliver] and performs the buffered actions; otherwise,
@ -1555,15 +1570,18 @@ impl Cap {
/// `AnyValue`, yields a `Cap` for the referenced entity. The
/// `Cap` automatically decodes presented `AnyValue`s into
/// instances of `M`.
pub fn guard<M: 'static + Send>(underlying: &Arc<Ref<M>>) -> Arc<Self>
pub fn guard<L: 'static + Sync + Send, M: 'static + Send>(
literals: Arc<L>,
underlying: Arc<Ref<M>>,
) -> Arc<Self>
where
for<'a> &'a M: Into<AnyValue>,
for<'a> M: TryFrom<&'a AnyValue>,
M: for<'a> Unparse<&'a L, AnyValue>,
M: for<'a> Parse<&'a L, AnyValue>,
{
Self::new(&Arc::new(Ref {
mailbox: Arc::clone(&underlying.mailbox),
facet_id: underlying.facet_id,
target: Mutex::new(Some(Box::new(Guard { underlying: underlying.clone() }))),
target: Mutex::new(Some(Box::new(Guard { underlying: underlying, literals }))),
}))
}
@ -1600,15 +1618,17 @@ impl Cap {
/// Translates `m` into an `AnyValue`, passes it through
/// [`rewrite`][Self::rewrite], and then
/// [`assert`s][Activation::assert] it using the activation `t`.
pub fn assert<M: Into<AnyValue>>(&self, t: &mut Activation, m: M) -> Option<Handle> {
self.rewrite(m.into()).map(|m| t.assert(&self.underlying, m))
pub fn assert<L, M: Unparse<L, AnyValue>>(&self, t: &mut Activation, literals: L, m: &M) -> Option<Handle>
{
self.rewrite(m.unparse(literals)).map(|m| t.assert(&self.underlying, m))
}
/// Translates `m` into an `AnyValue`, passes it through
/// [`rewrite`][Self::rewrite], and then sends it via method
/// [`message`][Activation::message] on the activation `t`.
pub fn message<M: Into<AnyValue>>(&self, t: &mut Activation, m: M) {
if let Some(m) = self.rewrite(m.into()) {
pub fn message<L, M: Unparse<L, AnyValue>>(&self, t: &mut Activation, literals: L, m: &M)
{
if let Some(m) = self.rewrite(m.unparse(literals)) {
t.message(&self.underlying, m)
}
}
@ -1643,13 +1663,13 @@ impl std::convert::From<&Cap> for IOValue {
}
}
impl<M> Entity<AnyValue> for Guard<M>
impl<L: Sync + Send, M> Entity<AnyValue> for Guard<L, M>
where
for<'a> &'a M: Into<AnyValue>,
for<'a> M: TryFrom<&'a AnyValue>,
M: for<'a> Unparse<&'a L, AnyValue>,
M: for<'a> Parse<&'a L, AnyValue>,
{
fn assert(&mut self, t: &mut Activation, a: AnyValue, h: Handle) -> ActorResult {
match M::try_from(&a) {
match M::parse(&*self.literals, &a) {
Ok(a) => t.with_entity(&self.underlying, |t, e| e.assert(t, a, h)),
Err(_) => Ok(()),
}
@ -1658,7 +1678,7 @@ where
t.with_entity(&self.underlying, |t, e| e.retract(t, h))
}
fn message(&mut self, t: &mut Activation, m: AnyValue) -> ActorResult {
match M::try_from(&m) {
match M::parse(&*self.literals, &m) {
Ok(m) => t.with_entity(&self.underlying, |t, e| e.message(t, m)),
Err(_) => Ok(()),
}

View File

@ -1,32 +0,0 @@
//! Useful utilities for working with [`AnyValue`]s.
use preserves::value::Embeddable;
use preserves::value::IOValue;
use preserves::value::NestedValue;
use std::convert::TryInto;
use crate::actor::*;
/// Converts an `AnyValue` to any [`crate::value::NestedValue`],
/// signalling an error if any embedded values are found in `v`.
pub fn from_any_value<N: NestedValue<D>, D: Embeddable>(v: &AnyValue) -> Result<N, &'static str> {
v.copy_via(&mut |_| Err("Embedded values cannot be converted")?)
}
/// Converts any [`crate::value::NestedValue`] to an `AnyValue`,
/// signalling an error if any embedded values are found in `v`.
pub fn to_any_value<N: NestedValue<D>, D: Embeddable>(v: &N) -> Result<AnyValue, &'static str> {
v.copy_via(&mut |_| Err("Embedded values cannot be converted")?)
}
/// Special case of [`to_any_value`] for [`IOValue`].
pub fn from_io_value<V: TryInto<IOValue>>(v: V) -> Result<AnyValue, &'static str> {
to_any_value(&v.try_into().map_err(|_| "Could not convert to IOValue")?)
}
/// Identity function for helping `rustc` decide which
/// [`crate::value::NestedValue`] to use (namely, [`AnyValue`]).
pub fn any_value(v: AnyValue) -> AnyValue {
v
}

View File

@ -8,14 +8,14 @@
//! on the web](https://syndicate-lang.org/tonyg-dissertation/).
//! [PDF](https://syndicate-lang.org/papers/conversational-concurrency-201712310922.pdf).
use super::language;
use super::skeleton;
use super::actor::*;
use super::schemas::dataspace::*;
use super::schemas::dataspace::_Any;
use preserves::value::Map;
use std::convert::TryFrom;
use preserves_schema::Codec;
// #[derive(Debug)]
// pub struct Churn {
@ -99,7 +99,7 @@ impl Entity<_Any> for Dataspace {
// self.churn.assertions_added += self.index.assertion_count() - old_assertions;
// self.churn.endpoints_added += 1;
if let Ok(o) = Observe::try_from(&a) {
if let Ok(o) = language().parse::<Observe>(&a) {
self.index.add_observer(t, &o.pattern, &o.observer);
// self.churn.observers_added += 1;
self.handle_map.insert(h, (a, Some(o)));

View File

@ -1,5 +1,6 @@
//! Actor errors.
use super::language;
use super::schemas::internal_protocol::_Any;
#[doc(inline)]
@ -7,7 +8,8 @@ pub use super::schemas::internal_protocol::Error;
use preserves::value::NestedValue;
use preserves::value::Value;
use preserves_schema::support::ParseError;
use preserves_schema::Codec;
use preserves_schema::ParseError;
impl std::error::Error for Error {}
@ -42,7 +44,7 @@ pub fn encode_error(result: Result<(), Error>) -> _Any {
}
Err(e) => {
let mut r = Value::record(_Any::symbol("Err"), 1);
r.fields_vec_mut().push((&e).into());
r.fields_vec_mut().push(language().unparse(&e));
r.finish().wrap()
}
}

View File

@ -8,7 +8,6 @@ pub use preserves;
pub mod actor;
pub mod bag;
pub mod convert;
pub mod dataspace;
pub mod during;
pub mod error;
@ -33,3 +32,7 @@ pub use during::entity;
#[doc(inline)]
pub use tracer::convenient_logging;
preserves_schema::define_language!(language(): Language<actor::AnyValue> {
syndicate: schemas::Language,
});

View File

@ -1,6 +1,7 @@
use bytes::Buf;
use bytes::BytesMut;
use crate::language;
use crate::actor::*;
use crate::during;
use crate::error::Error;
@ -29,8 +30,9 @@ use preserves::value::ViaCodec;
use preserves::value::Writer;
use preserves::value::signed_integer::SignedInteger;
use preserves_schema::support::Deserialize;
use preserves_schema::support::ParseError;
use preserves_schema::Codec;
use preserves_schema::Deserialize;
use preserves_schema::ParseError;
use std::io;
use std::pin::Pin;
@ -191,7 +193,7 @@ pub fn connect_stream<I, O, E, F>(
let denotation = a.value().to_embedded()?;
f(state, t, Arc::clone(denotation))
}));
gatekeeper.assert(t, &gatekeeper::Resolve {
gatekeeper.assert(t, language(), &gatekeeper::Resolve {
sturdyref,
observer: Cap::new(&main_entity),
});
@ -259,13 +261,13 @@ impl TunnelRelay {
match src.peek() {
Ok(v) => if v >= 128 {
self.output_text = false;
let mut r = src.packed::<_, AnyValue, _>(&mut dec);
let mut r = src.packed(&mut dec);
let res = P::Packet::deserialize(&mut r);
(res, r.source.index)
} else {
self.output_text = true;
let mut dec = ViaCodec::new(dec);
let mut r = src.text::<_, AnyValue, _>(&mut dec);
let mut r = src.text::<AnyValue, _>(&mut dec);
let res = P::Packet::deserialize(&mut r);
(res, r.source.index)
},
@ -310,7 +312,7 @@ impl TunnelRelay {
ws.inc_ref(),
None => {
tracing::warn!(
event = ?AnyValue::from(&P::TurnEvent { oid, event }),
event = ?language().unparse(&P::TurnEvent { oid, event }),
"Cannot deliver event: nonexistent oid");
return Ok(());
}
@ -322,7 +324,7 @@ impl TunnelRelay {
let P::Assert { assertion: P::Assertion(a), handle: remote_handle } = *b;
a.foreach_embedded::<_, Error>(
&mut |r| Ok(pins.push(self.membranes.lookup_ref(r))))?;
if let Some(local_handle) = target.assert(t, a) {
if let Some(local_handle) = target.assert(t, &(), &a) {
if let Some(_) = self.inbound_assertions.insert(remote_handle, (local_handle, pins)) {
return Err(error("Assertion with duplicate handle", AnyValue::new(false)));
}
@ -334,7 +336,7 @@ impl TunnelRelay {
P::Event::Retract(b) => {
let P::Retract { handle: remote_handle } = *b;
let (local_handle, previous_pins) = match self.inbound_assertions.remove(&remote_handle) {
None => return Err(error("Retraction of nonexistent handle", AnyValue::from(&remote_handle))),
None => return Err(error("Retraction of nonexistent handle", language().unparse(&remote_handle))),
Some(wss) => wss,
};
self.membranes.release(previous_pins);
@ -353,7 +355,7 @@ impl TunnelRelay {
_ => Ok(())
}
})?;
target.message(t, a);
target.message(t, &(), &a);
self.membranes.release(pins);
dump_membranes!(self.membranes);
}
@ -368,7 +370,7 @@ impl TunnelRelay {
}
impl Entity<Synced> for SyncPeer {
fn message(&mut self, t: &mut Activation, _a: Synced) -> ActorResult {
self.peer.message(t, AnyValue::new(true));
self.peer.message(t, &(), &AnyValue::new(true));
let mut g = self.relay_ref.lock().expect("unpoisoned");
let tr = g.as_mut().expect("initialized");
tr.membranes.release(std::mem::take(&mut self.pins));
@ -393,7 +395,7 @@ impl TunnelRelay {
fn outbound_event_bookkeeping(
&mut self,
t: &mut Activation,
_t: &mut Activation,
remote_oid: sturdy::Oid,
event: &P::Event,
) -> ActorResult {
@ -435,15 +437,15 @@ impl TunnelRelay {
}
pub fn send_packet(&mut self, account: &Arc<Account>, cost: usize, p: P::Packet) -> ActorResult {
let item = AnyValue::from(&p);
let item = language().unparse(&p);
tracing::trace!(packet = ?item, "<--");
let bs = if self.output_text {
let mut s = TextWriter::encode::<_, AnyValue, _>(&mut self.membranes, &item)?;
let mut s = TextWriter::encode(&mut self.membranes, &item)?;
s.push('\n');
s.into_bytes()
} else {
PackedWriter::encode::<_, AnyValue, _>(&mut self.membranes, &item)?
PackedWriter::encode(&mut self.membranes, &item)?
};
let _ = self.output.send(LoanedItem::new(account, cost, bs));
@ -563,7 +565,7 @@ impl DomainEncode<P::_Ptr> for Membranes {
w: &mut W,
d: &P::_Ptr,
) -> io::Result<()> {
w.write(&mut NoEmbeddedDomainCodec, &AnyValue::from(&match self.exported.ref_map.get(d) {
w.write(&mut NoEmbeddedDomainCodec, &language().unparse(&match self.exported.ref_map.get(d) {
Some(ws) => sturdy::WireRef::Mine {
oid: Box::new(ws.inc_ref().oid.clone()),
},
@ -711,7 +713,7 @@ impl Entity<AnyValue> for RelayEntity {
let mut g = self.relay_ref.lock().expect("unpoisoned");
let tr = g.as_mut().expect("initialized");
tr.send_event(t, self.oid.clone(), P::Event::Sync(Box::new(P::Sync {
peer: Cap::guard(&peer)
peer: Cap::guard(Arc::new(()), peer)
})))
}
}

View File

@ -126,7 +126,7 @@ impl Index {
|es, cs| {
if es.cached_captures.change(cs.clone(), 1) == bag::Net::AbsentToPresent {
for (observer, capture_map) in &mut es.endpoints {
if let Some(h) = observer.assert(t, cs.clone()) {
if let Some(h) = observer.assert(t, &(), &cs) {
capture_map.insert(cs.clone(), h);
}
}
@ -175,7 +175,7 @@ impl Index {
|es, cs| {
// *delivery_count += es.endpoints.len();
for observer in es.endpoints.keys() {
observer.message(t, cs.clone());
observer.message(t, &(), &cs);
}
}).perform(&mut self.root);
}
@ -446,7 +446,7 @@ impl Continuation {
});
let mut capture_map = Map::new();
for cs in endpoints.cached_captures.keys() {
if let Some(h) = observer.assert(t, cs.clone()) {
if let Some(h) = observer.assert(t, &(), cs) {
capture_map.insert(cs.clone(), h);
}
}

View File

@ -4,17 +4,17 @@ use hmac::{Hmac, Mac, NewMac, crypto_mac::MacError};
use preserves::hex::HexParser;
use preserves::hex::HexFormatter;
use preserves::value::Embeddable;
use preserves::value::NestedValue;
use preserves::value::NoEmbeddedDomainCodec;
use preserves::value::packed::PackedWriter;
use preserves::value::packed::from_bytes;
use preserves_schema::Codec;
use sha2::Sha256;
use std::convert::TryFrom;
use std::io;
use super::language;
use super::error::Error;
use super::rewrite::CaveatError;
pub use super::schemas::sturdy::*;
@ -54,11 +54,11 @@ pub fn new_key() -> Vec<u8> {
buf
}
pub fn encode<D: Embeddable, N: NestedValue<D>>(v: &N) -> Vec<u8> {
PackedWriter::encode::<D, N, _>(&mut NoEmbeddedDomainCodec, v).expect("no io errors")
pub fn encode<N: NestedValue>(v: &N) -> Vec<u8> {
PackedWriter::encode(&mut NoEmbeddedDomainCodec, v).expect("no io errors")
}
pub fn decode<D: Embeddable, N: NestedValue<D>>(bs: &[u8]) -> io::Result<N> {
pub fn decode<N: NestedValue>(bs: &[u8]) -> io::Result<N> {
from_bytes(bs, &mut NoEmbeddedDomainCodec)
}
@ -70,11 +70,11 @@ impl SturdyRef {
pub fn from_hex(s: &str) -> Result<Self, Error> {
let binary = HexParser::Liberal.decode(s).expect("hex encoded sturdyref");
Ok(Self::try_from(&decode::<_, _Any>(&binary)?)?)
Ok(language().parse(&decode(&binary)?)?)
}
pub fn to_hex(&self) -> String {
HexFormatter::Packed.encode(&encode::<_, _Any>(&self.into()))
HexFormatter::Packed.encode(&encode(&language().unparse(self)))
}
pub fn validate_and_attenuate(
@ -99,7 +99,7 @@ impl SturdyRef {
let mut key = key.to_vec();
key = signature(&key, &encode(oid));
for c in caveat_chain {
key = signature(&key, &encode(&_Any::from(c)));
key = signature(&key, &encode(&language().unparse(c)));
}
if &key == sig {
Ok(())
@ -114,7 +114,7 @@ impl SturdyRef {
let oid = oid.clone();
let mut caveat_chain = caveat_chain.clone();
caveat_chain.push(attenuation.clone());
let sig = signature(&sig, &encode(&_Any::from(attenuation)));
let sig = signature(&sig, &encode(&language().unparse(attenuation)));
Ok(SturdyRef { oid, caveat_chain, sig })
}
}