From 40025b90a6b6c83fc677229dc9a20bab9ede08f4 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 7 Oct 2021 17:00:04 +0200 Subject: [PATCH] More capability-oriented scripting language --- syndicate-server/protocols/schema-bundle.bin | 6 +- .../protocols/schemas/internalServices.prs | 8 +- syndicate-server/src/gatekeeper.rs | 23 +- syndicate-server/src/main.rs | 50 +- syndicate-server/src/script/mod.rs | 752 ++++++++++++++++++ .../src/services/config_watcher.rs | 390 ++------- syndicate-server/src/services/daemon.rs | 27 +- .../src/services/tcp_relay_listener.rs | 14 +- .../src/services/unix_relay_listener.rs | 34 +- syndicate/src/pattern.rs | 83 ++ 10 files changed, 975 insertions(+), 412 deletions(-) create mode 100644 syndicate-server/src/script/mod.rs diff --git a/syndicate-server/protocols/schema-bundle.bin b/syndicate-server/protocols/schema-bundle.bin index 2b85843..9c48d30 100644 --- a/syndicate-server/protocols/schema-bundle.bin +++ b/syndicate-server/protocols/schema-bundle.bin @@ -2,4 +2,8 @@ ProcessDir´³orµµ±present´³dict·³dir´³named³dir´³atom³String„„„„„µ±invalid´³dict·³dir´³named³dir³any„„„„µ±absent´³dict·„„„„„³ ProcessEnv´³orµµ±present´³dict·³env´³named³env´³dictof´³refµ„³ EnvVariable„´³refµ„³EnvValue„„„„„„µ±invalid´³dict·³env´³named³env³any„„„„µ±absent´³dict·„„„„„³ CommandLine´³orµµ±shell´³atom³String„„µ±full´³refµ„³FullCommandLine„„„„³ EnvVariable´³orµµ±string´³atom³String„„µ±symbol´³atom³Symbol„„µ±invalid³any„„„³ FullProcess´³andµ´³dict·³argv´³named³argv´³refµ„³ CommandLine„„„„´³named³env´³refµ„³ ProcessEnv„„´³named³dir´³refµ„³ -ProcessDir„„´³named³clearEnv´³refµ„³ClearEnv„„„„³ ReadyOnStart´³orµµ±present´³dict·³ readyOnStart´³named³ readyOnStart´³atom³Boolean„„„„„µ±invalid´³dict·³ readyOnStart´³named³ readyOnStart³any„„„„µ±absent´³dict·„„„„„³ RestartField´³orµµ±present´³dict·³restart´³named³restart´³refµ„³ RestartPolicy„„„„„µ±invalid´³dict·³restart´³named³restart³any„„„„µ±absent´³dict·„„„„„³ DaemonProcess´³rec´³lit³daemon„´³tupleµ´³named³id³any„´³named³config´³refµ„³DaemonProcessSpec„„„„„³ DaemonService´³rec´³lit³daemon„´³tupleµ´³named³id³any„„„„³ ProtocolField´³orµµ±present´³dict·³protocol´³named³protocol´³refµ„³Protocol„„„„„µ±invalid´³dict·³protocol´³named³protocol³any„„„„µ±absent´³dict·„„„„„³ RestartPolicy´³orµµ±always´³lit³always„„µ±onError´³lit³on-error„„µ±all´³lit³all„„„„³FullCommandLine´³ tuplePrefixµ´³named³program´³atom³String„„„´³named³args´³seqof´³atom³String„„„„³DaemonProcessSpec´³orµµ±simple´³refµ„³ CommandLine„„µ±full´³refµ„³FullDaemonProcess„„„„³FullDaemonProcess´³andµ´³named³process´³refµ„³ FullProcess„„´³named³ readyOnStart´³refµ„³ ReadyOnStart„„´³named³restart´³refµ„³ RestartField„„´³named³protocol´³refµ„³ ProtocolField„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³internalServices„´³schema·³version‘³ definitions·³ Milestone´³rec´³lit³ milestone„´³tupleµ´³named³name³any„„„„³ DebtReporter´³lit³ debt-reporter„³ ConfigWatcher´³rec´³lit³config-watcher„´³tupleµ´³named³path´³atom³String„„„„„³TcpRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Tcp„„„„„³UnixRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Unix„„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„„„ \ No newline at end of file +ProcessDir„„´³named³clearEnv´³refµ„³ClearEnv„„„„³ ReadyOnStart´³orµµ±present´³dict·³ readyOnStart´³named³ readyOnStart´³atom³Boolean„„„„„µ±invalid´³dict·³ readyOnStart´³named³ readyOnStart³any„„„„µ±absent´³dict·„„„„„³ RestartField´³orµµ±present´³dict·³restart´³named³restart´³refµ„³ RestartPolicy„„„„„µ±invalid´³dict·³restart´³named³restart³any„„„„µ±absent´³dict·„„„„„³ DaemonProcess´³rec´³lit³daemon„´³tupleµ´³named³id³any„´³named³config´³refµ„³DaemonProcessSpec„„„„„³ DaemonService´³rec´³lit³daemon„´³tupleµ´³named³id³any„„„„³ ProtocolField´³orµµ±present´³dict·³protocol´³named³protocol´³refµ„³Protocol„„„„„µ±invalid´³dict·³protocol´³named³protocol³any„„„„µ±absent´³dict·„„„„„³ RestartPolicy´³orµµ±always´³lit³always„„µ±onError´³lit³on-error„„µ±all´³lit³all„„„„³FullCommandLine´³ tuplePrefixµ´³named³program´³atom³String„„„´³named³args´³seqof´³atom³String„„„„³DaemonProcessSpec´³orµµ±simple´³refµ„³ CommandLine„„µ±full´³refµ„³FullDaemonProcess„„„„³FullDaemonProcess´³andµ´³named³process´³refµ„³ FullProcess„„´³named³ readyOnStart´³refµ„³ ReadyOnStart„„´³named³restart´³refµ„³ RestartField„„´³named³protocol´³refµ„³ ProtocolField„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³internalServices„´³schema·³version‘³ definitions·³ ConfigEnv´³dictof´³atom³Symbol„³any„³ Milestone´³rec´³lit³ milestone„´³tupleµ´³named³name³any„„„„³ DebtReporter´³lit³ debt-reporter„³ ConfigWatcher´³rec´³lit³config-watcher„´³tupleµ´³named³path´³atom³String„„´³named³env´³refµ„³ ConfigEnv„„„„„³TcpRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Tcp„„´³named³ +gatekeeper´³embedded´³refµ³ +gatekeeper„³Resolve„„„„„„³UnixRelayListener´³rec´³lit³relay-listener„´³tupleµ´³named³addr´³refµ³TransportAddress„³Unix„„´³named³ +gatekeeper´³embedded´³refµ³ +gatekeeper„³Resolve„„„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„„„ \ No newline at end of file diff --git a/syndicate-server/protocols/schemas/internalServices.prs b/syndicate-server/protocols/schemas/internalServices.prs index b0f5f89..7d0c89b 100644 --- a/syndicate-server/protocols/schemas/internalServices.prs +++ b/syndicate-server/protocols/schemas/internalServices.prs @@ -3,7 +3,9 @@ embeddedType EntityRef.Cap . DebtReporter = =debt-reporter . -TcpRelayListener = . -UnixRelayListener = . -ConfigWatcher = . +TcpRelayListener = . +UnixRelayListener = . +ConfigWatcher = . Milestone = . + +ConfigEnv = { symbol: any ...:... }. diff --git a/syndicate-server/src/gatekeeper.rs b/syndicate-server/src/gatekeeper.rs index aa594c1..aaddd97 100644 --- a/syndicate-server/src/gatekeeper.rs +++ b/syndicate-server/src/gatekeeper.rs @@ -5,22 +5,21 @@ use std::sync::Arc; use syndicate::actor::*; use syndicate::during::DuringResult; use syndicate::schemas::gatekeeper; -use syndicate::sturdy; use syndicate::value::NestedValue; use crate::language::language; -pub fn bind( - t: &mut Activation, - ds: &Arc, - oid: syndicate::schemas::sturdy::_Any, - key: [u8; 16], - target: Arc, -) { - let sr = sturdy::SturdyRef::mint(oid.clone(), &key); - tracing::info!(cap = ?language().unparse(&sr), hex = %sr.to_hex()); - ds.assert(t, language(), &gatekeeper::Bind { oid, key: key.to_vec(), target }); -} +// pub fn bind( +// t: &mut Activation, +// ds: &Arc, +// oid: syndicate::schemas::sturdy::_Any, +// key: [u8; 16], +// target: Arc, +// ) { +// let sr = sturdy::SturdyRef::mint(oid.clone(), &key); +// tracing::info!(cap = ?language().unparse(&sr), hex = %sr.to_hex()); +// ds.assert(t, language(), &gatekeeper::Bind { oid, key: key.to_vec(), target }); +// } pub fn handle_resolve( ds: &mut Arc, diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index cb37b8e..df01f45 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -12,6 +12,7 @@ use syndicate::relay; use syndicate::schemas::service; use syndicate::schemas::transport_address; +use syndicate::value::Map; use syndicate::value::NestedValue; mod counter; @@ -20,6 +21,7 @@ mod gatekeeper; mod language; mod lifecycle; mod protocol; +mod script; mod services; mod schemas { @@ -82,34 +84,35 @@ async fn main() -> Result<(), Box> { tracing::trace!("startup"); Actor::new().boot(tracing::Span::current(), move |t| { - let root_ds = Cap::new(&t.create(Dataspace::new())); + let server_config_ds = Cap::new(&t.create(Dataspace::new())); + let log_ds = Cap::new(&t.create(Dataspace::new())); if config.inferior { tracing::info!("inferior server instance"); - t.spawn(syndicate::name!("parent"), enclose!((root_ds) move |t| protocol::run_io_relay( - t, - relay::Input::Bytes(Box::pin(tokio::io::stdin())), - relay::Output::Bytes(Box::pin(tokio::io::stdout())), - root_ds))); + t.spawn(syndicate::name!("parent"), enclose!((server_config_ds) move |t| { + protocol::run_io_relay(t, + relay::Input::Bytes(Box::pin(tokio::io::stdin())), + relay::Output::Bytes(Box::pin(tokio::io::stdout())), + server_config_ds) + })); } - let server_config_ds = Cap::new(&t.create(Dataspace::new())); + let gatekeeper = Cap::guard(Arc::clone(&language().syndicate), t.create( + syndicate::entity(Arc::clone(&server_config_ds)) + .on_asserted(gatekeeper::handle_resolve))); - gatekeeper::bind(t, &root_ds, AnyValue::new("syndicate"), [0; 16], - Arc::clone(&root_ds)); - gatekeeper::bind(t, &root_ds, AnyValue::new("server-config"), [0; 16], - Arc::clone(&server_config_ds)); - - let gateway = Cap::guard(Arc::clone(&language().syndicate), t.create( - syndicate::entity(Arc::clone(&root_ds)).on_asserted(gatekeeper::handle_resolve))); + let mut env = Map::new(); + env.insert("config".to_owned(), AnyValue::domain(Arc::clone(&server_config_ds))); + env.insert("log".to_owned(), AnyValue::domain(Arc::clone(&log_ds))); + env.insert("gatekeeper".to_owned(), AnyValue::domain(Arc::clone(&gatekeeper))); dependencies::boot(t, Arc::clone(&server_config_ds)); - services::config_watcher::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&root_ds)); - services::daemon::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&root_ds)); + services::config_watcher::on_demand(t, Arc::clone(&server_config_ds)); + services::daemon::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&log_ds)); services::debt_reporter::on_demand(t, Arc::clone(&server_config_ds)); services::milestone::on_demand(t, Arc::clone(&server_config_ds)); - services::tcp_relay_listener::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&gateway)); - services::unix_relay_listener::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&gateway)); + services::tcp_relay_listener::on_demand(t, Arc::clone(&server_config_ds)); + services::unix_relay_listener::on_demand(t, Arc::clone(&server_config_ds)); if config.debt_reporter { server_config_ds.assert(t, language(), &service::RunService { @@ -123,7 +126,8 @@ async fn main() -> Result<(), Box> { addr: transport_address::Tcp { host: "0.0.0.0".to_owned(), port: (port as i32).into(), - } + }, + gatekeeper: gatekeeper.clone(), }), }); } @@ -133,7 +137,8 @@ async fn main() -> Result<(), Box> { service_name: language().unparse(&internal_services::UnixRelayListener { addr: transport_address::Unix { path: path.to_str().expect("representable UnixListener path").to_owned(), - } + }, + gatekeeper: gatekeeper.clone(), }), }); } @@ -142,11 +147,12 @@ async fn main() -> Result<(), Box> { server_config_ds.assert(t, language(), &service::RunService { service_name: language().unparse(&internal_services::ConfigWatcher { path: path.to_str().expect("representable ConfigWatcher path").to_owned(), + env: internal_services::ConfigEnv(env.clone()), }), }); } - t.spawn(tracing::Span::current(), enclose!((root_ds) move |t| { + t.spawn(tracing::Span::current(), enclose!((log_ds) move |t| { let n_unknown: AnyValue = AnyValue::symbol("-"); let n_pid: AnyValue = AnyValue::symbol("pid"); let n_line: AnyValue = AnyValue::symbol("line"); @@ -182,7 +188,7 @@ async fn main() -> Result<(), Box> { Ok(()) }) .create_cap(t); - root_ds.assert(t, language(), &syndicate::schemas::dataspace::Observe { + log_ds.assert(t, language(), &syndicate::schemas::dataspace::Observe { pattern: syndicate_macros::pattern!(), observer: e, }); diff --git a/syndicate-server/src/script/mod.rs b/syndicate-server/src/script/mod.rs new file mode 100644 index 0000000..5d798b0 --- /dev/null +++ b/syndicate-server/src/script/mod.rs @@ -0,0 +1,752 @@ +use std::io; +use std::borrow::Cow; +use std::path::PathBuf; +use std::sync::Arc; + +use syndicate::actor::*; +use syndicate::dataspace::Dataspace; +use syndicate::during; +use syndicate::enclose; +use syndicate::schemas::dataspace; +use syndicate::schemas::dataspace_patterns as P; +use syndicate::schemas::sturdy; +use syndicate::value::Map; +use syndicate::value::NestedValue; +use syndicate::value::Record; +use syndicate::value::Set; +use syndicate::value::Value; +use syndicate::value::signed_integer::SignedInteger; + +use crate::language::language; + +#[derive(Debug)] +struct PatternInstantiator<'env> { + env: &'env Env, + binding_names: Vec, +} + +#[derive(Debug, Clone)] +pub struct Env { + pub path: PathBuf, + bindings: Map, +} + +#[derive(Debug)] +pub struct Parser<'t> { + tokens: &'t [AnyValue], + errors: Vec, +} + +#[derive(Debug)] +pub enum Parsed { + Value(T), + Skip, + Eof, +} + +#[derive(Debug, Clone)] +pub enum Instruction { + Assert { + target: String, + template: AnyValue, + }, + React { + target: String, + pattern_template: AnyValue, + body: Box, + }, + Sequence { + instructions: Vec, + }, + Let { + pattern_template: AnyValue, + expr: Expr, + }, +} + +#[derive(Debug, Clone)] +pub enum Expr { + Template { + template: AnyValue, + }, + Dataspace, +} + +#[derive(Debug, Clone)] +enum RewriteTemplate { + Filter { + pattern_template: AnyValue, + }, + Rewrite { + pattern_template: AnyValue, + template_template: AnyValue, + }, +} + +#[derive(Debug)] +enum Symbolic { + Reference(String), + Binder(String), + Discard, + Literal(String), + Bare(String), +} + +impl Default for Parsed { + fn default() -> Self { + Parsed::Skip + } +} + +fn analyze(s: &str) -> Symbolic { + if s == "_" { + Symbolic::Discard + } else if s.starts_with("?") { + Symbolic::Binder(s[1..].to_owned()) + } else if s.starts_with("$") { + Symbolic::Reference(s[1..].to_owned()) + } else if s.starts_with("=") { + Symbolic::Literal(s[1..].to_owned()) + } else { + Symbolic::Bare(s.to_owned()) + } +} + +fn bad_instruction(message: &str) -> io::Error { + io::Error::new(io::ErrorKind::InvalidData, message) +} + +fn discard() -> P::Pattern { + P::Pattern::DDiscard(Box::new(P::DDiscard)) +} + +fn dlit(value: AnyValue) -> P::Pattern { + P::Pattern::DLit(Box::new(P::DLit { value })) +} + +fn tlit(value: AnyValue) -> sturdy::Template { + sturdy::Template::Lit(Box::new(sturdy::Lit { value })) +} + +fn parse_attenuation(r: &Record) -> io::Result)>> { + if r.label() != &AnyValue::symbol("*") { + return Ok(None); + } + + if r.fields().len() != 2 { + Err(bad_instruction(&format!( + "Attenuation requires a reference and a sequence of rewrites; got {:?}", + r)))?; + } + + let base_name = match r.fields()[0].value().as_symbol().map(|s| analyze(&s)) { + Some(Symbolic::Reference(s)) => s, + _ => Err(bad_instruction(&format!( + "Attenuation must have variable reference as first argument; got {:?}", + r.fields()[0])))?, + }; + + let raw_alternatives = match r.fields()[1].value().as_sequence() { + None => Err(bad_instruction(&format!( + "Attenuation of {:?} must have sequence of rewrites; got {:?}", + r.fields()[0], + r.fields()[1])))?, + Some(vs) => vs, + }; + + let mut alternatives = Vec::new(); + + for e in raw_alternatives.iter() { + match e.value().as_simple_record("filter", Some(1)) { + Some(fields) => + alternatives.push(RewriteTemplate::Filter { + pattern_template: fields[0].clone() + }), + None => match e.value().as_simple_record("rewrite", Some(2)) { + Some(fields) => + alternatives.push(RewriteTemplate::Rewrite { + pattern_template: fields[0].clone(), + template_template: fields[1].clone(), + }), + None => Err(bad_instruction(&format!( + "Bad rewrite in attenuation of {:?}: {:?}", + r.fields()[0], + e)))?, + } + } + } + + Ok(Some((base_name, alternatives))) +} + +impl<'env> PatternInstantiator<'env> { + fn instantiate_pattern(&mut self, template: &AnyValue) -> io::Result { + Ok(match template.value() { + Value::Boolean(_) | + Value::Float(_) | + Value::Double(_) | + Value::SignedInteger(_) | + Value::String(_) | + Value::ByteString(_) | + Value::Embedded(_) => + dlit(template.clone()), + + Value::Symbol(s) => match analyze(s) { + Symbolic::Discard => discard(), + Symbolic::Binder(s) => { + self.binding_names.push(s); + P::Pattern::DBind(Box::new(P::DBind { pattern: discard() })) + } + Symbolic::Reference(s) => + dlit(self.env.lookup(&s, "pattern-template variable")?.clone()), + Symbolic::Literal(s) | Symbolic::Bare(s) => + dlit(Value::Symbol(s).wrap()), + }, + + Value::Record(r) => match parse_attenuation(r)? { + Some((base_name, alternatives)) => + dlit(self.env.eval_attenuation(base_name, alternatives)?), + None => { + // TODO: properly consolidate constant patterns into literals. + match self.instantiate_pattern(r.label())? { + P::Pattern::DLit(b) => + P::Pattern::DCompound(Box::new(P::DCompound::Rec { + ctor: Box::new(P::CRec { + label: b.value, + arity: r.fields().len().into(), + }), + members: r.fields().iter().enumerate() + .map(|(i, p)| Ok((i.into(), self.instantiate_pattern(p)?))) + .filter(|e| discard() != e.as_ref().unwrap().1) + .collect::>>()?, + })), + _ => Err(bad_instruction("Record pattern must have literal label"))?, + } + }, + }, + Value::Sequence(v) => + P::Pattern::DCompound(Box::new(P::DCompound::Arr { + ctor: Box::new(P::CArr { + arity: v.len().into(), + }), + members: v.iter().enumerate() + .map(|(i, p)| Ok((i.into(), self.instantiate_pattern(p)?))) + .filter(|e| discard() != e.as_ref().unwrap().1) + .collect::>>()?, + })), + Value::Set(_) => + Err(bad_instruction(&format!("Sets not permitted in patterns: {:?}", template)))?, + Value::Dictionary(v) => + P::Pattern::DCompound(Box::new(P::DCompound::Dict { + ctor: Box::new(P::CDict), + members: v.iter() + .map(|(a, b)| Ok((a.clone(), self.instantiate_pattern(b)?))) + .collect::>>()?, + })), + }) + } +} + +impl Env { + pub fn new(path: PathBuf, bindings: Map) -> Self { + Env { + path: path.clone(), + bindings, + } + } + + pub fn clone_with_path(&self, path: PathBuf) -> Self { + Env { + path, + bindings: self.bindings.clone(), + } + } + + fn lookup_target(&self, s: &str) -> io::Result> { + Ok(self.lookup(s, "target variable")?.value().to_embedded()?.clone()) + } + + fn lookup(&self, s: &str, what: &'static str) -> io::Result { + if s == "." { + Ok(AnyValue::new(self.bindings.iter().map(|(k, v)| (AnyValue::new(k), v.clone())) + .collect::>())) + } else { + Ok(self.bindings.get(s).ok_or_else( + || bad_instruction(&format!("Undefined {}: {:?}", what, s)))?.clone()) + } + } + + fn instantiate_pattern( + &self, + pattern_template: &AnyValue, + ) -> io::Result<(Vec, P::Pattern)> { + let mut inst = PatternInstantiator { + env: self, + binding_names: Vec::new(), + }; + let pattern = inst.instantiate_pattern(pattern_template)?; + Ok((inst.binding_names, pattern)) + } + + fn instantiate_value(&self, template: &AnyValue) -> io::Result { + Ok(match template.value() { + Value::Boolean(_) | + Value::Float(_) | + Value::Double(_) | + Value::SignedInteger(_) | + Value::String(_) | + Value::ByteString(_) | + Value::Embedded(_) => + template.clone(), + + Value::Symbol(s) => match analyze(s) { + Symbolic::Binder(_) | Symbolic::Discard => + Err(bad_instruction(&format!( + "Invalid use of wildcard in template: {:?}", template)))?, + Symbolic::Reference(s) => + self.lookup(&s, "template variable")?, + Symbolic::Literal(s) | Symbolic::Bare(s) => + Value::Symbol(s).wrap(), + }, + + Value::Record(r) => match parse_attenuation(r)? { + Some((base_name, alternatives)) => + self.eval_attenuation(base_name, alternatives)?, + None => + Value::Record(Record(r.fields_vec().iter().map(|a| self.instantiate_value(a)) + .collect::, _>>()?)).wrap(), + }, + Value::Sequence(v) => + Value::Sequence(v.iter().map(|a| self.instantiate_value(a)) + .collect::, _>>()?).wrap(), + Value::Set(v) => + Value::Set(v.iter().map(|a| self.instantiate_value(a)) + .collect::, _>>()?).wrap(), + Value::Dictionary(v) => + Value::Dictionary(v.iter().map(|(a,b)| Ok((self.instantiate_value(a)?, + self.instantiate_value(b)?))) + .collect::>>()?).wrap(), + }) + } + + pub fn safe_eval(&mut self, t: &mut Activation, i: &Instruction) -> bool { + match self.eval(t, i) { + Ok(()) => true, + Err(error) => { + tracing::error!(path = ?self.path, ?error); + t.stop(); + false + } + } + } + + pub fn extend(&mut self, binding_names: &Vec, captures: Vec) { + for (k, v) in binding_names.iter().zip(captures) { + self.bindings.insert(k.clone(), v); + } + } + + fn eval_attenuation( + &self, + base_name: String, + alternatives: Vec, + ) -> io::Result { + let base_value = self.lookup(&base_name, "attenuation-base variable")?; + match base_value.value().as_embedded() { + None => Err(bad_instruction(&format!( + "Value to be attenuated is {:?} but must be capability", + base_value))), + Some(base_cap) => { + match base_cap.attenuate(&sturdy::Attenuation(vec![ + self.instantiate_caveat(&alternatives)?])) + { + Ok(derived_cap) => Ok(AnyValue::domain(derived_cap)), + Err(caveat_error) => + Err(bad_instruction(&format!("Attenuation of {:?} failed: {:?}", + base_value, + caveat_error))), + } + } + } + } + + pub fn eval(&mut self, t: &mut Activation, i: &Instruction) -> io::Result<()> { + match i { + Instruction::Assert { target, template } => { + self.lookup_target(target)?.assert(t, &(), &self.instantiate_value(template)?); + } + Instruction::React { target, pattern_template, body } => { + let (binding_names, pattern) = self.instantiate_pattern(pattern_template)?; + let observer = during::entity(self.clone()) + .on_asserted_facet(enclose!( + (binding_names, body) move |env, t, captures: AnyValue| { + if let Some(captures) = captures.value_owned().into_sequence() { + let mut env = env.clone(); + env.extend(&binding_names, captures); + env.safe_eval(t, &*body); + } + Ok(()) + })) + .create_cap(t); + self.lookup_target(target)?.assert(t, language(), &dataspace::Observe { + pattern, + observer, + }); + } + Instruction::Sequence { instructions } => { + for i in instructions { + self.eval(t, i)?; + } + } + Instruction::Let { pattern_template, expr } => { + let (binding_names, pattern) = self.instantiate_pattern(pattern_template)?; + let value = self.eval_expr(t, expr)?; + match pattern.match_value(&value) { + None => Err(bad_instruction( + &format!("Could not match pattern {:?} with value {:?}", + pattern_template, + value)))?, + Some(captures) => { + self.extend(&binding_names, captures); + } + } + } + } + Ok(()) + } + + pub fn eval_expr(&self, t: &mut Activation, e: &Expr) -> io::Result { + match e { + Expr::Template { template } => self.instantiate_value(template), + Expr::Dataspace => Ok(AnyValue::domain(Cap::new(&t.create(Dataspace::new())))), + } + } + + fn instantiate_caveat( + &self, + alternatives: &Vec, + ) -> io::Result { + let mut rewrites = Vec::new(); + for rw in alternatives { + match rw { + RewriteTemplate::Filter { pattern_template } => { + let (_binding_names, pattern) = self.instantiate_pattern(pattern_template)?; + rewrites.push(sturdy::Rewrite { + pattern: embed_pattern(&P::Pattern::DBind(Box::new(P::DBind { pattern }))), + template: sturdy::Template::TRef(Box::new(sturdy::TRef { binding: 0.into() })), + }) + } + RewriteTemplate::Rewrite { pattern_template, template_template } => { + let (binding_names, pattern) = self.instantiate_pattern(pattern_template)?; + rewrites.push(sturdy::Rewrite { + pattern: embed_pattern(&pattern), + template: self.instantiate_template(&binding_names, template_template)?, + }) + } + } + } + if rewrites.len() == 1 { + Ok(sturdy::Caveat::Rewrite(Box::new(rewrites.pop().unwrap()))) + } else { + Ok(sturdy::Caveat::Alts(Box::new(sturdy::Alts { + alternatives: rewrites, + }))) + } + } + + fn instantiate_template( + &self, + binding_names: &Vec, + template: &AnyValue, + ) -> io::Result { + let find_bound = |s: &str| { + binding_names.iter().enumerate().find(|(_i, n)| *n == s).map(|(i, _n)| i) + }; + + Ok(match template.value() { + Value::Boolean(_) | + Value::Float(_) | + Value::Double(_) | + Value::SignedInteger(_) | + Value::String(_) | + Value::ByteString(_) | + Value::Embedded(_) => + tlit(template.clone()), + + Value::Symbol(s) => match analyze(s) { + Symbolic::Binder(_) | Symbolic::Discard => + Err(bad_instruction(&format!( + "Invalid use of wildcard in template: {:?}", template)))?, + Symbolic::Reference(s) => + match find_bound(&s) { + Some(i) => + sturdy::Template::TRef(Box::new(sturdy::TRef { binding: i.into() })), + None => + tlit(self.lookup(&s, "attenuation-template variable")?), + }, + Symbolic::Literal(s) | Symbolic::Bare(s) => + tlit(Value::Symbol(s).wrap()), + }, + + Value::Record(r) => match parse_attenuation(r)? { + Some((base_name, alternatives)) => + match find_bound(&base_name) { + Some(i) => + sturdy::Template::TAttenuate(Box::new(sturdy::TAttenuate { + template: sturdy::Template::TRef(Box::new(sturdy::TRef { + binding: i.into(), + })), + attenuation: sturdy::Attenuation(vec![ + self.instantiate_caveat(&alternatives)?]), + })), + None => + tlit(self.eval_attenuation(base_name, alternatives)?), + }, + None => { + // TODO: properly consolidate constant templates into literals. + match self.instantiate_template(binding_names, r.label())? { + sturdy::Template::Lit(b) => + sturdy::Template::TCompound(Box::new(sturdy::TCompound { + ctor: sturdy::ConstructorSpec::CRec(Box::new(sturdy::CRec { + label: b.value, + arity: r.fields().len().into(), + })), + members: sturdy::TCompoundMembers( + r.fields().iter().enumerate() + .map(|(i, t)| Ok( + (AnyValue::new(i), + self.instantiate_template(binding_names, t)?))) + .collect::>>()?), + })), + _ => Err(bad_instruction("Record template must have literal label"))?, + } + } + }, + Value::Sequence(v) => + sturdy::Template::TCompound(Box::new(sturdy::TCompound { + ctor: sturdy::ConstructorSpec::CArr(Box::new(sturdy::CArr { + arity: v.len().into(), + })), + members: sturdy::TCompoundMembers( + v.iter().enumerate() + .map(|(i, p)| Ok( + (AnyValue::new(i), + self.instantiate_template(binding_names, p)?))) + .collect::>>()?), + })), + Value::Set(_) => + Err(bad_instruction(&format!("Sets not permitted in templates: {:?}", template)))?, + Value::Dictionary(v) => + sturdy::Template::TCompound(Box::new(sturdy::TCompound { + ctor: sturdy::ConstructorSpec::CDict(Box::new(sturdy::CDict)), + members: sturdy::TCompoundMembers( + v.iter() + .map(|(a, b)| Ok((a.clone(), self.instantiate_template(binding_names, b)?))) + .collect::>>()?), + })), + }) + } +} + +fn embed_pattern(p: &P::Pattern) -> sturdy::Pattern { + match p { + P::Pattern::DDiscard(_) => sturdy::Pattern::PDiscard(Box::new(sturdy::PDiscard)), + P::Pattern::DBind(b) => sturdy::Pattern::PBind(Box::new(sturdy::PBind { + pattern: embed_pattern(&b.pattern), + })), + P::Pattern::DLit(b) => sturdy::Pattern::Lit(Box::new(sturdy::Lit { + value: b.value.clone(), + })), + P::Pattern::DCompound(b) => sturdy::Pattern::PCompound(Box::new(match &**b { + P::DCompound::Rec { ctor, members } => + sturdy::PCompound { + ctor: sturdy::ConstructorSpec::CRec(Box::new(sturdy::CRec { + label: ctor.label.clone(), + arity: ctor.arity.clone(), + })), + members: sturdy::PCompoundMembers( + members.iter().map(|(k, v)| (AnyValue::new(k), embed_pattern(v))).collect()), + }, + P::DCompound::Arr { ctor, members } => + sturdy::PCompound { + ctor: sturdy::ConstructorSpec::CArr(Box::new(sturdy::CArr { + arity: ctor.arity.clone(), + })), + members: sturdy::PCompoundMembers( + members.iter().map(|(k, v)| (AnyValue::new(k), embed_pattern(v))).collect()), + }, + P::DCompound::Dict { ctor: _, members } => + sturdy::PCompound { + ctor: sturdy::ConstructorSpec::CDict(Box::new(sturdy::CDict)), + members: sturdy::PCompoundMembers( + members.iter().map(|(k, v)| (k.clone(), embed_pattern(v))).collect()), + }, + })), + } +} + +impl<'t> Parser<'t> { + pub fn new(tokens: &'t [AnyValue]) -> Self { + Parser { + tokens, + errors: Vec::new(), + } + } + + fn peek(&mut self) -> &'t Value { + self.tokens[0].value() + } + + fn shift(&mut self) -> AnyValue { + let v = self.tokens[0].clone(); + self.drop(); + v + } + + fn drop(&mut self) { + self.tokens = &self.tokens[1..]; + } + + fn len(&self) -> usize { + self.tokens.len() + } + + fn ateof(&self) -> bool { + self.len() == 0 + } + + fn error<'a, T: Default, E: Into>>(&mut self, message: E) -> T { + self.errors.push(message.into().into_owned()); + T::default() + } + + pub fn parse(&mut self, target: &str) -> Parsed { + if self.ateof() { + return Parsed::Eof; + } + + if self.peek().is_record() || self.peek().is_dictionary() { + return Parsed::Value(Instruction::Assert { + target: target.to_owned(), + template: self.shift(), + }); + } + + if let Some(tokens) = self.peek().as_sequence() { + self.drop(); + return Parsed::Value(Instruction::Sequence { + instructions: Parser::new(tokens).parse_all(target), + }); + } + + if let Some(s) = self.peek().as_symbol() { + match analyze(s) { + Symbolic::Binder(s) => { + self.drop(); + + if s.len() > 0 { + let m = format!("Invalid use of pattern binder in target: {:?}", self.peek()); + return self.error(m); + } + + if self.ateof() { + return self.error("Missing pattern and instruction in react"); + } + let pattern_template = self.shift(); + + return match self.parse(target) { + Parsed::Eof => + self.error(format!( + "Missing instruction in react with pattern {:?}", + pattern_template)), + Parsed::Skip => + Parsed::Skip, + Parsed::Value(body) => + Parsed::Value(Instruction::React { + target: target.to_owned(), + pattern_template, + body: Box::new(body), + }), + }; + } + Symbolic::Discard => { + self.drop(); + let m = format!("Invalid use of discard in target: {:?}", self.peek()); + return self.error(m); + }, + Symbolic::Reference(s) => { + self.drop(); + if self.ateof() { + let m = format!("Missing instruction after retarget: {:?}", self.peek()); + return self.error(m); + } + return self.parse(&s); + } + Symbolic::Bare(s) => { + if s == "let" { + self.drop(); + if self.len() >= 2 && self.tokens[1].value().as_symbol().map(String::as_str) == Some("=") + { + let pattern_template = self.shift(); + self.drop(); + return match self.parse_expr() { + Some(expr) => + Parsed::Value(Instruction::Let { pattern_template, expr }), + None => Parsed::Skip, + }; + } else { + return self.error("Invalid let statement"); + } + } else { + /* fall through */ + } + } + Symbolic::Literal(_) => { + /* fall through */ + } + } + } + + { + let m = format!("Invalid token: {:?}", self.shift()); + return self.error(m); + } + } + + pub fn parse_all(&mut self, target: &str) -> Vec { + let mut instructions = Vec::new(); + loop { + match self.parse(target) { + Parsed::Value(i) => instructions.push(i), + Parsed::Skip => (), + Parsed::Eof => break, + } + } + instructions + } + + pub fn parse_top(&mut self, target: &str) -> Result, Vec> { + let instructions = self.parse_all(target); + if self.errors.is_empty() { + match instructions.len() { + 0 => Ok(None), + _ => Ok(Some(Instruction::Sequence { instructions })), + } + } else { + Err(std::mem::take(&mut self.errors)) + } + } + + pub fn parse_expr(&mut self) -> Option { + if self.ateof() { + return None; + } + + if self.peek() == &Value::symbol("dataspace") { + self.drop(); + return Some(Expr::Dataspace); + } + + return Some(Expr::Template{ template: self.shift() }); + } +} diff --git a/syndicate-server/src/services/config_watcher.rs b/syndicate-server/src/services/config_watcher.rs index 5ebf50e..ccdc9a8 100644 --- a/syndicate-server/src/services/config_watcher.rs +++ b/syndicate-server/src/services/config_watcher.rs @@ -14,10 +14,7 @@ use std::time::Duration; use syndicate::actor::*; use syndicate::error::Error; -use syndicate::during; use syndicate::enclose; -use syndicate::schemas::dataspace; -use syndicate::schemas::dataspace_patterns as P; use syndicate::supervise::{Supervisor, SupervisorConfiguration}; use syndicate::value::BinarySource; use syndicate::value::IOBinarySource; @@ -25,30 +22,25 @@ use syndicate::value::Map; use syndicate::value::NestedValue; use syndicate::value::NoEmbeddedDomainCodec; use syndicate::value::Reader; -use syndicate::value::Record; -use syndicate::value::Set; -use syndicate::value::Value; use syndicate::value::ViaCodec; -use syndicate::value::signed_integer::SignedInteger; use crate::language::language; use crate::lifecycle; use crate::schemas::internal_services; +use crate::script; use syndicate_macros::during; -pub fn on_demand(t: &mut Activation, config_ds: Arc, root_ds: Arc) { +pub fn on_demand(t: &mut Activation, config_ds: Arc) { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { - Ok(during!( - t, config_ds, language(), , |t| { - Supervisor::start( - t, - syndicate::name!(parent: None, "config", spec = ?spec), - SupervisorConfiguration::default(), - enclose!((config_ds, spec) lifecycle::updater(config_ds, spec)), - enclose!((config_ds, root_ds) move |t| - enclose!((config_ds, root_ds, spec) run(t, config_ds, root_ds, spec)))) - })) + Ok(during!(t, config_ds, language(), , |t| { + Supervisor::start( + t, + syndicate::name!(parent: None, "config", spec = ?spec), + SupervisorConfiguration::default(), + enclose!((config_ds, spec) lifecycle::updater(config_ds, spec)), + enclose!((config_ds) move |t| enclose!((config_ds, spec) run(t, config_ds, spec)))) + })) }); } @@ -56,316 +48,37 @@ fn convert_notify_error(e: notify::Error) -> Error { syndicate::error::error(&format!("Notify error: {:?}", e), AnyValue::new(false)) } -#[derive(Debug, Clone)] -enum Instruction { - Assert { - target: String, - template: AnyValue, - }, - React { - target: String, - pattern_template: AnyValue, - body: Box, - }, - Sequence { - instructions: Vec, - }, -} - -#[derive(Debug)] -enum Symbolic { - Reference(String), - Binder(String), - Discard, - Literal(String), -} - -fn analyze(s: &str) -> Symbolic { - if s == "_" { - Symbolic::Discard - } else if s.starts_with("?") { - Symbolic::Binder(s[1..].to_owned()) - } else if s.starts_with("$") { - Symbolic::Reference(s[1..].to_owned()) - } else if s.starts_with("=") { - Symbolic::Literal(s[1..].to_owned()) - } else { - Symbolic::Literal(s.to_owned()) - } -} - -fn bad_instruction(message: String) -> io::Error { - io::Error::new(io::ErrorKind::InvalidData, message) -} - -fn discard() -> P::Pattern { - P::Pattern::DDiscard(Box::new(P::DDiscard)) -} - -#[derive(Debug)] -struct PatternInstantiator<'e> { - env: &'e Env, - binding_names: Vec, -} - -#[derive(Debug, Clone)] -struct Env(Map); - -impl<'e> PatternInstantiator<'e> { - fn instantiate_pattern(&mut self, template: &AnyValue) -> io::Result { - Ok(match template.value() { - Value::Boolean(_) | - Value::Float(_) | - Value::Double(_) | - Value::SignedInteger(_) | - Value::String(_) | - Value::ByteString(_) | - Value::Embedded(_) => - P::Pattern::DLit(Box::new(P::DLit { value: template.clone() })), - - Value::Symbol(s) => match analyze(s) { - Symbolic::Discard => discard(), - Symbolic::Binder(s) => { - self.binding_names.push(s); - P::Pattern::DBind(Box::new(P::DBind { pattern: discard() })) - } - Symbolic::Reference(s) => - P::Pattern::DLit(Box::new(P::DLit { - value: self.env.0.get(&s) - .ok_or_else(|| bad_instruction( - format!("Undefined pattern-template variable: {:?}", template)))? - .clone(), - })), - Symbolic::Literal(s) => - P::Pattern::DLit(Box::new(P::DLit { - value: Value::Symbol(s).wrap(), - })), - }, - - Value::Record(r) => - P::Pattern::DCompound(Box::new(P::DCompound::Rec { - ctor: Box::new(P::CRec { - label: r.label().clone(), - arity: r.fields().len().into(), - }), - members: r.fields().iter().enumerate() - .map(|(i, p)| Ok((i.into(), self.instantiate_pattern(p)?))) - .filter(|e| discard() != e.as_ref().unwrap().1) - .collect::>>()?, - })), - Value::Sequence(v) => - P::Pattern::DCompound(Box::new(P::DCompound::Arr { - ctor: Box::new(P::CArr { - arity: v.len().into(), - }), - members: v.iter().enumerate() - .map(|(i, p)| Ok((i.into(), self.instantiate_pattern(p)?))) - .filter(|e| discard() != e.as_ref().unwrap().1) - .collect::>>()?, - })), - Value::Set(_) => - Err(bad_instruction(format!("Sets not permitted in patterns: {:?}", template)))?, - Value::Dictionary(v) => - P::Pattern::DCompound(Box::new(P::DCompound::Dict { - ctor: Box::new(P::CDict), - members: v.iter() - .map(|(a, b)| Ok((a.clone(), self.instantiate_pattern(b)?))) - .collect::>>()?, - })), - }) - } -} - -impl Env { - fn lookup_target(&self, target: &String) -> io::Result> { - Ok(self.0.get(target) - .ok_or_else(|| bad_instruction(format!("Undefined target variable: {:?}", target)))? - .value().to_embedded()?.clone()) - } - - fn instantiate_value(&self, template: &AnyValue) -> io::Result { - Ok(match template.value() { - Value::Boolean(_) | - Value::Float(_) | - Value::Double(_) | - Value::SignedInteger(_) | - Value::String(_) | - Value::ByteString(_) | - Value::Embedded(_) => - template.clone(), - - Value::Symbol(s) => match analyze(s) { - Symbolic::Binder(_) | Symbolic::Discard => - Err(bad_instruction( - format!("Invalid use of wildcard in template: {:?}", template)))?, - Symbolic::Reference(s) => - self.0.get(&s).ok_or_else(|| bad_instruction( - format!("Undefined template variable: {:?}", template)))?.clone(), - Symbolic::Literal(s) => - Value::Symbol(s).wrap(), - }, - - Value::Record(r) => - Value::Record(Record(r.fields_vec().iter().map(|a| self.instantiate_value(a)) - .collect::, _>>()?)).wrap(), - Value::Sequence(v) => - Value::Sequence(v.iter().map(|a| self.instantiate_value(a)) - .collect::, _>>()?).wrap(), - Value::Set(v) => - Value::Set(v.iter().map(|a| self.instantiate_value(a)) - .collect::, _>>()?).wrap(), - Value::Dictionary(v) => - Value::Dictionary(v.iter().map(|(a,b)| Ok((self.instantiate_value(a)?, - self.instantiate_value(b)?))) - .collect::>>()?).wrap(), - }) - } -} - -impl Instruction { - fn eval(&self, t: &mut Activation, env: &Env) -> io::Result<()> { - match self { - Instruction::Assert { target, template } => { - env.lookup_target(target)?.assert(t, &(), &env.instantiate_value(template)?); - } - Instruction::React { target, pattern_template, body } => { - let mut inst = PatternInstantiator { - env, - binding_names: Vec::new(), - }; - let pattern = inst.instantiate_pattern(pattern_template)?; - let binding_names = inst.binding_names; - let observer = during::entity(env.clone()) - .on_asserted_facet(enclose!( - (binding_names, body) move |env, t, captures: AnyValue| { - if let Some(captures) = captures.value_owned().into_sequence() { - let mut env = env.clone(); - for (k, v) in binding_names.iter().zip(captures) { - env.0.insert(k.clone(), v); - } - body.eval(t, &env)?; - } - Ok(()) - })) - .create_cap(t); - env.lookup_target(target)?.assert(t, language(), &dataspace::Observe { - pattern, - observer, - }); - } - Instruction::Sequence { instructions } => { - for i in instructions { - i.eval(t, env)?; - } - } - } - Ok(()) - } - - fn parse<'t>(target: &str, tokens: &'t [AnyValue]) -> io::Result> { - if tokens.len() == 0 { - return Ok(None); - } - - if tokens[0].value().is_record() || tokens[0].value().is_dictionary() { - return Ok(Some((Instruction::Assert { - target: target.to_owned(), - template: tokens[0].clone(), - }, &tokens[1..]))); - } - - if let Some(tokens) = tokens[0].value().as_sequence() { - return Ok(Some((Instruction::Sequence { - instructions: Instruction::parse_all(target, tokens)?, - }, &tokens[1..]))); - } - - if let Some(s) = tokens[0].value().as_symbol() { - match analyze(s) { - Symbolic::Binder(s) => - if s.len() == 0 { - if tokens.len() == 1 { - Err(bad_instruction(format!("Missing pattern and instruction in react")))?; - } else { - let pattern_template = tokens[1].clone(); - match Instruction::parse(target, &tokens[2..])? { - None => - Err(bad_instruction(format!("Missing instruction in react with pattern {:?}", tokens[1])))?, - Some((body, tokens)) => - return Ok(Some((Instruction::React { - target: target.to_owned(), - pattern_template, - body: Box::new(body), - }, tokens))), - } - } - } else { - Err(bad_instruction(format!("Invalid use of pattern binder in target: {:?}", tokens[0])))?; - }, - Symbolic::Discard => - Err(bad_instruction(format!("Invalid use of discard in target: {:?}", tokens[0])))?, - Symbolic::Reference(s) => - if tokens.len() == 1 { - Err(bad_instruction(format!("Missing instruction after retarget: {:?}", tokens[0])))?; - } else if tokens[1].value().is_symbol() { - Err(bad_instruction(format!("Two retargets in a row: {:?} {:?}", tokens[0], tokens[1])))?; - } else { - return Instruction::parse(&s, &tokens[1..]); - }, - Symbolic::Literal(_) => - /* fall through */ (), - } - } - - Err(bad_instruction(format!("Invalid token: {:?}", tokens[0])))? - } - - fn parse_all(target: &str, mut tokens: &[AnyValue]) -> io::Result> { - let mut instructions = Vec::new(); - while let Some((i, more)) = Instruction::parse(target, tokens)? { - instructions.push(i); - tokens = more; - } - Ok(instructions) - } -} - fn process_existing_file( t: &mut Activation, - config_ds: &Arc, - root_ds: &Arc, - path: &PathBuf, + mut env: script::Env, ) -> io::Result> { - let tokens: Vec = IOBinarySource::new(fs::File::open(path)?) + let tokens: Vec = IOBinarySource::new(fs::File::open(&env.path)?) .text::(ViaCodec::new(NoEmbeddedDomainCodec)) .configured(true) .collect::, _>>()?; - let instructions = Instruction::parse_all("config", &tokens)?; - if instructions.is_empty() { - Ok(None) - } else { - let mut env = Map::new(); - env.insert("config".to_owned(), AnyValue::domain(config_ds.clone())); - env.insert("root".to_owned(), AnyValue::domain(root_ds.clone())); - match t.facet(|t| Ok(Instruction::Sequence { instructions }.eval(t, &Env(env))?)) { - Ok(facet_id) => Ok(Some(facet_id)), - Err(error) => { - tracing::error!(?path, ?error); - Ok(None) + match script::Parser::new(&tokens).parse_top("config") { + Ok(Some(i)) => Ok(Some(t.facet(|t| { + env.safe_eval(t, &i); + Ok(()) + }).expect("Successful facet startup"))), + Ok(None) => Ok(None), + Err(errors) => { + for e in errors { + tracing::error!(path = ?env.path, message = %e); } + Err(io::Error::new(io::ErrorKind::InvalidData, + format!("Parse of {:?} failed", env.path))) } } } fn process_path( t: &mut Activation, - config_ds: &Arc, - root_ds: &Arc, - path: &PathBuf, + env: script::Env, ) -> io::Result> { - match fs::metadata(path) { + match fs::metadata(&env.path) { Ok(md) => if md.is_file() { - process_existing_file(t, config_ds, root_ds, path) + process_existing_file(t, env) } else { Ok(None) } @@ -387,23 +100,23 @@ fn is_hidden(path: &PathBuf) -> bool { fn scan_file( t: &mut Activation, path_state: &mut Map, - config_ds: &Arc, - root_ds: &Arc, - path: &PathBuf, + env: script::Env, ) -> bool { - if is_hidden(path) { + let path = env.path.clone(); + if is_hidden(&path) { return true; } - tracing::info!("scan_file: {:?}", path); - match process_path(t, config_ds, root_ds, path) { + tracing::trace!("scan_file: scanning {:?}", &path); + match process_path(t, env) { Ok(maybe_facet_id) => { if let Some(facet_id) = maybe_facet_id { - path_state.insert(path.clone(), facet_id); + tracing::info!("scan_file: processed {:?}", &path); + path_state.insert(path, facet_id); } true }, Err(e) => { - tracing::warn!("scan_file: {:?}: {:?}", path, e); + tracing::debug!("scan_file: {:?}: {:?}", &path, e); false } } @@ -413,53 +126,53 @@ fn initial_scan( t: &mut Activation, path_state: &mut Map, config_ds: &Arc, - root_ds: &Arc, - path: &PathBuf, + env: script::Env, ) { - if is_hidden(path) { + if is_hidden(&env.path) { return; } - match fs::metadata(path) { + match fs::metadata(&env.path) { Ok(md) => if md.is_file() { - scan_file(t, path_state, config_ds, root_ds, path); + scan_file(t, path_state, env); } else { - match fs::read_dir(path) { + match fs::read_dir(&env.path) { Ok(entries) => for er in entries { match er { - Ok(e) => initial_scan(t, path_state, config_ds, root_ds, &e.path()), - Err(e) => tracing::warn!( - "initial_scan: transient during scan of {:?}: {:?}", path, e), + Ok(e) => + initial_scan(t, path_state, config_ds, env.clone_with_path(e.path())), + Err(e) => + tracing::warn!( + "initial_scan: transient during scan of {:?}: {:?}", &env.path, e), } } - Err(e) => tracing::warn!("initial_scan: enumerating {:?}: {:?}", path, e), + Err(e) => tracing::warn!("initial_scan: enumerating {:?}: {:?}", &env.path, e), } }, - Err(e) => tracing::warn!("initial_scan: `stat`ing {:?}: {:?}", path, e), + Err(e) => tracing::warn!("initial_scan: `stat`ing {:?}: {:?}", &env.path, e), } } fn run( t: &mut Activation, config_ds: Arc, - root_ds: Arc, spec: internal_services::ConfigWatcher, ) -> ActorResult { let path = fs::canonicalize(spec.path.clone())?; + let env = script::Env::new(path, spec.env.0.clone()); - tracing::info!("watching {:?}", &path); + tracing::info!("watching {:?}", &env.path); let (tx, rx) = channel(); let mut watcher = watcher(tx, Duration::from_millis(100)).map_err(convert_notify_error)?; - watcher.watch(&path, RecursiveMode::Recursive).map_err(convert_notify_error)?; + watcher.watch(&env.path, RecursiveMode::Recursive).map_err(convert_notify_error)?; let facet = t.facet.clone(); thread::spawn(move || { let mut path_state: Map = Map::new(); { - let root_path = path.clone().into(); facet.activate(Account::new(syndicate::name!("initial_scan")), |t| { - initial_scan(t, &mut path_state, &config_ds, &root_ds, &root_path); + initial_scan(t, &mut path_state, &config_ds, env.clone()); config_ds.assert(t, language(), &lifecycle::ready(&spec)); Ok(()) }).unwrap(); @@ -471,7 +184,8 @@ fn run( let mut to_stop = Vec::new(); for path in paths.into_iter() { let maybe_facet_id = path_state.remove(&path); - let new_content_ok = scan_file(t, &mut path_state, &config_ds, &root_ds, &path); + let new_content_ok = + scan_file(t, &mut path_state, env.clone_with_path(path.clone())); if let Some(old_facet_id) = maybe_facet_id { if new_content_ok { to_stop.push(old_facet_id); diff --git a/syndicate-server/src/services/daemon.rs b/syndicate-server/src/services/daemon.rs index 847fdb1..a7a135a 100644 --- a/syndicate-server/src/services/daemon.rs +++ b/syndicate-server/src/services/daemon.rs @@ -222,18 +222,23 @@ impl DaemonInstance { Err(_) => AnyValue::bytestring(buf), }; let now = AnyValue::new(chrono::Utc::now().to_rfc3339()); - facet.activate(Account::new(tracing::Span::current()), - enclose!((pid, service, kind) |t| { - log_ds.message(t, &(), &syndicate_macros::template!( - "")); - Ok(()) - }))?; + if facet.activate( + Account::new(tracing::Span::current()), + enclose!((pid, service, kind) |t| { + log_ds.message(t, &(), &syndicate_macros::template!( + "")); + Ok(()) + })).is_err() + { + break; + } } + Ok(LinkedTaskTermination::Normal) }); Ok(()) }); diff --git a/syndicate-server/src/services/tcp_relay_listener.rs b/syndicate-server/src/services/tcp_relay_listener.rs index 4013c45..55438d8 100644 --- a/syndicate-server/src/services/tcp_relay_listener.rs +++ b/syndicate-server/src/services/tcp_relay_listener.rs @@ -14,7 +14,7 @@ use crate::schemas::internal_services::TcpRelayListener; use syndicate_macros::during; -pub fn on_demand(t: &mut Activation, ds: Arc, gateway: Arc) { +pub fn on_demand(t: &mut Activation, ds: Arc) { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { Ok(during!(t, ds, language(), , |t| { Supervisor::start( @@ -22,13 +22,12 @@ pub fn on_demand(t: &mut Activation, ds: Arc, gateway: Arc) { syndicate::name!(parent: None, "relay", addr = ?spec), SupervisorConfiguration::default(), enclose!((ds, spec) lifecycle::updater(ds, spec)), - enclose!((ds, gateway) move |t| - enclose!((ds, gateway, spec) run(t, ds, gateway, spec)))) + enclose!((ds) move |t| enclose!((ds, spec) run(t, ds, spec)))) })) }); } -fn run(t: &mut Activation, ds: Arc, gateway: Arc, spec: TcpRelayListener) -> ActorResult { +fn run(t: &mut Activation, ds: Arc, spec: TcpRelayListener) -> ActorResult { let host = spec.addr.host.clone(); let port = u16::try_from(&spec.addr.port).map_err(|_| "Invalid TCP port number")?; let parent_span = tracing::Span::current(); @@ -43,15 +42,16 @@ fn run(t: &mut Activation, ds: Arc, gateway: Arc, spec: TcpRelayListen })?; loop { let (stream, addr) = listener.accept().await?; + let gatekeeper = spec.gatekeeper.clone(); Actor::new().boot( syndicate::name!(parent: parent_span.clone(), "conn"), - enclose!((gateway) move |t| Ok(t.linked_task(tracing::Span::current(), { + move |t| Ok(t.linked_task(tracing::Span::current(), { let facet = t.facet.clone(); async move { - detect_protocol(facet, stream, gateway, addr).await?; + detect_protocol(facet, stream, gatekeeper, addr).await?; Ok(LinkedTaskTermination::KeepFacet) } - })))); + }))); } }); Ok(()) diff --git a/syndicate-server/src/services/unix_relay_listener.rs b/syndicate-server/src/services/unix_relay_listener.rs index c188a33..66cd59c 100644 --- a/syndicate-server/src/services/unix_relay_listener.rs +++ b/syndicate-server/src/services/unix_relay_listener.rs @@ -18,7 +18,7 @@ use crate::schemas::internal_services::UnixRelayListener; use syndicate_macros::during; -pub fn on_demand(t: &mut Activation, ds: Arc, gateway: Arc) { +pub fn on_demand(t: &mut Activation, ds: Arc) { t.spawn(syndicate::name!("on_demand", module = module_path!()), move |t| { Ok(during!(t, ds, language(), , |t| { Supervisor::start( @@ -26,13 +26,12 @@ pub fn on_demand(t: &mut Activation, ds: Arc, gateway: Arc) { syndicate::name!(parent: None, "relay", addr = ?spec), SupervisorConfiguration::default(), enclose!((ds, spec) lifecycle::updater(ds, spec)), - enclose!((ds, gateway) move |t| - enclose!((ds, gateway, spec) run(t, ds, gateway, spec)))) + enclose!((ds) move |t| enclose!((ds, spec) run(t, ds, spec)))) })) }); } -fn run(t: &mut Activation, ds: Arc, gateway: Arc, spec: UnixRelayListener) -> ActorResult { +fn run(t: &mut Activation, ds: Arc, spec: UnixRelayListener) -> ActorResult { let path_str = spec.addr.path.clone(); let parent_span = tracing::Span::current(); let facet = t.facet.clone(); @@ -46,24 +45,23 @@ fn run(t: &mut Activation, ds: Arc, gateway: Arc, spec: UnixRelayListe loop { let (stream, _addr) = listener.accept().await?; let peer = stream.peer_cred()?; + let gatekeeper = spec.gatekeeper.clone(); Actor::new().boot( syndicate::name!(parent: parent_span.clone(), "conn", pid = ?peer.pid().unwrap_or(-1), uid = peer.uid()), - enclose!((gateway) |t| Ok(t.linked_task( - tracing::Span::current(), - { - let facet = t.facet.clone(); - async move { - tracing::info!(protocol = %"unix"); - let (i, o) = stream.into_split(); - run_connection(facet, - relay::Input::Bytes(Box::pin(i)), - relay::Output::Bytes(Box::pin(o)), - gateway)?; - Ok(LinkedTaskTermination::KeepFacet) - } - })))); + |t| Ok(t.linked_task(tracing::Span::current(), { + let facet = t.facet.clone(); + async move { + tracing::info!(protocol = %"unix"); + let (i, o) = stream.into_split(); + run_connection(facet, + relay::Input::Bytes(Box::pin(i)), + relay::Output::Bytes(Box::pin(o)), + gatekeeper)?; + Ok(LinkedTaskTermination::KeepFacet) + } + }))); } }); Ok(()) diff --git a/syndicate/src/pattern.rs b/syndicate/src/pattern.rs index 88cf165..5c14c76 100644 --- a/syndicate/src/pattern.rs +++ b/syndicate/src/pattern.rs @@ -3,6 +3,7 @@ use crate::schemas::dataspace_patterns::*; use preserves::value::NestedValue; use std::convert::TryFrom; +use std::convert::TryInto; #[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq)] pub enum PathStep { @@ -25,6 +26,10 @@ pub struct PatternAnalysis { pub capture_paths: Paths, } +struct PatternMatcher { + captures: Vec, +} + impl PatternAnalysis { pub fn new(p: &Pattern) -> Self { let mut analyzer = Analyzer { @@ -78,3 +83,81 @@ impl Analyzer { } } } + +impl Pattern { + pub fn match_value(&self, value: &N) -> Option> { + let mut matcher = PatternMatcher::new(); + if matcher.run(self, value) { + Some(matcher.captures) + } else { + None + } + } +} + +impl PatternMatcher { + fn new() -> Self { + PatternMatcher { + captures: Vec::new(), + } + } + + fn run(&mut self, pattern: &Pattern, value: &N) -> bool { + match pattern { + Pattern::DDiscard(_) => true, + Pattern::DBind(b) => { + self.captures.push(value.clone()); + self.run(&b.pattern, value) + } + Pattern::DLit(b) => value == &b.value, + Pattern::DCompound(b) => match &**b { + DCompound::Rec { ctor, members } => { + let arity = (&ctor.arity).try_into().expect("reasonable arity"); + match value.value().as_record(Some(arity)) { + None => false, + Some(r) => { + for (i, p) in members.iter() { + let i: usize = i.try_into().expect("reasonable index"); + if !self.run(p, &r.fields()[i]) { + return false; + } + } + true + } + } + } + DCompound::Arr { ctor, members } => { + let arity: usize = (&ctor.arity).try_into().expect("reasonable arity"); + match value.value().as_sequence() { + None => false, + Some(vs) => { + if vs.len() != arity { + return false; + } + for (i, p) in members.iter() { + let i: usize = i.try_into().expect("reasonable index"); + if !self.run(p, &vs[i]) { + return false; + } + } + true + } + } + } + DCompound::Dict { ctor: _, members } => { + match value.value().as_dictionary() { + None => false, + Some(entries) => { + for (k, p) in members.iter() { + if !entries.get(k).map(|v| self.run(p, v)).unwrap_or(false) { + return false; + } + } + true + } + } + } + } + } + } +}