Compare commits

...

49 Commits

Author SHA1 Message Date
Tony Garnock-Jones 6468e16790 Bump preserves dep 2024-04-12 19:57:23 +02:00
Tony Garnock-Jones 65101e900e Release independent packages
syndicate@0.40.0
syndicate-macros@0.32.0
syndicate-schema-plugin@0.9.0
syndicate-server@0.45.0
syndicate-tools@0.18.0

Generated by cargo-workspaces
2024-04-10 17:04:25 +02:00
Tony Garnock-Jones 581886835a New dataspace pattern implementation; update HTTP server 2024-04-10 17:03:09 +02:00
Tony Garnock-Jones dcb1aec142 Merge latest changes from the syndicate-protocols repository 2024-04-10 15:43:06 +02:00
Tony Garnock-Jones c0239cf322 And with that we are almost back where we started with http.prs! 2024-04-10 15:16:35 +02:00
Tony Garnock-Jones 9cc4175f24 Cope with HTTP/1.0's optional Host header 2024-04-10 14:54:19 +02:00
Tony Garnock-Jones 70f42dd931 Another revision of http.prs 2024-04-10 14:31:27 +02:00
Tony Garnock-Jones ef1ebe6412 Sigh. <done> turns out to be a good idea in addition to <processing> 2024-04-10 13:24:25 +02:00
Tony Garnock-Jones deec008c66 No taskset on osx 2024-04-10 11:07:22 +02:00
Tony Garnock-Jones 008671d0b2 Bump deps incl preserves-schema for a keyword-avoiding fix 2024-04-09 22:41:58 +02:00
Tony Garnock-Jones 9fcf22e1b5 Merge latest changes from the syndicate-protocols repository 2024-04-09 15:16:46 +02:00
Tony Garnock-Jones ca18ca08df Alternative representation of dataspacePatterns 2024-04-09 09:15:21 +02:00
Tony Garnock-Jones 40ca168eac Repair typo 2024-04-09 09:13:51 +02:00
Tony Garnock-Jones 5a73e8d4c3 Alter dataspacePatterns language to make rec and arr more like dict 2024-04-04 16:31:09 +02:00
Tony Garnock-Jones 91b26001d8 There isn't an /etc/mime.types on OSX 2024-04-03 22:32:54 +02:00
Tony Garnock-Jones b83b39515d Release independent packages
syndicate@0.39.0
syndicate-macros@0.31.0
syndicate-schema-plugin@0.8.0
syndicate-server@0.44.0
syndicate-tools@0.17.0

Generated by cargo-workspaces
2024-04-01 16:53:42 +02:00
Tony Garnock-Jones d9fa6362af Merge latest changes from the syndicate-protocols repository 2024-04-01 16:52:57 +02:00
Tony Garnock-Jones 94598a574b Update HTTP service protocol 2024-04-01 16:52:24 +02:00
Tony Garnock-Jones 80ad0914ed Revise http protocol 2024-04-01 16:52:24 +02:00
Tony Garnock-Jones bdb0cc1023 Repair severe error in turn rollback 2024-04-01 16:52:24 +02:00
Tony Garnock-Jones 710ff91a64 Revise http protocol 2024-04-01 15:56:07 +02:00
Tony Garnock-Jones d3748a286b Release independent packages
syndicate-server@0.43.1

Generated by cargo-workspaces
2024-04-01 15:08:11 +02:00
Tony Garnock-Jones a56aec2c30 Tweak tracing in http_router 2024-04-01 15:01:33 +02:00
Tony Garnock-Jones 0c06ae9601 Repair path matching where no explicit PathPatternElement::Rest is present 2024-04-01 14:58:55 +02:00
Tony Garnock-Jones 1f0c9d2883 Dep bump 2024-03-30 11:36:42 +01:00
Tony Garnock-Jones 615830f799 Release independent packages
syndicate@0.38.0

Generated by cargo-workspaces
2024-03-30 11:02:01 +01:00
Tony Garnock-Jones 3c44768a72 Convenience syndicate::relay::stdio_service 2024-03-30 11:00:22 +01:00
Tony Garnock-Jones 04bb8c2f23 Release independent packages
syndicate@0.37.1

Generated by cargo-workspaces
2024-03-29 10:23:40 +01:00
Tony Garnock-Jones 9084c1781e Repair nested-panic situation 2024-03-29 10:23:21 +01:00
Tony Garnock-Jones 8a817fcb4f Release independent packages
syndicate@0.37.0
syndicate-macros@0.30.0
syndicate-schema-plugin@0.7.0
syndicate-server@0.43.0
syndicate-tools@0.16.0

Generated by cargo-workspaces
2024-03-28 16:33:56 +01:00
Tony Garnock-Jones 2ed2b38edc Repair noise session introduction 2024-03-28 16:32:46 +01:00
Tony Garnock-Jones 5090625f47 Bump deps 2024-03-28 15:50:36 +01:00
Tony Garnock-Jones a7ede65bad Merge latest changes from the syndicate-protocols repository 2024-03-28 15:50:12 +01:00
Tony Garnock-Jones c59e044695 Set embeddedType for noise 2024-03-28 15:49:48 +01:00
Tony Garnock-Jones ef98217a3a Merge latest changes from the syndicate-protocols repository 2024-03-28 15:17:37 +01:00
Tony Garnock-Jones bf0d47f1b7 Repair noise protocol 2024-03-28 15:17:28 +01:00
Tony Garnock-Jones fef41f39eb Release independent packages
syndicate@0.36.1

Generated by cargo-workspaces
2024-03-22 20:51:30 +01:00
Tony Garnock-Jones 0b72b4029b Repair reimported, attenuated references. 2024-03-22 20:51:02 +01:00
Tony Garnock-Jones 40a239c9eb Release independent packages
syndicate-server@0.42.0

Generated by cargo-workspaces
2024-03-22 11:24:21 +01:00
Tony Garnock-Jones 55456621d4 Handle refinement to gatekeeper protocol allowing JIT binding and/or direct rejection 2024-03-22 11:22:58 +01:00
Tony Garnock-Jones 7797a3cd09 Updated description of gatekeeper protocol 2024-03-22 10:11:57 +01:00
Tony Garnock-Jones eb9d9bed0f Generalize target-stompling-avoidance originally only for docker 2024-03-08 10:59:45 +01:00
Tony Garnock-Jones b96c469ef5 Put release profile settings back the way they should be 2024-03-08 10:51:04 +01:00
Tony Garnock-Jones 34f611f4fe Release independent packages
syndicate@0.36.0
syndicate-macros@0.29.0
syndicate-schema-plugin@0.6.0
syndicate-server@0.41.0
syndicate-tools@0.15.0

Generated by cargo-workspaces
2024-03-08 10:48:11 +01:00
Tony Garnock-Jones 58c24c30c4 Update Preserves to 0.995 2024-03-08 10:47:52 +01:00
Tony Garnock-Jones fa990bc042 Implement a $control entity, a message <exit n>, and a --control command-line flag. 2024-03-07 09:27:58 +01:00
Tony Garnock-Jones 060ba36d2e Release independent packages
syndicate-macros@0.28.1

Generated by cargo-workspaces
2024-03-04 10:15:51 +01:00
Tony Garnock-Jones ecd5e87823 Bump deps 2024-03-04 10:15:36 +01:00
Tony Garnock-Jones a401e5fcd1 A little fairer 2024-03-04 10:11:17 +01:00
34 changed files with 1012 additions and 701 deletions

396
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -20,8 +20,8 @@ members = [
# preserves-schema = { path = "localdev/preserves/implementations/rust/preserves-schema" }
[profile.release]
# strip = true
debug = true
strip = true
# debug = true
# lto = true
[profile.bench]

View File

@ -32,7 +32,7 @@ pull-protocols:
static: static-x86_64
static-%:
cross build --target $*-unknown-linux-musl --features vendored-openssl,jemalloc
CARGO_TARGET_DIR=target/target.$* cross build --target $*-unknown-linux-musl --features vendored-openssl,jemalloc
###########################################################################
@ -54,18 +54,18 @@ static-%:
x86_64-binary: x86_64-binary-release
x86_64-binary-release:
cross build --target x86_64-unknown-linux-musl --release --all-targets --features vendored-openssl,jemalloc
CARGO_TARGET_DIR=target/target.x86_64 cross build --target x86_64-unknown-linux-musl --release --all-targets --features vendored-openssl,jemalloc
x86_64-binary-debug:
cross build --target x86_64-unknown-linux-musl --all-targets --features vendored-openssl
CARGO_TARGET_DIR=target/target.x86_64 cross build --target x86_64-unknown-linux-musl --all-targets --features vendored-openssl
armv7-binary: armv7-binary-release
armv7-binary-release:
cross build --target=armv7-unknown-linux-musleabihf --release --all-targets --features vendored-openssl
CARGO_TARGET_DIR=target/target.armv7 cross build --target=armv7-unknown-linux-musleabihf --release --all-targets --features vendored-openssl
armv7-binary-debug:
cross build --target=armv7-unknown-linux-musleabihf --all-targets --features vendored-openssl
CARGO_TARGET_DIR=target/target.armv7 cross build --target=armv7-unknown-linux-musleabihf --all-targets --features vendored-openssl
# As of 2023-05-12 (and probably earlier!) this is no longer required with current Rust nightlies
# # Hack to workaround https://github.com/rust-embedded/cross/issues/598
@ -74,7 +74,7 @@ armv7-binary-debug:
aarch64-binary: aarch64-binary-release
aarch64-binary-release:
cross build --target=aarch64-unknown-linux-musl --release --all-targets --features vendored-openssl,jemalloc
CARGO_TARGET_DIR=target/target.aarch64 cross build --target=aarch64-unknown-linux-musl --release --all-targets --features vendored-openssl,jemalloc
aarch64-binary-debug:
cross build --target=aarch64-unknown-linux-musl --all-targets --features vendored-openssl
CARGO_TARGET_DIR=target/target.aarch64 cross build --target=aarch64-unknown-linux-musl --all-targets --features vendored-openssl

View File

@ -1,2 +1,7 @@
#!/bin/sh
make -C ../syndicate-server binary && exec taskset -c 0,1 ../target/release/syndicate-server -c benchmark-config.pr "$@"
TASKSET='taskset -c 0,1'
if [ $(uname -s) = 'Darwin' ]
then
TASKSET=
fi
make -C ../syndicate-server binary && exec $TASKSET ../target/release/syndicate-server -c benchmark-config.pr "$@"

View File

@ -10,7 +10,6 @@ all:
clean:
rm -f syndicate-server.*
rm -rf $(patsubst %,target.%,$(ARCHITECTURES))
-podman images -q $(U)/$(I) | sort -u | xargs podman rmi -f
image: $(SERVERS)
@ -34,5 +33,5 @@ push-only:
podman manifest push $(U)/$(I):latest
syndicate-server.%:
make -C .. CARGO_TARGET_DIR=docker/target.$* $$(./alpine-architecture $*)-binary-release
cp -a target.$*/$$(./alpine-architecture $*)-unknown-linux-musl*/release/syndicate-server $@
make -C .. $$(./alpine-architecture $*)-binary-release
cp -a ../target/target.$$(./alpine-architecture $*)/$$(./alpine-architecture $*)-unknown-linux-musl*/release/syndicate-server $@

View File

@ -1,6 +1,6 @@
[package]
name = "syndicate-macros"
version = "0.28.0"
version = "0.32.0"
authors = ["Tony Garnock-Jones <tonyg@leastfixedpoint.com>"]
edition = "2018"
@ -13,7 +13,7 @@ license = "Apache-2.0"
proc-macro = true
[dependencies]
syndicate = { path = "../syndicate", version = "0.35.0"}
syndicate = { path = "../syndicate", version = "0.40.0"}
proc-macro2 = { version = "^1.0", features = ["span-locations"] }
quote = "^1.0"

View File

@ -5,7 +5,7 @@ use std::sync::atomic::Ordering;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
type Ref<T> = UnboundedSender<T>;
type Ref<T> = UnboundedSender<Box<T>>;
#[derive(Debug)]
enum Instruction {
@ -35,14 +35,14 @@ trait Actor<T> {
}
fn send<T: std::marker::Send + 'static>(ch: &Arc<Ref<T>>, message: T) -> () {
match ch.send(message) {
match ch.send(Box::new(message)) {
Ok(()) => (),
Err(v) => panic!("Aiee! Could not send {:?}", v),
}
}
fn spawn<T: std::marker::Send + 'static, R: Actor<T> + std::marker::Send + 'static>(rt: Option<Arc<AtomicU64>>, mut ac: R) -> Arc<Ref<T>> {
let (tx, mut rx) = unbounded_channel();
let (tx, mut rx) = unbounded_channel::<Box<T>>();
if let Some(ref c) = rt {
c.fetch_add(1, Ordering::SeqCst);
}
@ -51,7 +51,7 @@ fn spawn<T: std::marker::Send + 'static, R: Actor<T> + std::marker::Send + 'stat
match rx.recv().await {
None => break,
Some(message) => {
match ac.message(message) {
match ac.message(*message) {
Action::Continue => continue,
Action::Stop => break,
}

View File

@ -36,7 +36,7 @@ enum SymbolVariant<'a> {
fn compile_sequence_members(vs: &[IOValue]) -> Vec<TokenStream> {
vs.iter().enumerate().map(|(i, f)| {
let p = compile_pattern(f);
quote!((#i .into(), #p))
quote!((syndicate::value::Value::from(#i).wrap(), #p))
}).collect::<Vec<_>>()
}
@ -151,16 +151,14 @@ fn compile_pattern(v: &IOValue) -> TokenStream {
#[allow(non_snake_case)]
let V_: TokenStream = quote!(syndicate::value);
#[allow(non_snake_case)]
let MapFromIterator_: TokenStream = quote!(<#V_::Map<_, _> as std::iter::FromIterator<_>>::from_iter);
let MapFrom_: TokenStream = quote!(<#V_::Map<_, _>>::from);
match v.value() {
Value::Symbol(s) => match analyze_symbol(&s, true) {
SymbolVariant::Binder(_) =>
quote!(#P_::Pattern::DBind(Box::new(#P_::DBind {
pattern: #P_::Pattern::DDiscard(Box::new(#P_::DDiscard))
}))),
quote!(#P_::Pattern::Bind{ pattern: Box::new(#P_::Pattern::Discard) }),
SymbolVariant::Discard =>
quote!(#P_::Pattern::DDiscard(Box::new(#P_::DDiscard))),
quote!(#P_::Pattern::Discard),
SymbolVariant::Substitution(s) =>
lit(Ident::new(s, Span::call_site())),
SymbolVariant::Normal(_) =>
@ -172,9 +170,7 @@ fn compile_pattern(v: &IOValue) -> TokenStream {
Some(label) =>
if label.starts_with("$") && r.arity() == 1 {
let nested = compile_pattern(&r.fields()[0]);
quote!(#P_::Pattern::DBind(Box::new(#P_::DBind {
pattern: #nested
})))
quote!(#P_::Pattern::Bind{ pattern: Box::new(#nested) })
} else {
let label_stx = if label.starts_with("=") {
let id = Ident::new(&label[1..], Span::call_site());
@ -183,18 +179,19 @@ fn compile_pattern(v: &IOValue) -> TokenStream {
quote!(#V_::Value::symbol(#label).wrap())
};
let members = compile_sequence_members(r.fields());
quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Rec {
label: #label_stx,
fields: vec![#(#members),*],
})))
quote!(#P_::Pattern::Group {
type_: Box::new(#P_::GroupType::Rec { label: #label_stx }),
entries: #MapFrom_([#(#members),*]),
})
}
}
}
Value::Sequence(vs) => {
let members = compile_sequence_members(vs);
quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Arr {
items: vec![#(#members),*],
})))
quote!(#P_::Pattern::Group {
type_: Box::new(#P_::GroupType::Arr),
entries: #MapFrom_([#(#members),*]),
})
}
Value::Set(_) =>
panic!("Cannot match sets in patterns"),
@ -204,9 +201,10 @@ fn compile_pattern(v: &IOValue) -> TokenStream {
let v = compile_pattern(v);
quote!((#k, #v))
}).collect::<Vec<_>>();
quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Dict {
entries: #MapFromIterator_(vec![#(#members),*])
})))
quote!(#P_::Pattern::Group {
type_: Box::new(#P_::GroupType::Dict),
entries: #MapFrom_([#(#members),*]),
})
}
_ => lit(ValueCompiler::for_patterns().compile(v)),
}

View File

@ -15,10 +15,9 @@ pub fn lit<T: ToTokens>(e: T) -> TokenStream2 {
}
fn compile_sequence_members(stxs: &Vec<Stx>) -> Result<Vec<TokenStream2>, &'static str> {
stxs.iter().map(|stx| {
// let p = to_pattern_expr(stx)?;
// Ok(quote!(#p))
to_pattern_expr(stx)
stxs.iter().enumerate().map(|(i, stx)| {
let p = to_pattern_expr(stx)?;
Ok(quote!((syndicate::value::Value::from(#i).wrap(), #p)))
}).collect()
}
@ -28,7 +27,7 @@ pub fn to_pattern_expr(stx: &Stx) -> Result<TokenStream2, &'static str> {
#[allow(non_snake_case)]
let V_: TokenStream2 = quote!(syndicate::value);
#[allow(non_snake_case)]
let MapFromIterator_: TokenStream2 = quote!(<#V_::Map<_, _> as std::iter::FromIterator<_>>::from_iter);
let MapFrom_: TokenStream2 = quote!(<#V_::Map<_, _>>::from);
match stx {
Stx::Atom(v) =>
@ -41,26 +40,27 @@ pub fn to_pattern_expr(stx: &Stx) -> Result<TokenStream2, &'static str> {
None => to_pattern_expr(&Stx::Discard)?,
}
};
Ok(quote!(#P_::Pattern::DBind(Box::new(#P_::DBind { pattern: #inner_pat_expr }))))
Ok(quote!(#P_::Pattern::Bind { pattern: Box::new(#inner_pat_expr) }))
}
Stx::Subst(e) =>
Ok(lit(e)),
Stx::Discard =>
Ok(quote!(#P_::Pattern::DDiscard(Box::new(#P_::DDiscard)))),
Ok(quote!(#P_::Pattern::Discard)),
Stx::Rec(l, fs) => {
let label = to_value_expr(&*l)?;
let members = compile_sequence_members(fs)?;
Ok(quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Rec {
label: #label,
fields: vec![#(#members),*],
}))))
Ok(quote!(#P_::Pattern::Group {
type_: Box::new(#P_::GroupType::Rec { label: #label }),
entries: #MapFrom_([#(#members),*]),
}))
},
Stx::Seq(stxs) => {
let members = compile_sequence_members(stxs)?;
Ok(quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Arr {
items: vec![#(#members),*],
}))))
Ok(quote!(#P_::Pattern::Group {
type_: Box::new(#P_::GroupType::Arr),
entries: #MapFrom_([#(#members),*]),
}))
}
Stx::Set(_stxs) =>
Err("Set literals not supported in patterns"),
@ -70,9 +70,10 @@ pub fn to_pattern_expr(stx: &Stx) -> Result<TokenStream2, &'static str> {
let v = to_pattern_expr(v)?;
Ok(quote!((#k, #v)))
}).collect::<Result<Vec<_>, &'static str>>()?;
Ok(quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Dict {
entries: #MapFromIterator_(vec![#(#members),*])
}))))
Ok(quote!(#P_::Pattern::Group {
type_: Box::new(#P_::GroupType::Dict),
entries: #MapFrom_([#(#members),*])
}))
}
}
}

View File

@ -0,0 +1,15 @@
{
"folders": [
{
"path": "."
},
{
"path": "../syndicate-protocols"
}
],
"settings": {
"files.exclude": {
"target": true
}
}
}

View File

@ -1,6 +1,6 @@
[package]
name = "syndicate-schema-plugin"
version = "0.5.0"
version = "0.9.0"
authors = ["Tony Garnock-Jones <tonyg@leastfixedpoint.com>"]
edition = "2018"
@ -12,8 +12,8 @@ license = "Apache-2.0"
[lib]
[dependencies]
preserves-schema = "5.994"
syndicate = { path = "../syndicate", version = "0.35.0"}
preserves-schema = "5.995"
syndicate = { path = "../syndicate", version = "0.40.0"}
[package.metadata.workspaces]
independent = true

View File

@ -61,7 +61,7 @@ impl Plugin for PatternPlugin {
}
fn discard() -> P::Pattern {
P::Pattern::DDiscard(Box::new(P::DDiscard))
P::Pattern::Discard
}
trait WildcardPattern {
@ -94,33 +94,34 @@ fn from_io(v: &IOValue) -> Option<P::_Any> {
impl WildcardPattern for CompoundPattern {
fn wc(&self, s: &mut WalkState) -> Option<P::Pattern> {
match self {
CompoundPattern::Tuple { patterns } =>
Some(P::Pattern::DCompound(Box::new(P::DCompound::Arr {
items: patterns.iter()
.map(|p| unname(p).wc(s))
.collect::<Option<Vec<P::Pattern>>>()?,
}))),
CompoundPattern::TuplePrefix { .. } =>
Some(discard()),
CompoundPattern::Tuple { patterns } |
CompoundPattern::TuplePrefix { fixed: patterns, .. }=>
Some(P::Pattern::Group {
type_: Box::new(P::GroupType::Arr),
entries: patterns.iter().enumerate()
.map(|(i, p)| Some((P::_Any::new(i), unname(p).wc(s)?)))
.collect::<Option<Map<P::_Any, P::Pattern>>>()?,
}),
CompoundPattern::Dict { entries } =>
Some(P::Pattern::DCompound(Box::new(P::DCompound::Dict {
Some(P::Pattern::Group {
type_: Box::new(P::GroupType::Dict),
entries: Map::from_iter(
entries.0.iter()
.map(|(k, p)| Some((from_io(k)?, unname_simple(p).wc(s)?)))
.filter(|e| discard() != e.as_ref().unwrap().1)
.collect::<Option<Vec<(P::_Any, P::Pattern)>>>()?
.into_iter()),
}))),
}),
CompoundPattern::Rec { label, fields } => match (unname(label), unname(fields)) {
(Pattern::SimplePattern(label), Pattern::CompoundPattern(fields)) =>
match (*label, *fields) {
(SimplePattern::Lit { value }, CompoundPattern::Tuple { patterns }) =>
Some(P::Pattern::DCompound(Box::new(P::DCompound::Rec {
label: from_io(&value)?,
fields: patterns.iter()
.map(|p| unname(p).wc(s))
.collect::<Option<Vec<P::Pattern>>>()?,
}))),
Some(P::Pattern::Group{
type_: Box::new(P::GroupType::Rec { label: from_io(&value)? }),
entries: patterns.iter().enumerate()
.map(|(i, p)| Some((P::_Any::new(i), unname(p).wc(s)?)))
.collect::<Option<Map<P::_Any, P::Pattern>>>()?,
}),
_ => None,
},
_ => None,

View File

@ -1,6 +1,6 @@
[package]
name = "syndicate-server"
version = "0.40.0"
version = "0.45.0"
authors = ["Tony Garnock-Jones <tonyg@leastfixedpoint.com>"]
edition = "2018"
@ -13,14 +13,14 @@ license = "Apache-2.0"
jemalloc = ["dep:tikv-jemallocator"]
[build-dependencies]
preserves-schema = "5.994"
syndicate = { path = "../syndicate", version = "0.35.0"}
syndicate-schema-plugin = { path = "../syndicate-schema-plugin", version = "0.5.0"}
preserves-schema = "5.995"
syndicate = { path = "../syndicate", version = "0.40.0"}
syndicate-schema-plugin = { path = "../syndicate-schema-plugin", version = "0.9.0"}
[dependencies]
preserves-schema = "5.994"
syndicate = { path = "../syndicate", version = "0.35.0"}
syndicate-macros = { path = "../syndicate-macros", version = "0.28.0"}
preserves-schema = "5.995"
syndicate = { path = "../syndicate", version = "0.40.0"}
syndicate-macros = { path = "../syndicate-macros", version = "0.32.0"}
chrono = "0.4"
futures = "0.3"

View File

@ -1,4 +1,5 @@
´³bundle·µ³ documentation„´³schema·³version°³ definitions·³Url´³orµµ±present´³dict·³url´³named³url´³atom³String„„„„„µ±invalid´³dict·³url´³named³url³any„„„„µ±absent´³dict·„„„„„³IOList´³orµµ±bytes´³atom³
´³bundle·µ³control„´³schema·³version°³ definitions·³
ExitServer´³rec´³lit³exit„´³tupleµ´³named³code´³atom³ SignedInteger„„„„„„³ embeddedType€„„µ³ documentation„´³schema·³version°³ definitions·³Url´³orµµ±present´³dict·³url´³named³url´³atom³String„„„„„µ±invalid´³dict·³url´³named³url³any„„„„µ±absent´³dict·„„„„„³IOList´³orµµ±bytes´³atom³
ByteString„„µ±string´³atom³String„„µ±nested´³seqof´³refµ„³IOList„„„„„³Metadata´³rec´³lit³metadata„´³tupleµ´³named³object³any„´³named³info´³dictof´³atom³Symbol„³any„„„„„³ Description´³orµµ±present´³dict·³ description´³named³ description´³refµ„³IOList„„„„„µ±invalid´³dict·³ description´³named³ description³any„„„„µ±absent´³dict·„„„„„„³ embeddedType€„„µ³externalServices„´³schema·³version°³ definitions·³Process´³orµµ±simple´³refµ„³ CommandLine„„µ±full´³refµ„³ FullProcess„„„„³Service´³refµ„³ DaemonService„³ClearEnv´³orµµ±present´³dict·³clearEnv´³named³clearEnv´³atom³Boolean„„„„„µ±invalid´³dict·³clearEnv´³named³clearEnv³any„„„„µ±absent´³dict·„„„„„³EnvValue´³orµµ±set´³atom³String„„µ±remove´³lit€„„µ±invalid³any„„„³Protocol´³orµµ±none´³lit³none„„µ±binarySyndicate´³lit³application/syndicate„„µ± textSyndicate´³lit³text/syndicate„„„„³
ProcessDir´³orµµ±present´³dict·³dir´³named³dir´³atom³String„„„„„µ±invalid´³dict·³dir´³named³dir³any„„„„µ±absent´³dict·„„„„„³
ProcessEnv´³orµµ±present´³dict·³env´³named³env´³dictof´³refµ„³ EnvVariable„´³refµ„³EnvValue„„„„„„µ±invalid´³dict·³env´³named³env³any„„„„µ±absent´³dict·„„„„„³ CommandLine´³orµµ±shell´³atom³String„„µ±full´³refµ„³FullCommandLine„„„„³ EnvVariable´³orµµ±string´³atom³String„„µ±symbol´³atom³Symbol„„µ±invalid³any„„„³ FullProcess´³andµ´³dict·³argv´³named³argv´³refµ„³ CommandLine„„„„´³named³env´³refµ„³

View File

@ -0,0 +1,12 @@
version 1 .
# Messages and assertions relating to the `$control` entity enabled in syndicate-server when
# the `--control` flag is supplied.
#
# For example, placing the following into `control-config.pr` and starting the server with
# `syndicate-server --control -c control-config.pr` will result in the server exiting with
# exit code 2:
#
# $control ! <exit 2>
ExitServer = <exit @code int> .

View File

@ -15,7 +15,7 @@ use std::convert::TryInto;
use std::sync::Arc;
use syndicate::actor::*;
use syndicate::during::DuringResult;
use syndicate::enclose;
use syndicate::value::NestedValue;
use syndicate::schemas::dataspace;
use syndicate::schemas::gatekeeper;
@ -102,49 +102,78 @@ pub fn handle_binds(t: &mut Activation, ds: &Arc<Cap>) -> ActorResult {
Ok(())
}
fn eventually_retract<E>(h: Option<Handle>) -> DuringResult<E> {
if let Some(h) = h {
Ok(Some(Box::new(move |_state, t: &mut Activation| Ok(t.retract(h)))))
} else {
Ok(None)
}
}
pub fn handle_resolves(
pub fn facet_handle_resolve(
ds: &mut Arc<Cap>,
t: &mut Activation,
a: gatekeeper::Resolve,
) -> DuringResult<Arc<Cap>> {
) -> ActorResult {
let mut detail: &'static str = "unsupported";
if a.step.step_type == sturdy_step_type() {
detail = "invalid";
if let Ok(s) = language().parse::<sturdy::SturdyStepDetail>(&a.step.detail) {
return handle_resolve_sturdyref(ds, t, sturdy::SturdyRef { parameters: s.0 }, a.observer);
t.facet(|t| {
let f = handle_direct_resolution(ds, t, a.clone())?;
await_bind_sturdyref(ds, t, sturdy::SturdyRef { parameters: s.0 }, a.observer, f)
})?;
return Ok(());
}
}
if a.step.step_type == noise_step_type() {
detail = "invalid";
if let Ok(s) = language().parse::<noise::NoiseStepDetail<AnyValue>>(&a.step.detail) {
return handle_resolve_noise(ds, t, s.0.0, a.observer);
t.facet(|t| {
let f = handle_direct_resolution(ds, t, a.clone())?;
await_bind_noise(ds, t, s.0.0, a.observer, f)
})?;
return Ok(());
}
}
eventually_retract(ds.assert(t, language(), &gatekeeper::Rejected {
a.observer.assert(t, language(), &gatekeeper::Rejected {
detail: AnyValue::symbol(detail),
}))
});
Ok(())
}
fn handle_resolve_sturdyref(
fn handle_direct_resolution(
ds: &mut Arc<Cap>,
t: &mut Activation,
a: gatekeeper::Resolve,
) -> Result<FacetId, ActorError> {
let outer_facet = t.facet_id();
t.facet(move |t| {
let handler = syndicate::entity(a.observer)
.on_asserted(move |observer, t, a: AnyValue| {
t.stop_facet_and_continue(outer_facet, Some(
enclose!((observer, a) move |t: &mut Activation| {
observer.assert(t, language(), &a);
Ok(())
})))?;
Ok(None)
})
.create_cap(t);
ds.assert(t, language(), &gatekeeper::Resolve {
step: a.step.clone(),
observer: handler,
});
Ok(())
})
}
fn await_bind_sturdyref(
ds: &mut Arc<Cap>,
t: &mut Activation,
sturdyref: sturdy::SturdyRef,
observer: Arc<Cap>,
) -> DuringResult<Arc<Cap>> {
direct_resolution_facet: FacetId,
) -> ActorResult {
let queried_oid = sturdyref.parameters.oid.clone();
let handler = syndicate::entity(observer)
.on_asserted(move |observer, t, a: AnyValue| {
t.stop_facet(direct_resolution_facet);
let bindings = a.value().to_sequence()?;
let key = bindings[0].value().to_bytestring()?;
let unattenuated_target = bindings[1].value().to_embedded()?;
@ -152,27 +181,29 @@ fn handle_resolve_sturdyref(
Err(e) => {
tracing::warn!(sturdyref = ?language().unparse(&sturdyref),
"sturdyref failed validation: {}", e);
eventually_retract(observer.assert(t, language(), &gatekeeper::Resolved::Rejected(
observer.assert(t, language(), &gatekeeper::Resolved::Rejected(
Box::new(gatekeeper::Rejected {
detail: AnyValue::symbol("sturdyref-failed-validation"),
}))))
})));
},
Ok(target) => {
tracing::trace!(sturdyref = ?language().unparse(&sturdyref),
?target,
"sturdyref resolved");
eventually_retract(observer.assert(t, language(), &gatekeeper::Resolved::Accepted {
observer.assert(t, language(), &gatekeeper::Resolved::Accepted {
responder_session: target,
}))
});
}
}
Ok(None)
})
.create_cap(t);
eventually_retract(ds.assert(t, language(), &dataspace::Observe {
ds.assert(t, language(), &dataspace::Observe {
// TODO: codegen plugin to generate pattern constructors
pattern: pattern!{<bind <ref { oid: #(&queried_oid), key: $ }> $ _>},
observer: handler,
}))
});
Ok(())
}
struct ValidatedNoiseSpec {
@ -232,52 +263,80 @@ fn validate_noise_spec(
})
}
fn handle_resolve_noise(
fn await_bind_noise(
ds: &mut Arc<Cap>,
t: &mut Activation,
service_selector: AnyValue,
initiator_session: Arc<Cap>,
) -> DuringResult<Arc<Cap>> {
observer: Arc<Cap>,
direct_resolution_facet: FacetId,
) -> ActorResult {
let handler = syndicate::entity(())
.on_asserted_facet(move |_state, t, a: AnyValue| {
let initiator_session = Arc::clone(&initiator_session);
t.stop_facet(direct_resolution_facet);
let observer = Arc::clone(&observer);
t.spawn_link(None, move |t| {
let bindings = a.value().to_sequence()?;
let spec = validate_noise_spec(language().parse(&bindings[0])?)?;
let service = bindings[1].value().to_embedded()?;
run_noise_responder(t, spec, initiator_session, Arc::clone(service))
run_noise_responder(t, spec, observer, Arc::clone(service))
});
Ok(())
})
.create_cap(t);
eventually_retract(ds.assert(t, language(), &dataspace::Observe {
ds.assert(t, language(), &dataspace::Observe {
// TODO: codegen plugin to generate pattern constructors
pattern: pattern!{
<bind <noise $spec:NoiseServiceSpec{ { service: #(&service_selector) } }> $service _>
},
observer: handler,
}))
});
Ok(())
}
struct ResponderDetails {
initiator_session: Arc<Cap>,
service: Arc<Cap>,
}
struct ResponderTransport {
relay_input: Arc<Mutex<Option<TunnelRelay>>>,
c_recv: CipherState<ChaCha20Poly1305>
}
type HandshakeState = noise_protocol::HandshakeState<X25519, ChaCha20Poly1305, Blake2s>;
enum ResponderState {
Handshake(ResponderDetails, noise_protocol::HandshakeState<X25519, ChaCha20Poly1305, Blake2s>),
Transport(ResponderTransport),
Invalid, // used during state transitions
Introduction {
service: Arc<Cap>,
hs: HandshakeState,
},
Handshake {
initiator_session: Arc<Cap>,
service: Arc<Cap>,
hs: HandshakeState,
},
Transport {
relay_input: Arc<Mutex<Option<TunnelRelay>>>,
c_recv: CipherState<ChaCha20Poly1305>,
},
}
impl Entity<noise::Packet> for ResponderState {
fn message(&mut self, t: &mut Activation, p: noise::Packet) -> ActorResult {
impl Entity<noise::SessionItem> for ResponderState {
fn assert(&mut self, _t: &mut Activation, item: noise::SessionItem, _handle: Handle) -> ActorResult {
let initiator_session = match item {
noise::SessionItem::Initiator(i_box) => i_box.initiator_session,
noise::SessionItem::Packet(_) => Err("Unexpected Packet assertion")?,
};
match std::mem::replace(self, ResponderState::Invalid) {
ResponderState::Introduction { service, hs } => {
*self = ResponderState::Handshake { initiator_session, service, hs };
Ok(())
}
_ =>
Err("Received second Initiator")?,
}
}
fn message(&mut self, t: &mut Activation, item: noise::SessionItem) -> ActorResult {
let p = match item {
noise::SessionItem::Initiator(_) => Err("Unexpected Initiator message")?,
noise::SessionItem::Packet(p_box) => *p_box,
};
match self {
ResponderState::Handshake(details, hs) => match p {
ResponderState::Invalid | ResponderState::Introduction { .. } =>
Err("Received Packet in invalid ResponderState")?,
ResponderState::Handshake { initiator_session, service, hs } => match p {
noise::Packet::Complete(bs) => {
if bs.len() < hs.get_next_message_overhead() {
Err("Invalid handshake message for pattern")?;
@ -288,14 +347,13 @@ impl Entity<noise::Packet> for ResponderState {
hs.read_message(&bs, &mut [])?;
let mut reply = vec![0u8; hs.get_next_message_overhead()];
hs.write_message(&[], &mut reply[..])?;
details.initiator_session.message(t, language(), &noise::Packet::Complete(reply.into()));
initiator_session.message(t, language(), &noise::Packet::Complete(reply.into()));
if hs.completed() {
let (c_recv, mut c_send) = hs.get_ciphers();
let (_, relay_input, mut relay_output) =
TunnelRelay::_run(t, Some(Arc::clone(&details.service)), None, false);
TunnelRelay::_run(t, Some(Arc::clone(service)), None, false);
let trace_collector = t.trace_collector();
let transport = ResponderTransport { relay_input, c_recv };
let initiator_session = Arc::clone(&details.initiator_session);
let initiator_session = Arc::clone(initiator_session);
let relay_output_name = Some(AnyValue::symbol("relay_output"));
let transport_facet = t.facet_ref();
t.linked_task(relay_output_name.clone(), async move {
@ -326,25 +384,25 @@ impl Entity<noise::Packet> for ResponderState {
}
Ok(LinkedTaskTermination::Normal)
});
*self = ResponderState::Transport(transport);
*self = ResponderState::Transport { relay_input, c_recv };
}
}
_ => Err("Fragmented handshake is not allowed")?,
},
ResponderState::Transport(transport) => {
ResponderState::Transport { relay_input, c_recv } => {
let bs = match p {
noise::Packet::Complete(bs) =>
transport.c_recv.decrypt_vec(&bs[..]).map_err(|_| "Cannot decrypt packet")?,
c_recv.decrypt_vec(&bs[..]).map_err(|_| "Cannot decrypt packet")?,
noise::Packet::Fragmented(pieces) => {
let mut result = Vec::with_capacity(1024);
for piece in pieces {
result.extend(transport.c_recv.decrypt_vec(&piece[..])
result.extend(c_recv.decrypt_vec(&piece[..])
.map_err(|_| "Cannot decrypt packet fragment")?);
}
result
}
};
let mut g = transport.relay_input.lock();
let mut g = relay_input.lock();
let tr = g.as_mut().expect("initialized");
tr.handle_inbound_datagram(t, &bs[..])?;
}
@ -412,7 +470,7 @@ fn lookup_pattern(name: &str) -> Option<HandshakePattern> {
fn run_noise_responder(
t: &mut Activation,
spec: ValidatedNoiseSpec,
initiator_session: Arc<Cap>,
observer: Arc<Cap>,
service: Arc<Cap>,
) -> ActorResult {
let hs = {
@ -435,13 +493,8 @@ fn run_noise_responder(
hs
};
let details = ResponderDetails {
initiator_session: initiator_session.clone(),
service,
};
let responder_session =
Cap::guard(crate::Language::arc(), t.create(ResponderState::Handshake(details, hs)));
initiator_session.assert(t, language(), &gatekeeper::Resolved::Accepted { responder_session });
Cap::guard(crate::Language::arc(), t.create(ResponderState::Introduction{ service, hs }));
observer.assert(t, language(), &gatekeeper::Resolved::Accepted { responder_session });
Ok(())
}

View File

@ -32,13 +32,9 @@ pub fn empty_response(code: StatusCode) -> Response<Body> {
type ChunkItem = Result<body::Bytes, Box<dyn std::error::Error + Send + Sync>>;
enum ResponseCollector {
Pending {
tx: oneshot::Sender<Response<Body>>,
body_tx: UnboundedSender<ChunkItem>,
res: Response<Body>,
},
Done
struct ResponseCollector {
tx_res: Option<(oneshot::Sender<Response<Body>>, Response<Body>)>,
body_tx: Option<UnboundedSender<ChunkItem>>,
}
impl ResponseCollector {
@ -46,43 +42,50 @@ impl ResponseCollector {
let (body_tx, body_rx) = unbounded_channel();
let body_stream: Box<dyn futures::Stream<Item = ChunkItem> + Send> =
Box::new(UnboundedReceiverStream::new(body_rx));
ResponseCollector::Pending {
tx,
body_tx,
res: Response::new(body_stream.into()),
let mut res = Response::new(body_stream.into());
*res.status_mut() = StatusCode::OK;
ResponseCollector {
tx_res: Some((tx, res)),
body_tx: Some(body_tx),
}
}
fn with_res<F: FnOnce(&mut Response<Body>) -> ActorResult>(&mut self, f: F) -> ActorResult {
if let ResponseCollector::Pending { res, .. } = self {
if let Some((_, res)) = &mut self.tx_res {
f(res)?;
}
Ok(())
}
fn deliver_res(&mut self) {
if let Some((tx, res)) = std::mem::replace(&mut self.tx_res, None) {
let _ = tx.send(res);
}
}
fn add_chunk(&mut self, value: http::Chunk) -> ActorResult {
if let ResponseCollector::Pending { body_tx, .. } = self {
self.deliver_res();
if let Some(body_tx) = self.body_tx.as_mut() {
body_tx.send(Ok(match value {
http::Chunk::Bytes(bs) => bs.into(),
http::Chunk::String(s) => s.as_bytes().to_vec().into(),
}))?;
}
Ok(())
}
fn finish(&mut self) -> ActorResult {
match std::mem::replace(self, ResponseCollector::Done) {
ResponseCollector::Pending { tx, res, .. } => {
let _ = tx.send(res);
}
ResponseCollector::Done => (),
}
fn finish(&mut self, t: &mut Activation) -> ActorResult {
self.deliver_res();
self.body_tx = None;
t.stop();
Ok(())
}
}
impl Entity<http::HttpResponse> for ResponseCollector {
fn message(&mut self, _turn: &mut Activation, message: http::HttpResponse) -> ActorResult {
fn message(&mut self, t: &mut Activation, message: http::HttpResponse) -> ActorResult {
match message {
http::HttpResponse::Status { code, .. } => self.with_res(|r| {
*r.status_mut() = StatusCode::from_u16(
@ -94,10 +97,12 @@ impl Entity<http::HttpResponse> for ResponseCollector {
HeaderValue::from_str(value.as_str())?);
Ok(())
}),
http::HttpResponse::Chunk { chunk } => self.add_chunk(*chunk),
http::HttpResponse::Chunk { chunk } => {
self.add_chunk(*chunk)
}
http::HttpResponse::Done { chunk } => {
self.add_chunk(*chunk)?;
self.finish()
self.finish(t)
}
}
}
@ -111,11 +116,11 @@ pub async fn serve(
port: u16,
) -> Result<Response<Body>, Error> {
let host = match req.headers().get("host").and_then(|v| v.to_str().ok()) {
None => return Ok(empty_response(StatusCode::BAD_REQUEST)),
Some(h) => match h.rsplit_once(':') {
None => http::RequestHost::Absent,
Some(h) => http::RequestHost::Present(match h.rsplit_once(':') {
None => h.to_string(),
Some((h, _port)) => h.to_string(),
}
})
};
let uri = req.uri();
@ -160,34 +165,29 @@ pub async fn serve(
let account = Account::new(Some(AnyValue::symbol("http")), trace_collector);
let (tx, rx) = oneshot::channel();
let mut req_handle: Option<Handle> = None;
facet.activate(&account, Some(trace::TurnCause::external("http")), |t| {
let sreq = http::HttpRequest {
sequence_number: NEXT_SEQ.fetch_add(1, Ordering::Relaxed).into(),
host,
port: port.into(),
method: req.method().to_string().to_lowercase(),
path,
headers: http::Headers(headers),
query,
body,
};
tracing::debug!(?sreq);
let srep = Cap::guard(&language().syndicate, t.create(ResponseCollector::new(tx)));
req_handle = httpd.assert(t, language(), &http::HttpContext { req: sreq, res: srep });
t.facet(move |t| {
let sreq = http::HttpRequest {
sequence_number: NEXT_SEQ.fetch_add(1, Ordering::Relaxed).into(),
host,
port: port.into(),
method: req.method().to_string().to_lowercase(),
path,
headers: http::Headers(headers),
query,
body,
};
tracing::debug!(?sreq);
let srep = Cap::guard(&language().syndicate, t.create(ResponseCollector::new(tx)));
httpd.assert(t, language(), &http::HttpContext { req: sreq, res: srep });
Ok(())
})?;
Ok(())
});
let response_result = rx.await;
facet.activate(&account, Some(trace::TurnCause::external("http")), |t| {
if let Some(h) = req_handle {
t.retract(h);
}
Ok(())
});
match response_result {
Ok(response) => Ok(response),
Err(_ /* sender dropped */) => Ok(empty_response(StatusCode::INTERNAL_SERVER_ERROR)),

View File

@ -1,5 +1,6 @@
use preserves_schema::Codec;
use std::convert::TryInto;
use std::io;
use std::path::PathBuf;
use std::sync::Arc;
@ -61,6 +62,10 @@ struct ServerConfig {
#[structopt(short = "t", long)]
trace_file: Option<PathBuf>,
/// Enable `$control` entity.
#[structopt(long)]
control: bool,
}
#[tokio::main]
@ -115,7 +120,7 @@ async fn main() -> ActorResult {
let gatekeeper = Cap::guard(Language::arc(), t.create(
syndicate::entity(Arc::clone(&server_config_ds))
.on_asserted(gatekeeper::handle_resolves)));
.on_asserted_facet(gatekeeper::facet_handle_resolve)));
gatekeeper::handle_binds(t, &server_config_ds)?;
let mut env = Map::new();
@ -123,6 +128,20 @@ async fn main() -> ActorResult {
env.insert("log".to_owned(), AnyValue::domain(Arc::clone(&log_ds)));
env.insert("gatekeeper".to_owned(), AnyValue::domain(Arc::clone(&gatekeeper)));
if config.control {
env.insert("control".to_owned(), AnyValue::domain(Cap::guard(Language::arc(), t.create(
syndicate::entity(())
.on_message(|_, _t, m: crate::schemas::control::ExitServer| {
tracing::info!("$control received exit request with code {}", m.code);
std::process::exit((&m.code).try_into().unwrap_or_else(|_| {
tracing::warn!(
"exit code {} out-of-range of 32-bit signed integer, using 1 instead",
m.code);
1
}))
})))));
}
dependencies::boot(t, Arc::clone(&server_config_ds));
services::config_watcher::on_demand(t, Arc::clone(&server_config_ds));
services::daemon::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&log_ds));

View File

@ -9,7 +9,7 @@ use syndicate::actor::*;
use syndicate::dataspace::Dataspace;
use syndicate::during;
use syndicate::enclose;
use syndicate::pattern::{lift_literal, drop_literal};
use syndicate::pattern::{lift_literal, drop_literal, pattern_seq_from_dictionary};
use syndicate::schemas::dataspace;
use syndicate::schemas::dataspace_patterns as P;
use syndicate::schemas::sturdy;
@ -173,7 +173,7 @@ fn bad_instruction(message: &str) -> io::Error {
}
fn discard() -> P::Pattern {
P::Pattern::DDiscard(Box::new(P::DDiscard))
P::Pattern::Discard
}
fn dlit(value: AnyValue) -> P::Pattern {
@ -272,7 +272,7 @@ impl<'env> PatternInstantiator<'env> {
Symbolic::Discard => discard(),
Symbolic::Binder(s) => {
self.binding_names.push(s);
P::Pattern::DBind(Box::new(P::DBind { pattern: discard() }))
P::Pattern::Bind { pattern: Box::new(discard()) }
}
Symbolic::Reference(s) =>
dlit(self.env.lookup(&s, "pattern-template variable")?.clone()),
@ -287,43 +287,47 @@ impl<'env> PatternInstantiator<'env> {
Some(pat) => pat,
None => {
let label = self.instantiate_pattern(r.label())?;
let fields = r.fields().iter().map(|p| self.instantiate_pattern(p))
.collect::<io::Result<Vec<P::Pattern>>>()?;
P::Pattern::DCompound(Box::new(P::DCompound::Rec {
label: drop_literal(&label)
.ok_or(bad_instruction("Record pattern must have literal label"))?,
fields,
}))
let entries = r.fields().iter().enumerate()
.map(|(i, p)| Ok((AnyValue::new(i), self.instantiate_pattern(p)?)))
.collect::<io::Result<Map<AnyValue, P::Pattern>>>()?;
P::Pattern::Group {
type_: Box::new(P::GroupType::Rec {
label: drop_literal(&label)
.ok_or(bad_instruction("Record pattern must have literal label"))?,
}),
entries,
}
}
}
},
Value::Sequence(v) =>
P::Pattern::DCompound(Box::new(P::DCompound::Arr {
items: v.iter()
.map(|p| self.instantiate_pattern(p))
.collect::<io::Result<Vec<P::Pattern>>>()?,
})),
P::Pattern::Group {
type_: Box::new(P::GroupType::Arr),
entries: v.iter().enumerate()
.map(|(i, p)| Ok((AnyValue::new(i), self.instantiate_pattern(p)?)))
.collect::<io::Result<Map<AnyValue, P::Pattern>>>()?,
},
Value::Set(_) =>
Err(bad_instruction(&format!("Sets not permitted in patterns: {:?}", template)))?,
Value::Dictionary(v) =>
P::Pattern::DCompound(Box::new(P::DCompound::Dict {
P::Pattern::Group {
type_: Box::new(P::GroupType::Dict),
entries: v.iter()
.map(|(a, b)| Ok((a.clone(), self.instantiate_pattern(b)?)))
.collect::<io::Result<Map<AnyValue, P::Pattern>>>()?,
})),
},
})
}
fn maybe_binder_with_pattern(&mut self, r: &Record<AnyValue>) -> io::Result<Option<P::Pattern>> {
match r.label().value().as_symbol().map(|s| analyze(&s)) {
Some(Symbolic::Binder(formal)) => if r.fields().len() == 1 {
Some(Symbolic::Binder(formal)) if r.fields().len() == 1 => {
let pattern = self.instantiate_pattern(&r.fields()[0])?;
self.binding_names.push(formal);
return Ok(Some(P::Pattern::DBind(Box::new(P::DBind { pattern }))));
Ok(Some(P::Pattern::Bind { pattern: Box::new(pattern) }))
},
_ => (),
_ => Ok(None),
}
Ok(None)
}
}
@ -553,7 +557,7 @@ impl Env {
RewriteTemplate::Accept { pattern_template } => {
let (_binding_names, pattern) = self.instantiate_pattern(pattern_template)?;
Ok(sturdy::Rewrite {
pattern: embed_pattern(&P::Pattern::DBind(Box::new(P::DBind { pattern }))),
pattern: embed_pattern(&P::Pattern::Bind { pattern: Box::new(pattern) }),
template: sturdy::Template::TRef(Box::new(sturdy::TRef { binding: 0.into() })),
})
}
@ -674,24 +678,26 @@ impl Env {
fn embed_pattern(p: &P::Pattern) -> sturdy::Pattern {
match p {
P::Pattern::DDiscard(_) => sturdy::Pattern::PDiscard(Box::new(sturdy::PDiscard)),
P::Pattern::DBind(b) => sturdy::Pattern::PBind(Box::new(sturdy::PBind {
pattern: embed_pattern(&b.pattern),
P::Pattern::Discard => sturdy::Pattern::PDiscard(Box::new(sturdy::PDiscard)),
P::Pattern::Bind { pattern } => sturdy::Pattern::PBind(Box::new(sturdy::PBind {
pattern: embed_pattern(&**pattern),
})),
P::Pattern::DLit(b) => sturdy::Pattern::Lit(Box::new(sturdy::Lit {
value: language().unparse(&b.value),
P::Pattern::Lit { value } => sturdy::Pattern::Lit(Box::new(sturdy::Lit {
value: language().unparse(&**value),
})),
P::Pattern::DCompound(b) => sturdy::Pattern::PCompound(Box::new(match &**b {
P::DCompound::Rec { label, fields } =>
P::Pattern::Group { type_, entries } => sturdy::Pattern::PCompound(Box::new(match &**type_ {
P::GroupType::Rec { label } =>
sturdy::PCompound::Rec {
label: label.clone(),
fields: fields.iter().map(embed_pattern).collect(),
fields: pattern_seq_from_dictionary(entries).expect("correct field entries")
.into_iter().map(embed_pattern).collect(),
},
P::DCompound::Arr { items } =>
P::GroupType::Arr =>
sturdy::PCompound::Arr {
items: items.iter().map(embed_pattern).collect(),
items: pattern_seq_from_dictionary(entries).expect("correct element entries")
.into_iter().map(embed_pattern).collect(),
},
P::DCompound::Dict { entries } =>
P::GroupType::Dict =>
sturdy::PCompound::Dict {
entries: entries.iter().map(|(k, v)| (k.clone(), embed_pattern(v))).collect(),
},

View File

@ -27,7 +27,7 @@ pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
fn run(t: &mut Activation, ds: Arc<Cap>, spec: Gatekeeper<AnyValue>) -> ActorResult {
let resolver = t.create(syndicate::entity(Arc::clone(&spec.bindspace))
.on_asserted(gatekeeper::handle_resolves));
.on_asserted_facet(gatekeeper::facet_handle_resolve));
ds.assert(t, language(), &syndicate::schemas::service::ServiceObject {
service_name: language().unparse(&spec),
object: AnyValue::domain(Cap::guard(Language::arc(), resolver)),

View File

@ -10,7 +10,6 @@ use syndicate::error::Error;
use syndicate::preserves::rec;
use syndicate::preserves::value::Map;
use syndicate::preserves::value::NestedValue;
use syndicate::preserves::value::Set;
use syndicate::schemas::http;
use syndicate::value::signed_integer::SignedInteger;
@ -22,7 +21,7 @@ use crate::schemas::internal_services::HttpStaticFileServer;
use syndicate_macros::during;
lazy_static::lazy_static! {
pub static ref MIME_TABLE: Map<String, String> = load_mime_table("/etc/mime.types").expect("MIME table");
pub static ref MIME_TABLE: Map<String, String> = load_mime_table("/etc/mime.types").unwrap_or_default();
}
pub fn load_mime_table(path: &str) -> Result<Map<String, String>, std::io::Error> {
@ -56,10 +55,22 @@ pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
});
}
type MethodTable = Map<http::MethodPattern, Set<Arc<Cap>>>;
#[derive(Debug, Clone)]
struct ActiveHandler {
cap: Arc<Cap>,
terminated: Arc<Field<bool>>,
}
type MethodTable = Map<http::MethodPattern, Vec<ActiveHandler>>;
type HostTable = Map<http::HostPattern, Map<http::PathPattern, MethodTable>>;
type RoutingTable = Map<SignedInteger, HostTable>;
fn request_host(value: &http::RequestHost) -> Option<String> {
match value {
http::RequestHost::Present(h) => Some(h.to_owned()),
http::RequestHost::Absent => None,
}
}
fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
ds.assert(t, language(), &lifecycle::started(&spec));
ds.assert(t, language(), &lifecycle::ready(&spec));
@ -72,25 +83,35 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
enclose!((httpd, routes) during!(t, httpd, language(), <http-listener #(&port1)>, enclose!((routes, port) |t: &mut Activation| {
let port2 = port.clone();
during!(t, httpd, language(), <http-bind $host #(&port2) $method $path $handler>, |t: &mut Activation| {
tracing::debug!("+HTTP binding {:?} {:?} {:?} {:?} {:?}", host, port, method, path, handler);
let port = port.value().to_signedinteger()?;
let host = language().parse::<http::HostPattern>(&host)?;
let path = language().parse::<http::PathPattern>(&path)?;
let method = language().parse::<http::MethodPattern>(&method)?;
let handler = handler.value().to_embedded()?;
let handler_cap = handler.value().to_embedded()?.clone();
let handler_terminated = t.named_field("handler-terminated", false);
t.get_mut(&routes)
.entry(port.clone()).or_default()
.entry(host.clone()).or_default()
.entry(path.clone()).or_default()
.entry(method.clone()).or_default()
.insert(handler.clone());
t.on_stop(enclose!((routes, handler, method, path, host, port) move |t| {
.push(ActiveHandler {
cap: handler_cap.clone(),
terminated: handler_terminated,
});
t.on_stop(enclose!((routes, method, path, host, port) move |t| {
tracing::debug!("-HTTP binding {:?} {:?} {:?} {:?} {:?}", host, port, method, path, handler);
let port_map = t.get_mut(&routes);
let host_map = port_map.entry(port.clone()).or_default();
let path_map = host_map.entry(host.clone()).or_default();
let method_map = path_map.entry(path.clone()).or_default();
let handler_set = method_map.entry(method.clone()).or_default();
handler_set.remove(&handler);
if handler_set.is_empty() {
let handler_vec = method_map.entry(method.clone()).or_default();
let handler = {
let i = handler_vec.iter().position(|a| a.cap == handler_cap)
.expect("Expected an index of an active handler to remove");
handler_vec.swap_remove(i)
};
if handler_vec.is_empty() {
method_map.remove(&method);
}
if method_map.is_empty() {
@ -102,6 +123,7 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
if host_map.is_empty() {
port_map.remove(&port);
}
*t.get_mut(&handler.terminated) = true;
Ok(())
}));
Ok(())
@ -115,12 +137,14 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
let req = match language().parse::<http::HttpRequest>(&req) { Ok(v) => v, Err(_) => return Ok(()) };
let res = match res.value().to_embedded() { Ok(v) => v, Err(_) => return Ok(()) };
tracing::trace!("Looking up handler for {:#?} in {:#?}", &req, &t.get(&routes));
let host_map = match t.get(&routes).get(&req.port) {
Some(host_map) => host_map,
None => return send_empty(t, res, 404, "Not found"),
};
let methods = match try_hostname(host_map, http::HostPattern::Host(req.host.clone()), &req.path)? {
let methods = match request_host(&req.host).and_then(|h| try_hostname(host_map, http::HostPattern::Host(h), &req.path).transpose()).transpose()? {
Some(methods) => methods,
None => match try_hostname(host_map, http::HostPattern::Any, &req.path)? {
Some(methods) => methods,
@ -141,9 +165,7 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
code: 405.into(), message: "Method Not Allowed".into() });
res.message(t, language(), &http::HttpResponse::Header {
name: "allow".into(), value: allowed });
res.message(t, language(), &http::HttpResponse::Done {
chunk: Box::new(http::Chunk::Bytes(vec![])) });
return Ok(())
return send_done(t, res);
}
}
};
@ -151,21 +173,33 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
if handlers.len() > 1 {
tracing::warn!(?req, "Too many handlers available");
}
let handler = handlers.first().expect("Nonempty handler set").clone();
handler.assert(t, language(), &http::HttpContext { req, res: res.clone() });
let ActiveHandler { cap, terminated } = handlers.first().expect("Nonempty handler set").clone();
tracing::trace!("Handler for {:?} is {:?}", &req, &cap);
t.dataflow(enclose!((terminated, req, res) move |t| {
if *t.get(&terminated) {
tracing::trace!("Handler for {:?} terminated", &req);
send_empty(t, &res, 500, "Internal Server Error")?;
}
Ok(())
}))?;
cap.assert(t, language(), &http::HttpContext { req, res: res.clone() });
Ok(())
});
Ok(())
}
fn send_done(t: &mut Activation, res: &Arc<Cap>) -> ActorResult {
res.message(t, language(), &http::HttpResponse::Done {
chunk: Box::new(http::Chunk::Bytes(vec![])) });
Ok(())
}
fn send_empty(t: &mut Activation, res: &Arc<Cap>, code: u16, message: &str) -> ActorResult {
res.message(t, language(), &http::HttpResponse::Status {
code: code.into(), message: message.into() });
res.message(t, language(), &http::HttpResponse::Done {
chunk: Box::new(http::Chunk::Bytes(vec![])) });
return Ok(())
send_done(t, res)
}
fn path_pattern_matches(path_pat: &http::PathPattern, path: &Vec<String>) -> bool {
@ -187,7 +221,10 @@ fn path_pattern_matches(path_pat: &http::PathPattern, path: &Vec<String>) -> boo
http::PathPatternElement::Rest => return true,
}
}
true
match path_iter.next() {
Some(_more) => false,
None => true,
}
}
fn try_hostname<'table>(
@ -199,6 +236,7 @@ fn try_hostname<'table>(
None => Ok(None),
Some(path_table) => {
for (path_pat, method_table) in path_table.iter() {
tracing::trace!("Checking path {:?} against pattern {:?}", &path, &path_pat);
if path_pattern_matches(path_pat, path) {
return Ok(Some(method_table));
}
@ -263,9 +301,7 @@ impl HttpStaticFileServer {
code: 301.into(), message: "Moved permanently".into() });
res.message(t, language(), &http::HttpResponse::Header {
name: "location".into(), value: format!("/{}/", req.path.join("/")) });
res.message(t, language(), &http::HttpResponse::Done {
chunk: Box::new(http::Chunk::Bytes(vec![])) });
return Ok(())
return send_done(t, res);
} else {
let mut buf = Vec::new();
fh.read_to_end(&mut buf)?;
@ -286,6 +322,7 @@ impl HttpStaticFileServer {
}
res.message(t, language(), &http::HttpResponse::Done {
chunk: Box::new(http::Chunk::Bytes(body)) });
Ok(())
}
}

View File

@ -15,24 +15,20 @@ use tokio::net::TcpListener;
use crate::language::language;
use crate::lifecycle;
use crate::protocol::detect_protocol;
use crate::schemas::internal_services::{TcpWithHttp, TcpWithoutHttp, TcpRelayListener};
use crate::schemas::internal_services::TcpWithoutHttp;
use syndicate_macros::during;
pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
t.spawn(Some(AnyValue::symbol("tcp_relay_listener")), move |t| {
enclose!((ds) during!(t, ds, language(), <run-service $spec: TcpWithHttp::<AnyValue>>, |t: &mut Activation| {
spec.httpd.assert(t, language(), &syndicate::schemas::http::HttpListener { port: spec.addr.port.clone() });
run_supervisor(t, ds.clone(), TcpRelayListener::TcpWithHttp(Box::new(spec)))
}));
enclose!((ds) during!(t, ds, language(), <run-service $spec: TcpWithoutHttp::<AnyValue>>, |t| {
run_supervisor(t, ds.clone(), TcpRelayListener::TcpWithoutHttp(Box::new(spec)))
run_supervisor(t, ds.clone(), spec)
}));
Ok(())
});
}
fn run_supervisor(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> ActorResult {
fn run_supervisor(t: &mut Activation, ds: Arc<Cap>, spec: TcpWithoutHttp) -> ActorResult {
Supervisor::start(
t,
Some(rec![AnyValue::symbol("relay"), language().unparse(&spec)]),
@ -41,18 +37,32 @@ fn run_supervisor(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> A
enclose!((ds) move |t| enclose!((ds, spec) run(t, ds, spec))))
}
fn run(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> ActorResult {
fn run(t: &mut Activation, ds: Arc<Cap>, spec: TcpWithoutHttp) -> ActorResult {
lifecycle::terminate_on_service_restart(t, &ds, &spec);
let (addr, gatekeeper, httpd) = match spec.clone() {
TcpRelayListener::TcpWithHttp(b) => {
let TcpWithHttp { addr, gatekeeper, httpd } = *b;
(addr, gatekeeper, Some(httpd))
}
TcpRelayListener::TcpWithoutHttp(b) => {
let TcpWithoutHttp { addr, gatekeeper } = *b;
(addr, gatekeeper, None)
}
};
let httpd = t.named_field("httpd", None::<Arc<Cap>>);
{
let ad = spec.addr.clone();
let ad2 = ad.clone();
let gk = spec.gatekeeper.clone();
enclose!((ds, httpd) during!(t, ds, language(),
<run-service <relay-listener #(&language().unparse(&ad)) #(&AnyValue::domain(gk)) $h>>, |t: &mut Activation| {
if let Some(h) = h.value().as_embedded().cloned() {
h.assert(t, language(), &syndicate::schemas::http::HttpListener { port: ad2.port.clone() });
*t.get_mut(&httpd) = Some(h.clone());
t.on_stop(enclose!((httpd) move |t| {
let f = t.get_mut(&httpd);
if *f == Some(h.clone()) { *f = None; }
Ok(())
}));
}
Ok(())
}));
}
let TcpWithoutHttp { addr, gatekeeper } = spec.clone();
let host = addr.host.clone();
let port = u16::try_from(&addr.port).map_err(|_| "Invalid TCP port number")?;
let facet = t.facet_ref();
@ -83,6 +93,7 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> ActorResult
let account = Account::new(name.clone(), trace_collector.clone());
if !facet.activate(
&account, cause, enclose!((trace_collector, httpd) move |t| {
let httpd = t.get(&httpd).clone();
t.spawn(name, move |t| {
Ok(t.linked_task(None, {
let facet = t.facet_ref();
@ -91,7 +102,7 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> ActorResult
facet,
stream,
gatekeeper,
httpd.map(|r| r.clone()),
httpd,
addr,
port).await?;
Ok(LinkedTaskTermination::KeepFacet)

View File

@ -1,6 +1,6 @@
[package]
name = "syndicate-tools"
version = "0.14.0"
version = "0.18.0"
authors = ["Tony Garnock-Jones <tonyg@leastfixedpoint.com>"]
edition = "2018"
@ -10,8 +10,8 @@ repository = "https://git.syndicate-lang.org/syndicate-lang/syndicate-rs"
license = "Apache-2.0"
[dependencies]
preserves = "4.994"
syndicate = { path = "../syndicate", version = "0.35.0"}
preserves = "4.995"
syndicate = { path = "../syndicate", version = "0.40.0"}
clap = { version = "^4.0", features = ["derive"] }
clap_complete = "^4.0"

View File

@ -1,6 +1,6 @@
[package]
name = "syndicate"
version = "0.35.0"
version = "0.40.0"
authors = ["Tony Garnock-Jones <tonyg@leastfixedpoint.com>"]
edition = "2018"
@ -13,13 +13,13 @@ license = "Apache-2.0"
vendored-openssl = ["openssl/vendored"]
[build-dependencies]
preserves-schema = "5.994"
preserves-schema = "5.995"
[dependencies]
preserves = "4.994"
preserves-schema = "5.994"
preserves = "4.995"
preserves-schema = "5.995"
tokio = { version = "1.10", features = ["io-util", "macros", "rt", "rt-multi-thread", "time"] }
tokio = { version = "1.10", features = ["io-std", "io-util", "macros", "rt", "rt-multi-thread", "time"] }
tokio-util = "0.6"
bytes = "1.0"

View File

@ -11,6 +11,7 @@ use syndicate::during::entity;
use syndicate::dataspace::Dataspace;
use syndicate::schemas::dataspace::Observe;
use syndicate::schemas::dataspace_patterns as p;
use syndicate::value::Map;
use syndicate::value::NestedValue;
use syndicate::value::Value;
@ -88,11 +89,11 @@ pub fn bench_pub(c: &mut Criterion) {
.create_cap(t);
ds.assert(t, language(), &Observe {
pattern: p::Pattern::DBind(Box::new(p::DBind {
pattern: p::Pattern::DLit(Box::new(p::DLit {
value: p::AnyAtom::Symbol("consumer".to_owned()),
})),
})),
pattern: p::Pattern::Bind {
pattern: Box::new(p::Pattern::Lit {
value: Box::new(p::AnyAtom::Symbol("consumer".to_owned())),
}),
},
observer: shutdown,
});
@ -110,24 +111,27 @@ pub fn bench_pub(c: &mut Criterion) {
ds.assert(t, &(), &AnyValue::symbol("consumer"));
ds.assert(t, language(), &Observe {
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec {
label: AnyValue::symbol("Says"),
fields: vec![
p::Pattern::DLit(Box::new(p::DLit {
value: p::AnyAtom::String("bench_pub".to_owned()),
})),
p::Pattern::DBind(Box::new(p::DBind {
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)),
})),
]})),
pattern: p::Pattern::Group {
type_: Box::new(p::GroupType::Rec {
label: AnyValue::symbol("Says"),
}),
entries: Map::from([
(p::_Any::new(0), p::Pattern::Lit {
value: Box::new(p::AnyAtom::String("bench_pub".to_owned())),
}),
(p::_Any::new(1), p::Pattern::Bind {
pattern: Box::new(p::Pattern::Discard),
}),
]),
},
observer: receiver,
});
ds.assert(t, language(), &Observe {
pattern: p::Pattern::DBind(Box::new(p::DBind {
pattern: p::Pattern::DLit(Box::new(p::DLit {
value: p::AnyAtom::Bool(true),
})),
})),
pattern: p::Pattern::Bind {
pattern: Box::new(p::Pattern::Lit {
value: Box::new(p::AnyAtom::Bool(true)),
}),
},
observer: shutdown,
});

View File

@ -2,15 +2,15 @@
tcp-remote„´³tupleµ´³named³host´³atom³String„„´³named³port´³atom³ SignedInteger„„„„„³ TcpPeerInfo´³rec´³lit³tcp-peer„´³tupleµ´³named³handle´³embedded³any„„´³named³local´³refµ„³TcpLocal„„´³named³remote´³refµ„³ TcpRemote„„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³http„´³schema·³version°³ definitions·³Chunk´³orµµ±string´³atom³String„„µ±bytes´³atom³
ByteString„„„„³Headers´³dictof´³atom³Symbol„´³atom³String„„³MimeType´³atom³Symbol„³
QueryValue´³orµµ±string´³atom³String„„µ±file´³rec´³lit³file„´³tupleµ´³named³filename´³atom³String„„´³named³headers´³refµ„³Headers„„´³named³body´³atom³
ByteString„„„„„„„„³ HostPattern´³orµµ±host´³atom³String„„µ±any´³lit€„„„„³ HttpBinding´³rec´³lit³ http-bind„´³tupleµ´³named³host´³refµ„³ HostPattern„„´³named³port´³atom³ SignedInteger„„´³named³method´³refµ„³ MethodPattern„„´³named³path´³refµ„³ PathPattern„„´³named³handler´³embedded´³refµ„³ HttpRequest„„„„„„³ HttpContext´³rec´³lit³request„´³tupleµ´³named³req´³refµ„³ HttpRequest„„´³named³res´³embedded´³refµ„³ HttpResponse„„„„„„³ HttpRequest´³rec´³lit³ http-request„´³tupleµ´³named³sequenceNumber´³atom³ SignedInteger„„´³named³host´³atom³String„„´³named³port´³atom³ SignedInteger„„´³named³method´³atom³Symbol„„´³named³path´³seqof´³atom³String„„„´³named³headers´³refµ„³Headers„„´³named³query´³dictof´³atom³Symbol„´³seqof´³refµ„³
ByteString„„„„„„„„³ HostPattern´³orµµ±host´³atom³String„„µ±any´³lit€„„„„³ HttpBinding´³rec´³lit³ http-bind„´³tupleµ´³named³host´³refµ„³ HostPattern„„´³named³port´³atom³ SignedInteger„„´³named³method´³refµ„³ MethodPattern„„´³named³path´³refµ„³ PathPattern„„´³named³handler´³embedded´³refµ„³ HttpRequest„„„„„„³ HttpContext´³rec´³lit³request„´³tupleµ´³named³req´³refµ„³ HttpRequest„„´³named³res´³embedded´³refµ„³ HttpResponse„„„„„„³ HttpRequest´³rec´³lit³ http-request„´³tupleµ´³named³sequenceNumber´³atom³ SignedInteger„„´³named³host´³refµ„³ RequestHost„„´³named³port´³atom³ SignedInteger„„´³named³method´³atom³Symbol„„´³named³path´³seqof´³atom³String„„„´³named³headers´³refµ„³Headers„„´³named³query´³dictof´³atom³Symbol„´³seqof´³refµ„³
QueryValue„„„„´³named³body´³refµ„³ RequestBody„„„„„³ HttpService´³rec´³lit³ http-service„´³tupleµ´³named³host´³refµ„³ HostPattern„„´³named³port´³atom³ SignedInteger„„´³named³method´³refµ„³ MethodPattern„„´³named³path´³refµ„³ PathPattern„„„„„³ PathPattern´³seqof´³refµ„³PathPatternElement„„³ RequestBody´³orµµ±present´³atom³
ByteString„„µ±absent´³lit€„„„„³ HttpListener´³rec´³lit³ http-listener„´³tupleµ´³named³port´³atom³ SignedInteger„„„„„³ HttpResponse´³orµµ±status´³rec´³lit³status„´³tupleµ´³named³code´³atom³ SignedInteger„„´³named³message´³atom³String„„„„„„µ±header´³rec´³lit³header„´³tupleµ´³named³name´³atom³Symbol„„´³named³value´³atom³String„„„„„„µ±chunk´³rec´³lit³chunk„´³tupleµ´³named³chunk´³refµ„³Chunk„„„„„„µ±done´³rec´³lit³done„´³tupleµ´³named³chunk´³refµ„³Chunk„„„„„„„„³ MethodPattern´³orµµ±any´³lit€„„µ±specific´³atom³Symbol„„„„³PathPatternElement´³orµµ±label´³atom³String„„µ±wildcard´³lit³_„„µ±rest´³lit³...„„„„„³ embeddedType€„„µ³noise„´³schema·³version°³ definitions·³Packet´³orµµ±complete´³atom³
ByteString„„µ±absent´³lit€„„„„³ RequestHost´³orµµ±present´³atom³String„„µ±absent´³lit€„„„„³ HttpListener´³rec´³lit³ http-listener„´³tupleµ´³named³port´³atom³ SignedInteger„„„„„³ HttpResponse´³orµµ±status´³rec´³lit³status„´³tupleµ´³named³code´³atom³ SignedInteger„„´³named³message´³atom³String„„„„„„µ±header´³rec´³lit³header„´³tupleµ´³named³name´³atom³Symbol„„´³named³value´³atom³String„„„„„„µ±chunk´³rec´³lit³chunk„´³tupleµ´³named³chunk´³refµ„³Chunk„„„„„„µ±done´³rec´³lit³done„´³tupleµ´³named³chunk´³refµ„³Chunk„„„„„„„„³ MethodPattern´³orµµ±any´³lit€„„µ±specific´³atom³Symbol„„„„³PathPatternElement´³orµµ±label´³atom³String„„µ±wildcard´³lit³_„„µ±rest´³lit³...„„„„„³ embeddedType€„„µ³noise„´³schema·³version°³ definitions·³Packet´³orµµ±complete´³atom³
ByteString„„µ±
fragmented´³seqof´³atom³
ByteString„„„„„³ NoiseSpec´³andµ´³dict·³key´³named³key´³atom³
ByteString„„³service´³named³service´³refµ„³ServiceSelector„„„„´³named³protocol´³refµ„³ NoiseProtocol„„´³named³ preSharedKeys´³refµ„³NoisePreSharedKeys„„„„³ NoiseProtocol´³orµµ±present´³dict·³protocol´³named³protocol´³atom³String„„„„„µ±invalid´³dict·³protocol´³named³protocol³any„„„„µ±absent´³dict·„„„„„³ NoiseStepType´³lit³noise„³SecretKeyField´³orµµ±present´³dict·³ secretKey´³named³ secretKey´³atom³
ByteString„„„„„³ Initiator´³rec´³lit³ initiator„´³tupleµ´³named³initiatorSession´³embedded´³refµ„³Packet„„„„„„³ NoiseSpec´³andµ´³dict·³key´³named³key´³atom³
ByteString„„³service´³named³service´³refµ„³ServiceSelector„„„„´³named³protocol´³refµ„³ NoiseProtocol„„´³named³ preSharedKeys´³refµ„³NoisePreSharedKeys„„„„³ SessionItem´³orµµ± Initiator´³refµ„³ Initiator„„µ±Packet´³refµ„³Packet„„„„³ NoiseProtocol´³orµµ±present´³dict·³protocol´³named³protocol´³atom³String„„„„„µ±invalid´³dict·³protocol´³named³protocol³any„„„„µ±absent´³dict·„„„„„³ NoiseStepType´³lit³noise„³SecretKeyField´³orµµ±present´³dict·³ secretKey´³named³ secretKey´³atom³
ByteString„„„„„µ±invalid´³dict·³ secretKey´³named³ secretKey³any„„„„µ±absent´³dict·„„„„„³DefaultProtocol´³lit±!Noise_NK_25519_ChaChaPoly_BLAKE2s„³NoiseStepDetail´³refµ„³ServiceSelector„³ServiceSelector³any³NoiseServiceSpec´³andµ´³named³base´³refµ„³ NoiseSpec„„´³named³ secretKey´³refµ„³SecretKeyField„„„„³NoisePreSharedKeys´³orµµ±present´³dict·³ preSharedKeys´³named³ preSharedKeys´³seqof´³atom³
ByteString„„„„„„µ±invalid´³dict·³ preSharedKeys´³named³ preSharedKeys³any„„„„µ±absent´³dict·„„„„„³NoisePathStepDetail´³refµ„³ NoiseSpec„³NoiseDescriptionDetail´³refµ„³NoiseServiceSpec„„³ embeddedType„„µ³timer„´³schema·³version°³ definitions·³SetTimer´³rec´³lit³ set-timer„´³tupleµ´³named³label³any„´³named³seconds´³atom³Double„„´³named³kind´³refµ„³ TimerKind„„„„„³ LaterThan´³rec´³lit³
ByteString„„„„„„µ±invalid´³dict·³ preSharedKeys´³named³ preSharedKeys³any„„„„µ±absent´³dict·„„„„„³NoisePathStepDetail´³refµ„³ NoiseSpec„³NoiseDescriptionDetail´³refµ„³NoiseServiceSpec„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³timer„´³schema·³version°³ definitions·³SetTimer´³rec´³lit³ set-timer„´³tupleµ´³named³label³any„´³named³seconds´³atom³Double„„´³named³kind´³refµ„³ TimerKind„„„„„³ LaterThan´³rec´³lit³
later-than„´³tupleµ´³named³seconds´³atom³Double„„„„„³ TimerKind´³orµµ±relative´³lit³relative„„µ±absolute´³lit³absolute„„µ±clear´³lit³clear„„„„³ TimerExpired´³rec´³lit³ timer-expired„´³tupleµ´³named³label³any„´³named³seconds´³atom³Double„„„„„„³ embeddedType€„„µ³trace„´³schema·³version°³ definitions·³Oid³any³Name´³orµµ± anonymous´³rec´³lit³ anonymous„´³tupleµ„„„„µ±named´³rec´³lit³named„´³tupleµ´³named³name³any„„„„„„„³Target´³rec´³lit³entity„´³tupleµ´³named³actor´³refµ„³ActorId„„´³named³facet´³refµ„³FacetId„„´³named³oid´³refµ„³Oid„„„„„³TaskId³any³TurnId³any³ActorId³any³FacetId³any³ TurnCause´³orµµ±turn´³rec´³lit³ caused-by„´³tupleµ´³named³id´³refµ„³TurnId„„„„„„µ±cleanup´³rec´³lit³cleanup„´³tupleµ„„„„µ±linkedTaskRelease´³rec´³lit³linked-task-release„´³tupleµ´³named³id´³refµ„³TaskId„„´³named³reason´³refµ„³LinkedTaskReleaseReason„„„„„„µ±periodicActivation´³rec´³lit³periodic-activation„´³tupleµ´³named³period´³atom³Double„„„„„„µ±delay´³rec´³lit³delay„´³tupleµ´³named³ causingTurn´³refµ„³TurnId„„´³named³amount´³atom³Double„„„„„„µ±external´³rec´³lit³external„´³tupleµ´³named³ description³any„„„„„„„³ TurnEvent´³orµµ±assert´³rec´³lit³assert„´³tupleµ´³named³ assertion´³refµ„³AssertionDescription„„´³named³handle´³refµ³protocol„³Handle„„„„„„µ±retract´³rec´³lit³retract„´³tupleµ´³named³handle´³refµ³protocol„³Handle„„„„„„µ±message´³rec´³lit³message„´³tupleµ´³named³body´³refµ„³AssertionDescription„„„„„„µ±sync´³rec´³lit³sync„´³tupleµ´³named³peer´³refµ„³Target„„„„„„µ± breakLink´³rec´³lit³
break-link„´³tupleµ´³named³source´³refµ„³ActorId„„´³named³handle´³refµ³protocol„³Handle„„„„„„„„³
ExitStatus´³orµµ±ok´³lit³ok„„µ±Error´³refµ³protocol„³Error„„„„³
@ -40,5 +40,5 @@ ByteString
RunService´³rec´³lit³ run-service„´³tupleµ´³named³ serviceName³any„„„„³ ServiceState´³rec´³lit³ service-state„´³tupleµ´³named³ serviceName³any„´³named³state´³refµ„³State„„„„„³ ServiceObject´³rec´³lit³service-object„´³tupleµ´³named³ serviceName³any„´³named³object³any„„„„³RequireService´³rec´³lit³require-service„´³tupleµ´³named³ serviceName³any„„„„³RestartService´³rec´³lit³restart-service„´³tupleµ´³named³ serviceName³any„„„„³ServiceDependency´³rec´³lit³
depends-on„´³tupleµ´³named³depender³any„´³named³dependee´³refµ„³ ServiceState„„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³protocol„´³schema·³version°³ definitions·³Oid´³atom³ SignedInteger„³Sync´³rec´³lit³S„´³tupleµ´³named³peer´³embedded´³lit<69>„„„„„„³Turn´³seqof´³refµ„³ TurnEvent„„³Error´³rec´³lit³error„´³tupleµ´³named³message´³atom³String„„´³named³detail³any„„„„³Event´³orµµ±Assert´³refµ„³Assert„„µ±Retract´³refµ„³Retract„„µ±Message´³refµ„³Message„„µ±Sync´³refµ„³Sync„„„„³Assert´³rec´³lit³A„´³tupleµ´³named³ assertion´³refµ„³ Assertion„„´³named³handle´³refµ„³Handle„„„„„³Handle´³atom³ SignedInteger„³Packet´³orµµ±Turn´³refµ„³Turn„„µ±Error´³refµ„³Error„„µ± Extension´³refµ„³ Extension„„„„³Message´³rec´³lit³M„´³tupleµ´³named³body´³refµ„³ Assertion„„„„„³Retract´³rec´³lit³R„´³tupleµ´³named³handle´³refµ„³Handle„„„„„³ Assertion³any³ Extension´³rec´³named³label³any„´³named³fields´³seqof³any„„„³ TurnEvent´³tupleµ´³named³oid´³refµ„³Oid„„´³named³event´³refµ„³Event„„„„„³ embeddedType€„„µ³ dataspace„´³schema·³version°³ definitions·³Observe´³rec´³lit³Observe„´³tupleµ´³named³pattern´³refµ³dataspacePatterns„³Pattern„„´³named³observer´³embedded³any„„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³
gatekeeper„´³schema·³version°³ definitions·³Bind´³rec´³lit³bind„´³tupleµ´³named³ description´³refµ„³ Description„„´³named³target´³embedded³any„„´³named³observer´³refµ„³ BindObserver„„„„„³Step´³rec´³named³stepType´³atom³Symbol„„´³tupleµ´³named³detail³any„„„„³Bound´³orµµ±bound´³rec´³lit³bound„´³tupleµ´³named³pathStep´³refµ„³PathStep„„„„„„µ±Rejected´³refµ„³Rejected„„„„³Route´³rec´³lit³route„´³ tuplePrefixµ´³named³
transports´³seqof³any„„„´³named³ pathSteps´³seqof´³refµ„³PathStep„„„„„³Resolve´³rec´³lit³resolve„´³tupleµ´³named³step´³refµ„³Step„„´³named³observer´³embedded´³refµ„³Resolved„„„„„„³PathStep´³rec´³named³stepType´³atom³Symbol„„´³tupleµ´³named³detail³any„„„„³Rejected´³rec´³lit³rejected„´³tupleµ´³named³detail³any„„„„³Resolved´³orµµ±accepted´³rec´³lit³accepted„´³tupleµ´³named³responderSession´³embedded³any„„„„„„µ±Rejected´³refµ„³Rejected„„„„³ Description´³rec´³named³stepType´³atom³Symbol„„´³tupleµ´³named³detail³any„„„„³ ResolvePath´³rec´³lit³ resolve-path„´³tupleµ´³named³route´³refµ„³Route„„´³named³addr³any„´³named³control´³embedded´³refµ„³TransportControl„„„´³named³resolved´³refµ„³Resolved„„„„„³ BindObserver´³orµµ±present´³embedded´³refµ„³Bound„„„µ±absent´³lit€„„„„³ForceDisconnect´³rec´³lit³force-disconnect„´³tupleµ„„„³ResolvedPathStep´³rec´³lit³ path-step„´³tupleµ´³named³origin´³embedded´³refµ„³Resolve„„„´³named³pathStep´³refµ„³PathStep„„´³named³resolved´³refµ„³Resolved„„„„„³TransportControl´³refµ„³ForceDisconnect„³TransportConnection´³rec´³lit³connect-transport„´³tupleµ´³named³addr³any„´³named³control´³embedded´³refµ„³TransportControl„„„´³named³resolved´³refµ„³Resolved„„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³transportAddress„´³schema·³version°³ definitions·³Tcp´³rec´³lit³tcp„´³tupleµ´³named³host´³atom³String„„´³named³port´³atom³ SignedInteger„„„„„³Unix´³rec´³lit³unix„´³tupleµ´³named³path´³atom³String„„„„„³Stdio´³rec´³lit³stdio„´³tupleµ„„„³ WebSocket´³rec´³lit³ws„´³tupleµ´³named³url´³atom³String„„„„„„³ embeddedType€„„µ³dataspacePatterns„´³schema·³version°³ definitions·³DLit´³rec´³lit³lit„´³tupleµ´³named³value´³refµ„³AnyAtom„„„„„³DBind´³rec´³lit³bind„´³tupleµ´³named³pattern´³refµ„³Pattern„„„„„³AnyAtom´³orµµ±bool´³atom³Boolean„„µ±double´³atom³Double„„µ±int´³atom³ SignedInteger„„µ±string´³atom³String„„µ±bytes´³atom³
ByteString„„µ±symbol´³atom³Symbol„„µ±embedded´³embedded³any„„„„³Pattern´³orµµ±DDiscard´³refµ„³DDiscard„„µ±DBind´³refµ„³DBind„„µ±DLit´³refµ„³DLit„„µ± DCompound´³refµ„³ DCompound„„„„³DDiscard´³rec´³lit³_„´³tupleµ„„„³ DCompound´³orµµ±rec´³rec´³lit³rec„´³tupleµ´³named³label³any„´³named³fields´³seqof´³refµ„³Pattern„„„„„„„µ±arr´³rec´³lit³arr„´³tupleµ´³named³items´³seqof´³refµ„³Pattern„„„„„„„µ±dict´³rec´³lit³dict„´³tupleµ´³named³entries´³dictof³any´³refµ„³Pattern„„„„„„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„„„
transports´³seqof³any„„„´³named³ pathSteps´³seqof´³refµ„³PathStep„„„„„³Resolve´³rec´³lit³resolve„´³tupleµ´³named³step´³refµ„³Step„„´³named³observer´³embedded´³refµ„³Resolved„„„„„„³PathStep´³rec´³named³stepType´³atom³Symbol„„´³tupleµ´³named³detail³any„„„„³Rejected´³rec´³lit³rejected„´³tupleµ´³named³detail³any„„„„³Resolved´³orµµ±accepted´³rec´³lit³accepted„´³tupleµ´³named³responderSession´³embedded³any„„„„„„µ±Rejected´³refµ„³Rejected„„„„³ Description´³rec´³named³stepType´³atom³Symbol„„´³tupleµ´³named³detail³any„„„„³ ResolvePath´³rec´³lit³ resolve-path„´³tupleµ´³named³route´³refµ„³Route„„´³named³addr³any„´³named³control´³embedded´³refµ„³TransportControl„„„´³named³resolved´³refµ„³Resolved„„„„„³ BindObserver´³orµµ±present´³embedded´³refµ„³Bound„„„µ±absent´³lit€„„„„³ForceDisconnect´³rec´³lit³force-disconnect„´³tupleµ„„„³ResolvedPathStep´³rec´³lit³ path-step„´³tupleµ´³named³origin´³embedded´³refµ„³Resolve„„„´³named³pathStep´³refµ„³PathStep„„´³named³resolved´³refµ„³Resolved„„„„„³TransportControl´³refµ„³ForceDisconnect„³TransportConnection´³rec´³lit³connect-transport„´³tupleµ´³named³addr³any„´³named³control´³embedded´³refµ„³TransportControl„„„´³named³resolved´³refµ„³Resolved„„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„µ³transportAddress„´³schema·³version°³ definitions·³Tcp´³rec´³lit³tcp„´³tupleµ´³named³host´³atom³String„„´³named³port´³atom³ SignedInteger„„„„„³Unix´³rec´³lit³unix„´³tupleµ´³named³path´³atom³String„„„„„³Stdio´³rec´³lit³stdio„´³tupleµ„„„³ WebSocket´³rec´³lit³ws„´³tupleµ´³named³url´³atom³String„„„„„„³ embeddedType€„„µ³dataspacePatterns„´³schema·³version°³ definitions·³AnyAtom´³orµµ±bool´³atom³Boolean„„µ±double´³atom³Double„„µ±int´³atom³ SignedInteger„„µ±string´³atom³String„„µ±bytes´³atom³
ByteString„„µ±symbol´³atom³Symbol„„µ±embedded´³embedded³any„„„„³Pattern´³orµµ±discard´³rec´³lit³_„´³tupleµ„„„„µ±bind´³rec´³lit³bind„´³tupleµ´³named³pattern´³refµ„³Pattern„„„„„„µ±lit´³rec´³lit³lit„´³tupleµ´³named³value´³refµ„³AnyAtom„„„„„„µ±group´³rec´³lit³group„´³tupleµ´³named³type´³refµ„³ GroupType„„´³named³entries´³dictof³any´³refµ„³Pattern„„„„„„„„„³ GroupType´³orµµ±rec´³rec´³lit³rec„´³tupleµ´³named³label³any„„„„„µ±arr´³rec´³lit³arr„´³tupleµ„„„„µ±dict´³rec´³lit³dict„´³tupleµ„„„„„„„³ embeddedType´³refµ³ EntityRef„³Cap„„„„„

View File

@ -1,15 +1,23 @@
version 1 .
embeddedType EntityRef.Cap .
# Dataspace patterns: a sublanguage of attenuation patterns.
Pattern = DDiscard / DBind / DLit / DCompound .
# Dataspace patterns: *almost* a sublanguage of attenuation patterns.
#
# One key difference is that Dataspace patterns are extensible, in that
# they ignore fields not mentioned in group patterns.
DDiscard = <_>.
DBind = <bind @pattern Pattern>.
DLit = <lit @value AnyAtom>.
DCompound = <rec @label any @fields [Pattern ...]>
/ <arr @items [Pattern ...]>
/ <dict @entries { any: Pattern ...:... }> .
Pattern =
/ @discard <_>
/ <bind @pattern Pattern>
/ <lit @value AnyAtom>
/ <group @type GroupType @entries { any: Pattern ...:... }>
.
GroupType =
/ <rec @label any>
/ <arr>
/ <dict>
.
AnyAtom =
/ @bool bool

View File

@ -13,8 +13,37 @@ Step = <<rec> @stepType symbol [@detail any]> .
# ---------------------------------------------------------------------------
# Protocol at dataspaces *associated* with gatekeeper entities
# Assertion. Gatekeeper will compute an appropriate PathStep from `description` pointing at
# `target`, and will respond with a `Bound` to `observer` (if supplied).
# ## Handling `Resolve` requests
#
# When the gatekeeper entity receives a `Resolve` assertion (call it R1), it
#
# 1. asserts a `Resolve` (call it R2) into its associated dataspace that
# is the same as R1 except it has a different `observer`; and
#
# 2. observes a `Bind` with `description` matching the `step` of R1/R2
# according to `stepType` (e.g. treatment of SturdyStepType is not the
# same as treatment of NoiseStepType).
#
# Normally, an appropriate `Bind` is expected to exist. If the gatekeeper
# sees the `Bind` first, it takes the `target` from it and does whatever
# `stepType` mandates before replying to R1's observer.
#
# However, if a `Resolved` is asserted to R2's observer before a `Bind`
# appears, that resolution is relayed on to R1's observer directly, be it
# positive or negative, and the gatekeeper stops waiting for a `Bind`.
#
# This way, entities can keep an eye out for `Resolve` requests that will
# never complete, and answer `Rejected` to them even when no matching
# `Bind` exists. Entities could also use `Resolve` requests to synthesize a
# `Bind` in a "just-in-time" fashion.
#
# ## General treatment of `Bind` assertions
#
# When the gatekeeper sees a `Bind`, independently of any potential
# `Resolve` requests, it computes an appropriate PathStep from
# `description` pointing at `target`, and responds with a `Bound` to
# `observer` (if supplied).
#
Bind = <bind @description Description @target #:any @observer BindObserver> .
Description = <<rec> @stepType symbol [@detail any]> .
BindObserver = @present #:Bound / @absent #f .

View File

@ -21,7 +21,7 @@ MethodPattern = @any #f / @specific @"Lowercase" symbol .
# Assertion in driver DS
HttpRequest = <http-request
@sequenceNumber int
@host string
@host RequestHost
@port int
@method @"Lowercase" symbol
@path [string ...]
@ -32,13 +32,24 @@ HttpRequest = <http-request
Headers = {@"Lowercase" symbol: string ...:...} .
QueryValue = @string string / <file @filename string @headers Headers @body bytes> .
RequestBody = @present bytes / @absent #f .
RequestHost = @present string / @absent #f .
# Assertion to handler entity
HttpContext = <request @req HttpRequest @res #:HttpResponse> .
# HttpResponse protocol. Delivered to the `res` ref in `HttpContext`.
#
# (status | header)* . chunk* . done
#
# Done triggers completion of the response and retraction of the frame by the peer. If the
# HttpBinding responsible for the request is withdrawn mid-way through a response (i.e. when
# chunked transfer is used and at least one chunk has been sent) the request is abruptly
# closed; if it is withdrawn at any other moment in the lifetime of the request, a 500 Internal
# Server Error is send to the client.
#
@<TODO "trailers?">
# Messages
HttpResponse =
# Messages.
/ <status @code int @message string>
/ <header @name symbol @value string>
/ <chunk @chunk Chunk>

View File

@ -1,4 +1,5 @@
version 1 .
embeddedType EntityRef.Cap .
# https://noiseprotocol.org/
@ -42,13 +43,30 @@ DefaultProtocol = "Noise_NK_25519_ChaChaPoly_BLAKE2s" .
# sequence is exhausted or not supplied, an all-zeros key is used each time a PSK is needed.
NoisePreSharedKeys = @present { preSharedKeys: [bytes ...] } / @invalid { preSharedKeys: any } / @absent {} .
# Sessions proceed by sending Packets to the initiatorSession and responderSession according to
# the Noise protocol definition. Each Packet represents a complete logical unit of
# ---------------------------------------------------------------------------
# Handshaking and running a session
# 1. initiator asserts <resolve <noise ServiceSelector> #:A> at Gatekeeper
# 2. gatekeeper asserts <accepted #:B> at #:A
# 3. initiator asserts <initiator #:C> at #:B and then sends `Packet`s to #:B
# 4. responder sends `Packet`s to #:C
#
# Sessions begin with introduction of initiator (#:C) and responder (#:B) to each other, and
# then proceed by sending `Packet`s (from #:C) to #:B and (from #:B) to #:C according to
# the Noise protocol definition. Each `Packet` represents a complete logical unit of
# communication; for example, a complete Turn when layering the Syndicate protocol over Noise.
# Note well the restriction on Noise messages: no individual complete packet or packet fragment
# may exceed 65535 bytes (N.B. not 65536!). When `fragmented`, each portion of a Packet is a
# may exceed 65535 bytes (N.B. not 65536!). When `fragmented`, each portion of a `Packet` is a
# complete Noise "transport message"; when `complete`, the whole thing is likewise a complete
# "transport message".
#
# Retraction of the `Initiator` ends the session from the initiator-side; retraction of the
# `<accepted ...>` assertion ends the session from the responder-side.
SessionItem = Initiator / Packet .
# Assertion
Initiator = <initiator @initiatorSession #:Packet> .
# Message
Packet = @complete bytes / @fragmented [bytes ...] .
# When layering Syndicate protocol over noise,

View File

@ -1214,6 +1214,8 @@ impl Activation {
// just drop 'em so they don't run next time
std::mem::take(&mut self.commit_actions);
}
self.single_queue = None;
self.multiple_queues = None;
tracing::trace!("Activation::rollback complete");
}
@ -1510,8 +1512,7 @@ impl Activation {
trace::FacetStopReason::ExplicitAction)
}
/// Arranges for the [`Facet`] named by `facet_id` to be stopped cleanly when `self`
/// commits.
/// Cleanly stops the [`Facet`] named by `facet_id`.
///
/// Equivalent to `self.stop_facet_and_continue(facet_id, None)`, except that the lack of a
/// continuation means that there's no need for this method to return `ActorResult`.
@ -1520,15 +1521,15 @@ impl Activation {
.expect("Non-failing stop_facet_and_continue")
}
/// Arranges for the active facet to be stopped cleanly when `self` commits.
/// Cleanly stops the active facet.
///
/// Equivalent to `self.stop_facet(self.facet.facet_id)`.
/// Equivalent to `self.stop_facet(self.facet_id())`.
pub fn stop(&mut self) {
self.stop_facet(self.facet_id())
}
/// Arranges for the active actor's root facet to be stopped cleanly when `self` commits;
/// this is one way to arrange a clean shutdown of the entire actor.
/// Cleanly stops the active actor's root facet.
/// This is one way to arrange a clean shutdown of the entire actor.
///
/// Equivalent to `self.stop_facet(self.state.root)`.
pub fn stop_root(&mut self) {
@ -1838,7 +1839,16 @@ impl Activation {
) {
match &*exit_status {
ExitStatus::Normal => assert!(self.get_facet(self.root).is_none()),
ExitStatus::Dropped | ExitStatus::Error(_) => (),
ExitStatus::Dropped => {
// If we panicked, facet_id will be Some(_), but leaving it this way as we
// enter take_turn causes a nested panic, so we clear it here.
if self.facet_id.is_some() {
tracing::debug!(actor_id=?self.actor_id, facet_id=?self.facet_id,
"clearing facet_id (we must have panicked mid-Turn)");
self.facet_id = None;
}
}
ExitStatus::Error(_) => (),
}
let cause = Some(trace::TurnCause::Cleanup);

View File

@ -1,3 +1,5 @@
use std::sync::Arc;
use crate::schemas::dataspace_patterns::*;
use super::language;
@ -8,23 +10,25 @@ use preserves::value::Record;
use preserves::value::Value;
use preserves_schema::Codec;
#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq)]
pub enum PathStep {
Index(usize),
Key(_Any),
}
pub type PathStep = _Any;
pub type Path = Vec<PathStep>;
pub type Paths = Vec<Path>;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct ConstantPositions {
pub with_values: Paths,
pub required_to_exist: Paths,
}
struct Analyzer {
pub const_paths: Paths,
pub const_values: Vec<_Any>,
pub checked_paths: Paths,
pub capture_paths: Paths,
}
pub struct PatternAnalysis {
pub const_paths: Paths,
pub const_positions: Arc<ConstantPositions>,
pub const_values: _Any,
pub capture_paths: Paths,
}
@ -38,11 +42,15 @@ impl PatternAnalysis {
let mut analyzer = Analyzer {
const_paths: Vec::new(),
const_values: Vec::new(),
checked_paths: Vec::new(),
capture_paths: Vec::new(),
};
analyzer.walk(&mut Vec::new(), p);
PatternAnalysis {
const_paths: analyzer.const_paths,
const_positions: Arc::new(ConstantPositions {
with_values: analyzer.const_paths,
required_to_exist: analyzer.checked_paths,
}),
const_values: _Any::new(analyzer.const_values),
capture_paths: analyzer.capture_paths,
}
@ -58,34 +66,21 @@ impl Analyzer {
fn walk(&mut self, path: &mut Path, p: &Pattern) {
match p {
Pattern::DCompound(b) => match &**b {
DCompound::Rec { fields, .. } => {
for (i, p) in fields.iter().enumerate() {
self.walk_step(path, PathStep::Index(i), p);
}
}
DCompound::Arr { items, .. } => {
for (i, p) in items.iter().enumerate() {
self.walk_step(path, PathStep::Index(i), p);
}
}
DCompound::Dict { entries, .. } => {
for (k, p) in entries {
self.walk_step(path, PathStep::Key(k.clone()), p);
}
Pattern::Group { entries, .. } => {
for (k, p) in entries {
self.walk_step(path, k.clone(), p)
}
}
Pattern::DBind(b) => {
let DBind { pattern, .. } = &**b;
Pattern::Bind { pattern } => {
self.capture_paths.push(path.clone());
self.walk(path, pattern)
self.walk(path, &**pattern);
}
Pattern::DDiscard(_) =>
(),
Pattern::DLit(b) => {
let DLit { value } = &**b;
Pattern::Discard => {
self.checked_paths.push(path.clone());
}
Pattern::Lit { value } => {
self.const_paths.push(path.clone());
self.const_values.push(language().unparse(value));
self.const_values.push(language().unparse(&**value));
}
}
}
@ -109,52 +104,47 @@ impl PatternMatcher {
}
}
fn run_seq<'a, F: 'a + Fn(usize) -> &'a _Any>(&mut self, entries: &Map<_Any, Pattern<_Any>>, values: F) -> bool {
for (k, p) in entries {
match k.value().as_usize() {
None => return false,
Some(i) => if !self.run(p, values(i)) {
return false;
}
}
}
true
}
fn run(&mut self, pattern: &Pattern<_Any>, value: &_Any) -> bool {
match pattern {
Pattern::DDiscard(_) => true,
Pattern::DBind(b) => {
Pattern::Discard => true,
Pattern::Bind { pattern } => {
self.captures.push(value.clone());
self.run(&b.pattern, value)
self.run(&**pattern, value)
}
Pattern::DLit(b) => value == &language().unparse(&b.value),
Pattern::DCompound(b) => match &**b {
DCompound::Rec { label, fields } => {
match value.value().as_record(Some(fields.len())) {
Pattern::Lit { value: expected } => value == &language().unparse(&**expected),
Pattern::Group { type_, entries } => match &**type_ {
GroupType::Rec { label } => {
match value.value().as_record(None) {
None => false,
Some(r) => {
if r.label() != label {
return false;
}
for (i, p) in fields.iter().enumerate() {
if !self.run(p, &r.fields()[i]) {
return false;
}
}
true
}
Some(r) =>
r.label() == label &&
self.run_seq(entries, |i| &r.fields()[i])
}
}
DCompound::Arr { items } => {
GroupType::Arr => {
match value.value().as_sequence() {
None => false,
Some(vs) => {
if vs.len() != items.len() {
return false;
}
for (i, p) in items.iter().enumerate() {
if !self.run(p, &vs[i]) {
return false;
}
}
true
}
Some(vs) =>
self.run_seq(entries, |i| &vs[i])
}
}
DCompound::Dict { entries: expected_entries } => {
GroupType::Dict => {
match value.value().as_dictionary() {
None => false,
Some(actual_entries) => {
for (k, p) in expected_entries.iter() {
for (k, p) in entries {
if !actual_entries.get(k).map(|v| self.run(p, v)).unwrap_or(false) {
return false;
}
@ -170,42 +160,68 @@ impl PatternMatcher {
pub fn lift_literal(v: &_Any) -> Pattern {
match v.value() {
Value::Record(r) => Pattern::DCompound(Box::new(DCompound::Rec {
label: r.label().clone(),
fields: r.fields().iter().map(lift_literal).collect(),
})),
Value::Sequence(items) => Pattern::DCompound(Box::new(DCompound::Arr {
items: items.iter().map(lift_literal).collect(),
})),
Value::Record(r) => Pattern::Group {
type_: Box::new(GroupType::Rec { label: r.label().clone() }),
entries: r.fields().iter().enumerate()
.map(|(i, v)| (_Any::new(i), lift_literal(v)))
.collect(),
},
Value::Sequence(items) => Pattern::Group {
type_: Box::new(GroupType::Arr),
entries: items.iter().enumerate()
.map(|(i, v)| (_Any::new(i), lift_literal(v)))
.collect(),
},
Value::Set(_members) => panic!("Cannot express literal set in pattern"),
Value::Dictionary(entries) => Pattern::DCompound(Box::new(DCompound::Dict {
entries: entries.iter().map(|(k, v)| (k.clone(), lift_literal(v))).collect(),
})),
_other => Pattern::DLit(Box::new(DLit {
value: language().parse(v).expect("Non-compound datum can be converted to AnyAtom"),
})),
Value::Dictionary(entries) => Pattern::Group {
type_: Box::new(GroupType::Dict),
entries: entries.iter()
.map(|(k, v)| (k.clone(), lift_literal(v)))
.collect(),
},
_other => Pattern::Lit {
value: Box::new(language().parse(v).expect("Non-compound datum can be converted to AnyAtom")),
},
}
}
const DISCARD: Pattern = Pattern::Discard;
pub fn pattern_seq_from_dictionary(entries: &Map<_Any, Pattern>) -> Option<Vec<&Pattern>> {
let mut max_k: Option<usize> = None;
for k in entries.keys() {
max_k = max_k.max(Some(k.value().as_usize()?));
}
let mut seq = vec![];
if let Some(max_k) = max_k {
seq.reserve(max_k + 1);
for i in 0..=max_k {
seq.push(entries.get(&_Any::new(i)).unwrap_or(&DISCARD));
}
}
return Some(seq);
}
fn drop_literal_entries_seq(mut seq: Vec<_Any>, entries: &Map<_Any, Pattern>) -> Option<Vec<_Any>> {
for p in pattern_seq_from_dictionary(entries)?.into_iter() {
seq.push(drop_literal(p)?);
}
Some(seq)
}
pub fn drop_literal(p: &Pattern) -> Option<_Any> {
match p {
Pattern::DCompound(b) => match &**b {
DCompound::Rec { label, fields } => {
let mut r = vec![label.clone()];
for f in fields.iter() {
r.push(drop_literal(f)?);
}
Some(Value::Record(Record(r)).wrap())
}
DCompound::Arr { items } =>
Some(Value::Sequence(items.iter().map(drop_literal)
.collect::<Option<Vec<_Any>>>()?).wrap()),
DCompound::Dict { entries } =>
Some(Value::Dictionary(entries.iter()
.map(|(k, p)| Some((k.clone(), drop_literal(p)?)))
.collect::<Option<Map<_Any, _Any>>>()?).wrap()),
Pattern::Group { type_, entries } => match &**type_ {
GroupType::Rec { label } =>
Some(Value::Record(Record(drop_literal_entries_seq(vec![label.clone()], entries)?)).wrap()),
GroupType::Arr =>
Some(Value::Sequence(drop_literal_entries_seq(vec![], entries)?).wrap()),
GroupType::Dict =>
Some(Value::Dictionary(entries.iter()
.map(|(k, p)| Some((k.clone(), drop_literal(p)?)))
.collect::<Option<Map<_Any, _Any>>>()?).wrap()),
},
Pattern::DLit(b) => Some(language().unparse(&b.value)),
Pattern::Lit { value } => Some(language().unparse(&**value)),
_ => None,
}
}

View File

@ -29,6 +29,7 @@ use preserves::value::Map;
use preserves::value::NestedValue;
use preserves::value::NoEmbeddedDomainCodec;
use preserves::value::PackedWriter;
use preserves::value::Set;
use preserves::value::TextWriter;
use preserves::value::ViaCodec;
use preserves::value::Writer;
@ -76,6 +77,7 @@ struct Membranes {
exported: Membrane,
imported: Membrane,
next_export_oid: usize,
reimported_attenuations: Map<sturdy::Oid, Set<Arc<Cap>>>,
}
pub enum Input {
@ -172,6 +174,11 @@ impl Membrane {
ws
}
fn remove(&mut self, ws: &Arc<WireSymbol>) {
self.oid_map.remove(&ws.oid);
self.ref_map.remove(&ws.obj);
}
fn insert_inert_entity(&mut self, t: &mut Activation, oid: sturdy::Oid) -> Arc<WireSymbol> {
self.insert(oid, Cap::new(&t.inert_entity()))
}
@ -221,7 +228,57 @@ impl std::fmt::Debug for Membrane {
macro_rules! dump_membranes { ($e:expr) => { tracing::trace!("membranes: {:#?}", $e); } }
// macro_rules! dump_membranes { ($e:expr) => { (); } }
/// Main entry point for stdio-based Syndicate services.
pub async fn stdio_service<F>(f: F) -> !
where
F: 'static + Send + FnOnce(&mut Activation) -> Result<Arc<Cap>, ActorError>
{
let result = Actor::top(None, move |t| {
let service = f(t)?;
Ok(TunnelRelay::stdio_service(t, service))
}).await;
// Because we're currently using tokio::io::stdin(), which can prevent shutdown of the
// runtime, this routine uses std::process::exit directly as a special case. It's a
// stopgap: eventually, we'd like to do things Properly, as indicated in the comment
// attached (at the time of writing) to tokio::io::stdin(), which reads in part:
//
// This handle is best used for non-interactive uses, such as when a file
// is piped into the application. For technical reasons, `stdin` is
// implemented by using an ordinary blocking read on a separate thread, and
// it is impossible to cancel that read. This can make shutdown of the
// runtime hang until the user presses enter.
//
// For interactive uses, it is recommended to spawn a thread dedicated to
// user input and use blocking IO directly in that thread.
//
// TODO: Revisit this.
match result {
Ok(Ok(())) => {
std::process::exit(0);
}
Ok(Err(e)) => {
tracing::error!("Main stdio_service actor failed: {}", e);
std::process::exit(1);
},
Err(e) => {
tracing::error!("Join of main stdio_service actor failed: {}", e);
std::process::exit(2);
}
}
}
impl TunnelRelay {
pub fn stdio_service(t: &mut Activation, service: Arc<Cap>) -> () {
TunnelRelay::run(t,
Input::Bytes(Box::pin(tokio::io::stdin())),
Output::Bytes(Box::pin(tokio::io::stdout())),
Some(service),
None,
false);
}
pub fn run(
t: &mut Activation,
i: Input,
@ -259,6 +316,7 @@ impl TunnelRelay {
exported: Membrane::new(WireSymbolSide::Exported),
imported: Membrane::new(WireSymbolSide::Imported),
next_export_oid: 0,
reimported_attenuations: Map::new(),
},
pending_outbound: Vec::new(),
};
@ -550,9 +608,10 @@ impl Membranes {
#[inline]
fn release_one(&mut self, ws: Arc<WireSymbol>) -> bool {
if ws.dec_ref() {
let membrane = self.membrane(ws.side);
membrane.oid_map.remove(&ws.oid);
membrane.ref_map.remove(&ws.obj);
if let WireSymbolSide::Exported = ws.side {
self.reimported_attenuations.remove(&ws.oid);
}
self.membrane(ws.side).remove(&ws);
true
} else {
false
@ -573,34 +632,43 @@ impl Membranes {
src: &'src mut S,
_read_annotations: bool,
) -> io::Result<Arc<Cap>> {
let ws = match sturdy::WireRef::deserialize(&mut src.packed(NoEmbeddedDomainCodec))? {
match sturdy::WireRef::deserialize(&mut src.packed(NoEmbeddedDomainCodec))? {
sturdy::WireRef::Mine{ oid: b } => {
let oid = *b;
self.imported.oid_map.get(&oid).map(Arc::clone)
.unwrap_or_else(|| self.import_oid(t, relay_ref, oid))
let ws = self.imported.oid_map.get(&oid).map(Arc::clone)
.unwrap_or_else(|| self.import_oid(t, relay_ref, oid));
Ok(Arc::clone(&ws.inc_ref().obj))
}
sturdy::WireRef::Yours { oid: b, attenuation } => {
let oid = *b;
let ws = self.exported.oid_map.get(&oid).map(Arc::clone)
.unwrap_or_else(|| self.exported.insert_inert_entity(t, oid.clone()));
if attenuation.is_empty() {
self.exported.oid_map.get(&oid).map(Arc::clone).unwrap_or_else(
|| self.exported.insert_inert_entity(t, oid))
Ok(Arc::clone(&ws.inc_ref().obj))
} else {
match self.exported.oid_map.get(&oid) {
None => self.exported.insert_inert_entity(t, oid),
Some(ws) => {
let attenuated_obj = ws.obj.attenuate(&attenuation)
.map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!("Invalid capability attenuation: {:?}", e))
})?;
self.exported.insert(oid, attenuated_obj)
let attenuated_obj = ws.obj.attenuate(&attenuation)
.map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!("Invalid capability attenuation: {:?}", e))
})?;
ws.inc_ref();
let variations = self.reimported_attenuations.entry(oid).or_default();
match variations.get(&attenuated_obj) {
None => {
variations.insert(Arc::clone(&attenuated_obj));
self.exported.ref_map.insert(Arc::clone(&attenuated_obj), Arc::clone(&ws));
Ok(attenuated_obj)
}
Some(existing) =>
Ok(Arc::clone(existing))
}
}
}
};
Ok(Arc::clone(&ws.inc_ref().obj))
}
}
}

View File

@ -16,19 +16,12 @@ use crate::actor::Activation;
use crate::actor::Handle;
use crate::actor::Cap;
use crate::schemas::dataspace_patterns as ds;
use crate::pattern::{self, PathStep, Path, Paths};
use crate::pattern::{self, ConstantPositions, PathStep, Path, Paths};
type Bag<A> = bag::BTreeBag<A>;
type Captures = AnyValue;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
enum Guard {
Rec(AnyValue, usize),
Seq(usize),
Map,
}
/// Index of assertions and [`Observe`rs][crate::schemas::dataspace::Observe].
///
/// Generally speaking, you will not need to use this structure;
@ -44,13 +37,13 @@ pub struct Index {
#[derive(Debug)]
struct Node {
continuation: Continuation,
edges: Map<Selector, Map<Guard, Node>>,
edges: Map<Selector, Map<ds::GroupType, Node>>,
}
#[derive(Debug)]
struct Continuation {
cached_assertions: Set<AnyValue>,
leaf_map: Map<Paths, Map<Captures, Leaf>>,
leaf_map: Map<Arc<ConstantPositions>, Map<Captures, Leaf>>,
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
@ -205,7 +198,7 @@ impl Node {
}
fn extend(&mut self, pat: &ds::Pattern) -> &mut Continuation {
let (_pop_count, final_node) = self.extend_walk(&mut Vec::new(), 0, PathStep::Index(0), pat);
let (_pop_count, final_node) = self.extend_walk(&mut Vec::new(), 0, PathStep::new(0), pat);
&mut final_node.continuation
}
@ -216,23 +209,13 @@ impl Node {
step: PathStep,
pat: &ds::Pattern,
) -> (usize, &mut Node) {
let (guard, members): (Guard, Vec<(PathStep, &ds::Pattern)>) = match pat {
ds::Pattern::DCompound(b) => match &**b {
ds::DCompound::Arr { items } =>
(Guard::Seq(items.len()),
items.iter().enumerate().map(|(i, p)| (PathStep::Index(i), p)).collect()),
ds::DCompound::Rec { label, fields } =>
(Guard::Rec(label.clone(), fields.len()),
fields.iter().enumerate().map(|(i, p)| (PathStep::Index(i), p)).collect()),
ds::DCompound::Dict { entries, .. } =>
(Guard::Map,
entries.iter().map(|(k, p)| (PathStep::Key(k.clone()), p)).collect()),
}
ds::Pattern::DBind(b) => {
let ds::DBind { pattern, .. } = &**b;
return self.extend_walk(path, pop_count, step, pattern);
}
ds::Pattern::DDiscard(_) | ds::Pattern::DLit(_) =>
let (guard, members): (ds::GroupType, Vec<(PathStep, &ds::Pattern)>) = match pat {
ds::Pattern::Group { type_, entries } =>
((&**type_).clone(),
entries.iter().map(|(k, p)| (k.clone(), p)).collect()),
ds::Pattern::Bind { pattern } =>
return self.extend_walk(path, pop_count, step, &**pattern),
ds::Pattern::Discard | ds::Pattern::Lit { .. } =>
return (pop_count, self),
};
@ -336,41 +319,46 @@ where FCont: FnMut(&mut Continuation, &AnyValue) -> (),
fn continuation(&mut self, c: &mut Continuation) {
(self.m_cont)(c, self.outer_value);
let mut empty_const_paths = Vec::new();
for (const_paths, const_val_map) in &mut c.leaf_map {
if let Some(const_vals) = project_paths(self.outer_value, const_paths) {
let leaf_opt = if self.create_leaf_if_absent {
Some(const_val_map.entry(const_vals.clone()).or_insert_with(Leaf::new))
} else {
const_val_map.get_mut(&const_vals)
};
if let Some(leaf) = leaf_opt {
(self.m_leaf)(leaf, self.outer_value);
for (capture_paths, endpoints) in &mut leaf.endpoints_map {
if let Some(cs) = project_paths(self.outer_value, &capture_paths) {
(self.m_endpoints)(endpoints, cs);
}
let mut empty_const_positions = Vec::new();
for (const_positions, const_val_map) in &mut c.leaf_map {
if project_paths(self.outer_value, &const_positions.required_to_exist).is_none() {
continue;
}
let const_vals = match project_paths(self.outer_value, &const_positions.with_values) {
Some(vs) => vs,
None => continue,
};
let leaf_opt = if self.create_leaf_if_absent {
Some(const_val_map.entry(const_vals.clone()).or_insert_with(Leaf::new))
} else {
const_val_map.get_mut(&const_vals)
};
if let Some(leaf) = leaf_opt {
(self.m_leaf)(leaf, self.outer_value);
for (capture_paths, endpoints) in &mut leaf.endpoints_map {
if let Some(cs) = project_paths(self.outer_value, &capture_paths) {
(self.m_endpoints)(endpoints, cs);
}
if leaf.is_empty() {
const_val_map.remove(&const_vals);
if const_val_map.is_empty() {
empty_const_paths.push(const_paths.clone());
}
}
if leaf.is_empty() {
const_val_map.remove(&const_vals);
if const_val_map.is_empty() {
empty_const_positions.push(const_positions.clone());
}
}
}
}
for const_paths in empty_const_paths {
c.leaf_map.remove(&const_paths);
for const_positions in empty_const_positions {
c.leaf_map.remove(&const_positions);
}
}
}
fn class_of(v: &AnyValue) -> Option<Guard> {
fn class_of(v: &AnyValue) -> Option<ds::GroupType> {
match v.value() {
Value::Sequence(vs) => Some(Guard::Seq(vs.len())),
Value::Record(r) => Some(Guard::Rec(r.label().clone(), r.arity())),
Value::Dictionary(_) => Some(Guard::Map),
Value::Sequence(_) => Some(ds::GroupType::Arr),
Value::Record(r) => Some(ds::GroupType::Rec { label: r.label().clone() }),
Value::Dictionary(_) => Some(ds::GroupType::Dict),
_ => None,
}
}
@ -398,15 +386,17 @@ fn project_paths<'a>(v: &'a AnyValue, ps: &Paths) -> Option<Captures> {
}
fn step<'a>(v: &'a AnyValue, s: &PathStep) -> Option<&'a AnyValue> {
match (v.value(), s) {
(Value::Sequence(vs), PathStep::Index(i)) =>
if *i < vs.len() { Some(&vs[*i]) } else { None },
(Value::Record(r), PathStep::Index(i)) =>
if *i < r.arity() { Some(&r.fields()[*i]) } else { None },
(Value::Dictionary(m), PathStep::Key(k)) =>
m.get(k),
_ =>
None,
match v.value() {
Value::Sequence(vs) => {
let i = s.value().as_usize()?;
if i < vs.len() { Some(&vs[i]) } else { None }
}
Value::Record(r) => {
let i = s.value().as_usize()?;
if i < r.arity() { Some(&r.fields()[i]) } else { None }
}
Value::Dictionary(m) => m.get(s),
_ => None,
}
}
@ -423,11 +413,14 @@ impl Continuation {
) {
let cached_assertions = &self.cached_assertions;
let const_val_map =
self.leaf_map.entry(analysis.const_paths.clone()).or_insert_with({
self.leaf_map.entry(analysis.const_positions.clone()).or_insert_with({
|| {
let mut cvm = Map::new();
for a in cached_assertions {
if let Some(key) = project_paths(a, &analysis.const_paths) {
if project_paths(a, &analysis.const_positions.required_to_exist).is_none() {
continue;
}
if let Some(key) = project_paths(a, &analysis.const_positions.with_values) {
cvm.entry(key).or_insert_with(Leaf::new)
.cached_assertions.insert(a.clone());
}
@ -462,7 +455,7 @@ impl Continuation {
observer: &Arc<Cap>,
) {
if let Entry::Occupied(mut const_val_map_entry)
= self.leaf_map.entry(analysis.const_paths)
= self.leaf_map.entry(analysis.const_positions)
{
let const_val_map = const_val_map_entry.get_mut();
if let Entry::Occupied(mut leaf_entry)