New dataspace pattern implementation; update HTTP server

This commit is contained in:
Tony Garnock-Jones 2024-04-10 17:03:09 +02:00
parent dcb1aec142
commit 581886835a
11 changed files with 406 additions and 363 deletions

View File

@ -36,7 +36,7 @@ enum SymbolVariant<'a> {
fn compile_sequence_members(vs: &[IOValue]) -> Vec<TokenStream> { fn compile_sequence_members(vs: &[IOValue]) -> Vec<TokenStream> {
vs.iter().enumerate().map(|(i, f)| { vs.iter().enumerate().map(|(i, f)| {
let p = compile_pattern(f); let p = compile_pattern(f);
quote!((#i .into(), #p)) quote!((syndicate::value::Value::from(#i).wrap(), #p))
}).collect::<Vec<_>>() }).collect::<Vec<_>>()
} }
@ -151,16 +151,14 @@ fn compile_pattern(v: &IOValue) -> TokenStream {
#[allow(non_snake_case)] #[allow(non_snake_case)]
let V_: TokenStream = quote!(syndicate::value); let V_: TokenStream = quote!(syndicate::value);
#[allow(non_snake_case)] #[allow(non_snake_case)]
let MapFromIterator_: TokenStream = quote!(<#V_::Map<_, _> as std::iter::FromIterator<_>>::from_iter); let MapFrom_: TokenStream = quote!(<#V_::Map<_, _>>::from);
match v.value() { match v.value() {
Value::Symbol(s) => match analyze_symbol(&s, true) { Value::Symbol(s) => match analyze_symbol(&s, true) {
SymbolVariant::Binder(_) => SymbolVariant::Binder(_) =>
quote!(#P_::Pattern::DBind(Box::new(#P_::DBind { quote!(#P_::Pattern::Bind{ pattern: Box::new(#P_::Pattern::Discard) }),
pattern: #P_::Pattern::DDiscard(Box::new(#P_::DDiscard))
}))),
SymbolVariant::Discard => SymbolVariant::Discard =>
quote!(#P_::Pattern::DDiscard(Box::new(#P_::DDiscard))), quote!(#P_::Pattern::Discard),
SymbolVariant::Substitution(s) => SymbolVariant::Substitution(s) =>
lit(Ident::new(s, Span::call_site())), lit(Ident::new(s, Span::call_site())),
SymbolVariant::Normal(_) => SymbolVariant::Normal(_) =>
@ -172,9 +170,7 @@ fn compile_pattern(v: &IOValue) -> TokenStream {
Some(label) => Some(label) =>
if label.starts_with("$") && r.arity() == 1 { if label.starts_with("$") && r.arity() == 1 {
let nested = compile_pattern(&r.fields()[0]); let nested = compile_pattern(&r.fields()[0]);
quote!(#P_::Pattern::DBind(Box::new(#P_::DBind { quote!(#P_::Pattern::Bind{ pattern: Box::new(#nested) })
pattern: #nested
})))
} else { } else {
let label_stx = if label.starts_with("=") { let label_stx = if label.starts_with("=") {
let id = Ident::new(&label[1..], Span::call_site()); let id = Ident::new(&label[1..], Span::call_site());
@ -183,18 +179,19 @@ fn compile_pattern(v: &IOValue) -> TokenStream {
quote!(#V_::Value::symbol(#label).wrap()) quote!(#V_::Value::symbol(#label).wrap())
}; };
let members = compile_sequence_members(r.fields()); let members = compile_sequence_members(r.fields());
quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Rec { quote!(#P_::Pattern::Group {
label: #label_stx, type_: Box::new(#P_::GroupType::Rec { label: #label_stx }),
fields: vec![#(#members),*], entries: #MapFrom_([#(#members),*]),
}))) })
} }
} }
} }
Value::Sequence(vs) => { Value::Sequence(vs) => {
let members = compile_sequence_members(vs); let members = compile_sequence_members(vs);
quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Arr { quote!(#P_::Pattern::Group {
items: vec![#(#members),*], type_: Box::new(#P_::GroupType::Arr),
}))) entries: #MapFrom_([#(#members),*]),
})
} }
Value::Set(_) => Value::Set(_) =>
panic!("Cannot match sets in patterns"), panic!("Cannot match sets in patterns"),
@ -204,9 +201,10 @@ fn compile_pattern(v: &IOValue) -> TokenStream {
let v = compile_pattern(v); let v = compile_pattern(v);
quote!((#k, #v)) quote!((#k, #v))
}).collect::<Vec<_>>(); }).collect::<Vec<_>>();
quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Dict { quote!(#P_::Pattern::Group {
entries: #MapFromIterator_(vec![#(#members),*]) type_: Box::new(#P_::GroupType::Dict),
}))) entries: #MapFrom_([#(#members),*]),
})
} }
_ => lit(ValueCompiler::for_patterns().compile(v)), _ => lit(ValueCompiler::for_patterns().compile(v)),
} }

View File

@ -15,10 +15,9 @@ pub fn lit<T: ToTokens>(e: T) -> TokenStream2 {
} }
fn compile_sequence_members(stxs: &Vec<Stx>) -> Result<Vec<TokenStream2>, &'static str> { fn compile_sequence_members(stxs: &Vec<Stx>) -> Result<Vec<TokenStream2>, &'static str> {
stxs.iter().map(|stx| { stxs.iter().enumerate().map(|(i, stx)| {
// let p = to_pattern_expr(stx)?; let p = to_pattern_expr(stx)?;
// Ok(quote!(#p)) Ok(quote!((syndicate::value::Value::from(#i).wrap(), #p)))
to_pattern_expr(stx)
}).collect() }).collect()
} }
@ -28,7 +27,7 @@ pub fn to_pattern_expr(stx: &Stx) -> Result<TokenStream2, &'static str> {
#[allow(non_snake_case)] #[allow(non_snake_case)]
let V_: TokenStream2 = quote!(syndicate::value); let V_: TokenStream2 = quote!(syndicate::value);
#[allow(non_snake_case)] #[allow(non_snake_case)]
let MapFromIterator_: TokenStream2 = quote!(<#V_::Map<_, _> as std::iter::FromIterator<_>>::from_iter); let MapFrom_: TokenStream2 = quote!(<#V_::Map<_, _>>::from);
match stx { match stx {
Stx::Atom(v) => Stx::Atom(v) =>
@ -41,26 +40,27 @@ pub fn to_pattern_expr(stx: &Stx) -> Result<TokenStream2, &'static str> {
None => to_pattern_expr(&Stx::Discard)?, None => to_pattern_expr(&Stx::Discard)?,
} }
}; };
Ok(quote!(#P_::Pattern::DBind(Box::new(#P_::DBind { pattern: #inner_pat_expr })))) Ok(quote!(#P_::Pattern::Bind { pattern: Box::new(#inner_pat_expr) }))
} }
Stx::Subst(e) => Stx::Subst(e) =>
Ok(lit(e)), Ok(lit(e)),
Stx::Discard => Stx::Discard =>
Ok(quote!(#P_::Pattern::DDiscard(Box::new(#P_::DDiscard)))), Ok(quote!(#P_::Pattern::Discard)),
Stx::Rec(l, fs) => { Stx::Rec(l, fs) => {
let label = to_value_expr(&*l)?; let label = to_value_expr(&*l)?;
let members = compile_sequence_members(fs)?; let members = compile_sequence_members(fs)?;
Ok(quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Rec { Ok(quote!(#P_::Pattern::Group {
label: #label, type_: Box::new(#P_::GroupType::Rec { label: #label }),
fields: vec![#(#members),*], entries: #MapFrom_([#(#members),*]),
})))) }))
}, },
Stx::Seq(stxs) => { Stx::Seq(stxs) => {
let members = compile_sequence_members(stxs)?; let members = compile_sequence_members(stxs)?;
Ok(quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Arr { Ok(quote!(#P_::Pattern::Group {
items: vec![#(#members),*], type_: Box::new(#P_::GroupType::Arr),
})))) entries: #MapFrom_([#(#members),*]),
}))
} }
Stx::Set(_stxs) => Stx::Set(_stxs) =>
Err("Set literals not supported in patterns"), Err("Set literals not supported in patterns"),
@ -70,9 +70,10 @@ pub fn to_pattern_expr(stx: &Stx) -> Result<TokenStream2, &'static str> {
let v = to_pattern_expr(v)?; let v = to_pattern_expr(v)?;
Ok(quote!((#k, #v))) Ok(quote!((#k, #v)))
}).collect::<Result<Vec<_>, &'static str>>()?; }).collect::<Result<Vec<_>, &'static str>>()?;
Ok(quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Dict { Ok(quote!(#P_::Pattern::Group {
entries: #MapFromIterator_(vec![#(#members),*]) type_: Box::new(#P_::GroupType::Dict),
})))) entries: #MapFrom_([#(#members),*])
}))
} }
} }
} }

View File

@ -0,0 +1,15 @@
{
"folders": [
{
"path": "."
},
{
"path": "../syndicate-protocols"
}
],
"settings": {
"files.exclude": {
"target": true
}
}
}

View File

@ -61,7 +61,7 @@ impl Plugin for PatternPlugin {
} }
fn discard() -> P::Pattern { fn discard() -> P::Pattern {
P::Pattern::DDiscard(Box::new(P::DDiscard)) P::Pattern::Discard
} }
trait WildcardPattern { trait WildcardPattern {
@ -94,33 +94,34 @@ fn from_io(v: &IOValue) -> Option<P::_Any> {
impl WildcardPattern for CompoundPattern { impl WildcardPattern for CompoundPattern {
fn wc(&self, s: &mut WalkState) -> Option<P::Pattern> { fn wc(&self, s: &mut WalkState) -> Option<P::Pattern> {
match self { match self {
CompoundPattern::Tuple { patterns } => CompoundPattern::Tuple { patterns } |
Some(P::Pattern::DCompound(Box::new(P::DCompound::Arr { CompoundPattern::TuplePrefix { fixed: patterns, .. }=>
items: patterns.iter() Some(P::Pattern::Group {
.map(|p| unname(p).wc(s)) type_: Box::new(P::GroupType::Arr),
.collect::<Option<Vec<P::Pattern>>>()?, entries: patterns.iter().enumerate()
}))), .map(|(i, p)| Some((P::_Any::new(i), unname(p).wc(s)?)))
CompoundPattern::TuplePrefix { .. } => .collect::<Option<Map<P::_Any, P::Pattern>>>()?,
Some(discard()), }),
CompoundPattern::Dict { entries } => CompoundPattern::Dict { entries } =>
Some(P::Pattern::DCompound(Box::new(P::DCompound::Dict { Some(P::Pattern::Group {
type_: Box::new(P::GroupType::Dict),
entries: Map::from_iter( entries: Map::from_iter(
entries.0.iter() entries.0.iter()
.map(|(k, p)| Some((from_io(k)?, unname_simple(p).wc(s)?))) .map(|(k, p)| Some((from_io(k)?, unname_simple(p).wc(s)?)))
.filter(|e| discard() != e.as_ref().unwrap().1) .filter(|e| discard() != e.as_ref().unwrap().1)
.collect::<Option<Vec<(P::_Any, P::Pattern)>>>()? .collect::<Option<Vec<(P::_Any, P::Pattern)>>>()?
.into_iter()), .into_iter()),
}))), }),
CompoundPattern::Rec { label, fields } => match (unname(label), unname(fields)) { CompoundPattern::Rec { label, fields } => match (unname(label), unname(fields)) {
(Pattern::SimplePattern(label), Pattern::CompoundPattern(fields)) => (Pattern::SimplePattern(label), Pattern::CompoundPattern(fields)) =>
match (*label, *fields) { match (*label, *fields) {
(SimplePattern::Lit { value }, CompoundPattern::Tuple { patterns }) => (SimplePattern::Lit { value }, CompoundPattern::Tuple { patterns }) =>
Some(P::Pattern::DCompound(Box::new(P::DCompound::Rec { Some(P::Pattern::Group{
label: from_io(&value)?, type_: Box::new(P::GroupType::Rec { label: from_io(&value)? }),
fields: patterns.iter() entries: patterns.iter().enumerate()
.map(|p| unname(p).wc(s)) .map(|(i, p)| Some((P::_Any::new(i), unname(p).wc(s)?)))
.collect::<Option<Vec<P::Pattern>>>()?, .collect::<Option<Map<P::_Any, P::Pattern>>>()?,
}))), }),
_ => None, _ => None,
}, },
_ => None, _ => None,

View File

@ -10,7 +10,6 @@ use hyper::header::HeaderValue;
use syndicate::actor::*; use syndicate::actor::*;
use syndicate::error::Error; use syndicate::error::Error;
use syndicate::relay::Mutex;
use syndicate::trace; use syndicate::trace;
use syndicate::value::Map; use syndicate::value::Map;
use syndicate::value::NestedValue; use syndicate::value::NestedValue;
@ -34,29 +33,25 @@ pub fn empty_response(code: StatusCode) -> Response<Body> {
type ChunkItem = Result<body::Bytes, Box<dyn std::error::Error + Send + Sync>>; type ChunkItem = Result<body::Bytes, Box<dyn std::error::Error + Send + Sync>>;
struct ResponseCollector { struct ResponseCollector {
framing_handle: Option<Handle>,
context_handle: Arc<Mutex<Option<Handle>>>,
tx_res: Option<(oneshot::Sender<Response<Body>>, Response<Body>)>, tx_res: Option<(oneshot::Sender<Response<Body>>, Response<Body>)>,
body_tx: Option<UnboundedSender<ChunkItem>>, body_tx: Option<UnboundedSender<ChunkItem>>,
} }
impl ResponseCollector { impl ResponseCollector {
fn new(tx: oneshot::Sender<Response<Body>>, context_handle: Arc<Mutex<Option<Handle>>>) -> Self { fn new(tx: oneshot::Sender<Response<Body>>) -> Self {
let (body_tx, body_rx) = unbounded_channel(); let (body_tx, body_rx) = unbounded_channel();
let body_stream: Box<dyn futures::Stream<Item = ChunkItem> + Send> = let body_stream: Box<dyn futures::Stream<Item = ChunkItem> + Send> =
Box::new(UnboundedReceiverStream::new(body_rx)); Box::new(UnboundedReceiverStream::new(body_rx));
let mut res = Response::new(body_stream.into()); let mut res = Response::new(body_stream.into());
*res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; *res.status_mut() = StatusCode::OK;
ResponseCollector { ResponseCollector {
framing_handle: None,
context_handle,
tx_res: Some((tx, res)), tx_res: Some((tx, res)),
body_tx: Some(body_tx), body_tx: Some(body_tx),
} }
} }
fn with_res<F: FnOnce(&mut Response<Body>) -> ActorResult>(&mut self, f: F) -> ActorResult { fn with_res<F: FnOnce(&mut Response<Body>) -> ActorResult>(&mut self, f: F) -> ActorResult {
if let ResponseCollector { tx_res: Some((_, res)), .. } = self { if let Some((_, res)) = &mut self.tx_res {
f(res)?; f(res)?;
} }
Ok(()) Ok(())
@ -84,39 +79,13 @@ impl ResponseCollector {
fn finish(&mut self, t: &mut Activation) -> ActorResult { fn finish(&mut self, t: &mut Activation) -> ActorResult {
self.deliver_res(); self.deliver_res();
self.body_tx = None; self.body_tx = None;
if let Some(h) = self.context_handle.lock().take() { t.stop();
t.retract(h);
}
Ok(()) Ok(())
} }
} }
impl Entity<http::HttpResponse> for ResponseCollector { impl Entity<http::HttpResponse> for ResponseCollector {
fn assert(&mut self, _t: &mut Activation, assertion: http::HttpResponse, handle: Handle) -> ActorResult {
match assertion {
http::HttpResponse::Processing => {
self.framing_handle = Some(handle);
self.with_res(|r| {
*r.status_mut() = StatusCode::OK;
Ok(())
})
}
_ => Err(format!("Unexpected assertion {:?}", assertion))?,
}
}
fn retract(&mut self, t: &mut Activation, handle: Handle) -> ActorResult {
if self.framing_handle == Some(handle) {
self.finish(t)?;
}
Ok(())
}
fn message(&mut self, t: &mut Activation, message: http::HttpResponse) -> ActorResult { fn message(&mut self, t: &mut Activation, message: http::HttpResponse) -> ActorResult {
if self.framing_handle.is_none() {
self.finish(t)?;
Err("Attempt to reply before <processing> has been asserted")?;
}
match message { match message {
http::HttpResponse::Status { code, .. } => self.with_res(|r| { http::HttpResponse::Status { code, .. } => self.with_res(|r| {
*r.status_mut() = StatusCode::from_u16( *r.status_mut() = StatusCode::from_u16(
@ -128,8 +97,13 @@ impl Entity<http::HttpResponse> for ResponseCollector {
HeaderValue::from_str(value.as_str())?); HeaderValue::from_str(value.as_str())?);
Ok(()) Ok(())
}), }),
http::HttpResponse::Body { chunk } => self.add_chunk(*chunk), http::HttpResponse::Chunk { chunk } => {
_ => Err(format!("Unexpected message {:?}", message))?, self.add_chunk(*chunk)
}
http::HttpResponse::Done { chunk } => {
self.add_chunk(*chunk)?;
self.finish(t)
}
} }
} }
} }
@ -142,11 +116,11 @@ pub async fn serve(
port: u16, port: u16,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let host = match req.headers().get("host").and_then(|v| v.to_str().ok()) { let host = match req.headers().get("host").and_then(|v| v.to_str().ok()) {
None => return Ok(empty_response(StatusCode::BAD_REQUEST)), None => http::RequestHost::Absent,
Some(h) => match h.rsplit_once(':') { Some(h) => http::RequestHost::Present(match h.rsplit_once(':') {
None => h.to_string(), None => h.to_string(),
Some((h, _port)) => h.to_string(), Some((h, _port)) => h.to_string(),
} })
}; };
let uri = req.uri(); let uri = req.uri();
@ -193,23 +167,22 @@ pub async fn serve(
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
facet.activate(&account, Some(trace::TurnCause::external("http")), |t| { facet.activate(&account, Some(trace::TurnCause::external("http")), |t| {
let sreq = http::HttpRequest { t.facet(move |t| {
sequence_number: NEXT_SEQ.fetch_add(1, Ordering::Relaxed).into(), let sreq = http::HttpRequest {
host, sequence_number: NEXT_SEQ.fetch_add(1, Ordering::Relaxed).into(),
port: port.into(), host,
method: req.method().to_string().to_lowercase(), port: port.into(),
path, method: req.method().to_string().to_lowercase(),
headers: http::Headers(headers), path,
query, headers: http::Headers(headers),
body, query,
}; body,
tracing::debug!(?sreq); };
let context_handle: Arc<Mutex<Option<Handle>>> = Arc::new(Mutex::new(None)); tracing::debug!(?sreq);
let srep = Cap::guard(&language().syndicate, t.create(ResponseCollector::new( let srep = Cap::guard(&language().syndicate, t.create(ResponseCollector::new(tx)));
tx, httpd.assert(t, language(), &http::HttpContext { req: sreq, res: srep });
Arc::clone(&context_handle)))); Ok(())
*(context_handle.lock()) = httpd.assert( })?;
t, language(), &http::HttpContext { req: sreq, res: srep });
Ok(()) Ok(())
}); });

View File

@ -9,7 +9,7 @@ use syndicate::actor::*;
use syndicate::dataspace::Dataspace; use syndicate::dataspace::Dataspace;
use syndicate::during; use syndicate::during;
use syndicate::enclose; use syndicate::enclose;
use syndicate::pattern::{lift_literal, drop_literal}; use syndicate::pattern::{lift_literal, drop_literal, pattern_seq_from_dictionary};
use syndicate::schemas::dataspace; use syndicate::schemas::dataspace;
use syndicate::schemas::dataspace_patterns as P; use syndicate::schemas::dataspace_patterns as P;
use syndicate::schemas::sturdy; use syndicate::schemas::sturdy;
@ -173,7 +173,7 @@ fn bad_instruction(message: &str) -> io::Error {
} }
fn discard() -> P::Pattern { fn discard() -> P::Pattern {
P::Pattern::DDiscard(Box::new(P::DDiscard)) P::Pattern::Discard
} }
fn dlit(value: AnyValue) -> P::Pattern { fn dlit(value: AnyValue) -> P::Pattern {
@ -272,7 +272,7 @@ impl<'env> PatternInstantiator<'env> {
Symbolic::Discard => discard(), Symbolic::Discard => discard(),
Symbolic::Binder(s) => { Symbolic::Binder(s) => {
self.binding_names.push(s); self.binding_names.push(s);
P::Pattern::DBind(Box::new(P::DBind { pattern: discard() })) P::Pattern::Bind { pattern: Box::new(discard()) }
} }
Symbolic::Reference(s) => Symbolic::Reference(s) =>
dlit(self.env.lookup(&s, "pattern-template variable")?.clone()), dlit(self.env.lookup(&s, "pattern-template variable")?.clone()),
@ -287,43 +287,47 @@ impl<'env> PatternInstantiator<'env> {
Some(pat) => pat, Some(pat) => pat,
None => { None => {
let label = self.instantiate_pattern(r.label())?; let label = self.instantiate_pattern(r.label())?;
let fields = r.fields().iter().map(|p| self.instantiate_pattern(p)) let entries = r.fields().iter().enumerate()
.collect::<io::Result<Vec<P::Pattern>>>()?; .map(|(i, p)| Ok((AnyValue::new(i), self.instantiate_pattern(p)?)))
P::Pattern::DCompound(Box::new(P::DCompound::Rec { .collect::<io::Result<Map<AnyValue, P::Pattern>>>()?;
label: drop_literal(&label) P::Pattern::Group {
.ok_or(bad_instruction("Record pattern must have literal label"))?, type_: Box::new(P::GroupType::Rec {
fields, label: drop_literal(&label)
})) .ok_or(bad_instruction("Record pattern must have literal label"))?,
}),
entries,
}
} }
} }
}, },
Value::Sequence(v) => Value::Sequence(v) =>
P::Pattern::DCompound(Box::new(P::DCompound::Arr { P::Pattern::Group {
items: v.iter() type_: Box::new(P::GroupType::Arr),
.map(|p| self.instantiate_pattern(p)) entries: v.iter().enumerate()
.collect::<io::Result<Vec<P::Pattern>>>()?, .map(|(i, p)| Ok((AnyValue::new(i), self.instantiate_pattern(p)?)))
})), .collect::<io::Result<Map<AnyValue, P::Pattern>>>()?,
},
Value::Set(_) => Value::Set(_) =>
Err(bad_instruction(&format!("Sets not permitted in patterns: {:?}", template)))?, Err(bad_instruction(&format!("Sets not permitted in patterns: {:?}", template)))?,
Value::Dictionary(v) => Value::Dictionary(v) =>
P::Pattern::DCompound(Box::new(P::DCompound::Dict { P::Pattern::Group {
type_: Box::new(P::GroupType::Dict),
entries: v.iter() entries: v.iter()
.map(|(a, b)| Ok((a.clone(), self.instantiate_pattern(b)?))) .map(|(a, b)| Ok((a.clone(), self.instantiate_pattern(b)?)))
.collect::<io::Result<Map<AnyValue, P::Pattern>>>()?, .collect::<io::Result<Map<AnyValue, P::Pattern>>>()?,
})), },
}) })
} }
fn maybe_binder_with_pattern(&mut self, r: &Record<AnyValue>) -> io::Result<Option<P::Pattern>> { fn maybe_binder_with_pattern(&mut self, r: &Record<AnyValue>) -> io::Result<Option<P::Pattern>> {
match r.label().value().as_symbol().map(|s| analyze(&s)) { match r.label().value().as_symbol().map(|s| analyze(&s)) {
Some(Symbolic::Binder(formal)) => if r.fields().len() == 1 { Some(Symbolic::Binder(formal)) if r.fields().len() == 1 => {
let pattern = self.instantiate_pattern(&r.fields()[0])?; let pattern = self.instantiate_pattern(&r.fields()[0])?;
self.binding_names.push(formal); self.binding_names.push(formal);
return Ok(Some(P::Pattern::DBind(Box::new(P::DBind { pattern })))); Ok(Some(P::Pattern::Bind { pattern: Box::new(pattern) }))
}, },
_ => (), _ => Ok(None),
} }
Ok(None)
} }
} }
@ -553,7 +557,7 @@ impl Env {
RewriteTemplate::Accept { pattern_template } => { RewriteTemplate::Accept { pattern_template } => {
let (_binding_names, pattern) = self.instantiate_pattern(pattern_template)?; let (_binding_names, pattern) = self.instantiate_pattern(pattern_template)?;
Ok(sturdy::Rewrite { Ok(sturdy::Rewrite {
pattern: embed_pattern(&P::Pattern::DBind(Box::new(P::DBind { pattern }))), pattern: embed_pattern(&P::Pattern::Bind { pattern: Box::new(pattern) }),
template: sturdy::Template::TRef(Box::new(sturdy::TRef { binding: 0.into() })), template: sturdy::Template::TRef(Box::new(sturdy::TRef { binding: 0.into() })),
}) })
} }
@ -674,24 +678,26 @@ impl Env {
fn embed_pattern(p: &P::Pattern) -> sturdy::Pattern { fn embed_pattern(p: &P::Pattern) -> sturdy::Pattern {
match p { match p {
P::Pattern::DDiscard(_) => sturdy::Pattern::PDiscard(Box::new(sturdy::PDiscard)), P::Pattern::Discard => sturdy::Pattern::PDiscard(Box::new(sturdy::PDiscard)),
P::Pattern::DBind(b) => sturdy::Pattern::PBind(Box::new(sturdy::PBind { P::Pattern::Bind { pattern } => sturdy::Pattern::PBind(Box::new(sturdy::PBind {
pattern: embed_pattern(&b.pattern), pattern: embed_pattern(&**pattern),
})), })),
P::Pattern::DLit(b) => sturdy::Pattern::Lit(Box::new(sturdy::Lit { P::Pattern::Lit { value } => sturdy::Pattern::Lit(Box::new(sturdy::Lit {
value: language().unparse(&b.value), value: language().unparse(&**value),
})), })),
P::Pattern::DCompound(b) => sturdy::Pattern::PCompound(Box::new(match &**b { P::Pattern::Group { type_, entries } => sturdy::Pattern::PCompound(Box::new(match &**type_ {
P::DCompound::Rec { label, fields } => P::GroupType::Rec { label } =>
sturdy::PCompound::Rec { sturdy::PCompound::Rec {
label: label.clone(), label: label.clone(),
fields: fields.iter().map(embed_pattern).collect(), fields: pattern_seq_from_dictionary(entries).expect("correct field entries")
.into_iter().map(embed_pattern).collect(),
}, },
P::DCompound::Arr { items } => P::GroupType::Arr =>
sturdy::PCompound::Arr { sturdy::PCompound::Arr {
items: items.iter().map(embed_pattern).collect(), items: pattern_seq_from_dictionary(entries).expect("correct element entries")
.into_iter().map(embed_pattern).collect(),
}, },
P::DCompound::Dict { entries } => P::GroupType::Dict =>
sturdy::PCompound::Dict { sturdy::PCompound::Dict {
entries: entries.iter().map(|(k, v)| (k.clone(), embed_pattern(v))).collect(), entries: entries.iter().map(|(k, v)| (k.clone(), embed_pattern(v))).collect(),
}, },

View File

@ -10,7 +10,6 @@ use syndicate::error::Error;
use syndicate::preserves::rec; use syndicate::preserves::rec;
use syndicate::preserves::value::Map; use syndicate::preserves::value::Map;
use syndicate::preserves::value::NestedValue; use syndicate::preserves::value::NestedValue;
use syndicate::preserves::value::Set;
use syndicate::schemas::http; use syndicate::schemas::http;
use syndicate::value::signed_integer::SignedInteger; use syndicate::value::signed_integer::SignedInteger;
@ -56,10 +55,22 @@ pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
}); });
} }
type MethodTable = Map<http::MethodPattern, Set<Arc<Cap>>>; #[derive(Debug, Clone)]
struct ActiveHandler {
cap: Arc<Cap>,
terminated: Arc<Field<bool>>,
}
type MethodTable = Map<http::MethodPattern, Vec<ActiveHandler>>;
type HostTable = Map<http::HostPattern, Map<http::PathPattern, MethodTable>>; type HostTable = Map<http::HostPattern, Map<http::PathPattern, MethodTable>>;
type RoutingTable = Map<SignedInteger, HostTable>; type RoutingTable = Map<SignedInteger, HostTable>;
fn request_host(value: &http::RequestHost) -> Option<String> {
match value {
http::RequestHost::Present(h) => Some(h.to_owned()),
http::RequestHost::Absent => None,
}
}
fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult { fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
ds.assert(t, language(), &lifecycle::started(&spec)); ds.assert(t, language(), &lifecycle::started(&spec));
ds.assert(t, language(), &lifecycle::ready(&spec)); ds.assert(t, language(), &lifecycle::ready(&spec));
@ -77,22 +88,30 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
let host = language().parse::<http::HostPattern>(&host)?; let host = language().parse::<http::HostPattern>(&host)?;
let path = language().parse::<http::PathPattern>(&path)?; let path = language().parse::<http::PathPattern>(&path)?;
let method = language().parse::<http::MethodPattern>(&method)?; let method = language().parse::<http::MethodPattern>(&method)?;
let handler = handler.value().to_embedded()?; let handler_cap = handler.value().to_embedded()?.clone();
let handler_terminated = t.named_field("handler-terminated", false);
t.get_mut(&routes) t.get_mut(&routes)
.entry(port.clone()).or_default() .entry(port.clone()).or_default()
.entry(host.clone()).or_default() .entry(host.clone()).or_default()
.entry(path.clone()).or_default() .entry(path.clone()).or_default()
.entry(method.clone()).or_default() .entry(method.clone()).or_default()
.insert(handler.clone()); .push(ActiveHandler {
t.on_stop(enclose!((routes, handler, method, path, host, port) move |t| { cap: handler_cap.clone(),
terminated: handler_terminated,
});
t.on_stop(enclose!((routes, method, path, host, port) move |t| {
tracing::debug!("-HTTP binding {:?} {:?} {:?} {:?} {:?}", host, port, method, path, handler); tracing::debug!("-HTTP binding {:?} {:?} {:?} {:?} {:?}", host, port, method, path, handler);
let port_map = t.get_mut(&routes); let port_map = t.get_mut(&routes);
let host_map = port_map.entry(port.clone()).or_default(); let host_map = port_map.entry(port.clone()).or_default();
let path_map = host_map.entry(host.clone()).or_default(); let path_map = host_map.entry(host.clone()).or_default();
let method_map = path_map.entry(path.clone()).or_default(); let method_map = path_map.entry(path.clone()).or_default();
let handler_set = method_map.entry(method.clone()).or_default(); let handler_vec = method_map.entry(method.clone()).or_default();
handler_set.remove(&handler); let handler = {
if handler_set.is_empty() { let i = handler_vec.iter().position(|a| a.cap == handler_cap)
.expect("Expected an index of an active handler to remove");
handler_vec.swap_remove(i)
};
if handler_vec.is_empty() {
method_map.remove(&method); method_map.remove(&method);
} }
if method_map.is_empty() { if method_map.is_empty() {
@ -104,6 +123,7 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
if host_map.is_empty() { if host_map.is_empty() {
port_map.remove(&port); port_map.remove(&port);
} }
*t.get_mut(&handler.terminated) = true;
Ok(()) Ok(())
})); }));
Ok(()) Ok(())
@ -124,7 +144,7 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
None => return send_empty(t, res, 404, "Not found"), None => return send_empty(t, res, 404, "Not found"),
}; };
let methods = match try_hostname(host_map, http::HostPattern::Host(req.host.clone()), &req.path)? { let methods = match request_host(&req.host).and_then(|h| try_hostname(host_map, http::HostPattern::Host(h), &req.path).transpose()).transpose()? {
Some(methods) => methods, Some(methods) => methods,
None => match try_hostname(host_map, http::HostPattern::Any, &req.path)? { None => match try_hostname(host_map, http::HostPattern::Any, &req.path)? {
Some(methods) => methods, Some(methods) => methods,
@ -141,13 +161,11 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
http::MethodPattern::Specific(m) => m.to_uppercase(), http::MethodPattern::Specific(m) => m.to_uppercase(),
http::MethodPattern::Any => unreachable!(), http::MethodPattern::Any => unreachable!(),
}).collect::<Vec<String>>().join(", "); }).collect::<Vec<String>>().join(", ");
let h = res.assert(t, language(), &http::HttpResponse::Processing);
res.message(t, language(), &http::HttpResponse::Status { res.message(t, language(), &http::HttpResponse::Status {
code: 405.into(), message: "Method Not Allowed".into() }); code: 405.into(), message: "Method Not Allowed".into() });
res.message(t, language(), &http::HttpResponse::Header { res.message(t, language(), &http::HttpResponse::Header {
name: "allow".into(), value: allowed }); name: "allow".into(), value: allowed });
if let Some(h) = h { t.retract(h); } return send_done(t, res);
return Ok(())
} }
} }
}; };
@ -155,22 +173,33 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
if handlers.len() > 1 { if handlers.len() > 1 {
tracing::warn!(?req, "Too many handlers available"); tracing::warn!(?req, "Too many handlers available");
} }
let handler = handlers.first().expect("Nonempty handler set").clone(); let ActiveHandler { cap, terminated } = handlers.first().expect("Nonempty handler set").clone();
tracing::trace!("Handler for {:?} is {:?}", &req, &handler); tracing::trace!("Handler for {:?} is {:?}", &req, &cap);
handler.assert(t, language(), &http::HttpContext { req, res: res.clone() });
t.dataflow(enclose!((terminated, req, res) move |t| {
if *t.get(&terminated) {
tracing::trace!("Handler for {:?} terminated", &req);
send_empty(t, &res, 500, "Internal Server Error")?;
}
Ok(())
}))?;
cap.assert(t, language(), &http::HttpContext { req, res: res.clone() });
Ok(()) Ok(())
}); });
Ok(()) Ok(())
} }
fn send_done(t: &mut Activation, res: &Arc<Cap>) -> ActorResult {
res.message(t, language(), &http::HttpResponse::Done {
chunk: Box::new(http::Chunk::Bytes(vec![])) });
Ok(())
}
fn send_empty(t: &mut Activation, res: &Arc<Cap>, code: u16, message: &str) -> ActorResult { fn send_empty(t: &mut Activation, res: &Arc<Cap>, code: u16, message: &str) -> ActorResult {
let h = res.assert(t, language(), &http::HttpResponse::Processing);
res.message(t, language(), &http::HttpResponse::Status { res.message(t, language(), &http::HttpResponse::Status {
code: code.into(), message: message.into() }); code: code.into(), message: message.into() });
if let Some(h) = h { t.retract(h); } send_done(t, res)
return Ok(())
} }
fn path_pattern_matches(path_pat: &http::PathPattern, path: &Vec<String>) -> bool { fn path_pattern_matches(path_pat: &http::PathPattern, path: &Vec<String>) -> bool {
@ -268,13 +297,11 @@ impl HttpStaticFileServer {
Ok(mut fh) => { Ok(mut fh) => {
if fh.metadata().is_ok_and(|m| m.is_dir()) { if fh.metadata().is_ok_and(|m| m.is_dir()) {
drop(fh); drop(fh);
let h = res.assert(t, language(), &http::HttpResponse::Processing);
res.message(t, language(), &http::HttpResponse::Status { res.message(t, language(), &http::HttpResponse::Status {
code: 301.into(), message: "Moved permanently".into() }); code: 301.into(), message: "Moved permanently".into() });
res.message(t, language(), &http::HttpResponse::Header { res.message(t, language(), &http::HttpResponse::Header {
name: "location".into(), value: format!("/{}/", req.path.join("/")) }); name: "location".into(), value: format!("/{}/", req.path.join("/")) });
if let Some(h) = h { t.retract(h); } return send_done(t, res);
return Ok(())
} else { } else {
let mut buf = Vec::new(); let mut buf = Vec::new();
fh.read_to_end(&mut buf)?; fh.read_to_end(&mut buf)?;
@ -287,16 +314,14 @@ impl HttpStaticFileServer {
} }
}; };
let h = res.assert(t, language(), &http::HttpResponse::Processing);
res.message(t, language(), &http::HttpResponse::Status { res.message(t, language(), &http::HttpResponse::Status {
code: 200.into(), message: "OK".into() }); code: 200.into(), message: "OK".into() });
if let Some(mime_type) = mime_type { if let Some(mime_type) = mime_type {
res.message(t, language(), &http::HttpResponse::Header { res.message(t, language(), &http::HttpResponse::Header {
name: "content-type".into(), value: mime_type.to_owned() }); name: "content-type".into(), value: mime_type.to_owned() });
} }
res.message(t, language(), &http::HttpResponse::Body { res.message(t, language(), &http::HttpResponse::Done {
chunk: Box::new(http::Chunk::Bytes(body)) }); chunk: Box::new(http::Chunk::Bytes(body)) });
if let Some(h) = h { t.retract(h); }
Ok(()) Ok(())
} }

View File

@ -15,24 +15,20 @@ use tokio::net::TcpListener;
use crate::language::language; use crate::language::language;
use crate::lifecycle; use crate::lifecycle;
use crate::protocol::detect_protocol; use crate::protocol::detect_protocol;
use crate::schemas::internal_services::{TcpWithHttp, TcpWithoutHttp, TcpRelayListener}; use crate::schemas::internal_services::TcpWithoutHttp;
use syndicate_macros::during; use syndicate_macros::during;
pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) { pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
t.spawn(Some(AnyValue::symbol("tcp_relay_listener")), move |t| { t.spawn(Some(AnyValue::symbol("tcp_relay_listener")), move |t| {
enclose!((ds) during!(t, ds, language(), <run-service $spec: TcpWithHttp::<AnyValue>>, |t: &mut Activation| {
spec.httpd.assert(t, language(), &syndicate::schemas::http::HttpListener { port: spec.addr.port.clone() });
run_supervisor(t, ds.clone(), TcpRelayListener::TcpWithHttp(Box::new(spec)))
}));
enclose!((ds) during!(t, ds, language(), <run-service $spec: TcpWithoutHttp::<AnyValue>>, |t| { enclose!((ds) during!(t, ds, language(), <run-service $spec: TcpWithoutHttp::<AnyValue>>, |t| {
run_supervisor(t, ds.clone(), TcpRelayListener::TcpWithoutHttp(Box::new(spec))) run_supervisor(t, ds.clone(), spec)
})); }));
Ok(()) Ok(())
}); });
} }
fn run_supervisor(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> ActorResult { fn run_supervisor(t: &mut Activation, ds: Arc<Cap>, spec: TcpWithoutHttp) -> ActorResult {
Supervisor::start( Supervisor::start(
t, t,
Some(rec![AnyValue::symbol("relay"), language().unparse(&spec)]), Some(rec![AnyValue::symbol("relay"), language().unparse(&spec)]),
@ -41,18 +37,32 @@ fn run_supervisor(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> A
enclose!((ds) move |t| enclose!((ds, spec) run(t, ds, spec)))) enclose!((ds) move |t| enclose!((ds, spec) run(t, ds, spec))))
} }
fn run(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> ActorResult { fn run(t: &mut Activation, ds: Arc<Cap>, spec: TcpWithoutHttp) -> ActorResult {
lifecycle::terminate_on_service_restart(t, &ds, &spec); lifecycle::terminate_on_service_restart(t, &ds, &spec);
let (addr, gatekeeper, httpd) = match spec.clone() {
TcpRelayListener::TcpWithHttp(b) => { let httpd = t.named_field("httpd", None::<Arc<Cap>>);
let TcpWithHttp { addr, gatekeeper, httpd } = *b;
(addr, gatekeeper, Some(httpd)) {
} let ad = spec.addr.clone();
TcpRelayListener::TcpWithoutHttp(b) => { let ad2 = ad.clone();
let TcpWithoutHttp { addr, gatekeeper } = *b; let gk = spec.gatekeeper.clone();
(addr, gatekeeper, None) enclose!((ds, httpd) during!(t, ds, language(),
} <run-service <relay-listener #(&language().unparse(&ad)) #(&AnyValue::domain(gk)) $h>>, |t: &mut Activation| {
}; if let Some(h) = h.value().as_embedded().cloned() {
h.assert(t, language(), &syndicate::schemas::http::HttpListener { port: ad2.port.clone() });
*t.get_mut(&httpd) = Some(h.clone());
t.on_stop(enclose!((httpd) move |t| {
let f = t.get_mut(&httpd);
if *f == Some(h.clone()) { *f = None; }
Ok(())
}));
}
Ok(())
}));
}
let TcpWithoutHttp { addr, gatekeeper } = spec.clone();
let host = addr.host.clone(); let host = addr.host.clone();
let port = u16::try_from(&addr.port).map_err(|_| "Invalid TCP port number")?; let port = u16::try_from(&addr.port).map_err(|_| "Invalid TCP port number")?;
let facet = t.facet_ref(); let facet = t.facet_ref();
@ -83,6 +93,7 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> ActorResult
let account = Account::new(name.clone(), trace_collector.clone()); let account = Account::new(name.clone(), trace_collector.clone());
if !facet.activate( if !facet.activate(
&account, cause, enclose!((trace_collector, httpd) move |t| { &account, cause, enclose!((trace_collector, httpd) move |t| {
let httpd = t.get(&httpd).clone();
t.spawn(name, move |t| { t.spawn(name, move |t| {
Ok(t.linked_task(None, { Ok(t.linked_task(None, {
let facet = t.facet_ref(); let facet = t.facet_ref();
@ -91,7 +102,7 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> ActorResult
facet, facet,
stream, stream,
gatekeeper, gatekeeper,
httpd.map(|r| r.clone()), httpd,
addr, addr,
port).await?; port).await?;
Ok(LinkedTaskTermination::KeepFacet) Ok(LinkedTaskTermination::KeepFacet)

View File

@ -11,6 +11,7 @@ use syndicate::during::entity;
use syndicate::dataspace::Dataspace; use syndicate::dataspace::Dataspace;
use syndicate::schemas::dataspace::Observe; use syndicate::schemas::dataspace::Observe;
use syndicate::schemas::dataspace_patterns as p; use syndicate::schemas::dataspace_patterns as p;
use syndicate::value::Map;
use syndicate::value::NestedValue; use syndicate::value::NestedValue;
use syndicate::value::Value; use syndicate::value::Value;
@ -88,11 +89,11 @@ pub fn bench_pub(c: &mut Criterion) {
.create_cap(t); .create_cap(t);
ds.assert(t, language(), &Observe { ds.assert(t, language(), &Observe {
pattern: p::Pattern::DBind(Box::new(p::DBind { pattern: p::Pattern::Bind {
pattern: p::Pattern::DLit(Box::new(p::DLit { pattern: Box::new(p::Pattern::Lit {
value: p::AnyAtom::Symbol("consumer".to_owned()), value: Box::new(p::AnyAtom::Symbol("consumer".to_owned())),
})), }),
})), },
observer: shutdown, observer: shutdown,
}); });
@ -110,24 +111,27 @@ pub fn bench_pub(c: &mut Criterion) {
ds.assert(t, &(), &AnyValue::symbol("consumer")); ds.assert(t, &(), &AnyValue::symbol("consumer"));
ds.assert(t, language(), &Observe { ds.assert(t, language(), &Observe {
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec { pattern: p::Pattern::Group {
label: AnyValue::symbol("Says"), type_: Box::new(p::GroupType::Rec {
fields: vec![ label: AnyValue::symbol("Says"),
p::Pattern::DLit(Box::new(p::DLit { }),
value: p::AnyAtom::String("bench_pub".to_owned()), entries: Map::from([
})), (p::_Any::new(0), p::Pattern::Lit {
p::Pattern::DBind(Box::new(p::DBind { value: Box::new(p::AnyAtom::String("bench_pub".to_owned())),
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)), }),
})), (p::_Any::new(1), p::Pattern::Bind {
]})), pattern: Box::new(p::Pattern::Discard),
}),
]),
},
observer: receiver, observer: receiver,
}); });
ds.assert(t, language(), &Observe { ds.assert(t, language(), &Observe {
pattern: p::Pattern::DBind(Box::new(p::DBind { pattern: p::Pattern::Bind {
pattern: p::Pattern::DLit(Box::new(p::DLit { pattern: Box::new(p::Pattern::Lit {
value: p::AnyAtom::Bool(true), value: Box::new(p::AnyAtom::Bool(true)),
})), }),
})), },
observer: shutdown, observer: shutdown,
}); });

View File

@ -1,3 +1,5 @@
use std::sync::Arc;
use crate::schemas::dataspace_patterns::*; use crate::schemas::dataspace_patterns::*;
use super::language; use super::language;
@ -8,23 +10,25 @@ use preserves::value::Record;
use preserves::value::Value; use preserves::value::Value;
use preserves_schema::Codec; use preserves_schema::Codec;
#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq)] pub type PathStep = _Any;
pub enum PathStep {
Index(usize),
Key(_Any),
}
pub type Path = Vec<PathStep>; pub type Path = Vec<PathStep>;
pub type Paths = Vec<Path>; pub type Paths = Vec<Path>;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct ConstantPositions {
pub with_values: Paths,
pub required_to_exist: Paths,
}
struct Analyzer { struct Analyzer {
pub const_paths: Paths, pub const_paths: Paths,
pub const_values: Vec<_Any>, pub const_values: Vec<_Any>,
pub checked_paths: Paths,
pub capture_paths: Paths, pub capture_paths: Paths,
} }
pub struct PatternAnalysis { pub struct PatternAnalysis {
pub const_paths: Paths, pub const_positions: Arc<ConstantPositions>,
pub const_values: _Any, pub const_values: _Any,
pub capture_paths: Paths, pub capture_paths: Paths,
} }
@ -38,11 +42,15 @@ impl PatternAnalysis {
let mut analyzer = Analyzer { let mut analyzer = Analyzer {
const_paths: Vec::new(), const_paths: Vec::new(),
const_values: Vec::new(), const_values: Vec::new(),
checked_paths: Vec::new(),
capture_paths: Vec::new(), capture_paths: Vec::new(),
}; };
analyzer.walk(&mut Vec::new(), p); analyzer.walk(&mut Vec::new(), p);
PatternAnalysis { PatternAnalysis {
const_paths: analyzer.const_paths, const_positions: Arc::new(ConstantPositions {
with_values: analyzer.const_paths,
required_to_exist: analyzer.checked_paths,
}),
const_values: _Any::new(analyzer.const_values), const_values: _Any::new(analyzer.const_values),
capture_paths: analyzer.capture_paths, capture_paths: analyzer.capture_paths,
} }
@ -58,34 +66,21 @@ impl Analyzer {
fn walk(&mut self, path: &mut Path, p: &Pattern) { fn walk(&mut self, path: &mut Path, p: &Pattern) {
match p { match p {
Pattern::DCompound(b) => match &**b { Pattern::Group { entries, .. } => {
DCompound::Rec { fields, .. } => { for (k, p) in entries {
for (i, p) in fields.iter().enumerate() { self.walk_step(path, k.clone(), p)
self.walk_step(path, PathStep::Index(i), p);
}
}
DCompound::Arr { items, .. } => {
for (i, p) in items.iter().enumerate() {
self.walk_step(path, PathStep::Index(i), p);
}
}
DCompound::Dict { entries, .. } => {
for (k, p) in entries {
self.walk_step(path, PathStep::Key(k.clone()), p);
}
} }
} }
Pattern::DBind(b) => { Pattern::Bind { pattern } => {
let DBind { pattern, .. } = &**b;
self.capture_paths.push(path.clone()); self.capture_paths.push(path.clone());
self.walk(path, pattern) self.walk(path, &**pattern);
} }
Pattern::DDiscard(_) => Pattern::Discard => {
(), self.checked_paths.push(path.clone());
Pattern::DLit(b) => { }
let DLit { value } = &**b; Pattern::Lit { value } => {
self.const_paths.push(path.clone()); self.const_paths.push(path.clone());
self.const_values.push(language().unparse(value)); self.const_values.push(language().unparse(&**value));
} }
} }
} }
@ -109,52 +104,47 @@ impl PatternMatcher {
} }
} }
fn run_seq<'a, F: 'a + Fn(usize) -> &'a _Any>(&mut self, entries: &Map<_Any, Pattern<_Any>>, values: F) -> bool {
for (k, p) in entries {
match k.value().as_usize() {
None => return false,
Some(i) => if !self.run(p, values(i)) {
return false;
}
}
}
true
}
fn run(&mut self, pattern: &Pattern<_Any>, value: &_Any) -> bool { fn run(&mut self, pattern: &Pattern<_Any>, value: &_Any) -> bool {
match pattern { match pattern {
Pattern::DDiscard(_) => true, Pattern::Discard => true,
Pattern::DBind(b) => { Pattern::Bind { pattern } => {
self.captures.push(value.clone()); self.captures.push(value.clone());
self.run(&b.pattern, value) self.run(&**pattern, value)
} }
Pattern::DLit(b) => value == &language().unparse(&b.value), Pattern::Lit { value: expected } => value == &language().unparse(&**expected),
Pattern::DCompound(b) => match &**b { Pattern::Group { type_, entries } => match &**type_ {
DCompound::Rec { label, fields } => { GroupType::Rec { label } => {
match value.value().as_record(Some(fields.len())) { match value.value().as_record(None) {
None => false, None => false,
Some(r) => { Some(r) =>
if r.label() != label { r.label() == label &&
return false; self.run_seq(entries, |i| &r.fields()[i])
}
for (i, p) in fields.iter().enumerate() {
if !self.run(p, &r.fields()[i]) {
return false;
}
}
true
}
} }
} }
DCompound::Arr { items } => { GroupType::Arr => {
match value.value().as_sequence() { match value.value().as_sequence() {
None => false, None => false,
Some(vs) => { Some(vs) =>
if vs.len() != items.len() { self.run_seq(entries, |i| &vs[i])
return false;
}
for (i, p) in items.iter().enumerate() {
if !self.run(p, &vs[i]) {
return false;
}
}
true
}
} }
} }
DCompound::Dict { entries: expected_entries } => { GroupType::Dict => {
match value.value().as_dictionary() { match value.value().as_dictionary() {
None => false, None => false,
Some(actual_entries) => { Some(actual_entries) => {
for (k, p) in expected_entries.iter() { for (k, p) in entries {
if !actual_entries.get(k).map(|v| self.run(p, v)).unwrap_or(false) { if !actual_entries.get(k).map(|v| self.run(p, v)).unwrap_or(false) {
return false; return false;
} }
@ -170,42 +160,68 @@ impl PatternMatcher {
pub fn lift_literal(v: &_Any) -> Pattern { pub fn lift_literal(v: &_Any) -> Pattern {
match v.value() { match v.value() {
Value::Record(r) => Pattern::DCompound(Box::new(DCompound::Rec { Value::Record(r) => Pattern::Group {
label: r.label().clone(), type_: Box::new(GroupType::Rec { label: r.label().clone() }),
fields: r.fields().iter().map(lift_literal).collect(), entries: r.fields().iter().enumerate()
})), .map(|(i, v)| (_Any::new(i), lift_literal(v)))
Value::Sequence(items) => Pattern::DCompound(Box::new(DCompound::Arr { .collect(),
items: items.iter().map(lift_literal).collect(), },
})), Value::Sequence(items) => Pattern::Group {
type_: Box::new(GroupType::Arr),
entries: items.iter().enumerate()
.map(|(i, v)| (_Any::new(i), lift_literal(v)))
.collect(),
},
Value::Set(_members) => panic!("Cannot express literal set in pattern"), Value::Set(_members) => panic!("Cannot express literal set in pattern"),
Value::Dictionary(entries) => Pattern::DCompound(Box::new(DCompound::Dict { Value::Dictionary(entries) => Pattern::Group {
entries: entries.iter().map(|(k, v)| (k.clone(), lift_literal(v))).collect(), type_: Box::new(GroupType::Dict),
})), entries: entries.iter()
_other => Pattern::DLit(Box::new(DLit { .map(|(k, v)| (k.clone(), lift_literal(v)))
value: language().parse(v).expect("Non-compound datum can be converted to AnyAtom"), .collect(),
})), },
_other => Pattern::Lit {
value: Box::new(language().parse(v).expect("Non-compound datum can be converted to AnyAtom")),
},
} }
} }
const DISCARD: Pattern = Pattern::Discard;
pub fn pattern_seq_from_dictionary(entries: &Map<_Any, Pattern>) -> Option<Vec<&Pattern>> {
let mut max_k: Option<usize> = None;
for k in entries.keys() {
max_k = max_k.max(Some(k.value().as_usize()?));
}
let mut seq = vec![];
if let Some(max_k) = max_k {
seq.reserve(max_k + 1);
for i in 0..=max_k {
seq.push(entries.get(&_Any::new(i)).unwrap_or(&DISCARD));
}
}
return Some(seq);
}
fn drop_literal_entries_seq(mut seq: Vec<_Any>, entries: &Map<_Any, Pattern>) -> Option<Vec<_Any>> {
for p in pattern_seq_from_dictionary(entries)?.into_iter() {
seq.push(drop_literal(p)?);
}
Some(seq)
}
pub fn drop_literal(p: &Pattern) -> Option<_Any> { pub fn drop_literal(p: &Pattern) -> Option<_Any> {
match p { match p {
Pattern::DCompound(b) => match &**b { Pattern::Group { type_, entries } => match &**type_ {
DCompound::Rec { label, fields } => { GroupType::Rec { label } =>
let mut r = vec![label.clone()]; Some(Value::Record(Record(drop_literal_entries_seq(vec![label.clone()], entries)?)).wrap()),
for f in fields.iter() { GroupType::Arr =>
r.push(drop_literal(f)?); Some(Value::Sequence(drop_literal_entries_seq(vec![], entries)?).wrap()),
} GroupType::Dict =>
Some(Value::Record(Record(r)).wrap()) Some(Value::Dictionary(entries.iter()
} .map(|(k, p)| Some((k.clone(), drop_literal(p)?)))
DCompound::Arr { items } => .collect::<Option<Map<_Any, _Any>>>()?).wrap()),
Some(Value::Sequence(items.iter().map(drop_literal)
.collect::<Option<Vec<_Any>>>()?).wrap()),
DCompound::Dict { entries } =>
Some(Value::Dictionary(entries.iter()
.map(|(k, p)| Some((k.clone(), drop_literal(p)?)))
.collect::<Option<Map<_Any, _Any>>>()?).wrap()),
}, },
Pattern::DLit(b) => Some(language().unparse(&b.value)), Pattern::Lit { value } => Some(language().unparse(&**value)),
_ => None, _ => None,
} }
} }

View File

@ -16,19 +16,12 @@ use crate::actor::Activation;
use crate::actor::Handle; use crate::actor::Handle;
use crate::actor::Cap; use crate::actor::Cap;
use crate::schemas::dataspace_patterns as ds; use crate::schemas::dataspace_patterns as ds;
use crate::pattern::{self, PathStep, Path, Paths}; use crate::pattern::{self, ConstantPositions, PathStep, Path, Paths};
type Bag<A> = bag::BTreeBag<A>; type Bag<A> = bag::BTreeBag<A>;
type Captures = AnyValue; type Captures = AnyValue;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
enum Guard {
Rec(AnyValue, usize),
Seq(usize),
Map,
}
/// Index of assertions and [`Observe`rs][crate::schemas::dataspace::Observe]. /// Index of assertions and [`Observe`rs][crate::schemas::dataspace::Observe].
/// ///
/// Generally speaking, you will not need to use this structure; /// Generally speaking, you will not need to use this structure;
@ -44,13 +37,13 @@ pub struct Index {
#[derive(Debug)] #[derive(Debug)]
struct Node { struct Node {
continuation: Continuation, continuation: Continuation,
edges: Map<Selector, Map<Guard, Node>>, edges: Map<Selector, Map<ds::GroupType, Node>>,
} }
#[derive(Debug)] #[derive(Debug)]
struct Continuation { struct Continuation {
cached_assertions: Set<AnyValue>, cached_assertions: Set<AnyValue>,
leaf_map: Map<Paths, Map<Captures, Leaf>>, leaf_map: Map<Arc<ConstantPositions>, Map<Captures, Leaf>>,
} }
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
@ -205,7 +198,7 @@ impl Node {
} }
fn extend(&mut self, pat: &ds::Pattern) -> &mut Continuation { fn extend(&mut self, pat: &ds::Pattern) -> &mut Continuation {
let (_pop_count, final_node) = self.extend_walk(&mut Vec::new(), 0, PathStep::Index(0), pat); let (_pop_count, final_node) = self.extend_walk(&mut Vec::new(), 0, PathStep::new(0), pat);
&mut final_node.continuation &mut final_node.continuation
} }
@ -216,23 +209,13 @@ impl Node {
step: PathStep, step: PathStep,
pat: &ds::Pattern, pat: &ds::Pattern,
) -> (usize, &mut Node) { ) -> (usize, &mut Node) {
let (guard, members): (Guard, Vec<(PathStep, &ds::Pattern)>) = match pat { let (guard, members): (ds::GroupType, Vec<(PathStep, &ds::Pattern)>) = match pat {
ds::Pattern::DCompound(b) => match &**b { ds::Pattern::Group { type_, entries } =>
ds::DCompound::Arr { items } => ((&**type_).clone(),
(Guard::Seq(items.len()), entries.iter().map(|(k, p)| (k.clone(), p)).collect()),
items.iter().enumerate().map(|(i, p)| (PathStep::Index(i), p)).collect()), ds::Pattern::Bind { pattern } =>
ds::DCompound::Rec { label, fields } => return self.extend_walk(path, pop_count, step, &**pattern),
(Guard::Rec(label.clone(), fields.len()), ds::Pattern::Discard | ds::Pattern::Lit { .. } =>
fields.iter().enumerate().map(|(i, p)| (PathStep::Index(i), p)).collect()),
ds::DCompound::Dict { entries, .. } =>
(Guard::Map,
entries.iter().map(|(k, p)| (PathStep::Key(k.clone()), p)).collect()),
}
ds::Pattern::DBind(b) => {
let ds::DBind { pattern, .. } = &**b;
return self.extend_walk(path, pop_count, step, pattern);
}
ds::Pattern::DDiscard(_) | ds::Pattern::DLit(_) =>
return (pop_count, self), return (pop_count, self),
}; };
@ -336,41 +319,46 @@ where FCont: FnMut(&mut Continuation, &AnyValue) -> (),
fn continuation(&mut self, c: &mut Continuation) { fn continuation(&mut self, c: &mut Continuation) {
(self.m_cont)(c, self.outer_value); (self.m_cont)(c, self.outer_value);
let mut empty_const_paths = Vec::new(); let mut empty_const_positions = Vec::new();
for (const_paths, const_val_map) in &mut c.leaf_map { for (const_positions, const_val_map) in &mut c.leaf_map {
if let Some(const_vals) = project_paths(self.outer_value, const_paths) { if project_paths(self.outer_value, &const_positions.required_to_exist).is_none() {
let leaf_opt = if self.create_leaf_if_absent { continue;
Some(const_val_map.entry(const_vals.clone()).or_insert_with(Leaf::new)) }
} else { let const_vals = match project_paths(self.outer_value, &const_positions.with_values) {
const_val_map.get_mut(&const_vals) Some(vs) => vs,
}; None => continue,
if let Some(leaf) = leaf_opt { };
(self.m_leaf)(leaf, self.outer_value); let leaf_opt = if self.create_leaf_if_absent {
for (capture_paths, endpoints) in &mut leaf.endpoints_map { Some(const_val_map.entry(const_vals.clone()).or_insert_with(Leaf::new))
if let Some(cs) = project_paths(self.outer_value, &capture_paths) { } else {
(self.m_endpoints)(endpoints, cs); const_val_map.get_mut(&const_vals)
} };
if let Some(leaf) = leaf_opt {
(self.m_leaf)(leaf, self.outer_value);
for (capture_paths, endpoints) in &mut leaf.endpoints_map {
if let Some(cs) = project_paths(self.outer_value, &capture_paths) {
(self.m_endpoints)(endpoints, cs);
} }
if leaf.is_empty() { }
const_val_map.remove(&const_vals); if leaf.is_empty() {
if const_val_map.is_empty() { const_val_map.remove(&const_vals);
empty_const_paths.push(const_paths.clone()); if const_val_map.is_empty() {
} empty_const_positions.push(const_positions.clone());
} }
} }
} }
} }
for const_paths in empty_const_paths { for const_positions in empty_const_positions {
c.leaf_map.remove(&const_paths); c.leaf_map.remove(&const_positions);
} }
} }
} }
fn class_of(v: &AnyValue) -> Option<Guard> { fn class_of(v: &AnyValue) -> Option<ds::GroupType> {
match v.value() { match v.value() {
Value::Sequence(vs) => Some(Guard::Seq(vs.len())), Value::Sequence(_) => Some(ds::GroupType::Arr),
Value::Record(r) => Some(Guard::Rec(r.label().clone(), r.arity())), Value::Record(r) => Some(ds::GroupType::Rec { label: r.label().clone() }),
Value::Dictionary(_) => Some(Guard::Map), Value::Dictionary(_) => Some(ds::GroupType::Dict),
_ => None, _ => None,
} }
} }
@ -398,15 +386,17 @@ fn project_paths<'a>(v: &'a AnyValue, ps: &Paths) -> Option<Captures> {
} }
fn step<'a>(v: &'a AnyValue, s: &PathStep) -> Option<&'a AnyValue> { fn step<'a>(v: &'a AnyValue, s: &PathStep) -> Option<&'a AnyValue> {
match (v.value(), s) { match v.value() {
(Value::Sequence(vs), PathStep::Index(i)) => Value::Sequence(vs) => {
if *i < vs.len() { Some(&vs[*i]) } else { None }, let i = s.value().as_usize()?;
(Value::Record(r), PathStep::Index(i)) => if i < vs.len() { Some(&vs[i]) } else { None }
if *i < r.arity() { Some(&r.fields()[*i]) } else { None }, }
(Value::Dictionary(m), PathStep::Key(k)) => Value::Record(r) => {
m.get(k), let i = s.value().as_usize()?;
_ => if i < r.arity() { Some(&r.fields()[i]) } else { None }
None, }
Value::Dictionary(m) => m.get(s),
_ => None,
} }
} }
@ -423,11 +413,14 @@ impl Continuation {
) { ) {
let cached_assertions = &self.cached_assertions; let cached_assertions = &self.cached_assertions;
let const_val_map = let const_val_map =
self.leaf_map.entry(analysis.const_paths.clone()).or_insert_with({ self.leaf_map.entry(analysis.const_positions.clone()).or_insert_with({
|| { || {
let mut cvm = Map::new(); let mut cvm = Map::new();
for a in cached_assertions { for a in cached_assertions {
if let Some(key) = project_paths(a, &analysis.const_paths) { if project_paths(a, &analysis.const_positions.required_to_exist).is_none() {
continue;
}
if let Some(key) = project_paths(a, &analysis.const_positions.with_values) {
cvm.entry(key).or_insert_with(Leaf::new) cvm.entry(key).or_insert_with(Leaf::new)
.cached_assertions.insert(a.clone()); .cached_assertions.insert(a.clone());
} }
@ -462,7 +455,7 @@ impl Continuation {
observer: &Arc<Cap>, observer: &Arc<Cap>,
) { ) {
if let Entry::Occupied(mut const_val_map_entry) if let Entry::Occupied(mut const_val_map_entry)
= self.leaf_map.entry(analysis.const_paths) = self.leaf_map.entry(analysis.const_positions)
{ {
let const_val_map = const_val_map_entry.get_mut(); let const_val_map = const_val_map_entry.get_mut();
if let Entry::Occupied(mut leaf_entry) if let Entry::Occupied(mut leaf_entry)