From 581886835aa5d7f6a6161c3ed90212716b0c77a4 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 10 Apr 2024 17:03:09 +0200 Subject: [PATCH] New dataspace pattern implementation; update HTTP server --- syndicate-macros/src/lib.rs | 36 ++-- syndicate-macros/src/pat.rs | 35 +-- syndicate-rs.code-workspace | 15 ++ syndicate-schema-plugin/src/pattern_plugin.rs | 35 +-- syndicate-server/src/http.rs | 87 +++----- syndicate-server/src/script/mod.rs | 72 ++++--- syndicate-server/src/services/http_router.rs | 73 ++++--- .../src/services/tcp_relay_listener.rs | 49 +++-- syndicate/benches/bench_dataspace.rs | 44 ++-- syndicate/src/pattern.rs | 200 ++++++++++-------- syndicate/src/skeleton.rs | 123 +++++------ 11 files changed, 406 insertions(+), 363 deletions(-) create mode 100644 syndicate-rs.code-workspace diff --git a/syndicate-macros/src/lib.rs b/syndicate-macros/src/lib.rs index 586fbc1..c075b39 100644 --- a/syndicate-macros/src/lib.rs +++ b/syndicate-macros/src/lib.rs @@ -36,7 +36,7 @@ enum SymbolVariant<'a> { fn compile_sequence_members(vs: &[IOValue]) -> Vec { vs.iter().enumerate().map(|(i, f)| { let p = compile_pattern(f); - quote!((#i .into(), #p)) + quote!((syndicate::value::Value::from(#i).wrap(), #p)) }).collect::>() } @@ -151,16 +151,14 @@ fn compile_pattern(v: &IOValue) -> TokenStream { #[allow(non_snake_case)] let V_: TokenStream = quote!(syndicate::value); #[allow(non_snake_case)] - let MapFromIterator_: TokenStream = quote!(<#V_::Map<_, _> as std::iter::FromIterator<_>>::from_iter); + let MapFrom_: TokenStream = quote!(<#V_::Map<_, _>>::from); match v.value() { Value::Symbol(s) => match analyze_symbol(&s, true) { SymbolVariant::Binder(_) => - quote!(#P_::Pattern::DBind(Box::new(#P_::DBind { - pattern: #P_::Pattern::DDiscard(Box::new(#P_::DDiscard)) - }))), + quote!(#P_::Pattern::Bind{ pattern: Box::new(#P_::Pattern::Discard) }), SymbolVariant::Discard => - quote!(#P_::Pattern::DDiscard(Box::new(#P_::DDiscard))), + quote!(#P_::Pattern::Discard), SymbolVariant::Substitution(s) => lit(Ident::new(s, Span::call_site())), SymbolVariant::Normal(_) => @@ -172,9 +170,7 @@ fn compile_pattern(v: &IOValue) -> TokenStream { Some(label) => if label.starts_with("$") && r.arity() == 1 { let nested = compile_pattern(&r.fields()[0]); - quote!(#P_::Pattern::DBind(Box::new(#P_::DBind { - pattern: #nested - }))) + quote!(#P_::Pattern::Bind{ pattern: Box::new(#nested) }) } else { let label_stx = if label.starts_with("=") { let id = Ident::new(&label[1..], Span::call_site()); @@ -183,18 +179,19 @@ fn compile_pattern(v: &IOValue) -> TokenStream { quote!(#V_::Value::symbol(#label).wrap()) }; let members = compile_sequence_members(r.fields()); - quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Rec { - label: #label_stx, - fields: vec![#(#members),*], - }))) + quote!(#P_::Pattern::Group { + type_: Box::new(#P_::GroupType::Rec { label: #label_stx }), + entries: #MapFrom_([#(#members),*]), + }) } } } Value::Sequence(vs) => { let members = compile_sequence_members(vs); - quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Arr { - items: vec![#(#members),*], - }))) + quote!(#P_::Pattern::Group { + type_: Box::new(#P_::GroupType::Arr), + entries: #MapFrom_([#(#members),*]), + }) } Value::Set(_) => panic!("Cannot match sets in patterns"), @@ -204,9 +201,10 @@ fn compile_pattern(v: &IOValue) -> TokenStream { let v = compile_pattern(v); quote!((#k, #v)) }).collect::>(); - quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Dict { - entries: #MapFromIterator_(vec![#(#members),*]) - }))) + quote!(#P_::Pattern::Group { + type_: Box::new(#P_::GroupType::Dict), + entries: #MapFrom_([#(#members),*]), + }) } _ => lit(ValueCompiler::for_patterns().compile(v)), } diff --git a/syndicate-macros/src/pat.rs b/syndicate-macros/src/pat.rs index fc73f34..9d9580c 100644 --- a/syndicate-macros/src/pat.rs +++ b/syndicate-macros/src/pat.rs @@ -15,10 +15,9 @@ pub fn lit(e: T) -> TokenStream2 { } fn compile_sequence_members(stxs: &Vec) -> Result, &'static str> { - stxs.iter().map(|stx| { - // let p = to_pattern_expr(stx)?; - // Ok(quote!(#p)) - to_pattern_expr(stx) + stxs.iter().enumerate().map(|(i, stx)| { + let p = to_pattern_expr(stx)?; + Ok(quote!((syndicate::value::Value::from(#i).wrap(), #p))) }).collect() } @@ -28,7 +27,7 @@ pub fn to_pattern_expr(stx: &Stx) -> Result { #[allow(non_snake_case)] let V_: TokenStream2 = quote!(syndicate::value); #[allow(non_snake_case)] - let MapFromIterator_: TokenStream2 = quote!(<#V_::Map<_, _> as std::iter::FromIterator<_>>::from_iter); + let MapFrom_: TokenStream2 = quote!(<#V_::Map<_, _>>::from); match stx { Stx::Atom(v) => @@ -41,26 +40,27 @@ pub fn to_pattern_expr(stx: &Stx) -> Result { None => to_pattern_expr(&Stx::Discard)?, } }; - Ok(quote!(#P_::Pattern::DBind(Box::new(#P_::DBind { pattern: #inner_pat_expr })))) + Ok(quote!(#P_::Pattern::Bind { pattern: Box::new(#inner_pat_expr) })) } Stx::Subst(e) => Ok(lit(e)), Stx::Discard => - Ok(quote!(#P_::Pattern::DDiscard(Box::new(#P_::DDiscard)))), + Ok(quote!(#P_::Pattern::Discard)), Stx::Rec(l, fs) => { let label = to_value_expr(&*l)?; let members = compile_sequence_members(fs)?; - Ok(quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Rec { - label: #label, - fields: vec![#(#members),*], - })))) + Ok(quote!(#P_::Pattern::Group { + type_: Box::new(#P_::GroupType::Rec { label: #label }), + entries: #MapFrom_([#(#members),*]), + })) }, Stx::Seq(stxs) => { let members = compile_sequence_members(stxs)?; - Ok(quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Arr { - items: vec![#(#members),*], - })))) + Ok(quote!(#P_::Pattern::Group { + type_: Box::new(#P_::GroupType::Arr), + entries: #MapFrom_([#(#members),*]), + })) } Stx::Set(_stxs) => Err("Set literals not supported in patterns"), @@ -70,9 +70,10 @@ pub fn to_pattern_expr(stx: &Stx) -> Result { let v = to_pattern_expr(v)?; Ok(quote!((#k, #v))) }).collect::, &'static str>>()?; - Ok(quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Dict { - entries: #MapFromIterator_(vec![#(#members),*]) - })))) + Ok(quote!(#P_::Pattern::Group { + type_: Box::new(#P_::GroupType::Dict), + entries: #MapFrom_([#(#members),*]) + })) } } } diff --git a/syndicate-rs.code-workspace b/syndicate-rs.code-workspace new file mode 100644 index 0000000..1c4a034 --- /dev/null +++ b/syndicate-rs.code-workspace @@ -0,0 +1,15 @@ +{ + "folders": [ + { + "path": "." + }, + { + "path": "../syndicate-protocols" + } + ], + "settings": { + "files.exclude": { + "target": true + } + } +} \ No newline at end of file diff --git a/syndicate-schema-plugin/src/pattern_plugin.rs b/syndicate-schema-plugin/src/pattern_plugin.rs index b67eaba..355e12d 100644 --- a/syndicate-schema-plugin/src/pattern_plugin.rs +++ b/syndicate-schema-plugin/src/pattern_plugin.rs @@ -61,7 +61,7 @@ impl Plugin for PatternPlugin { } fn discard() -> P::Pattern { - P::Pattern::DDiscard(Box::new(P::DDiscard)) + P::Pattern::Discard } trait WildcardPattern { @@ -94,33 +94,34 @@ fn from_io(v: &IOValue) -> Option { impl WildcardPattern for CompoundPattern { fn wc(&self, s: &mut WalkState) -> Option { match self { - CompoundPattern::Tuple { patterns } => - Some(P::Pattern::DCompound(Box::new(P::DCompound::Arr { - items: patterns.iter() - .map(|p| unname(p).wc(s)) - .collect::>>()?, - }))), - CompoundPattern::TuplePrefix { .. } => - Some(discard()), + CompoundPattern::Tuple { patterns } | + CompoundPattern::TuplePrefix { fixed: patterns, .. }=> + Some(P::Pattern::Group { + type_: Box::new(P::GroupType::Arr), + entries: patterns.iter().enumerate() + .map(|(i, p)| Some((P::_Any::new(i), unname(p).wc(s)?))) + .collect::>>()?, + }), CompoundPattern::Dict { entries } => - Some(P::Pattern::DCompound(Box::new(P::DCompound::Dict { + Some(P::Pattern::Group { + type_: Box::new(P::GroupType::Dict), entries: Map::from_iter( entries.0.iter() .map(|(k, p)| Some((from_io(k)?, unname_simple(p).wc(s)?))) .filter(|e| discard() != e.as_ref().unwrap().1) .collect::>>()? .into_iter()), - }))), + }), CompoundPattern::Rec { label, fields } => match (unname(label), unname(fields)) { (Pattern::SimplePattern(label), Pattern::CompoundPattern(fields)) => match (*label, *fields) { (SimplePattern::Lit { value }, CompoundPattern::Tuple { patterns }) => - Some(P::Pattern::DCompound(Box::new(P::DCompound::Rec { - label: from_io(&value)?, - fields: patterns.iter() - .map(|p| unname(p).wc(s)) - .collect::>>()?, - }))), + Some(P::Pattern::Group{ + type_: Box::new(P::GroupType::Rec { label: from_io(&value)? }), + entries: patterns.iter().enumerate() + .map(|(i, p)| Some((P::_Any::new(i), unname(p).wc(s)?))) + .collect::>>()?, + }), _ => None, }, _ => None, diff --git a/syndicate-server/src/http.rs b/syndicate-server/src/http.rs index 5386ffe..3604660 100644 --- a/syndicate-server/src/http.rs +++ b/syndicate-server/src/http.rs @@ -10,7 +10,6 @@ use hyper::header::HeaderValue; use syndicate::actor::*; use syndicate::error::Error; -use syndicate::relay::Mutex; use syndicate::trace; use syndicate::value::Map; use syndicate::value::NestedValue; @@ -34,29 +33,25 @@ pub fn empty_response(code: StatusCode) -> Response { type ChunkItem = Result>; struct ResponseCollector { - framing_handle: Option, - context_handle: Arc>>, tx_res: Option<(oneshot::Sender>, Response)>, body_tx: Option>, } impl ResponseCollector { - fn new(tx: oneshot::Sender>, context_handle: Arc>>) -> Self { + fn new(tx: oneshot::Sender>) -> Self { let (body_tx, body_rx) = unbounded_channel(); let body_stream: Box + Send> = Box::new(UnboundedReceiverStream::new(body_rx)); let mut res = Response::new(body_stream.into()); - *res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + *res.status_mut() = StatusCode::OK; ResponseCollector { - framing_handle: None, - context_handle, tx_res: Some((tx, res)), body_tx: Some(body_tx), } } fn with_res) -> ActorResult>(&mut self, f: F) -> ActorResult { - if let ResponseCollector { tx_res: Some((_, res)), .. } = self { + if let Some((_, res)) = &mut self.tx_res { f(res)?; } Ok(()) @@ -84,39 +79,13 @@ impl ResponseCollector { fn finish(&mut self, t: &mut Activation) -> ActorResult { self.deliver_res(); self.body_tx = None; - if let Some(h) = self.context_handle.lock().take() { - t.retract(h); - } + t.stop(); Ok(()) } } impl Entity for ResponseCollector { - fn assert(&mut self, _t: &mut Activation, assertion: http::HttpResponse, handle: Handle) -> ActorResult { - match assertion { - http::HttpResponse::Processing => { - self.framing_handle = Some(handle); - self.with_res(|r| { - *r.status_mut() = StatusCode::OK; - Ok(()) - }) - } - _ => Err(format!("Unexpected assertion {:?}", assertion))?, - } - } - - fn retract(&mut self, t: &mut Activation, handle: Handle) -> ActorResult { - if self.framing_handle == Some(handle) { - self.finish(t)?; - } - Ok(()) - } - fn message(&mut self, t: &mut Activation, message: http::HttpResponse) -> ActorResult { - if self.framing_handle.is_none() { - self.finish(t)?; - Err("Attempt to reply before has been asserted")?; - } match message { http::HttpResponse::Status { code, .. } => self.with_res(|r| { *r.status_mut() = StatusCode::from_u16( @@ -128,8 +97,13 @@ impl Entity for ResponseCollector { HeaderValue::from_str(value.as_str())?); Ok(()) }), - http::HttpResponse::Body { chunk } => self.add_chunk(*chunk), - _ => Err(format!("Unexpected message {:?}", message))?, + http::HttpResponse::Chunk { chunk } => { + self.add_chunk(*chunk) + } + http::HttpResponse::Done { chunk } => { + self.add_chunk(*chunk)?; + self.finish(t) + } } } } @@ -142,11 +116,11 @@ pub async fn serve( port: u16, ) -> Result, Error> { let host = match req.headers().get("host").and_then(|v| v.to_str().ok()) { - None => return Ok(empty_response(StatusCode::BAD_REQUEST)), - Some(h) => match h.rsplit_once(':') { + None => http::RequestHost::Absent, + Some(h) => http::RequestHost::Present(match h.rsplit_once(':') { None => h.to_string(), Some((h, _port)) => h.to_string(), - } + }) }; let uri = req.uri(); @@ -193,23 +167,22 @@ pub async fn serve( let (tx, rx) = oneshot::channel(); facet.activate(&account, Some(trace::TurnCause::external("http")), |t| { - let sreq = http::HttpRequest { - sequence_number: NEXT_SEQ.fetch_add(1, Ordering::Relaxed).into(), - host, - port: port.into(), - method: req.method().to_string().to_lowercase(), - path, - headers: http::Headers(headers), - query, - body, - }; - tracing::debug!(?sreq); - let context_handle: Arc>> = Arc::new(Mutex::new(None)); - let srep = Cap::guard(&language().syndicate, t.create(ResponseCollector::new( - tx, - Arc::clone(&context_handle)))); - *(context_handle.lock()) = httpd.assert( - t, language(), &http::HttpContext { req: sreq, res: srep }); + t.facet(move |t| { + let sreq = http::HttpRequest { + sequence_number: NEXT_SEQ.fetch_add(1, Ordering::Relaxed).into(), + host, + port: port.into(), + method: req.method().to_string().to_lowercase(), + path, + headers: http::Headers(headers), + query, + body, + }; + tracing::debug!(?sreq); + let srep = Cap::guard(&language().syndicate, t.create(ResponseCollector::new(tx))); + httpd.assert(t, language(), &http::HttpContext { req: sreq, res: srep }); + Ok(()) + })?; Ok(()) }); diff --git a/syndicate-server/src/script/mod.rs b/syndicate-server/src/script/mod.rs index 390983c..765f34c 100644 --- a/syndicate-server/src/script/mod.rs +++ b/syndicate-server/src/script/mod.rs @@ -9,7 +9,7 @@ use syndicate::actor::*; use syndicate::dataspace::Dataspace; use syndicate::during; use syndicate::enclose; -use syndicate::pattern::{lift_literal, drop_literal}; +use syndicate::pattern::{lift_literal, drop_literal, pattern_seq_from_dictionary}; use syndicate::schemas::dataspace; use syndicate::schemas::dataspace_patterns as P; use syndicate::schemas::sturdy; @@ -173,7 +173,7 @@ fn bad_instruction(message: &str) -> io::Error { } fn discard() -> P::Pattern { - P::Pattern::DDiscard(Box::new(P::DDiscard)) + P::Pattern::Discard } fn dlit(value: AnyValue) -> P::Pattern { @@ -272,7 +272,7 @@ impl<'env> PatternInstantiator<'env> { Symbolic::Discard => discard(), Symbolic::Binder(s) => { self.binding_names.push(s); - P::Pattern::DBind(Box::new(P::DBind { pattern: discard() })) + P::Pattern::Bind { pattern: Box::new(discard()) } } Symbolic::Reference(s) => dlit(self.env.lookup(&s, "pattern-template variable")?.clone()), @@ -287,43 +287,47 @@ impl<'env> PatternInstantiator<'env> { Some(pat) => pat, None => { let label = self.instantiate_pattern(r.label())?; - let fields = r.fields().iter().map(|p| self.instantiate_pattern(p)) - .collect::>>()?; - P::Pattern::DCompound(Box::new(P::DCompound::Rec { - label: drop_literal(&label) - .ok_or(bad_instruction("Record pattern must have literal label"))?, - fields, - })) + let entries = r.fields().iter().enumerate() + .map(|(i, p)| Ok((AnyValue::new(i), self.instantiate_pattern(p)?))) + .collect::>>()?; + P::Pattern::Group { + type_: Box::new(P::GroupType::Rec { + label: drop_literal(&label) + .ok_or(bad_instruction("Record pattern must have literal label"))?, + }), + entries, + } } } }, Value::Sequence(v) => - P::Pattern::DCompound(Box::new(P::DCompound::Arr { - items: v.iter() - .map(|p| self.instantiate_pattern(p)) - .collect::>>()?, - })), + P::Pattern::Group { + type_: Box::new(P::GroupType::Arr), + entries: v.iter().enumerate() + .map(|(i, p)| Ok((AnyValue::new(i), self.instantiate_pattern(p)?))) + .collect::>>()?, + }, Value::Set(_) => Err(bad_instruction(&format!("Sets not permitted in patterns: {:?}", template)))?, Value::Dictionary(v) => - P::Pattern::DCompound(Box::new(P::DCompound::Dict { + P::Pattern::Group { + type_: Box::new(P::GroupType::Dict), entries: v.iter() .map(|(a, b)| Ok((a.clone(), self.instantiate_pattern(b)?))) .collect::>>()?, - })), + }, }) } fn maybe_binder_with_pattern(&mut self, r: &Record) -> io::Result> { match r.label().value().as_symbol().map(|s| analyze(&s)) { - Some(Symbolic::Binder(formal)) => if r.fields().len() == 1 { + Some(Symbolic::Binder(formal)) if r.fields().len() == 1 => { let pattern = self.instantiate_pattern(&r.fields()[0])?; self.binding_names.push(formal); - return Ok(Some(P::Pattern::DBind(Box::new(P::DBind { pattern })))); + Ok(Some(P::Pattern::Bind { pattern: Box::new(pattern) })) }, - _ => (), + _ => Ok(None), } - Ok(None) } } @@ -553,7 +557,7 @@ impl Env { RewriteTemplate::Accept { pattern_template } => { let (_binding_names, pattern) = self.instantiate_pattern(pattern_template)?; Ok(sturdy::Rewrite { - pattern: embed_pattern(&P::Pattern::DBind(Box::new(P::DBind { pattern }))), + pattern: embed_pattern(&P::Pattern::Bind { pattern: Box::new(pattern) }), template: sturdy::Template::TRef(Box::new(sturdy::TRef { binding: 0.into() })), }) } @@ -674,24 +678,26 @@ impl Env { fn embed_pattern(p: &P::Pattern) -> sturdy::Pattern { match p { - P::Pattern::DDiscard(_) => sturdy::Pattern::PDiscard(Box::new(sturdy::PDiscard)), - P::Pattern::DBind(b) => sturdy::Pattern::PBind(Box::new(sturdy::PBind { - pattern: embed_pattern(&b.pattern), + P::Pattern::Discard => sturdy::Pattern::PDiscard(Box::new(sturdy::PDiscard)), + P::Pattern::Bind { pattern } => sturdy::Pattern::PBind(Box::new(sturdy::PBind { + pattern: embed_pattern(&**pattern), })), - P::Pattern::DLit(b) => sturdy::Pattern::Lit(Box::new(sturdy::Lit { - value: language().unparse(&b.value), + P::Pattern::Lit { value } => sturdy::Pattern::Lit(Box::new(sturdy::Lit { + value: language().unparse(&**value), })), - P::Pattern::DCompound(b) => sturdy::Pattern::PCompound(Box::new(match &**b { - P::DCompound::Rec { label, fields } => + P::Pattern::Group { type_, entries } => sturdy::Pattern::PCompound(Box::new(match &**type_ { + P::GroupType::Rec { label } => sturdy::PCompound::Rec { label: label.clone(), - fields: fields.iter().map(embed_pattern).collect(), + fields: pattern_seq_from_dictionary(entries).expect("correct field entries") + .into_iter().map(embed_pattern).collect(), }, - P::DCompound::Arr { items } => + P::GroupType::Arr => sturdy::PCompound::Arr { - items: items.iter().map(embed_pattern).collect(), + items: pattern_seq_from_dictionary(entries).expect("correct element entries") + .into_iter().map(embed_pattern).collect(), }, - P::DCompound::Dict { entries } => + P::GroupType::Dict => sturdy::PCompound::Dict { entries: entries.iter().map(|(k, v)| (k.clone(), embed_pattern(v))).collect(), }, diff --git a/syndicate-server/src/services/http_router.rs b/syndicate-server/src/services/http_router.rs index 21367cf..bc89bd7 100644 --- a/syndicate-server/src/services/http_router.rs +++ b/syndicate-server/src/services/http_router.rs @@ -10,7 +10,6 @@ use syndicate::error::Error; use syndicate::preserves::rec; use syndicate::preserves::value::Map; use syndicate::preserves::value::NestedValue; -use syndicate::preserves::value::Set; use syndicate::schemas::http; use syndicate::value::signed_integer::SignedInteger; @@ -56,10 +55,22 @@ pub fn on_demand(t: &mut Activation, ds: Arc) { }); } -type MethodTable = Map>>; +#[derive(Debug, Clone)] +struct ActiveHandler { + cap: Arc, + terminated: Arc>, +} +type MethodTable = Map>; type HostTable = Map>; type RoutingTable = Map; +fn request_host(value: &http::RequestHost) -> Option { + match value { + http::RequestHost::Present(h) => Some(h.to_owned()), + http::RequestHost::Absent => None, + } +} + fn run(t: &mut Activation, ds: Arc, spec: HttpRouter) -> ActorResult { ds.assert(t, language(), &lifecycle::started(&spec)); ds.assert(t, language(), &lifecycle::ready(&spec)); @@ -77,22 +88,30 @@ fn run(t: &mut Activation, ds: Arc, spec: HttpRouter) -> ActorResult { let host = language().parse::(&host)?; let path = language().parse::(&path)?; let method = language().parse::(&method)?; - let handler = handler.value().to_embedded()?; + let handler_cap = handler.value().to_embedded()?.clone(); + let handler_terminated = t.named_field("handler-terminated", false); t.get_mut(&routes) .entry(port.clone()).or_default() .entry(host.clone()).or_default() .entry(path.clone()).or_default() .entry(method.clone()).or_default() - .insert(handler.clone()); - t.on_stop(enclose!((routes, handler, method, path, host, port) move |t| { + .push(ActiveHandler { + cap: handler_cap.clone(), + terminated: handler_terminated, + }); + t.on_stop(enclose!((routes, method, path, host, port) move |t| { tracing::debug!("-HTTP binding {:?} {:?} {:?} {:?} {:?}", host, port, method, path, handler); let port_map = t.get_mut(&routes); let host_map = port_map.entry(port.clone()).or_default(); let path_map = host_map.entry(host.clone()).or_default(); let method_map = path_map.entry(path.clone()).or_default(); - let handler_set = method_map.entry(method.clone()).or_default(); - handler_set.remove(&handler); - if handler_set.is_empty() { + let handler_vec = method_map.entry(method.clone()).or_default(); + let handler = { + let i = handler_vec.iter().position(|a| a.cap == handler_cap) + .expect("Expected an index of an active handler to remove"); + handler_vec.swap_remove(i) + }; + if handler_vec.is_empty() { method_map.remove(&method); } if method_map.is_empty() { @@ -104,6 +123,7 @@ fn run(t: &mut Activation, ds: Arc, spec: HttpRouter) -> ActorResult { if host_map.is_empty() { port_map.remove(&port); } + *t.get_mut(&handler.terminated) = true; Ok(()) })); Ok(()) @@ -124,7 +144,7 @@ fn run(t: &mut Activation, ds: Arc, spec: HttpRouter) -> ActorResult { None => return send_empty(t, res, 404, "Not found"), }; - let methods = match try_hostname(host_map, http::HostPattern::Host(req.host.clone()), &req.path)? { + let methods = match request_host(&req.host).and_then(|h| try_hostname(host_map, http::HostPattern::Host(h), &req.path).transpose()).transpose()? { Some(methods) => methods, None => match try_hostname(host_map, http::HostPattern::Any, &req.path)? { Some(methods) => methods, @@ -141,13 +161,11 @@ fn run(t: &mut Activation, ds: Arc, spec: HttpRouter) -> ActorResult { http::MethodPattern::Specific(m) => m.to_uppercase(), http::MethodPattern::Any => unreachable!(), }).collect::>().join(", "); - let h = res.assert(t, language(), &http::HttpResponse::Processing); res.message(t, language(), &http::HttpResponse::Status { code: 405.into(), message: "Method Not Allowed".into() }); res.message(t, language(), &http::HttpResponse::Header { name: "allow".into(), value: allowed }); - if let Some(h) = h { t.retract(h); } - return Ok(()) + return send_done(t, res); } } }; @@ -155,22 +173,33 @@ fn run(t: &mut Activation, ds: Arc, spec: HttpRouter) -> ActorResult { if handlers.len() > 1 { tracing::warn!(?req, "Too many handlers available"); } - let handler = handlers.first().expect("Nonempty handler set").clone(); - tracing::trace!("Handler for {:?} is {:?}", &req, &handler); - handler.assert(t, language(), &http::HttpContext { req, res: res.clone() }); + let ActiveHandler { cap, terminated } = handlers.first().expect("Nonempty handler set").clone(); + tracing::trace!("Handler for {:?} is {:?}", &req, &cap); + t.dataflow(enclose!((terminated, req, res) move |t| { + if *t.get(&terminated) { + tracing::trace!("Handler for {:?} terminated", &req); + send_empty(t, &res, 500, "Internal Server Error")?; + } + Ok(()) + }))?; + + cap.assert(t, language(), &http::HttpContext { req, res: res.clone() }); Ok(()) }); Ok(()) } +fn send_done(t: &mut Activation, res: &Arc) -> ActorResult { + res.message(t, language(), &http::HttpResponse::Done { + chunk: Box::new(http::Chunk::Bytes(vec![])) }); + Ok(()) +} fn send_empty(t: &mut Activation, res: &Arc, code: u16, message: &str) -> ActorResult { - let h = res.assert(t, language(), &http::HttpResponse::Processing); res.message(t, language(), &http::HttpResponse::Status { code: code.into(), message: message.into() }); - if let Some(h) = h { t.retract(h); } - return Ok(()) + send_done(t, res) } fn path_pattern_matches(path_pat: &http::PathPattern, path: &Vec) -> bool { @@ -268,13 +297,11 @@ impl HttpStaticFileServer { Ok(mut fh) => { if fh.metadata().is_ok_and(|m| m.is_dir()) { drop(fh); - let h = res.assert(t, language(), &http::HttpResponse::Processing); res.message(t, language(), &http::HttpResponse::Status { code: 301.into(), message: "Moved permanently".into() }); res.message(t, language(), &http::HttpResponse::Header { name: "location".into(), value: format!("/{}/", req.path.join("/")) }); - if let Some(h) = h { t.retract(h); } - return Ok(()) + return send_done(t, res); } else { let mut buf = Vec::new(); fh.read_to_end(&mut buf)?; @@ -287,16 +314,14 @@ impl HttpStaticFileServer { } }; - let h = res.assert(t, language(), &http::HttpResponse::Processing); res.message(t, language(), &http::HttpResponse::Status { code: 200.into(), message: "OK".into() }); if let Some(mime_type) = mime_type { res.message(t, language(), &http::HttpResponse::Header { name: "content-type".into(), value: mime_type.to_owned() }); } - res.message(t, language(), &http::HttpResponse::Body { + res.message(t, language(), &http::HttpResponse::Done { chunk: Box::new(http::Chunk::Bytes(body)) }); - if let Some(h) = h { t.retract(h); } Ok(()) } diff --git a/syndicate-server/src/services/tcp_relay_listener.rs b/syndicate-server/src/services/tcp_relay_listener.rs index 96334cf..9609a31 100644 --- a/syndicate-server/src/services/tcp_relay_listener.rs +++ b/syndicate-server/src/services/tcp_relay_listener.rs @@ -15,24 +15,20 @@ use tokio::net::TcpListener; use crate::language::language; use crate::lifecycle; use crate::protocol::detect_protocol; -use crate::schemas::internal_services::{TcpWithHttp, TcpWithoutHttp, TcpRelayListener}; +use crate::schemas::internal_services::TcpWithoutHttp; use syndicate_macros::during; pub fn on_demand(t: &mut Activation, ds: Arc) { t.spawn(Some(AnyValue::symbol("tcp_relay_listener")), move |t| { - enclose!((ds) during!(t, ds, language(), >, |t: &mut Activation| { - spec.httpd.assert(t, language(), &syndicate::schemas::http::HttpListener { port: spec.addr.port.clone() }); - run_supervisor(t, ds.clone(), TcpRelayListener::TcpWithHttp(Box::new(spec))) - })); enclose!((ds) during!(t, ds, language(), >, |t| { - run_supervisor(t, ds.clone(), TcpRelayListener::TcpWithoutHttp(Box::new(spec))) + run_supervisor(t, ds.clone(), spec) })); Ok(()) }); } -fn run_supervisor(t: &mut Activation, ds: Arc, spec: TcpRelayListener) -> ActorResult { +fn run_supervisor(t: &mut Activation, ds: Arc, spec: TcpWithoutHttp) -> ActorResult { Supervisor::start( t, Some(rec![AnyValue::symbol("relay"), language().unparse(&spec)]), @@ -41,18 +37,32 @@ fn run_supervisor(t: &mut Activation, ds: Arc, spec: TcpRelayListener) -> A enclose!((ds) move |t| enclose!((ds, spec) run(t, ds, spec)))) } -fn run(t: &mut Activation, ds: Arc, spec: TcpRelayListener) -> ActorResult { +fn run(t: &mut Activation, ds: Arc, spec: TcpWithoutHttp) -> ActorResult { lifecycle::terminate_on_service_restart(t, &ds, &spec); - let (addr, gatekeeper, httpd) = match spec.clone() { - TcpRelayListener::TcpWithHttp(b) => { - let TcpWithHttp { addr, gatekeeper, httpd } = *b; - (addr, gatekeeper, Some(httpd)) - } - TcpRelayListener::TcpWithoutHttp(b) => { - let TcpWithoutHttp { addr, gatekeeper } = *b; - (addr, gatekeeper, None) - } - }; + + let httpd = t.named_field("httpd", None::>); + + { + let ad = spec.addr.clone(); + let ad2 = ad.clone(); + let gk = spec.gatekeeper.clone(); + enclose!((ds, httpd) during!(t, ds, language(), + >, |t: &mut Activation| { + if let Some(h) = h.value().as_embedded().cloned() { + h.assert(t, language(), &syndicate::schemas::http::HttpListener { port: ad2.port.clone() }); + *t.get_mut(&httpd) = Some(h.clone()); + t.on_stop(enclose!((httpd) move |t| { + let f = t.get_mut(&httpd); + if *f == Some(h.clone()) { *f = None; } + Ok(()) + })); + } + Ok(()) + })); + } + + let TcpWithoutHttp { addr, gatekeeper } = spec.clone(); + let host = addr.host.clone(); let port = u16::try_from(&addr.port).map_err(|_| "Invalid TCP port number")?; let facet = t.facet_ref(); @@ -83,6 +93,7 @@ fn run(t: &mut Activation, ds: Arc, spec: TcpRelayListener) -> ActorResult let account = Account::new(name.clone(), trace_collector.clone()); if !facet.activate( &account, cause, enclose!((trace_collector, httpd) move |t| { + let httpd = t.get(&httpd).clone(); t.spawn(name, move |t| { Ok(t.linked_task(None, { let facet = t.facet_ref(); @@ -91,7 +102,7 @@ fn run(t: &mut Activation, ds: Arc, spec: TcpRelayListener) -> ActorResult facet, stream, gatekeeper, - httpd.map(|r| r.clone()), + httpd, addr, port).await?; Ok(LinkedTaskTermination::KeepFacet) diff --git a/syndicate/benches/bench_dataspace.rs b/syndicate/benches/bench_dataspace.rs index be79996..16fffa8 100644 --- a/syndicate/benches/bench_dataspace.rs +++ b/syndicate/benches/bench_dataspace.rs @@ -11,6 +11,7 @@ use syndicate::during::entity; use syndicate::dataspace::Dataspace; use syndicate::schemas::dataspace::Observe; use syndicate::schemas::dataspace_patterns as p; +use syndicate::value::Map; use syndicate::value::NestedValue; use syndicate::value::Value; @@ -88,11 +89,11 @@ pub fn bench_pub(c: &mut Criterion) { .create_cap(t); ds.assert(t, language(), &Observe { - pattern: p::Pattern::DBind(Box::new(p::DBind { - pattern: p::Pattern::DLit(Box::new(p::DLit { - value: p::AnyAtom::Symbol("consumer".to_owned()), - })), - })), + pattern: p::Pattern::Bind { + pattern: Box::new(p::Pattern::Lit { + value: Box::new(p::AnyAtom::Symbol("consumer".to_owned())), + }), + }, observer: shutdown, }); @@ -110,24 +111,27 @@ pub fn bench_pub(c: &mut Criterion) { ds.assert(t, &(), &AnyValue::symbol("consumer")); ds.assert(t, language(), &Observe { - pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { - label: AnyValue::symbol("Says"), - fields: vec![ - p::Pattern::DLit(Box::new(p::DLit { - value: p::AnyAtom::String("bench_pub".to_owned()), - })), - p::Pattern::DBind(Box::new(p::DBind { - pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), - })), - ]})), + pattern: p::Pattern::Group { + type_: Box::new(p::GroupType::Rec { + label: AnyValue::symbol("Says"), + }), + entries: Map::from([ + (p::_Any::new(0), p::Pattern::Lit { + value: Box::new(p::AnyAtom::String("bench_pub".to_owned())), + }), + (p::_Any::new(1), p::Pattern::Bind { + pattern: Box::new(p::Pattern::Discard), + }), + ]), + }, observer: receiver, }); ds.assert(t, language(), &Observe { - pattern: p::Pattern::DBind(Box::new(p::DBind { - pattern: p::Pattern::DLit(Box::new(p::DLit { - value: p::AnyAtom::Bool(true), - })), - })), + pattern: p::Pattern::Bind { + pattern: Box::new(p::Pattern::Lit { + value: Box::new(p::AnyAtom::Bool(true)), + }), + }, observer: shutdown, }); diff --git a/syndicate/src/pattern.rs b/syndicate/src/pattern.rs index 2c22387..59e5c24 100644 --- a/syndicate/src/pattern.rs +++ b/syndicate/src/pattern.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::schemas::dataspace_patterns::*; use super::language; @@ -8,23 +10,25 @@ use preserves::value::Record; use preserves::value::Value; use preserves_schema::Codec; -#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq)] -pub enum PathStep { - Index(usize), - Key(_Any), -} - +pub type PathStep = _Any; pub type Path = Vec; pub type Paths = Vec; +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct ConstantPositions { + pub with_values: Paths, + pub required_to_exist: Paths, +} + struct Analyzer { pub const_paths: Paths, pub const_values: Vec<_Any>, + pub checked_paths: Paths, pub capture_paths: Paths, } pub struct PatternAnalysis { - pub const_paths: Paths, + pub const_positions: Arc, pub const_values: _Any, pub capture_paths: Paths, } @@ -38,11 +42,15 @@ impl PatternAnalysis { let mut analyzer = Analyzer { const_paths: Vec::new(), const_values: Vec::new(), + checked_paths: Vec::new(), capture_paths: Vec::new(), }; analyzer.walk(&mut Vec::new(), p); PatternAnalysis { - const_paths: analyzer.const_paths, + const_positions: Arc::new(ConstantPositions { + with_values: analyzer.const_paths, + required_to_exist: analyzer.checked_paths, + }), const_values: _Any::new(analyzer.const_values), capture_paths: analyzer.capture_paths, } @@ -58,34 +66,21 @@ impl Analyzer { fn walk(&mut self, path: &mut Path, p: &Pattern) { match p { - Pattern::DCompound(b) => match &**b { - DCompound::Rec { fields, .. } => { - for (i, p) in fields.iter().enumerate() { - self.walk_step(path, PathStep::Index(i), p); - } - } - DCompound::Arr { items, .. } => { - for (i, p) in items.iter().enumerate() { - self.walk_step(path, PathStep::Index(i), p); - } - } - DCompound::Dict { entries, .. } => { - for (k, p) in entries { - self.walk_step(path, PathStep::Key(k.clone()), p); - } + Pattern::Group { entries, .. } => { + for (k, p) in entries { + self.walk_step(path, k.clone(), p) } } - Pattern::DBind(b) => { - let DBind { pattern, .. } = &**b; + Pattern::Bind { pattern } => { self.capture_paths.push(path.clone()); - self.walk(path, pattern) + self.walk(path, &**pattern); } - Pattern::DDiscard(_) => - (), - Pattern::DLit(b) => { - let DLit { value } = &**b; + Pattern::Discard => { + self.checked_paths.push(path.clone()); + } + Pattern::Lit { value } => { self.const_paths.push(path.clone()); - self.const_values.push(language().unparse(value)); + self.const_values.push(language().unparse(&**value)); } } } @@ -109,52 +104,47 @@ impl PatternMatcher { } } + fn run_seq<'a, F: 'a + Fn(usize) -> &'a _Any>(&mut self, entries: &Map<_Any, Pattern<_Any>>, values: F) -> bool { + for (k, p) in entries { + match k.value().as_usize() { + None => return false, + Some(i) => if !self.run(p, values(i)) { + return false; + } + } + } + true + } + fn run(&mut self, pattern: &Pattern<_Any>, value: &_Any) -> bool { match pattern { - Pattern::DDiscard(_) => true, - Pattern::DBind(b) => { + Pattern::Discard => true, + Pattern::Bind { pattern } => { self.captures.push(value.clone()); - self.run(&b.pattern, value) + self.run(&**pattern, value) } - Pattern::DLit(b) => value == &language().unparse(&b.value), - Pattern::DCompound(b) => match &**b { - DCompound::Rec { label, fields } => { - match value.value().as_record(Some(fields.len())) { + Pattern::Lit { value: expected } => value == &language().unparse(&**expected), + Pattern::Group { type_, entries } => match &**type_ { + GroupType::Rec { label } => { + match value.value().as_record(None) { None => false, - Some(r) => { - if r.label() != label { - return false; - } - for (i, p) in fields.iter().enumerate() { - if !self.run(p, &r.fields()[i]) { - return false; - } - } - true - } + Some(r) => + r.label() == label && + self.run_seq(entries, |i| &r.fields()[i]) } } - DCompound::Arr { items } => { + GroupType::Arr => { match value.value().as_sequence() { None => false, - Some(vs) => { - if vs.len() != items.len() { - return false; - } - for (i, p) in items.iter().enumerate() { - if !self.run(p, &vs[i]) { - return false; - } - } - true - } + Some(vs) => + self.run_seq(entries, |i| &vs[i]) } } - DCompound::Dict { entries: expected_entries } => { + GroupType::Dict => { match value.value().as_dictionary() { None => false, Some(actual_entries) => { - for (k, p) in expected_entries.iter() { + for (k, p) in entries { if !actual_entries.get(k).map(|v| self.run(p, v)).unwrap_or(false) { return false; } @@ -170,42 +160,68 @@ impl PatternMatcher { pub fn lift_literal(v: &_Any) -> Pattern { match v.value() { - Value::Record(r) => Pattern::DCompound(Box::new(DCompound::Rec { - label: r.label().clone(), - fields: r.fields().iter().map(lift_literal).collect(), - })), - Value::Sequence(items) => Pattern::DCompound(Box::new(DCompound::Arr { - items: items.iter().map(lift_literal).collect(), - })), + Value::Record(r) => Pattern::Group { + type_: Box::new(GroupType::Rec { label: r.label().clone() }), + entries: r.fields().iter().enumerate() + .map(|(i, v)| (_Any::new(i), lift_literal(v))) + .collect(), + }, + Value::Sequence(items) => Pattern::Group { + type_: Box::new(GroupType::Arr), + entries: items.iter().enumerate() + .map(|(i, v)| (_Any::new(i), lift_literal(v))) + .collect(), + }, Value::Set(_members) => panic!("Cannot express literal set in pattern"), - Value::Dictionary(entries) => Pattern::DCompound(Box::new(DCompound::Dict { - entries: entries.iter().map(|(k, v)| (k.clone(), lift_literal(v))).collect(), - })), - _other => Pattern::DLit(Box::new(DLit { - value: language().parse(v).expect("Non-compound datum can be converted to AnyAtom"), - })), + Value::Dictionary(entries) => Pattern::Group { + type_: Box::new(GroupType::Dict), + entries: entries.iter() + .map(|(k, v)| (k.clone(), lift_literal(v))) + .collect(), + }, + _other => Pattern::Lit { + value: Box::new(language().parse(v).expect("Non-compound datum can be converted to AnyAtom")), + }, } } +const DISCARD: Pattern = Pattern::Discard; + +pub fn pattern_seq_from_dictionary(entries: &Map<_Any, Pattern>) -> Option> { + let mut max_k: Option = None; + for k in entries.keys() { + max_k = max_k.max(Some(k.value().as_usize()?)); + } + let mut seq = vec![]; + if let Some(max_k) = max_k { + seq.reserve(max_k + 1); + for i in 0..=max_k { + seq.push(entries.get(&_Any::new(i)).unwrap_or(&DISCARD)); + } + } + return Some(seq); +} + +fn drop_literal_entries_seq(mut seq: Vec<_Any>, entries: &Map<_Any, Pattern>) -> Option> { + for p in pattern_seq_from_dictionary(entries)?.into_iter() { + seq.push(drop_literal(p)?); + } + Some(seq) +} + pub fn drop_literal(p: &Pattern) -> Option<_Any> { match p { - Pattern::DCompound(b) => match &**b { - DCompound::Rec { label, fields } => { - let mut r = vec![label.clone()]; - for f in fields.iter() { - r.push(drop_literal(f)?); - } - Some(Value::Record(Record(r)).wrap()) - } - DCompound::Arr { items } => - Some(Value::Sequence(items.iter().map(drop_literal) - .collect::>>()?).wrap()), - DCompound::Dict { entries } => - Some(Value::Dictionary(entries.iter() - .map(|(k, p)| Some((k.clone(), drop_literal(p)?))) - .collect::>>()?).wrap()), + Pattern::Group { type_, entries } => match &**type_ { + GroupType::Rec { label } => + Some(Value::Record(Record(drop_literal_entries_seq(vec![label.clone()], entries)?)).wrap()), + GroupType::Arr => + Some(Value::Sequence(drop_literal_entries_seq(vec![], entries)?).wrap()), + GroupType::Dict => + Some(Value::Dictionary(entries.iter() + .map(|(k, p)| Some((k.clone(), drop_literal(p)?))) + .collect::>>()?).wrap()), }, - Pattern::DLit(b) => Some(language().unparse(&b.value)), + Pattern::Lit { value } => Some(language().unparse(&**value)), _ => None, } } diff --git a/syndicate/src/skeleton.rs b/syndicate/src/skeleton.rs index e86ae57..f142ce7 100644 --- a/syndicate/src/skeleton.rs +++ b/syndicate/src/skeleton.rs @@ -16,19 +16,12 @@ use crate::actor::Activation; use crate::actor::Handle; use crate::actor::Cap; use crate::schemas::dataspace_patterns as ds; -use crate::pattern::{self, PathStep, Path, Paths}; +use crate::pattern::{self, ConstantPositions, PathStep, Path, Paths}; type Bag = bag::BTreeBag; type Captures = AnyValue; -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] -enum Guard { - Rec(AnyValue, usize), - Seq(usize), - Map, -} - /// Index of assertions and [`Observe`rs][crate::schemas::dataspace::Observe]. /// /// Generally speaking, you will not need to use this structure; @@ -44,13 +37,13 @@ pub struct Index { #[derive(Debug)] struct Node { continuation: Continuation, - edges: Map>, + edges: Map>, } #[derive(Debug)] struct Continuation { cached_assertions: Set, - leaf_map: Map>, + leaf_map: Map, Map>, } #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] @@ -205,7 +198,7 @@ impl Node { } fn extend(&mut self, pat: &ds::Pattern) -> &mut Continuation { - let (_pop_count, final_node) = self.extend_walk(&mut Vec::new(), 0, PathStep::Index(0), pat); + let (_pop_count, final_node) = self.extend_walk(&mut Vec::new(), 0, PathStep::new(0), pat); &mut final_node.continuation } @@ -216,23 +209,13 @@ impl Node { step: PathStep, pat: &ds::Pattern, ) -> (usize, &mut Node) { - let (guard, members): (Guard, Vec<(PathStep, &ds::Pattern)>) = match pat { - ds::Pattern::DCompound(b) => match &**b { - ds::DCompound::Arr { items } => - (Guard::Seq(items.len()), - items.iter().enumerate().map(|(i, p)| (PathStep::Index(i), p)).collect()), - ds::DCompound::Rec { label, fields } => - (Guard::Rec(label.clone(), fields.len()), - fields.iter().enumerate().map(|(i, p)| (PathStep::Index(i), p)).collect()), - ds::DCompound::Dict { entries, .. } => - (Guard::Map, - entries.iter().map(|(k, p)| (PathStep::Key(k.clone()), p)).collect()), - } - ds::Pattern::DBind(b) => { - let ds::DBind { pattern, .. } = &**b; - return self.extend_walk(path, pop_count, step, pattern); - } - ds::Pattern::DDiscard(_) | ds::Pattern::DLit(_) => + let (guard, members): (ds::GroupType, Vec<(PathStep, &ds::Pattern)>) = match pat { + ds::Pattern::Group { type_, entries } => + ((&**type_).clone(), + entries.iter().map(|(k, p)| (k.clone(), p)).collect()), + ds::Pattern::Bind { pattern } => + return self.extend_walk(path, pop_count, step, &**pattern), + ds::Pattern::Discard | ds::Pattern::Lit { .. } => return (pop_count, self), }; @@ -336,41 +319,46 @@ where FCont: FnMut(&mut Continuation, &AnyValue) -> (), fn continuation(&mut self, c: &mut Continuation) { (self.m_cont)(c, self.outer_value); - let mut empty_const_paths = Vec::new(); - for (const_paths, const_val_map) in &mut c.leaf_map { - if let Some(const_vals) = project_paths(self.outer_value, const_paths) { - let leaf_opt = if self.create_leaf_if_absent { - Some(const_val_map.entry(const_vals.clone()).or_insert_with(Leaf::new)) - } else { - const_val_map.get_mut(&const_vals) - }; - if let Some(leaf) = leaf_opt { - (self.m_leaf)(leaf, self.outer_value); - for (capture_paths, endpoints) in &mut leaf.endpoints_map { - if let Some(cs) = project_paths(self.outer_value, &capture_paths) { - (self.m_endpoints)(endpoints, cs); - } + let mut empty_const_positions = Vec::new(); + for (const_positions, const_val_map) in &mut c.leaf_map { + if project_paths(self.outer_value, &const_positions.required_to_exist).is_none() { + continue; + } + let const_vals = match project_paths(self.outer_value, &const_positions.with_values) { + Some(vs) => vs, + None => continue, + }; + let leaf_opt = if self.create_leaf_if_absent { + Some(const_val_map.entry(const_vals.clone()).or_insert_with(Leaf::new)) + } else { + const_val_map.get_mut(&const_vals) + }; + if let Some(leaf) = leaf_opt { + (self.m_leaf)(leaf, self.outer_value); + for (capture_paths, endpoints) in &mut leaf.endpoints_map { + if let Some(cs) = project_paths(self.outer_value, &capture_paths) { + (self.m_endpoints)(endpoints, cs); } - if leaf.is_empty() { - const_val_map.remove(&const_vals); - if const_val_map.is_empty() { - empty_const_paths.push(const_paths.clone()); - } + } + if leaf.is_empty() { + const_val_map.remove(&const_vals); + if const_val_map.is_empty() { + empty_const_positions.push(const_positions.clone()); } } } } - for const_paths in empty_const_paths { - c.leaf_map.remove(&const_paths); + for const_positions in empty_const_positions { + c.leaf_map.remove(&const_positions); } } } -fn class_of(v: &AnyValue) -> Option { +fn class_of(v: &AnyValue) -> Option { match v.value() { - Value::Sequence(vs) => Some(Guard::Seq(vs.len())), - Value::Record(r) => Some(Guard::Rec(r.label().clone(), r.arity())), - Value::Dictionary(_) => Some(Guard::Map), + Value::Sequence(_) => Some(ds::GroupType::Arr), + Value::Record(r) => Some(ds::GroupType::Rec { label: r.label().clone() }), + Value::Dictionary(_) => Some(ds::GroupType::Dict), _ => None, } } @@ -398,15 +386,17 @@ fn project_paths<'a>(v: &'a AnyValue, ps: &Paths) -> Option { } fn step<'a>(v: &'a AnyValue, s: &PathStep) -> Option<&'a AnyValue> { - match (v.value(), s) { - (Value::Sequence(vs), PathStep::Index(i)) => - if *i < vs.len() { Some(&vs[*i]) } else { None }, - (Value::Record(r), PathStep::Index(i)) => - if *i < r.arity() { Some(&r.fields()[*i]) } else { None }, - (Value::Dictionary(m), PathStep::Key(k)) => - m.get(k), - _ => - None, + match v.value() { + Value::Sequence(vs) => { + let i = s.value().as_usize()?; + if i < vs.len() { Some(&vs[i]) } else { None } + } + Value::Record(r) => { + let i = s.value().as_usize()?; + if i < r.arity() { Some(&r.fields()[i]) } else { None } + } + Value::Dictionary(m) => m.get(s), + _ => None, } } @@ -423,11 +413,14 @@ impl Continuation { ) { let cached_assertions = &self.cached_assertions; let const_val_map = - self.leaf_map.entry(analysis.const_paths.clone()).or_insert_with({ + self.leaf_map.entry(analysis.const_positions.clone()).or_insert_with({ || { let mut cvm = Map::new(); for a in cached_assertions { - if let Some(key) = project_paths(a, &analysis.const_paths) { + if project_paths(a, &analysis.const_positions.required_to_exist).is_none() { + continue; + } + if let Some(key) = project_paths(a, &analysis.const_positions.with_values) { cvm.entry(key).or_insert_with(Leaf::new) .cached_assertions.insert(a.clone()); } @@ -462,7 +455,7 @@ impl Continuation { observer: &Arc, ) { if let Entry::Occupied(mut const_val_map_entry) - = self.leaf_map.entry(analysis.const_paths) + = self.leaf_map.entry(analysis.const_positions) { let const_val_map = const_val_map_entry.get_mut(); if let Entry::Occupied(mut leaf_entry)