diff --git a/Cargo.lock b/Cargo.lock index 3c6c7a3..f24f18a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1053,9 +1053,9 @@ checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" [[package]] name = "preserves" -version = "1.0.0" +version = "2.0.0-rc1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41e4496df1095b2d85d6ac4cad0b2b8c77ae9224aca1789ad41b0654c79c64eb" +checksum = "e281800f60acacfaf49115f3032d3c9939a4eb4d6921cb280d7593744cada262" dependencies = [ "base64", "dtoa", @@ -1067,9 +1067,9 @@ dependencies = [ [[package]] name = "preserves-schema" -version = "1.0.0" +version = "2.0.0-rc1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04ed3d65a4e263db4117a86e209ef1732e52749aa8fbaeb6b9b6ed3b49da1d94" +checksum = "192e1fb003f58e76d60a70536deb6bc731ad9132ffb095ea31e7ecebaaf8de0e" dependencies = [ "convert_case", "glob", @@ -1458,13 +1458,14 @@ dependencies = [ [[package]] name = "syndicate" -version = "0.11.0" +version = "0.12.0" dependencies = [ "bytes", "criterion", "futures", "getrandom", "hmac", + "lazy_static", "openssl", "preserves", "preserves-schema", @@ -1478,7 +1479,7 @@ dependencies = [ [[package]] name = "syndicate-macros" -version = "0.6.0" +version = "0.7.0" dependencies = [ "proc-macro2", "quote", @@ -1488,9 +1489,10 @@ dependencies = [ [[package]] name = "syndicate-server" -version = "0.11.0" +version = "0.12.0" dependencies = [ "futures", + "lazy_static", "notify", "preserves-schema", "structopt", diff --git a/Cargo.toml b/Cargo.toml index fd6256c..05a0a4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,8 +8,8 @@ members = [ ] # [patch.crates-io] -# preserves = { path = "/home/tonyg/src/preserves/implementations/rust/preserves" } -# preserves-schema = { path = "/home/tonyg/src/preserves/implementations/rust/preserves-schema" } +# preserves = { path = "../preserves/implementations/rust/preserves" } +# preserves-schema = { path = "../preserves/implementations/rust/preserves-schema" } [profile.release] strip = true diff --git a/syndicate-macros/Cargo.toml b/syndicate-macros/Cargo.toml index 6db6073..a6b93d4 100644 --- a/syndicate-macros/Cargo.toml +++ b/syndicate-macros/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "syndicate-macros" -version = "0.6.0" +version = "0.7.0" authors = ["Tony Garnock-Jones "] edition = "2018" @@ -13,7 +13,7 @@ license = "Apache-2.0" proc-macro = true [dependencies] -syndicate = { path = "../syndicate", version = "^0.11.0"} +syndicate = { path = "../syndicate", version = "^0.12.0"} proc-macro2 = { version = "^1.0", features = ["span-locations"] } quote = "^1.0" diff --git a/syndicate-macros/src/dur.rs b/syndicate-macros/src/dur.rs new file mode 100644 index 0000000..d995c46 --- /dev/null +++ b/syndicate-macros/src/dur.rs @@ -0,0 +1,87 @@ +use proc_macro2::Span; + +use quote::quote_spanned; + +use syn::parse_macro_input; +use syn::Expr; +use syn::Ident; +use syn::LitInt; +use syn::Token; +use syn::Type; +use syn::parse::Error; +use syn::parse::Parse; +use syn::parse::ParseStream; + +use crate::stx::Stx; +use crate::pat; + +#[derive(Debug)] +struct During { + turn_stx: Expr, + ds_stx: Expr, + pat_stx: Stx, + body_stx: Expr, +} + +fn comma_parse(input: ParseStream) -> syn::parse::Result { + let _: Token![,] = input.parse()?; + input.parse() +} + +impl Parse for During { + fn parse(input: ParseStream) -> syn::parse::Result { + Ok(During { + turn_stx: input.parse()?, + ds_stx: comma_parse(input)?, + pat_stx: comma_parse(input)?, + body_stx: comma_parse(input)?, + }) + } +} + +impl During { + fn bindings(&self) -> (Vec, Vec, Vec) { + let mut ids = vec![]; + let mut tys = vec![]; + let mut indexes = vec![]; + for (i, (maybe_id, ty)) in self.pat_stx.bindings().into_iter().enumerate() { + if let Some(id) = maybe_id { + indexes.push(LitInt::new(&i.to_string(), id.span())); + ids.push(id); + tys.push(ty); + } + } + (ids, tys, indexes) + } +} + +pub fn during(src: proc_macro::TokenStream) -> proc_macro::TokenStream { + let d = parse_macro_input!(src as During); + let During { turn_stx, ds_stx, pat_stx, body_stx } = &d; + let (varname_stx, type_stx, index_stx) = d.bindings(); + let binding_count = varname_stx.len(); + let pat_stx_expr = match pat::to_pattern_expr(pat_stx) { + Ok(e) => e, + Err(e) => return Error::new(Span::call_site(), e).to_compile_error().into(), + }; + (quote_spanned!{Span::mixed_site()=> { + let monitor = syndicate::during::entity(()) + .on_asserted_facet(|_, t, captures: syndicate::actor::AnyValue| { + if let Some(captures) = captures.value().as_sequence() { + if captures.len() == #binding_count { + #(let #varname_stx = match #type_stx::try_from(captures[#index_stx]) { + Ok(v) => v, + Err(_) => return Ok(()), + };)* + return (#body_stx)(t); + } + } + Ok(()) + }) + .create_cap(#turn_stx); + #ds_stx.assert(#turn_stx, &syndicate::schemas::dataspace::Observe { + pattern: #pat_stx_expr, + observer: monitor, + }); + }}).into() +} diff --git a/syndicate-macros/src/lib.rs b/syndicate-macros/src/lib.rs index 66978ae..28303d5 100644 --- a/syndicate-macros/src/lib.rs +++ b/syndicate-macros/src/lib.rs @@ -18,8 +18,9 @@ use syn::Ident; use syn::Lit; use syn::LitByteStr; -mod stx; +mod dur; mod pat; +mod stx; mod val; use pat::lit; @@ -261,3 +262,10 @@ pub fn template(src: proc_macro::TokenStream) -> proc_macro::TokenStream { pub fn pattern(src: proc_macro::TokenStream) -> proc_macro::TokenStream { pat::pattern(src) } + +//--------------------------------------------------------------------------- + +#[proc_macro] +pub fn during(src: proc_macro::TokenStream) -> proc_macro::TokenStream { + dur::during(src) +} diff --git a/syndicate-macros/src/pat.rs b/syndicate-macros/src/pat.rs index bece22d..8e5e3d6 100644 --- a/syndicate-macros/src/pat.rs +++ b/syndicate-macros/src/pat.rs @@ -17,14 +17,14 @@ pub fn lit(e: T) -> TokenStream2 { syndicate::schemas::dataspace_patterns::DLit { value: #e }))) } -fn compile_sequence_members(stxs: Vec) -> Result, &'static str> { - stxs.into_iter().enumerate().map(|(i, stx)| { +fn compile_sequence_members(stxs: &Vec) -> Result, &'static str> { + stxs.iter().enumerate().map(|(i, stx)| { let p = to_pattern_expr(stx)?; Ok(quote!((#i .into(), #p))) }).collect() } -fn to_pattern_expr(stx: Stx) -> Result { +pub fn to_pattern_expr(stx: &Stx) -> Result { #[allow(non_snake_case)] let P_: TokenStream2 = quote!(syndicate::schemas::dataspace_patterns); #[allow(non_snake_case)] @@ -35,21 +35,24 @@ fn to_pattern_expr(stx: Stx) -> Result { match stx { Stx::Atom(v) => Ok(lit(value_to_value_expr(&v))), - Stx::Binder(_, p) => { - let pe = to_pattern_expr(*p)?; - Ok(quote!(#P_::Pattern::DBind(Box::new(#P_::DBind { pattern: #pe })))) + Stx::Binder(_, maybe_ty, maybe_pat) => { + let inner_pat_expr = match maybe_pat { + Some(p) => to_pattern_expr(&*p)?, + None => match maybe_ty { + Some(ty) => quote!(#ty::wildcard_dataspace_pattern()), + None => to_pattern_expr(&Stx::Discard)?, + } + }; + Ok(quote!(#P_::Pattern::DBind(Box::new(#P_::DBind { pattern: #inner_pat_expr })))) } Stx::Subst(e) => Ok(lit(e)), Stx::Discard => Ok(quote!(#P_::Pattern::DDiscard(Box::new(#P_::DDiscard)))), - Stx::Ctor1(_, _) => todo!(), - Stx::CtorN(_, _) => todo!(), - Stx::Rec(l, fs) => { let arity = fs.len() as u128; - let label = to_value_expr(*l)?; + let label = to_value_expr(&*l)?; let members = compile_sequence_members(fs)?; Ok(quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Rec { ctor: Box::new(#P_::CRec { label: #label, arity: #arity .into() }), @@ -65,9 +68,9 @@ fn to_pattern_expr(stx: Stx) -> Result { })))) } Stx::Set(stxs) => - Ok(lit(emit_set(&stxs.into_iter().map(to_value_expr).collect::,_>>()?))), + Ok(lit(emit_set(&stxs.iter().map(to_value_expr).collect::,_>>()?))), Stx::Dict(d) => { - let members = d.into_iter().map(|(k, v)| { + let members = d.iter().map(|(k, v)| { let k = to_value_expr(k)?; let v = to_pattern_expr(v)?; Ok(quote!((#k, #v))) @@ -82,7 +85,8 @@ fn to_pattern_expr(stx: Stx) -> Result { pub fn pattern(src: TokenStream) -> TokenStream { let src2 = src.clone(); - let e = to_pattern_expr(parse_macro_input!(src2 as Stx)).expect("Cannot compile pattern").into(); + let e = to_pattern_expr(&parse_macro_input!(src2 as Stx)) + .expect("Cannot compile pattern").into(); // println!("\n{:#} -->\n{:#}\n", &src, &e); e } diff --git a/syndicate-macros/src/stx.rs b/syndicate-macros/src/stx.rs index 1ae3baf..194e56b 100644 --- a/syndicate-macros/src/stx.rs +++ b/syndicate-macros/src/stx.rs @@ -3,12 +3,16 @@ use proc_macro2::LineColumn; use proc_macro2::TokenTree; use syn::ExprLit; +use syn::Ident; use syn::Lit; use syn::Result; +use syn::Type; use syn::buffer::Cursor; use syn::parse::Error; use syn::parse::Parse; +use syn::parse::Parser; use syn::parse::ParseStream; +use syn::parse_str; use syndicate::value::Float; use syndicate::value::Double; @@ -18,11 +22,9 @@ use syndicate::value::NestedValue; #[derive(Debug, Clone)] pub enum Stx { Atom(IOValue), - Binder(Option, Box), + Binder(Option, Option, Option>), Discard, Subst(TokenTree), - Ctor1(String, Box), - CtorN(String, Vec<(Stx, Stx)>), Rec(Box, Vec), Seq(Vec), Set(Vec), @@ -35,6 +37,39 @@ impl Parse for Stx { } } +impl Stx { + pub fn bindings(&self) -> Vec<(Option, Type)> { + let mut bs = vec![]; + self._bindings(&mut bs); + bs + } + + fn _bindings(&self, bs: &mut Vec<(Option, Type)>) { + match self { + Stx::Atom(_) | Stx::Discard | Stx::Subst(_) => (), + Stx::Binder(id, ty, pat) => { + bs.push((id.clone(), + ty.clone().unwrap_or_else( + || parse_str("syndicate::actor::AnyValue").unwrap()))); + if let Some(p) = pat { + p._bindings(bs); + } + }, + Stx::Rec(l, fs) => { + l._bindings(bs); + fs.iter().for_each(|f| f._bindings(bs)); + }, + Stx::Seq(vs) => vs.iter().for_each(|v| v._bindings(bs)), + Stx::Set(vs) => vs.iter().for_each(|v| v._bindings(bs)), + Stx::Dict(kvs) => kvs.iter().for_each(|(_k, v)| v._bindings(bs)), + } + } +} + +fn punct_char(c: Cursor) -> Option<(char, Cursor)> { + c.punct().map(|(p, c)| (p.as_char(), c)) +} + fn parse_id(mut c: Cursor) -> Result<(String, Cursor)> { let mut id = String::new(); let mut prev_pos = c.span().start(); @@ -83,11 +118,9 @@ fn parse_seq(delim_ch: char, mut c: Cursor) -> Result<(Vec, Cursor)> { fn skip_commas(mut c: Cursor) -> Cursor { loop { - if let Some((p, next)) = c.punct() { - if p.as_char() == ',' { - c = next; - continue; - } + if let Some((',', next)) = punct_char(c) { + c = next; + continue; } return c; } @@ -121,25 +154,20 @@ fn parse_group<'c, R, F: Fn(Cursor<'c>) -> Result<(R, Cursor<'c>)>>( fn parse_kv(c: Cursor) -> Result<((Stx, Stx), Cursor)> { let (k, c) = parse1(c)?; - if let Some((p, c)) = c.punct() { - if p.as_char() == ':' { - let (v, c) = parse1(c)?; - return Ok(((k, v), c)); - } + if let Some((':', c)) = punct_char(c) { + let (v, c) = parse1(c)?; + return Ok(((k, v), c)); } Err(Error::new(c.span(), "Expected ':'")) } -fn adjacent_id(pos: LineColumn, c: Cursor) -> (Option, Cursor) { +fn adjacent_ident(pos: LineColumn, c: Cursor) -> (Option, Cursor) { if c.span().start() != pos { (None, c) + } else if let Some((id, next)) = c.ident() { + (Some(id), next) } else { - let (s, next) = parse_id(c).unwrap(); - if s.is_empty() { - (None, c) - } else { - (Some(s), next) - } + (None, c) } } @@ -151,6 +179,27 @@ fn parse_exactly_one<'c>(c: Cursor<'c>) -> Result { }) } +fn parse_generic(mut c: Cursor) -> Option<(T, Cursor)> { + match T::parse.parse2(c.token_stream()) { + Ok(t) => Some((t, Cursor::empty())), // because parse2 checks for end-of-stream! + Err(e) => { + // OK, because parse2 checks for end-of-stream, let's chop + // the input at the position of the error and try again (!). + let mut collected = Vec::new(); + let upto = e.span().start(); + while !c.eof() && c.span().start() != upto { + let (tt, next) = c.token_tree().unwrap(); + collected.push(tt); + c = next; + } + match T::parse.parse2(collected.into_iter().collect()) { + Ok(t) => Some((t, c)), + Err(_) => None, + } + } + } +} + fn parse1(c: Cursor) -> Result<(Stx, Cursor)> { if let Some((p, next)) = c.punct() { match p.as_char() { @@ -162,12 +211,20 @@ fn parse1(c: Cursor) -> Result<(Stx, Cursor)> { '{' => parse_group(Delimiter::Brace, parse_kv, c).map(|(q,c)| (Stx::Dict(q),c)), '[' => parse_group(Delimiter::Bracket, parse1, c).map(|(q,c)| (Stx::Seq(q),c)), '$' => { - let (maybe_id, next) = adjacent_id(p.span().end(), next); - if let Some((inner, _, next)) = next.group(Delimiter::Parenthesis) { - parse_exactly_one(inner).map( - |q| (Stx::Binder(maybe_id, Box::new(q)), next)) + let (maybe_id, next) = adjacent_ident(p.span().end(), next); + let (maybe_type, next) = if let Some((':', next)) = punct_char(next) { + match parse_generic::(next) { + Some((t, next)) => (Some(t), next), + None => (None, next) + } } else { - Ok((Stx::Binder(maybe_id, Box::new(Stx::Discard)), next)) + (None, next) + }; + if let Some((inner, _, next)) = next.group(Delimiter::Brace) { + parse_exactly_one(inner).map( + |q| (Stx::Binder(maybe_id, maybe_type, Some(Box::new(q))), next)) + } else { + Ok((Stx::Binder(maybe_id, maybe_type, None), next)) } } '#' => { @@ -185,17 +242,7 @@ fn parse1(c: Cursor) -> Result<(Stx, Cursor)> { if i.to_string() == "_" { Ok((Stx::Discard, next)) } else { - parse_id(c).and_then(|(q,c)| { - if let Some((inner, _, next)) = c.group(Delimiter::Parenthesis) { - match parse_group_inner(inner, parse_kv, next) { - Ok((kvs, next)) => Ok((Stx::CtorN(q, kvs), next)), - Err(_) => parse_exactly_one(inner).map( - |v| (Stx::Ctor1(q, Box::new(v)), next)), - } - } else { - Ok((Stx::Atom(IOValue::symbol(&q)), c)) - } - }) + parse_id(c).and_then(|(q,c)| Ok((Stx::Atom(IOValue::symbol(&q)), c))) } } else if let Some((literal, next)) = c.literal() { let t: ExprLit = syn::parse_str(&literal.to_string())?; diff --git a/syndicate-macros/src/val.rs b/syndicate-macros/src/val.rs index 05a12da..5902123 100644 --- a/syndicate-macros/src/val.rs +++ b/syndicate-macros/src/val.rs @@ -84,18 +84,15 @@ pub fn value_to_value_expr(v: &IOValue) -> TokenStream2 { } } -pub fn to_value_expr(stx: Stx) -> Result { +pub fn to_value_expr(stx: &Stx) -> Result { match stx { Stx::Atom(v) => Ok(value_to_value_expr(&v)), - Stx::Binder(_, _) => Err("Cannot use binder in literal value"), + Stx::Binder(_, _, _) => Err("Cannot use binder in literal value"), Stx::Discard => Err("Cannot use discard in literal value"), - Stx::Subst(e) => Ok(e.into()), - - Stx::Ctor1(_, _) => todo!(), - Stx::CtorN(_, _) => todo!(), + Stx::Subst(e) => Ok(e.clone().into()), Stx::Rec(l, fs) => - Ok(emit_record(to_value_expr(*l)?, + Ok(emit_record(to_value_expr(&*l)?, &fs.into_iter().map(to_value_expr).collect::,_>>()?)), Stx::Seq(vs) => Ok(emit_seq(&vs.into_iter().map(to_value_expr).collect::,_>>()?)), diff --git a/syndicate-server/Cargo.toml b/syndicate-server/Cargo.toml index 236c124..6cb577d 100644 --- a/syndicate-server/Cargo.toml +++ b/syndicate-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "syndicate-server" -version = "0.11.0" +version = "0.12.0" authors = ["Tony Garnock-Jones "] edition = "2018" @@ -10,17 +10,16 @@ repository = "https://git.syndicate-lang.org/syndicate-lang/syndicate-rs" license = "Apache-2.0" [build-dependencies] -preserves-schema = "1.0.0" +preserves-schema = "2.0.0-rc1" [dependencies] -preserves-schema = "1.0.0" -syndicate = { path = "../syndicate", version = "^0.11.0"} -syndicate-macros = { path = "../syndicate-macros", version = "^0.6.0"} +preserves-schema = "2.0.0-rc1" +syndicate = { path = "../syndicate", version = "^0.12.0"} +syndicate-macros = { path = "../syndicate-macros", version = "^0.7.0"} futures = "0.3" - +lazy_static = "1.4" notify = "4.0" - structopt = "0.3" tungstenite = "0.13" diff --git a/syndicate-server/build.rs b/syndicate-server/build.rs index 34850d5..806fdc4 100644 --- a/syndicate-server/build.rs +++ b/syndicate-server/build.rs @@ -8,8 +8,12 @@ fn main() -> std::io::Result<()> { let mut c = CompilerConfig::new(gen_dir, "crate::schemas".to_owned()); // c.plugins.push(Box::new(syndicate_plugins::PatternPlugin)); - c.module_aliases.insert(vec!["EntityRef".to_owned()], "syndicate::actor".to_owned()); - c.module_aliases.insert(vec!["TransportAddress".to_owned()], "syndicate::schemas::transport_address".to_owned()); + c.add_external_module(ExternalModule::new(vec!["EntityRef".to_owned()], "syndicate::actor")); + c.add_external_module( + ExternalModule::new(vec!["TransportAddress".to_owned()], + "syndicate::schemas::transport_address") + .set_fallback_language_types( + |v| vec![format!("syndicate::schemas::Language<{}>", v)].into_iter().collect())); let inputs = expand_inputs(&vec!["protocols/schema-bundle.bin".to_owned()])?; c.load_schemas_and_bundles(&inputs)?; diff --git a/syndicate-server/examples/consumer.rs b/syndicate-server/examples/consumer.rs index bee600f..8ca6c19 100644 --- a/syndicate-server/examples/consumer.rs +++ b/syndicate-server/examples/consumer.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use structopt::StructOpt; use syndicate::actor::*; +use syndicate::language; use syndicate::relay; use syndicate::schemas::dataspace::Observe; use syndicate::sturdy; @@ -42,7 +43,7 @@ async fn main() -> Result<(), Box> { Ok(()) }) .create_cap(t); - ds.assert(t, &Observe { + ds.assert(t, language(), &Observe { pattern: syndicate_macros::pattern!{}, observer: Arc::clone(&consumer), }); diff --git a/syndicate-server/examples/pingpong.rs b/syndicate-server/examples/pingpong.rs index 6cbbd67..e8a0441 100644 --- a/syndicate-server/examples/pingpong.rs +++ b/syndicate-server/examples/pingpong.rs @@ -4,6 +4,7 @@ use std::time::SystemTime; use structopt::StructOpt; use syndicate::actor::*; +use syndicate::language; use syndicate::relay; use syndicate::schemas::dataspace::Observe; use syndicate::sturdy; @@ -137,9 +138,9 @@ async fn main() -> Result<(), Box> { let padding = &bindings[1]; if should_echo || (report_latency_every == 0) { - ds.message(t, simple_record2(&send_label, - timestamp.clone(), - padding.clone())); + ds.message(t, &(), &simple_record2(&send_label, + timestamp.clone(), + padding.clone())); } else { if let None = current_reply { turn_counter += 1; @@ -159,7 +160,7 @@ async fn main() -> Result<(), Box> { Value::from(now()).wrap(), padding.clone())); } - ds.message(t, current_reply.as_ref().expect("some reply").clone()); + ds.message(t, &(), current_reply.as_ref().expect("some reply")); } } } @@ -168,7 +169,7 @@ async fn main() -> Result<(), Box> { Cap::new(&self_ref) }; - ds.assert(t, &Observe { + ds.assert(t, language(), &Observe { pattern: { let recv_label = AnyValue::symbol(recv_label); syndicate_macros::pattern!{<#(recv_label) $ $>} diff --git a/syndicate-server/examples/state-consumer.rs b/syndicate-server/examples/state-consumer.rs index ac1d63f..9dbfaa6 100644 --- a/syndicate-server/examples/state-consumer.rs +++ b/syndicate-server/examples/state-consumer.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use structopt::StructOpt; use syndicate::actor::*; +use syndicate::language; use syndicate::relay; use syndicate::schemas::dataspace::Observe; use syndicate::sturdy; @@ -63,7 +64,7 @@ async fn main() -> Result<(), Box> { }).create_cap(t) }; - ds.assert(t, &Observe { + ds.assert(t, language(), &Observe { pattern: syndicate_macros::pattern!{}, observer: Arc::clone(&consumer), }); diff --git a/syndicate-server/protocols/schema-bundle.bin b/syndicate-server/protocols/schema-bundle.bin index 14e4804..48f3023 100644 --- a/syndicate-server/protocols/schema-bundle.bin +++ b/syndicate-server/protocols/schema-bundle.bin @@ -1 +1,4 @@ -´³bundle·µ³internalServices„´³schema·³version‘³ definitions·³ DebtReporter´³lit³ debt-reporter„³ ConfigWatcher´³rec´³lit³config-watcher„´³tupleµ´³named³path´³atom³String„„„„„³TcpRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Tcp„„„„„³UnixRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Unix„„„„„„³ embeddedType€„„„„ \ No newline at end of file +´³bundle·µ³externalServices„´³schema·³version‘³ definitions·³Service´³refµ„³ DaemonService„³ClearEnv´³orµµ±present´³dict·³clearEnv´³named³clearEnv´³atom³Boolean„„„„„µ±absent´³dict·„„„„„³DaemonId³any³EnvValue´³orµµ±set´³atom³String„„µ±remove´³lit€„„„„³ DaemonDir´³orµµ±present´³dict·³dir´³named³dir´³atom³String„„„„„µ±absent´³dict·„„„„„³ DaemonEnv´³orµµ±present´³dict·³env´³named³env´³dictof´³atom³String„´³refµ„³EnvValue„„„„„„µ±absent´³dict·„„„„„³ +DaemonSpec´³andµ´³dict·³argv´³named³argv´³seqof´³atom³String„„„„„´³named³env´³refµ„³ DaemonEnv„„´³named³dir´³refµ„³ DaemonDir„„´³named³clearEnv´³refµ„³ClearEnv„„„„³ DaemonProcess´³rec´³lit³daemon„´³tupleµ´³named³id´³refµ„³DaemonId„„´³named³config´³refµ„³ +DaemonSpec„„„„„³ DaemonService´³rec´³lit³daemon„´³tupleµ´³named³id´³refµ„³DaemonId„„„„„³ServiceDependency´³rec´³lit³ +depends-on„´³tupleµ´³named³depender³any„´³named³dependee³any„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³internalServices„´³schema·³version‘³ definitions·³ Milestone´³rec´³lit³ milestone„´³tupleµ´³named³name³any„„„„³ DebtReporter´³lit³ debt-reporter„³ ConfigWatcher´³rec´³lit³config-watcher„´³tupleµ´³named³path´³atom³String„„„„„³TcpRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Tcp„„„„„³UnixRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Unix„„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„„„ \ No newline at end of file diff --git a/syndicate-server/protocols/schemas/externalServices.prs b/syndicate-server/protocols/schemas/externalServices.prs new file mode 100644 index 0000000..7eb8398 --- /dev/null +++ b/syndicate-server/protocols/schemas/externalServices.prs @@ -0,0 +1,17 @@ +version 1 . +embeddedType EntityRef.Cap . + +Service = DaemonService . + +DaemonService = . + +ServiceDependency = . + +DaemonProcess = . +DaemonId = any . +DaemonSpec = { argv: [string ...] } & @env DaemonEnv & @dir DaemonDir & @clearEnv ClearEnv . +DaemonEnv = @present { env: { string: EnvValue ...:... } } / @absent {} . +DaemonDir = @present { dir: string } / @absent {} . +ClearEnv = @present { clearEnv: bool } / @absent {} . + +EnvValue = @set string / @remove #f . diff --git a/syndicate-server/protocols/schemas/internalServices.prs b/syndicate-server/protocols/schemas/internalServices.prs index 1c8fdfd..b0f5f89 100644 --- a/syndicate-server/protocols/schemas/internalServices.prs +++ b/syndicate-server/protocols/schemas/internalServices.prs @@ -1,7 +1,9 @@ version 1 . +embeddedType EntityRef.Cap . DebtReporter = =debt-reporter . TcpRelayListener = . UnixRelayListener = . ConfigWatcher = . +Milestone = . diff --git a/syndicate-server/src/gatekeeper.rs b/syndicate-server/src/gatekeeper.rs index e5b9e01..aa594c1 100644 --- a/syndicate-server/src/gatekeeper.rs +++ b/syndicate-server/src/gatekeeper.rs @@ -1,3 +1,5 @@ +use preserves_schema::Codec; + use std::sync::Arc; use syndicate::actor::*; @@ -6,6 +8,8 @@ use syndicate::schemas::gatekeeper; use syndicate::sturdy; use syndicate::value::NestedValue; +use crate::language::language; + pub fn bind( t: &mut Activation, ds: &Arc, @@ -14,8 +18,8 @@ pub fn bind( target: Arc, ) { let sr = sturdy::SturdyRef::mint(oid.clone(), &key); - tracing::info!(cap = ?AnyValue::from(&sr), hex = %sr.to_hex()); - ds.assert(t, &gatekeeper::Bind { oid, key: key.to_vec(), target }); + tracing::info!(cap = ?language().unparse(&sr), hex = %sr.to_hex()); + ds.assert(t, language(), &gatekeeper::Bind { oid, key: key.to_vec(), target }); } pub fn handle_resolve( @@ -34,15 +38,15 @@ pub fn handle_resolve( let unattenuated_target = bindings[1].value().to_embedded()?; match sturdyref.validate_and_attenuate(key, unattenuated_target) { Err(e) => { - tracing::warn!(sturdyref = ?AnyValue::from(&sturdyref), + tracing::warn!(sturdyref = ?language().unparse(&sturdyref), "sturdyref failed validation: {}", e); Ok(None) }, Ok(target) => { - tracing::trace!(sturdyref = ?AnyValue::from(&sturdyref), + tracing::trace!(sturdyref = ?language().unparse(&sturdyref), ?target, "sturdyref resolved"); - if let Some(h) = observer.assert(t, AnyValue::domain(target)) { + if let Some(h) = observer.assert(t, &(), &AnyValue::domain(target)) { Ok(Some(Box::new(move |_observer, t| Ok(t.retract(h))))) } else { Ok(None) @@ -51,7 +55,7 @@ pub fn handle_resolve( } }) .create_cap(t); - if let Some(oh) = ds.assert(t, &dataspace::Observe { + if let Some(oh) = ds.assert(t, language(), &dataspace::Observe { // TODO: codegen plugin to generate pattern constructors pattern: syndicate_macros::pattern!{}, observer: handler, diff --git a/syndicate-server/src/language.rs b/syndicate-server/src/language.rs new file mode 100644 index 0000000..7d86836 --- /dev/null +++ b/syndicate-server/src/language.rs @@ -0,0 +1,6 @@ +use syndicate::actor; + +preserves_schema::define_language!(language(): Language { + syndicate: syndicate::schemas::Language, + server: crate::schemas::Language, +}); diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index 88b3d55..2564858 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -1,10 +1,11 @@ +use preserves_schema::Codec; + use std::path::PathBuf; use std::sync::Arc; use structopt::StructOpt; use syndicate::actor::*; -use syndicate::convert::from_io_value; use syndicate::dataspace::*; use syndicate::relay; use syndicate::schemas::service; @@ -13,6 +14,7 @@ use syndicate::schemas::transport_address; use syndicate::value::NestedValue; mod gatekeeper; +mod language; mod protocol; mod services; @@ -20,6 +22,7 @@ mod schemas { include!(concat!(env!("OUT_DIR"), "/src/schemas/mod.rs")); } +use language::language; use schemas::internal_services; #[derive(Clone, StructOpt)] @@ -91,49 +94,47 @@ async fn main() -> Result<(), Box> { gatekeeper::bind(t, &root_ds, AnyValue::new("server-config"), [0; 16], Arc::clone(&server_config_ds)); - let gateway = Cap::guard(&t.create( + let gateway = Cap::guard(Arc::clone(&language().syndicate), t.create( syndicate::entity(Arc::clone(&root_ds)).on_asserted(gatekeeper::handle_resolve))); services::debt_reporter::on_demand(t, Arc::clone(&server_config_ds)); services::tcp_relay_listener::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&gateway)); services::unix_relay_listener::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&gateway)); services::config_watcher::on_demand(t, Arc::clone(&server_config_ds)); + services::daemon::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&root_ds)); if config.debt_reporter { - server_config_ds.assert(t, &service::RequireService { - service_name: from_io_value(&internal_services::DebtReporter)?, + server_config_ds.assert(t, language(), &service::RequireService { + service_name: language().unparse(&internal_services::DebtReporter), }); } for port in config.ports.clone() { - server_config_ds.assert(t, &service::RequireService { - service_name: from_io_value( - &internal_services::TcpRelayListener { - addr: transport_address::Tcp { - host: "0.0.0.0".to_owned(), - port: (port as i32).into(), - } - })?, + server_config_ds.assert(t, language(), &service::RequireService { + service_name: language().unparse(&internal_services::TcpRelayListener { + addr: transport_address::Tcp { + host: "0.0.0.0".to_owned(), + port: (port as i32).into(), + } + }), }); } for path in config.sockets.clone() { - server_config_ds.assert(t, &service::RequireService { - service_name: from_io_value( - &internal_services::UnixRelayListener { - addr: transport_address::Unix { - path: path.to_str().expect("representable UnixListener path").to_owned(), - } - })?, + server_config_ds.assert(t, language(), &service::RequireService { + service_name: language().unparse(&internal_services::UnixRelayListener { + addr: transport_address::Unix { + path: path.to_str().expect("representable UnixListener path").to_owned(), + } + }), }); } for path in config.config.clone() { - server_config_ds.assert(t, &service::RequireService { - service_name: from_io_value( - &internal_services::ConfigWatcher { - path: path.to_str().expect("representable ConfigWatcher path").to_owned(), - })?, + server_config_ds.assert(t, language(), &service::RequireService { + service_name: language().unparse(&internal_services::ConfigWatcher { + path: path.to_str().expect("representable ConfigWatcher path").to_owned(), + }), }); } diff --git a/syndicate-server/src/services/config_watcher.rs b/syndicate-server/src/services/config_watcher.rs index fab1d6a..590ef41 100644 --- a/syndicate-server/src/services/config_watcher.rs +++ b/syndicate-server/src/services/config_watcher.rs @@ -3,7 +3,8 @@ use notify::Watcher; use notify::RecursiveMode; use notify::watcher; -use std::convert::TryFrom; +use preserves_schema::Codec; + use std::fs; use std::future; use std::io; @@ -14,7 +15,6 @@ use std::thread; use std::time::Duration; use syndicate::actor::*; -use syndicate::convert::*; use syndicate::during::entity; use syndicate::schemas::dataspace::Observe; use syndicate::value::BinarySource; @@ -26,6 +26,7 @@ use syndicate::value::Reader; use syndicate::value::Set; use syndicate::value::ViaCodec; +use crate::language::language; use crate::schemas::internal_services; pub fn on_demand(t: &mut Activation, ds: Arc) { @@ -41,8 +42,8 @@ pub fn on_demand(t: &mut Activation, ds: Arc) { } }) .create_cap(t); - ds.assert(t, &Observe { - pattern: syndicate_macros::pattern!{)>}, + ds.assert(t, language(), &Observe { + pattern: syndicate_macros::pattern!{}>}, observer: monitor, }); Ok(()) @@ -57,13 +58,13 @@ fn assertions_at_existing_file(t: &mut Activation, ds: &Arc, path: &PathBuf let mut handles = Set::new(); let fh = fs::File::open(path)?; let mut src = IOBinarySource::new(fh); - let mut r = src.text::<_, AnyValue, _>(ViaCodec::new(NoEmbeddedDomainCodec)); + let mut r = src.text::(ViaCodec::new(NoEmbeddedDomainCodec)); let mut values = Vec::new(); - while let Some(value) = Reader::<_, AnyValue>::next(&mut r, true)? { + while let Some(value) = Reader::::next(&mut r, true)? { values.push(value); } for value in values.into_iter() { - if let Some(handle) = ds.assert(t, value.clone()) { + if let Some(handle) = ds.assert(t, &(), &value) { handles.insert(handle); } } @@ -145,11 +146,10 @@ fn initial_scan( } fn run(t: &mut Activation, ds: Arc, captures: AnyValue) -> ActorResult { - let spec = internal_services::ConfigWatcher::try_from(&from_any_value( - &captures.value().to_sequence()?[0])?)?; + let spec: internal_services::ConfigWatcher = language().parse(&captures.value().to_sequence()?[0])?; { - let spec = from_io_value(&spec)?; - ds.assert(t, syndicate_macros::template!("")); + let spec = language().unparse(&spec); + ds.assert(t, &(), &syndicate_macros::template!("")); } let path = fs::canonicalize(spec.path)?; diff --git a/syndicate-server/src/services/daemon.rs b/syndicate-server/src/services/daemon.rs new file mode 100644 index 0000000..36005b2 --- /dev/null +++ b/syndicate-server/src/services/daemon.rs @@ -0,0 +1,47 @@ +use std::convert::TryFrom; +use std::sync::Arc; + +use syndicate::actor::*; +use syndicate::during::entity; +use syndicate::schemas::dataspace::Observe; +use syndicate::supervise::{Supervisor, SupervisorConfiguration}; +use syndicate::value::NestedValue; + +use crate::schemas::external_services; + +use syndicate_macros::during; + +pub fn on_demand(t: &mut Activation, config_ds: Arc, root_ds: Arc) { + // t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { + + // during!(t, config_ds, , |t| { + // let config_ds = Arc::clone(&config_ds); + // let root_ds = Arc::clone(&root_ds); + // Ok(Supervisor::start( + // t, + // syndicate::name!(parent: None, "daemon", service = ?spec_any), + // SupervisorConfiguration::default(), + // move |t| run(t, Arc::clone(&config_ds), Arc::clone(&root_ds), spec.clone()))) + // }); + + // Ok(()) + // }); +} + +// fn run( +// t: &mut Activation, +// config_ds: Arc, +// _root_ds: Arc, +// captures: AnyValue, +// ) -> ActorResult { +// let spec = external_services::DaemonService::try_from(&from_any_value( +// &captures.value().to_sequence()?[0])?)?; +// { +// let spec = from_io_value(&spec)?; +// config_ds.assert(t, syndicate_macros::template!("")); +// } + +// tracing::info!("daemon {:?}", &spec); + +// Ok(()) +// } diff --git a/syndicate-server/src/services/debt_reporter.rs b/syndicate-server/src/services/debt_reporter.rs index 7a63628..d8a6685 100644 --- a/syndicate-server/src/services/debt_reporter.rs +++ b/syndicate-server/src/services/debt_reporter.rs @@ -1,10 +1,12 @@ +use preserves_schema::Codec; + use std::sync::Arc; use syndicate::actor::*; -use syndicate::convert::*; use syndicate::during::entity; use syndicate::schemas::dataspace::Observe; +use crate::language::language; use crate::schemas::internal_services; pub fn on_demand(t: &mut Activation, ds: Arc) { @@ -19,8 +21,8 @@ pub fn on_demand(t: &mut Activation, ds: Arc) { } }) .create_cap(t); - let spec = from_io_value(&internal_services::DebtReporter)?; - ds.assert(t, &Observe { + let spec = language().unparse(&internal_services::DebtReporter); + ds.assert(t, language(), &Observe { pattern: syndicate_macros::pattern!{}, observer: monitor, }); @@ -29,8 +31,8 @@ pub fn on_demand(t: &mut Activation, ds: Arc) { } fn run(t: &mut Activation, ds: Arc) -> ActorResult { - let spec = from_io_value(&internal_services::DebtReporter)?; - ds.assert(t, syndicate_macros::template!("")); + let spec = language().unparse(&internal_services::DebtReporter); + ds.assert(t, &(), &syndicate_macros::template!("")); t.linked_task(syndicate::name!("tick"), async { let mut timer = tokio::time::interval(core::time::Duration::from_secs(1)); loop { diff --git a/syndicate-server/src/services/mod.rs b/syndicate-server/src/services/mod.rs index 520a9fd..a134df5 100644 --- a/syndicate-server/src/services/mod.rs +++ b/syndicate-server/src/services/mod.rs @@ -1,4 +1,5 @@ pub mod config_watcher; +pub mod daemon; pub mod debt_reporter; pub mod tcp_relay_listener; pub mod unix_relay_listener; diff --git a/syndicate-server/src/services/tcp_relay_listener.rs b/syndicate-server/src/services/tcp_relay_listener.rs index 91376e9..50cbbab 100644 --- a/syndicate-server/src/services/tcp_relay_listener.rs +++ b/syndicate-server/src/services/tcp_relay_listener.rs @@ -1,8 +1,9 @@ +use preserves_schema::Codec; + use std::convert::TryFrom; use std::sync::Arc; use syndicate::actor::*; -use syndicate::convert::*; use syndicate::during::entity; use syndicate::schemas::dataspace::Observe; use syndicate::supervise::{Supervisor, SupervisorConfiguration}; @@ -10,6 +11,7 @@ use syndicate::value::NestedValue; use tokio::net::TcpListener; +use crate::language::language; use crate::protocol::detect_protocol; use crate::schemas::internal_services; @@ -30,8 +32,8 @@ pub fn on_demand(t: &mut Activation, ds: Arc, gateway: Arc) { } }) .create_cap(t); - ds.assert(t, &Observe { - pattern: syndicate_macros::pattern!{>)>}, + ds.assert(t, language(), &Observe { + pattern: syndicate_macros::pattern!{>}>}, observer: monitor, }); Ok(()) @@ -44,13 +46,12 @@ fn run( gateway: Arc, captures: AnyValue, ) -> ActorResult { - let spec = internal_services::TcpRelayListener::try_from(&from_any_value( - &captures.value().to_sequence()?[0])?)?; + let spec: internal_services::TcpRelayListener = language().parse(&captures.value().to_sequence()?[0])?; let host = spec.addr.host.clone(); let port = u16::try_from(&spec.addr.port).map_err(|_| "Invalid TCP port number")?; { - let spec = from_io_value(&spec)?; - ds.assert(t, syndicate_macros::template!("")); + let spec = language().unparse(&spec); + ds.assert(t, &(), &syndicate_macros::template!("")); } let parent_span = tracing::Span::current(); t.linked_task(syndicate::name!("listener"), async move { diff --git a/syndicate-server/src/services/unix_relay_listener.rs b/syndicate-server/src/services/unix_relay_listener.rs index c6a3683..a424a47 100644 --- a/syndicate-server/src/services/unix_relay_listener.rs +++ b/syndicate-server/src/services/unix_relay_listener.rs @@ -1,10 +1,10 @@ -use std::convert::TryFrom; +use preserves_schema::Codec; + use std::io; use std::path::PathBuf; use std::sync::Arc; use syndicate::actor::*; -use syndicate::convert::*; use syndicate::during::entity; use syndicate::error::Error; use syndicate::relay; @@ -14,6 +14,7 @@ use syndicate::value::NestedValue; use tokio::net::UnixListener; use tokio::net::UnixStream; +use crate::language::language; use crate::protocol::run_connection; use crate::schemas::internal_services; @@ -31,8 +32,8 @@ pub fn on_demand(t: &mut Activation, ds: Arc, gateway: Arc) { } }) .create_cap(t); - ds.assert(t, &Observe { - pattern: syndicate_macros::pattern!{>)>}, + ds.assert(t, language(), &Observe { + pattern: syndicate_macros::pattern!{>}>}, observer: monitor, }); Ok(()) @@ -45,12 +46,11 @@ fn run( gateway: Arc, captures: AnyValue, ) -> ActorResult { - let spec = internal_services::UnixRelayListener::try_from(&from_any_value( - &captures.value().to_sequence()?[0])?)?; + let spec: internal_services::UnixRelayListener = language().parse(&captures.value().to_sequence()?[0])?; let path_str = spec.addr.path.clone(); { - let spec = from_io_value(&spec)?; - ds.assert(t, syndicate_macros::template!("")); + let spec = language().unparse(&spec); + ds.assert(t, &(), &syndicate_macros::template!("")); } let parent_span = tracing::Span::current(); t.linked_task(syndicate::name!("listener"), async move { diff --git a/syndicate/Cargo.toml b/syndicate/Cargo.toml index 1228d05..c7c43d6 100644 --- a/syndicate/Cargo.toml +++ b/syndicate/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "syndicate" -version = "0.11.0" +version = "0.12.0" authors = ["Tony Garnock-Jones "] edition = "2018" @@ -13,11 +13,11 @@ license = "Apache-2.0" vendored-openssl = ["openssl/vendored"] [build-dependencies] -preserves-schema = "1.0.0" +preserves-schema = "2.0.0-rc1" [dependencies] -preserves = "1.0.0" -preserves-schema = "1.0.0" +preserves = "2.0.0-rc1" +preserves-schema = "2.0.0-rc1" tokio = { version = "1.10", features = ["io-util", "macros", "rt", "rt-multi-thread", "time"] } tokio-util = "0.6" @@ -27,6 +27,7 @@ futures = "0.3" getrandom = "0.2" hmac = "0.11" +lazy_static = "1.4" sha2 = "0.9" tracing = "0.1" diff --git a/syndicate/benches/bench_dataspace.rs b/syndicate/benches/bench_dataspace.rs index 2619897..db95698 100644 --- a/syndicate/benches/bench_dataspace.rs +++ b/syndicate/benches/bench_dataspace.rs @@ -6,6 +6,7 @@ use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::time::Instant; +use syndicate::language; use syndicate::actor::*; use syndicate::during::entity; use syndicate::dataspace::Dataspace; @@ -101,7 +102,7 @@ pub fn bench_pub(c: &mut Criterion) { }) .create_cap(t); - ds.assert(t, &Observe { + ds.assert(t, language(), &Observe { pattern: p::Pattern::DBind(Box::new(p::DBind { pattern: p::Pattern::DLit(Box::new(p::DLit { value: AnyValue::symbol("consumer"), @@ -122,8 +123,8 @@ pub fn bench_pub(c: &mut Criterion) { let shutdown = Cap::new(&t.create(ShutdownEntity)); let receiver = Cap::new(&t.create(Receiver(Arc::clone(&turn_count)))); - ds.assert(t, Value::::symbol("consumer").wrap()); - ds.assert(t, &Observe { + ds.assert(t, &(), &AnyValue::symbol("consumer")); + ds.assert(t, language(), &Observe { pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { ctor: Box::new(p::CRec { label: AnyValue::symbol("Says"), @@ -140,7 +141,7 @@ pub fn bench_pub(c: &mut Criterion) { })), observer: receiver, }); - ds.assert(t, &Observe { + ds.assert(t, language(), &Observe { pattern: p::Pattern::DBind(Box::new(p::DBind { pattern: p::Pattern::DLit(Box::new(p::DLit { value: AnyValue::new(true), diff --git a/syndicate/build.rs b/syndicate/build.rs index aabe8a2..c425f6b 100644 --- a/syndicate/build.rs +++ b/syndicate/build.rs @@ -9,16 +9,12 @@ mod syndicate_plugins { pub(super) struct PatternPlugin; impl Plugin for PatternPlugin { - fn generate( + fn generate_definition( &self, - m: &mut context::ModuleContext, + _m: &mut context::ModuleContext, _definition_name: &str, _definition: &Definition, ) { - if m.mode.is_some() { - return; - } - // TODO: Emit code for building instances of sturdy.Pattern and sturdy.Template } } @@ -32,7 +28,7 @@ fn main() -> std::io::Result<()> { let mut c = CompilerConfig::new(gen_dir, "crate::schemas".to_owned()); c.plugins.push(Box::new(syndicate_plugins::PatternPlugin)); - c.module_aliases.insert(vec!["EntityRef".to_owned()], "crate::actor".to_owned()); + c.add_external_module(ExternalModule::new(vec!["EntityRef".to_owned()], "crate::actor")); let inputs = expand_inputs(&vec!["protocols/schema-bundle.bin".to_owned()])?; c.load_schemas_and_bundles(&inputs)?; diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index d9b6f6c..33fd28f 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -17,7 +17,9 @@ use preserves::value::IOValue; use preserves::value::Map; use preserves::value::NestedValue; use preserves::value::Set; -use preserves_schema::support::ParseError; +use preserves_schema::ParseError; +use preserves_schema::support::Parse; +use preserves_schema::support::Unparse; use std::boxed::Box; use std::collections::hash_map::HashMap; @@ -401,12 +403,13 @@ pub struct Cap { /// The [`Entity`] implementation for `Guard` decodes `AnyValue` /// assertions/messages to type `M` before passing them on to the /// underlying entity. -pub struct Guard +pub struct Guard where - for<'a> &'a M: Into, - for<'a> M: TryFrom<&'a AnyValue>, + M: for<'a> Unparse<&'a L, AnyValue>, + M: for<'a> Parse<&'a L, AnyValue>, { - underlying: Arc> + underlying: Arc>, + literals: Arc, } /// Simple entity that stops its containing facet when any assertion it receives is @@ -472,6 +475,18 @@ impl From<&Synced> for AnyValue { } } +impl<'a> Parse<&'a (), AnyValue> for Synced { + fn parse(_language: &'a (), value: &AnyValue) -> Result { + Synced::try_from(value) + } +} + +impl<'a> Unparse<&'a (), AnyValue> for Synced { + fn unparse(&self, _language: &'a ()) -> AnyValue { + self.into() + } +} + impl FacetRef { /// Executes `f` in a new "[turn][Activation]" for `actor`. If `f` returns `Ok(())`, /// [commits the turn][Activation::deliver] and performs the buffered actions; otherwise, @@ -1555,15 +1570,18 @@ impl Cap { /// `AnyValue`, yields a `Cap` for the referenced entity. The /// `Cap` automatically decodes presented `AnyValue`s into /// instances of `M`. - pub fn guard(underlying: &Arc>) -> Arc + pub fn guard( + literals: Arc, + underlying: Arc>, + ) -> Arc where - for<'a> &'a M: Into, - for<'a> M: TryFrom<&'a AnyValue>, + M: for<'a> Unparse<&'a L, AnyValue>, + M: for<'a> Parse<&'a L, AnyValue>, { Self::new(&Arc::new(Ref { mailbox: Arc::clone(&underlying.mailbox), facet_id: underlying.facet_id, - target: Mutex::new(Some(Box::new(Guard { underlying: underlying.clone() }))), + target: Mutex::new(Some(Box::new(Guard { underlying: underlying, literals }))), })) } @@ -1600,15 +1618,17 @@ impl Cap { /// Translates `m` into an `AnyValue`, passes it through /// [`rewrite`][Self::rewrite], and then /// [`assert`s][Activation::assert] it using the activation `t`. - pub fn assert>(&self, t: &mut Activation, m: M) -> Option { - self.rewrite(m.into()).map(|m| t.assert(&self.underlying, m)) + pub fn assert>(&self, t: &mut Activation, literals: L, m: &M) -> Option + { + self.rewrite(m.unparse(literals)).map(|m| t.assert(&self.underlying, m)) } /// Translates `m` into an `AnyValue`, passes it through /// [`rewrite`][Self::rewrite], and then sends it via method /// [`message`][Activation::message] on the activation `t`. - pub fn message>(&self, t: &mut Activation, m: M) { - if let Some(m) = self.rewrite(m.into()) { + pub fn message>(&self, t: &mut Activation, literals: L, m: &M) + { + if let Some(m) = self.rewrite(m.unparse(literals)) { t.message(&self.underlying, m) } } @@ -1643,13 +1663,13 @@ impl std::convert::From<&Cap> for IOValue { } } -impl Entity for Guard +impl Entity for Guard where - for<'a> &'a M: Into, - for<'a> M: TryFrom<&'a AnyValue>, + M: for<'a> Unparse<&'a L, AnyValue>, + M: for<'a> Parse<&'a L, AnyValue>, { fn assert(&mut self, t: &mut Activation, a: AnyValue, h: Handle) -> ActorResult { - match M::try_from(&a) { + match M::parse(&*self.literals, &a) { Ok(a) => t.with_entity(&self.underlying, |t, e| e.assert(t, a, h)), Err(_) => Ok(()), } @@ -1658,7 +1678,7 @@ where t.with_entity(&self.underlying, |t, e| e.retract(t, h)) } fn message(&mut self, t: &mut Activation, m: AnyValue) -> ActorResult { - match M::try_from(&m) { + match M::parse(&*self.literals, &m) { Ok(m) => t.with_entity(&self.underlying, |t, e| e.message(t, m)), Err(_) => Ok(()), } diff --git a/syndicate/src/convert.rs b/syndicate/src/convert.rs deleted file mode 100644 index 3964003..0000000 --- a/syndicate/src/convert.rs +++ /dev/null @@ -1,32 +0,0 @@ -//! Useful utilities for working with [`AnyValue`]s. - -use preserves::value::Embeddable; -use preserves::value::IOValue; -use preserves::value::NestedValue; - -use std::convert::TryInto; - -use crate::actor::*; - -/// Converts an `AnyValue` to any [`crate::value::NestedValue`], -/// signalling an error if any embedded values are found in `v`. -pub fn from_any_value, D: Embeddable>(v: &AnyValue) -> Result { - v.copy_via(&mut |_| Err("Embedded values cannot be converted")?) -} - -/// Converts any [`crate::value::NestedValue`] to an `AnyValue`, -/// signalling an error if any embedded values are found in `v`. -pub fn to_any_value, D: Embeddable>(v: &N) -> Result { - v.copy_via(&mut |_| Err("Embedded values cannot be converted")?) -} - -/// Special case of [`to_any_value`] for [`IOValue`]. -pub fn from_io_value>(v: V) -> Result { - to_any_value(&v.try_into().map_err(|_| "Could not convert to IOValue")?) -} - -/// Identity function for helping `rustc` decide which -/// [`crate::value::NestedValue`] to use (namely, [`AnyValue`]). -pub fn any_value(v: AnyValue) -> AnyValue { - v -} diff --git a/syndicate/src/dataspace.rs b/syndicate/src/dataspace.rs index 26df3a0..52d74e5 100644 --- a/syndicate/src/dataspace.rs +++ b/syndicate/src/dataspace.rs @@ -8,14 +8,14 @@ //! on the web](https://syndicate-lang.org/tonyg-dissertation/). //! [PDF](https://syndicate-lang.org/papers/conversational-concurrency-201712310922.pdf). +use super::language; use super::skeleton; use super::actor::*; use super::schemas::dataspace::*; use super::schemas::dataspace::_Any; use preserves::value::Map; - -use std::convert::TryFrom; +use preserves_schema::Codec; // #[derive(Debug)] // pub struct Churn { @@ -99,7 +99,7 @@ impl Entity<_Any> for Dataspace { // self.churn.assertions_added += self.index.assertion_count() - old_assertions; // self.churn.endpoints_added += 1; - if let Ok(o) = Observe::try_from(&a) { + if let Ok(o) = language().parse::(&a) { self.index.add_observer(t, &o.pattern, &o.observer); // self.churn.observers_added += 1; self.handle_map.insert(h, (a, Some(o))); diff --git a/syndicate/src/error.rs b/syndicate/src/error.rs index 8efd953..4c113c8 100644 --- a/syndicate/src/error.rs +++ b/syndicate/src/error.rs @@ -1,5 +1,6 @@ //! Actor errors. +use super::language; use super::schemas::internal_protocol::_Any; #[doc(inline)] @@ -7,7 +8,8 @@ pub use super::schemas::internal_protocol::Error; use preserves::value::NestedValue; use preserves::value::Value; -use preserves_schema::support::ParseError; +use preserves_schema::Codec; +use preserves_schema::ParseError; impl std::error::Error for Error {} @@ -42,7 +44,7 @@ pub fn encode_error(result: Result<(), Error>) -> _Any { } Err(e) => { let mut r = Value::record(_Any::symbol("Err"), 1); - r.fields_vec_mut().push((&e).into()); + r.fields_vec_mut().push(language().unparse(&e)); r.finish().wrap() } } diff --git a/syndicate/src/lib.rs b/syndicate/src/lib.rs index b261e28..1d8676d 100644 --- a/syndicate/src/lib.rs +++ b/syndicate/src/lib.rs @@ -8,7 +8,6 @@ pub use preserves; pub mod actor; pub mod bag; -pub mod convert; pub mod dataspace; pub mod during; pub mod error; @@ -33,3 +32,7 @@ pub use during::entity; #[doc(inline)] pub use tracer::convenient_logging; + +preserves_schema::define_language!(language(): Language { + syndicate: schemas::Language, +}); diff --git a/syndicate/src/relay.rs b/syndicate/src/relay.rs index 06b1e4d..3352ff0 100644 --- a/syndicate/src/relay.rs +++ b/syndicate/src/relay.rs @@ -1,6 +1,7 @@ use bytes::Buf; use bytes::BytesMut; +use crate::language; use crate::actor::*; use crate::during; use crate::error::Error; @@ -29,8 +30,9 @@ use preserves::value::ViaCodec; use preserves::value::Writer; use preserves::value::signed_integer::SignedInteger; -use preserves_schema::support::Deserialize; -use preserves_schema::support::ParseError; +use preserves_schema::Codec; +use preserves_schema::Deserialize; +use preserves_schema::ParseError; use std::io; use std::pin::Pin; @@ -191,7 +193,7 @@ pub fn connect_stream( let denotation = a.value().to_embedded()?; f(state, t, Arc::clone(denotation)) })); - gatekeeper.assert(t, &gatekeeper::Resolve { + gatekeeper.assert(t, language(), &gatekeeper::Resolve { sturdyref, observer: Cap::new(&main_entity), }); @@ -259,13 +261,13 @@ impl TunnelRelay { match src.peek() { Ok(v) => if v >= 128 { self.output_text = false; - let mut r = src.packed::<_, AnyValue, _>(&mut dec); + let mut r = src.packed(&mut dec); let res = P::Packet::deserialize(&mut r); (res, r.source.index) } else { self.output_text = true; let mut dec = ViaCodec::new(dec); - let mut r = src.text::<_, AnyValue, _>(&mut dec); + let mut r = src.text::(&mut dec); let res = P::Packet::deserialize(&mut r); (res, r.source.index) }, @@ -310,7 +312,7 @@ impl TunnelRelay { ws.inc_ref(), None => { tracing::warn!( - event = ?AnyValue::from(&P::TurnEvent { oid, event }), + event = ?language().unparse(&P::TurnEvent { oid, event }), "Cannot deliver event: nonexistent oid"); return Ok(()); } @@ -322,7 +324,7 @@ impl TunnelRelay { let P::Assert { assertion: P::Assertion(a), handle: remote_handle } = *b; a.foreach_embedded::<_, Error>( &mut |r| Ok(pins.push(self.membranes.lookup_ref(r))))?; - if let Some(local_handle) = target.assert(t, a) { + if let Some(local_handle) = target.assert(t, &(), &a) { if let Some(_) = self.inbound_assertions.insert(remote_handle, (local_handle, pins)) { return Err(error("Assertion with duplicate handle", AnyValue::new(false))); } @@ -334,7 +336,7 @@ impl TunnelRelay { P::Event::Retract(b) => { let P::Retract { handle: remote_handle } = *b; let (local_handle, previous_pins) = match self.inbound_assertions.remove(&remote_handle) { - None => return Err(error("Retraction of nonexistent handle", AnyValue::from(&remote_handle))), + None => return Err(error("Retraction of nonexistent handle", language().unparse(&remote_handle))), Some(wss) => wss, }; self.membranes.release(previous_pins); @@ -353,7 +355,7 @@ impl TunnelRelay { _ => Ok(()) } })?; - target.message(t, a); + target.message(t, &(), &a); self.membranes.release(pins); dump_membranes!(self.membranes); } @@ -368,7 +370,7 @@ impl TunnelRelay { } impl Entity for SyncPeer { fn message(&mut self, t: &mut Activation, _a: Synced) -> ActorResult { - self.peer.message(t, AnyValue::new(true)); + self.peer.message(t, &(), &AnyValue::new(true)); let mut g = self.relay_ref.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); tr.membranes.release(std::mem::take(&mut self.pins)); @@ -393,7 +395,7 @@ impl TunnelRelay { fn outbound_event_bookkeeping( &mut self, - t: &mut Activation, + _t: &mut Activation, remote_oid: sturdy::Oid, event: &P::Event, ) -> ActorResult { @@ -435,15 +437,15 @@ impl TunnelRelay { } pub fn send_packet(&mut self, account: &Arc, cost: usize, p: P::Packet) -> ActorResult { - let item = AnyValue::from(&p); + let item = language().unparse(&p); tracing::trace!(packet = ?item, "<--"); let bs = if self.output_text { - let mut s = TextWriter::encode::<_, AnyValue, _>(&mut self.membranes, &item)?; + let mut s = TextWriter::encode(&mut self.membranes, &item)?; s.push('\n'); s.into_bytes() } else { - PackedWriter::encode::<_, AnyValue, _>(&mut self.membranes, &item)? + PackedWriter::encode(&mut self.membranes, &item)? }; let _ = self.output.send(LoanedItem::new(account, cost, bs)); @@ -563,7 +565,7 @@ impl DomainEncode for Membranes { w: &mut W, d: &P::_Ptr, ) -> io::Result<()> { - w.write(&mut NoEmbeddedDomainCodec, &AnyValue::from(&match self.exported.ref_map.get(d) { + w.write(&mut NoEmbeddedDomainCodec, &language().unparse(&match self.exported.ref_map.get(d) { Some(ws) => sturdy::WireRef::Mine { oid: Box::new(ws.inc_ref().oid.clone()), }, @@ -711,7 +713,7 @@ impl Entity for RelayEntity { let mut g = self.relay_ref.lock().expect("unpoisoned"); let tr = g.as_mut().expect("initialized"); tr.send_event(t, self.oid.clone(), P::Event::Sync(Box::new(P::Sync { - peer: Cap::guard(&peer) + peer: Cap::guard(Arc::new(()), peer) }))) } } diff --git a/syndicate/src/skeleton.rs b/syndicate/src/skeleton.rs index 7d88d57..3857686 100644 --- a/syndicate/src/skeleton.rs +++ b/syndicate/src/skeleton.rs @@ -126,7 +126,7 @@ impl Index { |es, cs| { if es.cached_captures.change(cs.clone(), 1) == bag::Net::AbsentToPresent { for (observer, capture_map) in &mut es.endpoints { - if let Some(h) = observer.assert(t, cs.clone()) { + if let Some(h) = observer.assert(t, &(), &cs) { capture_map.insert(cs.clone(), h); } } @@ -175,7 +175,7 @@ impl Index { |es, cs| { // *delivery_count += es.endpoints.len(); for observer in es.endpoints.keys() { - observer.message(t, cs.clone()); + observer.message(t, &(), &cs); } }).perform(&mut self.root); } @@ -446,7 +446,7 @@ impl Continuation { }); let mut capture_map = Map::new(); for cs in endpoints.cached_captures.keys() { - if let Some(h) = observer.assert(t, cs.clone()) { + if let Some(h) = observer.assert(t, &(), cs) { capture_map.insert(cs.clone(), h); } } diff --git a/syndicate/src/sturdy.rs b/syndicate/src/sturdy.rs index 708053b..08a7cce 100644 --- a/syndicate/src/sturdy.rs +++ b/syndicate/src/sturdy.rs @@ -4,17 +4,17 @@ use hmac::{Hmac, Mac, NewMac, crypto_mac::MacError}; use preserves::hex::HexParser; use preserves::hex::HexFormatter; -use preserves::value::Embeddable; use preserves::value::NestedValue; use preserves::value::NoEmbeddedDomainCodec; use preserves::value::packed::PackedWriter; use preserves::value::packed::from_bytes; +use preserves_schema::Codec; use sha2::Sha256; -use std::convert::TryFrom; use std::io; +use super::language; use super::error::Error; use super::rewrite::CaveatError; pub use super::schemas::sturdy::*; @@ -54,11 +54,11 @@ pub fn new_key() -> Vec { buf } -pub fn encode>(v: &N) -> Vec { - PackedWriter::encode::(&mut NoEmbeddedDomainCodec, v).expect("no io errors") +pub fn encode(v: &N) -> Vec { + PackedWriter::encode(&mut NoEmbeddedDomainCodec, v).expect("no io errors") } -pub fn decode>(bs: &[u8]) -> io::Result { +pub fn decode(bs: &[u8]) -> io::Result { from_bytes(bs, &mut NoEmbeddedDomainCodec) } @@ -70,11 +70,11 @@ impl SturdyRef { pub fn from_hex(s: &str) -> Result { let binary = HexParser::Liberal.decode(s).expect("hex encoded sturdyref"); - Ok(Self::try_from(&decode::<_, _Any>(&binary)?)?) + Ok(language().parse(&decode(&binary)?)?) } pub fn to_hex(&self) -> String { - HexFormatter::Packed.encode(&encode::<_, _Any>(&self.into())) + HexFormatter::Packed.encode(&encode(&language().unparse(self))) } pub fn validate_and_attenuate( @@ -99,7 +99,7 @@ impl SturdyRef { let mut key = key.to_vec(); key = signature(&key, &encode(oid)); for c in caveat_chain { - key = signature(&key, &encode(&_Any::from(c))); + key = signature(&key, &encode(&language().unparse(c))); } if &key == sig { Ok(()) @@ -114,7 +114,7 @@ impl SturdyRef { let oid = oid.clone(); let mut caveat_chain = caveat_chain.clone(); caveat_chain.push(attenuation.clone()); - let sig = signature(&sig, &encode(&_Any::from(attenuation))); + let sig = signature(&sig, &encode(&language().unparse(attenuation))); Ok(SturdyRef { oid, caveat_chain, sig }) } }