Compare commits
73 Commits
syndicate-
...
main
Author | SHA1 | Date |
---|---|---|
Tony Garnock-Jones | 6468e16790 | |
Tony Garnock-Jones | 65101e900e | |
Tony Garnock-Jones | 581886835a | |
Tony Garnock-Jones | dcb1aec142 | |
Tony Garnock-Jones | c0239cf322 | |
Tony Garnock-Jones | 9cc4175f24 | |
Tony Garnock-Jones | 70f42dd931 | |
Tony Garnock-Jones | ef1ebe6412 | |
Tony Garnock-Jones | deec008c66 | |
Tony Garnock-Jones | 008671d0b2 | |
Tony Garnock-Jones | 9fcf22e1b5 | |
Tony Garnock-Jones | ca18ca08df | |
Tony Garnock-Jones | 40ca168eac | |
Tony Garnock-Jones | 5a73e8d4c3 | |
Tony Garnock-Jones | 91b26001d8 | |
Tony Garnock-Jones | b83b39515d | |
Tony Garnock-Jones | d9fa6362af | |
Tony Garnock-Jones | 94598a574b | |
Tony Garnock-Jones | 80ad0914ed | |
Tony Garnock-Jones | bdb0cc1023 | |
Tony Garnock-Jones | 710ff91a64 | |
Tony Garnock-Jones | d3748a286b | |
Tony Garnock-Jones | a56aec2c30 | |
Tony Garnock-Jones | 0c06ae9601 | |
Tony Garnock-Jones | 1f0c9d2883 | |
Tony Garnock-Jones | 615830f799 | |
Tony Garnock-Jones | 3c44768a72 | |
Tony Garnock-Jones | 04bb8c2f23 | |
Tony Garnock-Jones | 9084c1781e | |
Tony Garnock-Jones | 8a817fcb4f | |
Tony Garnock-Jones | 2ed2b38edc | |
Tony Garnock-Jones | 5090625f47 | |
Tony Garnock-Jones | a7ede65bad | |
Tony Garnock-Jones | c59e044695 | |
Tony Garnock-Jones | ef98217a3a | |
Tony Garnock-Jones | bf0d47f1b7 | |
Tony Garnock-Jones | fef41f39eb | |
Tony Garnock-Jones | 0b72b4029b | |
Tony Garnock-Jones | 40a239c9eb | |
Tony Garnock-Jones | 55456621d4 | |
Tony Garnock-Jones | 7797a3cd09 | |
Tony Garnock-Jones | eb9d9bed0f | |
Tony Garnock-Jones | b96c469ef5 | |
Tony Garnock-Jones | 34f611f4fe | |
Tony Garnock-Jones | 58c24c30c4 | |
Tony Garnock-Jones | fa990bc042 | |
Tony Garnock-Jones | 060ba36d2e | |
Tony Garnock-Jones | ecd5e87823 | |
Tony Garnock-Jones | a401e5fcd1 | |
Tony Garnock-Jones | 5db05b2df2 | |
Tony Garnock-Jones | f4a4b4d595 | |
Tony Garnock-Jones | b7d4bd4b58 | |
Tony Garnock-Jones | 41cf85f865 | |
Tony Garnock-Jones | 4fcb14d63e | |
Tony Garnock-Jones | b4f355aa0d | |
Tony Garnock-Jones | 5a431b2060 | |
Tony Garnock-Jones | 1ff222b291 | |
Tony Garnock-Jones | e501d0f76a | |
Tony Garnock-Jones | 2e65d31d5d | |
Tony Garnock-Jones | 852f0f4722 | |
Tony Garnock-Jones | 9850c73993 | |
Tony Garnock-Jones | 9864ce0ec8 | |
Tony Garnock-Jones | 19b1e84e43 | |
Tony Garnock-Jones | 3649cc1237 | |
Tony Garnock-Jones | 0f2d9239f9 | |
Tony Garnock-Jones | 0514f11d0f | |
Tony Garnock-Jones | 12428bbdf6 | |
Tony Garnock-Jones | 5dd68e87c1 | |
Tony Garnock-Jones | e2a32b891d | |
Tony Garnock-Jones | 461ac034f8 | |
Tony Garnock-Jones | 19cbceda7a | |
Tony Garnock-Jones | 97876335ba | |
Tony Garnock-Jones | d7b330e6dd |
File diff suppressed because it is too large
Load Diff
14
Makefile
14
Makefile
|
@ -32,7 +32,7 @@ pull-protocols:
|
|||
static: static-x86_64
|
||||
|
||||
static-%:
|
||||
cross build --target $*-unknown-linux-musl --features vendored-openssl,jemalloc
|
||||
CARGO_TARGET_DIR=target/target.$* cross build --target $*-unknown-linux-musl --features vendored-openssl,jemalloc
|
||||
|
||||
###########################################################################
|
||||
|
||||
|
@ -54,18 +54,18 @@ static-%:
|
|||
x86_64-binary: x86_64-binary-release
|
||||
|
||||
x86_64-binary-release:
|
||||
cross build --target x86_64-unknown-linux-musl --release --all-targets --features vendored-openssl,jemalloc
|
||||
CARGO_TARGET_DIR=target/target.x86_64 cross build --target x86_64-unknown-linux-musl --release --all-targets --features vendored-openssl,jemalloc
|
||||
|
||||
x86_64-binary-debug:
|
||||
cross build --target x86_64-unknown-linux-musl --all-targets --features vendored-openssl
|
||||
CARGO_TARGET_DIR=target/target.x86_64 cross build --target x86_64-unknown-linux-musl --all-targets --features vendored-openssl
|
||||
|
||||
armv7-binary: armv7-binary-release
|
||||
|
||||
armv7-binary-release:
|
||||
cross build --target=armv7-unknown-linux-musleabihf --release --all-targets --features vendored-openssl
|
||||
CARGO_TARGET_DIR=target/target.armv7 cross build --target=armv7-unknown-linux-musleabihf --release --all-targets --features vendored-openssl
|
||||
|
||||
armv7-binary-debug:
|
||||
cross build --target=armv7-unknown-linux-musleabihf --all-targets --features vendored-openssl
|
||||
CARGO_TARGET_DIR=target/target.armv7 cross build --target=armv7-unknown-linux-musleabihf --all-targets --features vendored-openssl
|
||||
|
||||
# As of 2023-05-12 (and probably earlier!) this is no longer required with current Rust nightlies
|
||||
# # Hack to workaround https://github.com/rust-embedded/cross/issues/598
|
||||
|
@ -74,7 +74,7 @@ armv7-binary-debug:
|
|||
aarch64-binary: aarch64-binary-release
|
||||
|
||||
aarch64-binary-release:
|
||||
cross build --target=aarch64-unknown-linux-musl --release --all-targets --features vendored-openssl,jemalloc
|
||||
CARGO_TARGET_DIR=target/target.aarch64 cross build --target=aarch64-unknown-linux-musl --release --all-targets --features vendored-openssl,jemalloc
|
||||
|
||||
aarch64-binary-debug:
|
||||
cross build --target=aarch64-unknown-linux-musl --all-targets --features vendored-openssl
|
||||
CARGO_TARGET_DIR=target/target.aarch64 cross build --target=aarch64-unknown-linux-musl --all-targets --features vendored-openssl
|
||||
|
|
|
@ -1,2 +1,7 @@
|
|||
#!/bin/sh
|
||||
make -C ../syndicate-server binary && exec taskset -c 0,1 ../target/release/syndicate-server -c benchmark-config.pr "$@"
|
||||
TASKSET='taskset -c 0,1'
|
||||
if [ $(uname -s) = 'Darwin' ]
|
||||
then
|
||||
TASKSET=
|
||||
fi
|
||||
make -C ../syndicate-server binary && exec $TASKSET ../target/release/syndicate-server -c benchmark-config.pr "$@"
|
||||
|
|
|
@ -10,7 +10,6 @@ all:
|
|||
|
||||
clean:
|
||||
rm -f syndicate-server.*
|
||||
rm -rf $(patsubst %,target.%,$(ARCHITECTURES))
|
||||
-podman images -q $(U)/$(I) | sort -u | xargs podman rmi -f
|
||||
|
||||
image: $(SERVERS)
|
||||
|
@ -34,5 +33,5 @@ push-only:
|
|||
podman manifest push $(U)/$(I):latest
|
||||
|
||||
syndicate-server.%:
|
||||
make -C .. CARGO_TARGET_DIR=docker/target.$* $$(./alpine-architecture $*)-binary-release
|
||||
cp -a target.$*/$$(./alpine-architecture $*)-unknown-linux-musl*/release/syndicate-server $@
|
||||
make -C .. $$(./alpine-architecture $*)-binary-release
|
||||
cp -a ../target/target.$$(./alpine-architecture $*)/$$(./alpine-architecture $*)-unknown-linux-musl*/release/syndicate-server $@
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "syndicate-macros"
|
||||
version = "0.25.0"
|
||||
version = "0.32.0"
|
||||
authors = ["Tony Garnock-Jones <tonyg@leastfixedpoint.com>"]
|
||||
edition = "2018"
|
||||
|
||||
|
@ -13,7 +13,7 @@ license = "Apache-2.0"
|
|||
proc-macro = true
|
||||
|
||||
[dependencies]
|
||||
syndicate = { path = "../syndicate", version = "0.31.0"}
|
||||
syndicate = { path = "../syndicate", version = "0.40.0"}
|
||||
|
||||
proc-macro2 = { version = "^1.0", features = ["span-locations"] }
|
||||
quote = "^1.0"
|
||||
|
|
|
@ -5,11 +5,11 @@ use std::sync::Arc;
|
|||
#[derive(Debug)]
|
||||
enum Instruction {
|
||||
SetPeer(Arc<Ref<Instruction>>),
|
||||
HandleMessage(u32),
|
||||
HandleMessage(u64),
|
||||
}
|
||||
|
||||
struct Forwarder {
|
||||
n_rounds: u32,
|
||||
hop_limit: u64,
|
||||
supervisor: Arc<Ref<Instruction>>,
|
||||
peer: Option<Arc<Ref<Instruction>>>,
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ impl Entity<Instruction> for Forwarder {
|
|||
self.peer = Some(r);
|
||||
}
|
||||
Instruction::HandleMessage(n) => {
|
||||
let target = if n >= self.n_rounds { &self.supervisor } else { self.peer.as_ref().expect("peer") };
|
||||
let target = if n >= self.hop_limit { &self.supervisor } else { self.peer.as_ref().expect("peer") };
|
||||
turn.message(target, Instruction::HandleMessage(n + 1));
|
||||
}
|
||||
}
|
||||
|
@ -40,8 +40,8 @@ impl Entity<Instruction> for Forwarder {
|
|||
}
|
||||
|
||||
struct Supervisor {
|
||||
n_actors: u32,
|
||||
n_rounds: u32,
|
||||
latency_mode: bool,
|
||||
total_transfers: u64,
|
||||
remaining_to_receive: u32,
|
||||
start_time: Option<std::time::Instant>,
|
||||
}
|
||||
|
@ -58,11 +58,11 @@ impl Entity<Instruction> for Supervisor {
|
|||
if self.remaining_to_receive == 0 {
|
||||
let stop_time = std::time::Instant::now();
|
||||
let duration = stop_time - self.start_time.unwrap();
|
||||
let n_messages: u64 = self.n_actors as u64 * self.n_rounds as u64;
|
||||
tracing::info!("Stop after {:?}; {:?} messages, so {:?} Hz",
|
||||
tracing::info!("Stop after {:?}; {:?} messages, so {:?} Hz ({} mode)",
|
||||
duration,
|
||||
n_messages,
|
||||
(1000.0 * n_messages as f64) / duration.as_millis() as f64);
|
||||
self.total_transfers,
|
||||
(1000.0 * self.total_transfers as f64) / duration.as_millis() as f64,
|
||||
if self.latency_mode { "latency" } else { "throughput" });
|
||||
turn.stop_root();
|
||||
}
|
||||
},
|
||||
|
@ -78,12 +78,24 @@ async fn main() -> ActorResult {
|
|||
let args: Vec<String> = env::args().collect();
|
||||
let n_actors: u32 = args.get(1).unwrap_or(&"1000000".to_string()).parse()?;
|
||||
let n_rounds: u32 = args.get(2).unwrap_or(&"200".to_string()).parse()?;
|
||||
let latency_mode: bool = match args.get(3).unwrap_or(&"throughput".to_string()).as_str() {
|
||||
"latency" => true,
|
||||
"throughput" => false,
|
||||
_other => return Err("Invalid throughput/latency mode".into()),
|
||||
};
|
||||
tracing::info!("Will run {:?} actors for {:?} rounds", n_actors, n_rounds);
|
||||
|
||||
let total_transfers: u64 = n_actors as u64 * n_rounds as u64;
|
||||
let (hop_limit, injection_count) = if latency_mode {
|
||||
(total_transfers, 1)
|
||||
} else {
|
||||
(n_rounds as u64, n_actors)
|
||||
};
|
||||
|
||||
let me = t.create(Supervisor {
|
||||
n_actors,
|
||||
n_rounds,
|
||||
remaining_to_receive: n_actors,
|
||||
latency_mode,
|
||||
total_transfers,
|
||||
remaining_to_receive: injection_count,
|
||||
start_time: None,
|
||||
});
|
||||
|
||||
|
@ -93,7 +105,7 @@ async fn main() -> ActorResult {
|
|||
forwarders.push(
|
||||
t.spawn_for_entity(None, true, Box::new(
|
||||
Forwarder {
|
||||
n_rounds,
|
||||
hop_limit,
|
||||
supervisor: me.clone(),
|
||||
peer: forwarders.last().cloned(),
|
||||
}))
|
||||
|
@ -103,8 +115,13 @@ async fn main() -> ActorResult {
|
|||
t.later(move |t| {
|
||||
t.message(&me, Instruction::SetPeer(me.clone()));
|
||||
t.later(move |t| {
|
||||
let mut injected: u32 = 0;
|
||||
for f in forwarders.into_iter() {
|
||||
if injected >= injection_count {
|
||||
break;
|
||||
}
|
||||
t.message(&f, Instruction::HandleMessage(0));
|
||||
injected += 1;
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
|
|
|
@ -0,0 +1,175 @@
|
|||
use std::env;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
|
||||
|
||||
type Ref<T> = UnboundedSender<Box<T>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum Instruction {
|
||||
SetPeer(Arc<Ref<Instruction>>),
|
||||
HandleMessage(u64),
|
||||
}
|
||||
|
||||
struct Forwarder {
|
||||
hop_limit: u64,
|
||||
supervisor: Arc<Ref<Instruction>>,
|
||||
peer: Option<Arc<Ref<Instruction>>>,
|
||||
}
|
||||
|
||||
impl Drop for Forwarder {
|
||||
fn drop(&mut self) {
|
||||
let r = self.peer.take();
|
||||
let _ = tokio::spawn(async move {
|
||||
drop(r);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
enum Action { Continue, Stop }
|
||||
|
||||
trait Actor<T> {
|
||||
fn message(&mut self, message: T) -> Action;
|
||||
}
|
||||
|
||||
fn send<T: std::marker::Send + 'static>(ch: &Arc<Ref<T>>, message: T) -> () {
|
||||
match ch.send(Box::new(message)) {
|
||||
Ok(()) => (),
|
||||
Err(v) => panic!("Aiee! Could not send {:?}", v),
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn<T: std::marker::Send + 'static, R: Actor<T> + std::marker::Send + 'static>(rt: Option<Arc<AtomicU64>>, mut ac: R) -> Arc<Ref<T>> {
|
||||
let (tx, mut rx) = unbounded_channel::<Box<T>>();
|
||||
if let Some(ref c) = rt {
|
||||
c.fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
None => break,
|
||||
Some(message) => {
|
||||
match ac.message(*message) {
|
||||
Action::Continue => continue,
|
||||
Action::Stop => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(c) = rt {
|
||||
c.fetch_sub(1, Ordering::SeqCst);
|
||||
}
|
||||
});
|
||||
Arc::new(tx)
|
||||
}
|
||||
|
||||
impl Actor<Instruction> for Forwarder {
|
||||
fn message(&mut self, message: Instruction) -> Action {
|
||||
match message {
|
||||
Instruction::SetPeer(r) => {
|
||||
tracing::info!("Setting peer {:?}", r);
|
||||
self.peer = Some(r);
|
||||
}
|
||||
Instruction::HandleMessage(n) => {
|
||||
let target = if n >= self.hop_limit { &self.supervisor } else { self.peer.as_ref().expect("peer") };
|
||||
send(target, Instruction::HandleMessage(n + 1));
|
||||
}
|
||||
}
|
||||
Action::Continue
|
||||
}
|
||||
}
|
||||
|
||||
struct Supervisor {
|
||||
latency_mode: bool,
|
||||
total_transfers: u64,
|
||||
remaining_to_receive: u32,
|
||||
start_time: Option<std::time::Instant>,
|
||||
}
|
||||
|
||||
impl Actor<Instruction> for Supervisor {
|
||||
fn message(&mut self, message: Instruction) -> Action {
|
||||
match message {
|
||||
Instruction::SetPeer(_) => {
|
||||
tracing::info!("Start");
|
||||
self.start_time = Some(std::time::Instant::now());
|
||||
},
|
||||
Instruction::HandleMessage(_n) => {
|
||||
self.remaining_to_receive -= 1;
|
||||
if self.remaining_to_receive == 0 {
|
||||
let stop_time = std::time::Instant::now();
|
||||
let duration = stop_time - self.start_time.unwrap();
|
||||
tracing::info!("Stop after {:?}; {:?} messages, so {:?} Hz ({} mode)",
|
||||
duration,
|
||||
self.total_transfers,
|
||||
(1000.0 * self.total_transfers as f64) / duration.as_millis() as f64,
|
||||
if self.latency_mode { "latency" } else { "throughput" });
|
||||
return Action::Stop;
|
||||
}
|
||||
},
|
||||
}
|
||||
Action::Continue
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>> {
|
||||
syndicate::convenient_logging()?;
|
||||
|
||||
let args: Vec<String> = env::args().collect();
|
||||
let n_actors: u32 = args.get(1).unwrap_or(&"1000000".to_string()).parse()?;
|
||||
let n_rounds: u32 = args.get(2).unwrap_or(&"200".to_string()).parse()?;
|
||||
let latency_mode: bool = match args.get(3).unwrap_or(&"throughput".to_string()).as_str() {
|
||||
"latency" => true,
|
||||
"throughput" => false,
|
||||
_other => return Err("Invalid throughput/latency mode".into()),
|
||||
};
|
||||
tracing::info!("Will run {:?} actors for {:?} rounds", n_actors, n_rounds);
|
||||
|
||||
let count = Arc::new(AtomicU64::new(0));
|
||||
|
||||
let total_transfers: u64 = n_actors as u64 * n_rounds as u64;
|
||||
let (hop_limit, injection_count) = if latency_mode {
|
||||
(total_transfers, 1)
|
||||
} else {
|
||||
(n_rounds as u64, n_actors)
|
||||
};
|
||||
|
||||
let me = spawn(Some(count.clone()), Supervisor {
|
||||
latency_mode,
|
||||
total_transfers,
|
||||
remaining_to_receive: injection_count,
|
||||
start_time: None,
|
||||
});
|
||||
|
||||
let mut forwarders: Vec<Arc<Ref<Instruction>>> = Vec::new();
|
||||
for _i in 0 .. n_actors {
|
||||
if _i % 10000 == 0 { tracing::info!("Actor {:?}", _i); }
|
||||
forwarders.push(spawn(None, Forwarder {
|
||||
hop_limit,
|
||||
supervisor: me.clone(),
|
||||
peer: forwarders.last().cloned(),
|
||||
}));
|
||||
}
|
||||
send(&forwarders[0], Instruction::SetPeer(forwarders.last().expect("an entity").clone()));
|
||||
send(&me, Instruction::SetPeer(me.clone()));
|
||||
|
||||
let mut injected: u32 = 0;
|
||||
for f in forwarders.into_iter() {
|
||||
if injected >= injection_count {
|
||||
break;
|
||||
}
|
||||
send(&f, Instruction::HandleMessage(0));
|
||||
injected += 1;
|
||||
}
|
||||
|
||||
loop {
|
||||
if count.load(Ordering::SeqCst) == 0 {
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -27,6 +27,7 @@ use pat::lit;
|
|||
|
||||
enum SymbolVariant<'a> {
|
||||
Normal(&'a str),
|
||||
#[allow(dead_code)] // otherwise we get 'warning: field `0` is never read'
|
||||
Binder(&'a str),
|
||||
Substitution(&'a str),
|
||||
Discard,
|
||||
|
@ -35,7 +36,7 @@ enum SymbolVariant<'a> {
|
|||
fn compile_sequence_members(vs: &[IOValue]) -> Vec<TokenStream> {
|
||||
vs.iter().enumerate().map(|(i, f)| {
|
||||
let p = compile_pattern(f);
|
||||
quote!((#i .into(), #p))
|
||||
quote!((syndicate::value::Value::from(#i).wrap(), #p))
|
||||
}).collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
|
@ -79,10 +80,6 @@ impl ValueCompiler {
|
|||
match v.value() {
|
||||
Value::Boolean(b) =>
|
||||
quote!(#V_::Value::from(#b).wrap()),
|
||||
Value::Float(f) => {
|
||||
let f = f.0;
|
||||
quote!(#V_::Value::from(#f).wrap())
|
||||
}
|
||||
Value::Double(d) => {
|
||||
let d = d.0;
|
||||
quote!(#V_::Value::from(#d).wrap())
|
||||
|
@ -154,16 +151,14 @@ fn compile_pattern(v: &IOValue) -> TokenStream {
|
|||
#[allow(non_snake_case)]
|
||||
let V_: TokenStream = quote!(syndicate::value);
|
||||
#[allow(non_snake_case)]
|
||||
let MapFromIterator_: TokenStream = quote!(<#V_::Map<_, _> as std::iter::FromIterator<_>>::from_iter);
|
||||
let MapFrom_: TokenStream = quote!(<#V_::Map<_, _>>::from);
|
||||
|
||||
match v.value() {
|
||||
Value::Symbol(s) => match analyze_symbol(&s, true) {
|
||||
SymbolVariant::Binder(_) =>
|
||||
quote!(#P_::Pattern::DBind(Box::new(#P_::DBind {
|
||||
pattern: #P_::Pattern::DDiscard(Box::new(#P_::DDiscard))
|
||||
}))),
|
||||
quote!(#P_::Pattern::Bind{ pattern: Box::new(#P_::Pattern::Discard) }),
|
||||
SymbolVariant::Discard =>
|
||||
quote!(#P_::Pattern::DDiscard(Box::new(#P_::DDiscard))),
|
||||
quote!(#P_::Pattern::Discard),
|
||||
SymbolVariant::Substitution(s) =>
|
||||
lit(Ident::new(s, Span::call_site())),
|
||||
SymbolVariant::Normal(_) =>
|
||||
|
@ -175,9 +170,7 @@ fn compile_pattern(v: &IOValue) -> TokenStream {
|
|||
Some(label) =>
|
||||
if label.starts_with("$") && r.arity() == 1 {
|
||||
let nested = compile_pattern(&r.fields()[0]);
|
||||
quote!(#P_::Pattern::DBind(Box::new(#P_::DBind {
|
||||
pattern: #nested
|
||||
})))
|
||||
quote!(#P_::Pattern::Bind{ pattern: Box::new(#nested) })
|
||||
} else {
|
||||
let label_stx = if label.starts_with("=") {
|
||||
let id = Ident::new(&label[1..], Span::call_site());
|
||||
|
@ -186,18 +179,19 @@ fn compile_pattern(v: &IOValue) -> TokenStream {
|
|||
quote!(#V_::Value::symbol(#label).wrap())
|
||||
};
|
||||
let members = compile_sequence_members(r.fields());
|
||||
quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Rec {
|
||||
label: #label_stx,
|
||||
fields: vec![#(#members),*],
|
||||
})))
|
||||
quote!(#P_::Pattern::Group {
|
||||
type_: Box::new(#P_::GroupType::Rec { label: #label_stx }),
|
||||
entries: #MapFrom_([#(#members),*]),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
Value::Sequence(vs) => {
|
||||
let members = compile_sequence_members(vs);
|
||||
quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Arr {
|
||||
items: vec![#(#members),*],
|
||||
})))
|
||||
quote!(#P_::Pattern::Group {
|
||||
type_: Box::new(#P_::GroupType::Arr),
|
||||
entries: #MapFrom_([#(#members),*]),
|
||||
})
|
||||
}
|
||||
Value::Set(_) =>
|
||||
panic!("Cannot match sets in patterns"),
|
||||
|
@ -207,9 +201,10 @@ fn compile_pattern(v: &IOValue) -> TokenStream {
|
|||
let v = compile_pattern(v);
|
||||
quote!((#k, #v))
|
||||
}).collect::<Vec<_>>();
|
||||
quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Dict {
|
||||
entries: #MapFromIterator_(vec![#(#members),*])
|
||||
})))
|
||||
quote!(#P_::Pattern::Group {
|
||||
type_: Box::new(#P_::GroupType::Dict),
|
||||
entries: #MapFrom_([#(#members),*]),
|
||||
})
|
||||
}
|
||||
_ => lit(ValueCompiler::for_patterns().compile(v)),
|
||||
}
|
||||
|
|
|
@ -15,10 +15,9 @@ pub fn lit<T: ToTokens>(e: T) -> TokenStream2 {
|
|||
}
|
||||
|
||||
fn compile_sequence_members(stxs: &Vec<Stx>) -> Result<Vec<TokenStream2>, &'static str> {
|
||||
stxs.iter().map(|stx| {
|
||||
// let p = to_pattern_expr(stx)?;
|
||||
// Ok(quote!(#p))
|
||||
to_pattern_expr(stx)
|
||||
stxs.iter().enumerate().map(|(i, stx)| {
|
||||
let p = to_pattern_expr(stx)?;
|
||||
Ok(quote!((syndicate::value::Value::from(#i).wrap(), #p)))
|
||||
}).collect()
|
||||
}
|
||||
|
||||
|
@ -28,7 +27,7 @@ pub fn to_pattern_expr(stx: &Stx) -> Result<TokenStream2, &'static str> {
|
|||
#[allow(non_snake_case)]
|
||||
let V_: TokenStream2 = quote!(syndicate::value);
|
||||
#[allow(non_snake_case)]
|
||||
let MapFromIterator_: TokenStream2 = quote!(<#V_::Map<_, _> as std::iter::FromIterator<_>>::from_iter);
|
||||
let MapFrom_: TokenStream2 = quote!(<#V_::Map<_, _>>::from);
|
||||
|
||||
match stx {
|
||||
Stx::Atom(v) =>
|
||||
|
@ -41,26 +40,27 @@ pub fn to_pattern_expr(stx: &Stx) -> Result<TokenStream2, &'static str> {
|
|||
None => to_pattern_expr(&Stx::Discard)?,
|
||||
}
|
||||
};
|
||||
Ok(quote!(#P_::Pattern::DBind(Box::new(#P_::DBind { pattern: #inner_pat_expr }))))
|
||||
Ok(quote!(#P_::Pattern::Bind { pattern: Box::new(#inner_pat_expr) }))
|
||||
}
|
||||
Stx::Subst(e) =>
|
||||
Ok(lit(e)),
|
||||
Stx::Discard =>
|
||||
Ok(quote!(#P_::Pattern::DDiscard(Box::new(#P_::DDiscard)))),
|
||||
Ok(quote!(#P_::Pattern::Discard)),
|
||||
|
||||
Stx::Rec(l, fs) => {
|
||||
let label = to_value_expr(&*l)?;
|
||||
let members = compile_sequence_members(fs)?;
|
||||
Ok(quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Rec {
|
||||
label: #label,
|
||||
fields: vec![#(#members),*],
|
||||
}))))
|
||||
Ok(quote!(#P_::Pattern::Group {
|
||||
type_: Box::new(#P_::GroupType::Rec { label: #label }),
|
||||
entries: #MapFrom_([#(#members),*]),
|
||||
}))
|
||||
},
|
||||
Stx::Seq(stxs) => {
|
||||
let members = compile_sequence_members(stxs)?;
|
||||
Ok(quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Arr {
|
||||
items: vec![#(#members),*],
|
||||
}))))
|
||||
Ok(quote!(#P_::Pattern::Group {
|
||||
type_: Box::new(#P_::GroupType::Arr),
|
||||
entries: #MapFrom_([#(#members),*]),
|
||||
}))
|
||||
}
|
||||
Stx::Set(_stxs) =>
|
||||
Err("Set literals not supported in patterns"),
|
||||
|
@ -70,9 +70,10 @@ pub fn to_pattern_expr(stx: &Stx) -> Result<TokenStream2, &'static str> {
|
|||
let v = to_pattern_expr(v)?;
|
||||
Ok(quote!((#k, #v)))
|
||||
}).collect::<Result<Vec<_>, &'static str>>()?;
|
||||
Ok(quote!(#P_::Pattern::DCompound(Box::new(#P_::DCompound::Dict {
|
||||
entries: #MapFromIterator_(vec![#(#members),*])
|
||||
}))))
|
||||
Ok(quote!(#P_::Pattern::Group {
|
||||
type_: Box::new(#P_::GroupType::Dict),
|
||||
entries: #MapFrom_([#(#members),*])
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,7 +15,6 @@ use syn::parse::Parser;
|
|||
use syn::parse::ParseStream;
|
||||
use syn::parse_str;
|
||||
|
||||
use syndicate::value::Float;
|
||||
use syndicate::value::Double;
|
||||
use syndicate::value::IOValue;
|
||||
use syndicate::value::NestedValue;
|
||||
|
@ -266,7 +265,7 @@ fn parse1(c: Cursor) -> Result<(Stx, Cursor)> {
|
|||
IOValue::new(i.base10_parse::<i128>()?)
|
||||
}
|
||||
Lit::Float(f) => if f.suffix() == "f32" {
|
||||
IOValue::new(&Float(f.base10_parse::<f32>()?))
|
||||
IOValue::new(&Double(f.base10_parse::<f32>()? as f64))
|
||||
} else {
|
||||
IOValue::new(&Double(f.base10_parse::<f64>()?))
|
||||
}
|
||||
|
|
|
@ -50,10 +50,6 @@ pub fn value_to_value_expr(v: &IOValue) -> TokenStream2 {
|
|||
match v.value() {
|
||||
Value::Boolean(b) =>
|
||||
quote!(#V_::Value::from(#b).wrap()),
|
||||
Value::Float(f) => {
|
||||
let f = f.0;
|
||||
quote!(#V_::Value::from(#f).wrap())
|
||||
}
|
||||
Value::Double(d) => {
|
||||
let d = d.0;
|
||||
quote!(#V_::Value::from(#d).wrap())
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
{
|
||||
"folders": [
|
||||
{
|
||||
"path": "."
|
||||
},
|
||||
{
|
||||
"path": "../syndicate-protocols"
|
||||
}
|
||||
],
|
||||
"settings": {
|
||||
"files.exclude": {
|
||||
"target": true
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "syndicate-schema-plugin"
|
||||
version = "0.2.1"
|
||||
version = "0.9.0"
|
||||
authors = ["Tony Garnock-Jones <tonyg@leastfixedpoint.com>"]
|
||||
edition = "2018"
|
||||
|
||||
|
@ -12,8 +12,8 @@ license = "Apache-2.0"
|
|||
[lib]
|
||||
|
||||
[dependencies]
|
||||
preserves-schema = "5.992"
|
||||
syndicate = { path = "../syndicate", version = "0.31.0"}
|
||||
preserves-schema = "5.995"
|
||||
syndicate = { path = "../syndicate", version = "0.40.0"}
|
||||
|
||||
[package.metadata.workspaces]
|
||||
independent = true
|
||||
|
|
|
@ -1,12 +1,11 @@
|
|||
use preserves_schema::*;
|
||||
use preserves_schema::compiler::*;
|
||||
use preserves_schema::compiler::context::ModuleContext;
|
||||
use preserves_schema::compiler::types::definition_type;
|
||||
use preserves_schema::compiler::types::Purpose;
|
||||
use preserves_schema::gen::schema::*;
|
||||
use preserves_schema::syntax::block::escape_string;
|
||||
use preserves_schema::syntax::block::constructors::*;
|
||||
use preserves_schema::compiler::names;
|
||||
use preserves_schema::compiler::types::definition_type;
|
||||
use preserves_schema::compiler::types::Purpose;
|
||||
|
||||
use std::iter::FromIterator;
|
||||
|
||||
|
@ -62,7 +61,7 @@ impl Plugin for PatternPlugin {
|
|||
}
|
||||
|
||||
fn discard() -> P::Pattern {
|
||||
P::Pattern::DDiscard(Box::new(P::DDiscard))
|
||||
P::Pattern::Discard
|
||||
}
|
||||
|
||||
trait WildcardPattern {
|
||||
|
@ -95,33 +94,34 @@ fn from_io(v: &IOValue) -> Option<P::_Any> {
|
|||
impl WildcardPattern for CompoundPattern {
|
||||
fn wc(&self, s: &mut WalkState) -> Option<P::Pattern> {
|
||||
match self {
|
||||
CompoundPattern::Tuple { patterns } =>
|
||||
Some(P::Pattern::DCompound(Box::new(P::DCompound::Arr {
|
||||
items: patterns.iter()
|
||||
.map(|p| unname(p).wc(s))
|
||||
.collect::<Option<Vec<P::Pattern>>>()?,
|
||||
}))),
|
||||
CompoundPattern::TuplePrefix { .. } =>
|
||||
Some(discard()),
|
||||
CompoundPattern::Tuple { patterns } |
|
||||
CompoundPattern::TuplePrefix { fixed: patterns, .. }=>
|
||||
Some(P::Pattern::Group {
|
||||
type_: Box::new(P::GroupType::Arr),
|
||||
entries: patterns.iter().enumerate()
|
||||
.map(|(i, p)| Some((P::_Any::new(i), unname(p).wc(s)?)))
|
||||
.collect::<Option<Map<P::_Any, P::Pattern>>>()?,
|
||||
}),
|
||||
CompoundPattern::Dict { entries } =>
|
||||
Some(P::Pattern::DCompound(Box::new(P::DCompound::Dict {
|
||||
Some(P::Pattern::Group {
|
||||
type_: Box::new(P::GroupType::Dict),
|
||||
entries: Map::from_iter(
|
||||
entries.0.iter()
|
||||
.map(|(k, p)| Some((from_io(k)?, unname_simple(p).wc(s)?)))
|
||||
.filter(|e| discard() != e.as_ref().unwrap().1)
|
||||
.collect::<Option<Vec<(P::_Any, P::Pattern)>>>()?
|
||||
.into_iter()),
|
||||
}))),
|
||||
}),
|
||||
CompoundPattern::Rec { label, fields } => match (unname(label), unname(fields)) {
|
||||
(Pattern::SimplePattern(label), Pattern::CompoundPattern(fields)) =>
|
||||
match (*label, *fields) {
|
||||
(SimplePattern::Lit { value }, CompoundPattern::Tuple { patterns }) =>
|
||||
Some(P::Pattern::DCompound(Box::new(P::DCompound::Rec {
|
||||
label: from_io(&value)?,
|
||||
fields: patterns.iter()
|
||||
.map(|p| unname(p).wc(s))
|
||||
.collect::<Option<Vec<P::Pattern>>>()?,
|
||||
}))),
|
||||
Some(P::Pattern::Group{
|
||||
type_: Box::new(P::GroupType::Rec { label: from_io(&value)? }),
|
||||
entries: patterns.iter().enumerate()
|
||||
.map(|(i, p)| Some((P::_Any::new(i), unname(p).wc(s)?)))
|
||||
.collect::<Option<Map<P::_Any, P::Pattern>>>()?,
|
||||
}),
|
||||
_ => None,
|
||||
},
|
||||
_ => None,
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "syndicate-server"
|
||||
version = "0.36.1"
|
||||
version = "0.45.0"
|
||||
authors = ["Tony Garnock-Jones <tonyg@leastfixedpoint.com>"]
|
||||
edition = "2018"
|
||||
|
||||
|
@ -13,14 +13,14 @@ license = "Apache-2.0"
|
|||
jemalloc = ["dep:tikv-jemallocator"]
|
||||
|
||||
[build-dependencies]
|
||||
preserves-schema = "5.992"
|
||||
syndicate = { path = "../syndicate", version = "0.31.0"}
|
||||
syndicate-schema-plugin = { path = "../syndicate-schema-plugin", version = "0.2.0"}
|
||||
preserves-schema = "5.995"
|
||||
syndicate = { path = "../syndicate", version = "0.40.0"}
|
||||
syndicate-schema-plugin = { path = "../syndicate-schema-plugin", version = "0.9.0"}
|
||||
|
||||
[dependencies]
|
||||
preserves-schema = "5.992"
|
||||
syndicate = { path = "../syndicate", version = "0.31.0"}
|
||||
syndicate-macros = { path = "../syndicate-macros", version = "0.25.0"}
|
||||
preserves-schema = "5.995"
|
||||
syndicate = { path = "../syndicate", version = "0.40.0"}
|
||||
syndicate-macros = { path = "../syndicate-macros", version = "0.32.0"}
|
||||
|
||||
chrono = "0.4"
|
||||
futures = "0.3"
|
||||
|
|
|
@ -177,7 +177,7 @@ async fn main() -> ActorResult {
|
|||
})?;
|
||||
|
||||
if let PingPongMode::Ping(c) = &config.mode {
|
||||
let facet = t.facet.clone();
|
||||
let facet = t.facet_ref();
|
||||
let turn_count = c.turn_count;
|
||||
let action_count = c.action_count;
|
||||
let account = Arc::clone(t.account());
|
||||
|
|
|
@ -28,7 +28,7 @@ async fn main() -> ActorResult {
|
|||
let (i, o) = TcpStream::connect("127.0.0.1:9001").await?.into_split();
|
||||
Actor::top(None, |t| {
|
||||
relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| {
|
||||
let facet = t.facet.clone();
|
||||
let facet = t.facet_ref();
|
||||
let padding = AnyValue::new(&vec![0u8; config.bytes_padding][..]);
|
||||
let action_count = config.action_count;
|
||||
let account = Account::new(None, None);
|
||||
|
|
|
@ -22,7 +22,7 @@ async fn main() -> ActorResult {
|
|||
let (i, o) = TcpStream::connect("127.0.0.1:9001").await?.into_split();
|
||||
Actor::top(None, |t| {
|
||||
relay::connect_stream(t, i, o, false, sturdyref, (), move |_state, t, ds| {
|
||||
let facet = t.facet.clone();
|
||||
let facet = t.facet_ref();
|
||||
let account = Account::new(None, None);
|
||||
t.linked_task(Some(AnyValue::symbol("sender")), async move {
|
||||
let presence = rec![AnyValue::symbol("Present"), AnyValue::new(std::process::id())];
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
´³bundle·µ³
documentation„´³schema·³version°³definitions·³Url´³orµµ±present´³dict·³url´³named³url´³atom³String„„„„„µ±invalid´³dict·³url´³named³url³any„„„„µ±absent´³dict·„„„„„³IOList´³orµµ±bytes´³atom³
|
||||
´³bundle·µ³control„´³schema·³version°³definitions·³
|
||||
ExitServer´³rec´³lit³exit„´³tupleµ´³named³code´³atom³
SignedInteger„„„„„„³embeddedType€„„µ³
documentation„´³schema·³version°³definitions·³Url´³orµµ±present´³dict·³url´³named³url´³atom³String„„„„„µ±invalid´³dict·³url´³named³url³any„„„„µ±absent´³dict·„„„„„³IOList´³orµµ±bytes´³atom³
|
||||
ByteString„„µ±string´³atom³String„„µ±nested´³seqof´³refµ„³IOList„„„„„³Metadata´³rec´³lit³metadata„´³tupleµ´³named³object³any„´³named³info´³dictof´³atom³Symbol„³any„„„„„³Description´³orµµ±present´³dict·³description´³named³description´³refµ„³IOList„„„„„µ±invalid´³dict·³description´³named³description³any„„„„µ±absent´³dict·„„„„„„³embeddedType€„„µ³externalServices„´³schema·³version°³definitions·³Process´³orµµ±simple´³refµ„³CommandLine„„µ±full´³refµ„³FullProcess„„„„³Service´³refµ„³
DaemonService„³ClearEnv´³orµµ±present´³dict·³clearEnv´³named³clearEnv´³atom³Boolean„„„„„µ±invalid´³dict·³clearEnv´³named³clearEnv³any„„„„µ±absent´³dict·„„„„„³EnvValue´³orµµ±set´³atom³String„„µ±remove´³lit€„„µ±invalid³any„„„³Protocol´³orµµ±none´³lit³none„„µ±binarySyndicate´³lit³application/syndicate„„µ±
textSyndicate´³lit³text/syndicate„„„„³
|
||||
ProcessDir´³orµµ±present´³dict·³dir´³named³dir´³atom³String„„„„„µ±invalid´³dict·³dir´³named³dir³any„„„„µ±absent´³dict·„„„„„³
|
||||
ProcessEnv´³orµµ±present´³dict·³env´³named³env´³dictof´³refµ„³EnvVariable„´³refµ„³EnvValue„„„„„„µ±invalid´³dict·³env´³named³env³any„„„„µ±absent´³dict·„„„„„³CommandLine´³orµµ±shell´³atom³String„„µ±full´³refµ„³FullCommandLine„„„„³EnvVariable´³orµµ±string´³atom³String„„µ±symbol´³atom³Symbol„„µ±invalid³any„„„³FullProcess´³andµ´³dict·³argv´³named³argv´³refµ„³CommandLine„„„„´³named³env´³refµ„³
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
version 1 .
|
||||
|
||||
# Messages and assertions relating to the `$control` entity enabled in syndicate-server when
|
||||
# the `--control` flag is supplied.
|
||||
#
|
||||
# For example, placing the following into `control-config.pr` and starting the server with
|
||||
# `syndicate-server --control -c control-config.pr` will result in the server exiting with
|
||||
# exit code 2:
|
||||
#
|
||||
# $control ! <exit 2>
|
||||
|
||||
ExitServer = <exit @code int> .
|
|
@ -1,18 +1,18 @@
|
|||
version 1 .
|
||||
embeddedType EntityRef.Cap .
|
||||
|
||||
Gatekeeper = <gatekeeper @bindspace #!gatekeeper.Bind> .
|
||||
Gatekeeper = <gatekeeper @bindspace #:gatekeeper.Bind> .
|
||||
|
||||
DebtReporter = <debt-reporter @intervalSeconds double>.
|
||||
|
||||
TcpRelayListener = TcpWithoutHttp / TcpWithHttp .
|
||||
TcpWithoutHttp = <relay-listener @addr TransportAddress.Tcp @gatekeeper #!gatekeeper.Resolve> .
|
||||
TcpWithHttp = <relay-listener @addr TransportAddress.Tcp @gatekeeper #!gatekeeper.Resolve @httpd #!http.HttpContext> .
|
||||
TcpWithoutHttp = <relay-listener @addr TransportAddress.Tcp @gatekeeper #:gatekeeper.Resolve> .
|
||||
TcpWithHttp = <relay-listener @addr TransportAddress.Tcp @gatekeeper #:gatekeeper.Resolve @httpd #:http.HttpContext> .
|
||||
|
||||
UnixRelayListener = <relay-listener @addr TransportAddress.Unix @gatekeeper #!gatekeeper.Resolve> .
|
||||
UnixRelayListener = <relay-listener @addr TransportAddress.Unix @gatekeeper #:gatekeeper.Resolve> .
|
||||
ConfigWatcher = <config-watcher @path string @env ConfigEnv>.
|
||||
|
||||
ConfigEnv = { symbol: any ...:... }.
|
||||
|
||||
HttpRouter = <http-router @httpd #!any> .
|
||||
HttpRouter = <http-router @httpd #:any> .
|
||||
HttpStaticFileServer = <http-static-files @dir string @pathPrefixElements int> .
|
||||
|
|
|
@ -15,7 +15,7 @@ use std::convert::TryInto;
|
|||
use std::sync::Arc;
|
||||
|
||||
use syndicate::actor::*;
|
||||
use syndicate::during::DuringResult;
|
||||
use syndicate::enclose;
|
||||
use syndicate::value::NestedValue;
|
||||
use syndicate::schemas::dataspace;
|
||||
use syndicate::schemas::gatekeeper;
|
||||
|
@ -102,49 +102,78 @@ pub fn handle_binds(t: &mut Activation, ds: &Arc<Cap>) -> ActorResult {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn eventually_retract<E>(h: Option<Handle>) -> DuringResult<E> {
|
||||
if let Some(h) = h {
|
||||
Ok(Some(Box::new(move |_state, t: &mut Activation| Ok(t.retract(h)))))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_resolves(
|
||||
pub fn facet_handle_resolve(
|
||||
ds: &mut Arc<Cap>,
|
||||
t: &mut Activation,
|
||||
a: gatekeeper::Resolve,
|
||||
) -> DuringResult<Arc<Cap>> {
|
||||
) -> ActorResult {
|
||||
let mut detail: &'static str = "unsupported";
|
||||
|
||||
if a.step.step_type == sturdy_step_type() {
|
||||
detail = "invalid";
|
||||
if let Ok(s) = language().parse::<sturdy::SturdyStepDetail>(&a.step.detail) {
|
||||
return handle_resolve_sturdyref(ds, t, sturdy::SturdyRef { parameters: s.0 }, a.observer);
|
||||
t.facet(|t| {
|
||||
let f = handle_direct_resolution(ds, t, a.clone())?;
|
||||
await_bind_sturdyref(ds, t, sturdy::SturdyRef { parameters: s.0 }, a.observer, f)
|
||||
})?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
if a.step.step_type == noise_step_type() {
|
||||
detail = "invalid";
|
||||
if let Ok(s) = language().parse::<noise::NoiseStepDetail<AnyValue>>(&a.step.detail) {
|
||||
return handle_resolve_noise(ds, t, s.0.0, a.observer);
|
||||
t.facet(|t| {
|
||||
let f = handle_direct_resolution(ds, t, a.clone())?;
|
||||
await_bind_noise(ds, t, s.0.0, a.observer, f)
|
||||
})?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
eventually_retract(ds.assert(t, language(), &gatekeeper::Rejected {
|
||||
a.observer.assert(t, language(), &gatekeeper::Rejected {
|
||||
detail: AnyValue::symbol(detail),
|
||||
}))
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_resolve_sturdyref(
|
||||
fn handle_direct_resolution(
|
||||
ds: &mut Arc<Cap>,
|
||||
t: &mut Activation,
|
||||
a: gatekeeper::Resolve,
|
||||
) -> Result<FacetId, ActorError> {
|
||||
let outer_facet = t.facet_id();
|
||||
t.facet(move |t| {
|
||||
let handler = syndicate::entity(a.observer)
|
||||
.on_asserted(move |observer, t, a: AnyValue| {
|
||||
t.stop_facet_and_continue(outer_facet, Some(
|
||||
enclose!((observer, a) move |t: &mut Activation| {
|
||||
observer.assert(t, language(), &a);
|
||||
Ok(())
|
||||
})))?;
|
||||
Ok(None)
|
||||
})
|
||||
.create_cap(t);
|
||||
ds.assert(t, language(), &gatekeeper::Resolve {
|
||||
step: a.step.clone(),
|
||||
observer: handler,
|
||||
});
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn await_bind_sturdyref(
|
||||
ds: &mut Arc<Cap>,
|
||||
t: &mut Activation,
|
||||
sturdyref: sturdy::SturdyRef,
|
||||
observer: Arc<Cap>,
|
||||
) -> DuringResult<Arc<Cap>> {
|
||||
direct_resolution_facet: FacetId,
|
||||
) -> ActorResult {
|
||||
let queried_oid = sturdyref.parameters.oid.clone();
|
||||
let handler = syndicate::entity(observer)
|
||||
.on_asserted(move |observer, t, a: AnyValue| {
|
||||
t.stop_facet(direct_resolution_facet);
|
||||
let bindings = a.value().to_sequence()?;
|
||||
let key = bindings[0].value().to_bytestring()?;
|
||||
let unattenuated_target = bindings[1].value().to_embedded()?;
|
||||
|
@ -152,27 +181,29 @@ fn handle_resolve_sturdyref(
|
|||
Err(e) => {
|
||||
tracing::warn!(sturdyref = ?language().unparse(&sturdyref),
|
||||
"sturdyref failed validation: {}", e);
|
||||
eventually_retract(observer.assert(t, language(), &gatekeeper::Resolved::Rejected(
|
||||
observer.assert(t, language(), &gatekeeper::Resolved::Rejected(
|
||||
Box::new(gatekeeper::Rejected {
|
||||
detail: AnyValue::symbol("sturdyref-failed-validation"),
|
||||
}))))
|
||||
})));
|
||||
},
|
||||
Ok(target) => {
|
||||
tracing::trace!(sturdyref = ?language().unparse(&sturdyref),
|
||||
?target,
|
||||
"sturdyref resolved");
|
||||
eventually_retract(observer.assert(t, language(), &gatekeeper::Resolved::Accepted {
|
||||
observer.assert(t, language(), &gatekeeper::Resolved::Accepted {
|
||||
responder_session: target,
|
||||
}))
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
})
|
||||
.create_cap(t);
|
||||
eventually_retract(ds.assert(t, language(), &dataspace::Observe {
|
||||
ds.assert(t, language(), &dataspace::Observe {
|
||||
// TODO: codegen plugin to generate pattern constructors
|
||||
pattern: pattern!{<bind <ref { oid: #(&queried_oid), key: $ }> $ _>},
|
||||
observer: handler,
|
||||
}))
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct ValidatedNoiseSpec {
|
||||
|
@ -232,52 +263,80 @@ fn validate_noise_spec(
|
|||
})
|
||||
}
|
||||
|
||||
fn handle_resolve_noise(
|
||||
fn await_bind_noise(
|
||||
ds: &mut Arc<Cap>,
|
||||
t: &mut Activation,
|
||||
service_selector: AnyValue,
|
||||
initiator_session: Arc<Cap>,
|
||||
) -> DuringResult<Arc<Cap>> {
|
||||
observer: Arc<Cap>,
|
||||
direct_resolution_facet: FacetId,
|
||||
) -> ActorResult {
|
||||
let handler = syndicate::entity(())
|
||||
.on_asserted_facet(move |_state, t, a: AnyValue| {
|
||||
let initiator_session = Arc::clone(&initiator_session);
|
||||
t.stop_facet(direct_resolution_facet);
|
||||
let observer = Arc::clone(&observer);
|
||||
t.spawn_link(None, move |t| {
|
||||
let bindings = a.value().to_sequence()?;
|
||||
let spec = validate_noise_spec(language().parse(&bindings[0])?)?;
|
||||
let service = bindings[1].value().to_embedded()?;
|
||||
run_noise_responder(t, spec, initiator_session, Arc::clone(service))
|
||||
run_noise_responder(t, spec, observer, Arc::clone(service))
|
||||
});
|
||||
Ok(())
|
||||
})
|
||||
.create_cap(t);
|
||||
eventually_retract(ds.assert(t, language(), &dataspace::Observe {
|
||||
ds.assert(t, language(), &dataspace::Observe {
|
||||
// TODO: codegen plugin to generate pattern constructors
|
||||
pattern: pattern!{
|
||||
<bind <noise $spec:NoiseServiceSpec{ { service: #(&service_selector) } }> $service _>
|
||||
},
|
||||
observer: handler,
|
||||
}))
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct ResponderDetails {
|
||||
initiator_session: Arc<Cap>,
|
||||
service: Arc<Cap>,
|
||||
}
|
||||
|
||||
struct ResponderTransport {
|
||||
relay_input: Arc<Mutex<Option<TunnelRelay>>>,
|
||||
c_recv: CipherState<ChaCha20Poly1305>
|
||||
}
|
||||
type HandshakeState = noise_protocol::HandshakeState<X25519, ChaCha20Poly1305, Blake2s>;
|
||||
|
||||
enum ResponderState {
|
||||
Handshake(ResponderDetails, noise_protocol::HandshakeState<X25519, ChaCha20Poly1305, Blake2s>),
|
||||
Transport(ResponderTransport),
|
||||
Invalid, // used during state transitions
|
||||
Introduction {
|
||||
service: Arc<Cap>,
|
||||
hs: HandshakeState,
|
||||
},
|
||||
Handshake {
|
||||
initiator_session: Arc<Cap>,
|
||||
service: Arc<Cap>,
|
||||
hs: HandshakeState,
|
||||
},
|
||||
Transport {
|
||||
relay_input: Arc<Mutex<Option<TunnelRelay>>>,
|
||||
c_recv: CipherState<ChaCha20Poly1305>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Entity<noise::Packet> for ResponderState {
|
||||
fn message(&mut self, t: &mut Activation, p: noise::Packet) -> ActorResult {
|
||||
impl Entity<noise::SessionItem> for ResponderState {
|
||||
fn assert(&mut self, _t: &mut Activation, item: noise::SessionItem, _handle: Handle) -> ActorResult {
|
||||
let initiator_session = match item {
|
||||
noise::SessionItem::Initiator(i_box) => i_box.initiator_session,
|
||||
noise::SessionItem::Packet(_) => Err("Unexpected Packet assertion")?,
|
||||
};
|
||||
match std::mem::replace(self, ResponderState::Invalid) {
|
||||
ResponderState::Introduction { service, hs } => {
|
||||
*self = ResponderState::Handshake { initiator_session, service, hs };
|
||||
Ok(())
|
||||
}
|
||||
_ =>
|
||||
Err("Received second Initiator")?,
|
||||
}
|
||||
}
|
||||
|
||||
fn message(&mut self, t: &mut Activation, item: noise::SessionItem) -> ActorResult {
|
||||
let p = match item {
|
||||
noise::SessionItem::Initiator(_) => Err("Unexpected Initiator message")?,
|
||||
noise::SessionItem::Packet(p_box) => *p_box,
|
||||
};
|
||||
match self {
|
||||
ResponderState::Handshake(details, hs) => match p {
|
||||
ResponderState::Invalid | ResponderState::Introduction { .. } =>
|
||||
Err("Received Packet in invalid ResponderState")?,
|
||||
ResponderState::Handshake { initiator_session, service, hs } => match p {
|
||||
noise::Packet::Complete(bs) => {
|
||||
if bs.len() < hs.get_next_message_overhead() {
|
||||
Err("Invalid handshake message for pattern")?;
|
||||
|
@ -288,16 +347,15 @@ impl Entity<noise::Packet> for ResponderState {
|
|||
hs.read_message(&bs, &mut [])?;
|
||||
let mut reply = vec![0u8; hs.get_next_message_overhead()];
|
||||
hs.write_message(&[], &mut reply[..])?;
|
||||
details.initiator_session.message(t, language(), &noise::Packet::Complete(reply.into()));
|
||||
initiator_session.message(t, language(), &noise::Packet::Complete(reply.into()));
|
||||
if hs.completed() {
|
||||
let (c_recv, mut c_send) = hs.get_ciphers();
|
||||
let (_, relay_input, mut relay_output) =
|
||||
TunnelRelay::_run(t, Some(Arc::clone(&details.service)), None, false);
|
||||
TunnelRelay::_run(t, Some(Arc::clone(service)), None, false);
|
||||
let trace_collector = t.trace_collector();
|
||||
let transport = ResponderTransport { relay_input, c_recv };
|
||||
let initiator_session = Arc::clone(&details.initiator_session);
|
||||
let initiator_session = Arc::clone(initiator_session);
|
||||
let relay_output_name = Some(AnyValue::symbol("relay_output"));
|
||||
let transport_facet = t.facet.clone();
|
||||
let transport_facet = t.facet_ref();
|
||||
t.linked_task(relay_output_name.clone(), async move {
|
||||
let account = Account::new(relay_output_name, trace_collector);
|
||||
let cause = TurnCause::external("relay_output");
|
||||
|
@ -326,25 +384,25 @@ impl Entity<noise::Packet> for ResponderState {
|
|||
}
|
||||
Ok(LinkedTaskTermination::Normal)
|
||||
});
|
||||
*self = ResponderState::Transport(transport);
|
||||
*self = ResponderState::Transport { relay_input, c_recv };
|
||||
}
|
||||
}
|
||||
_ => Err("Fragmented handshake is not allowed")?,
|
||||
},
|
||||
ResponderState::Transport(transport) => {
|
||||
ResponderState::Transport { relay_input, c_recv } => {
|
||||
let bs = match p {
|
||||
noise::Packet::Complete(bs) =>
|
||||
transport.c_recv.decrypt_vec(&bs[..]).map_err(|_| "Cannot decrypt packet")?,
|
||||
c_recv.decrypt_vec(&bs[..]).map_err(|_| "Cannot decrypt packet")?,
|
||||
noise::Packet::Fragmented(pieces) => {
|
||||
let mut result = Vec::with_capacity(1024);
|
||||
for piece in pieces {
|
||||
result.extend(transport.c_recv.decrypt_vec(&piece[..])
|
||||
result.extend(c_recv.decrypt_vec(&piece[..])
|
||||
.map_err(|_| "Cannot decrypt packet fragment")?);
|
||||
}
|
||||
result
|
||||
}
|
||||
};
|
||||
let mut g = transport.relay_input.lock();
|
||||
let mut g = relay_input.lock();
|
||||
let tr = g.as_mut().expect("initialized");
|
||||
tr.handle_inbound_datagram(t, &bs[..])?;
|
||||
}
|
||||
|
@ -412,7 +470,7 @@ fn lookup_pattern(name: &str) -> Option<HandshakePattern> {
|
|||
fn run_noise_responder(
|
||||
t: &mut Activation,
|
||||
spec: ValidatedNoiseSpec,
|
||||
initiator_session: Arc<Cap>,
|
||||
observer: Arc<Cap>,
|
||||
service: Arc<Cap>,
|
||||
) -> ActorResult {
|
||||
let hs = {
|
||||
|
@ -435,13 +493,8 @@ fn run_noise_responder(
|
|||
hs
|
||||
};
|
||||
|
||||
let details = ResponderDetails {
|
||||
initiator_session: initiator_session.clone(),
|
||||
service,
|
||||
};
|
||||
|
||||
let responder_session =
|
||||
Cap::guard(crate::Language::arc(), t.create(ResponderState::Handshake(details, hs)));
|
||||
initiator_session.assert(t, language(), &gatekeeper::Resolved::Accepted { responder_session });
|
||||
Cap::guard(crate::Language::arc(), t.create(ResponderState::Introduction{ service, hs }));
|
||||
observer.assert(t, language(), &gatekeeper::Resolved::Accepted { responder_session });
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -32,13 +32,9 @@ pub fn empty_response(code: StatusCode) -> Response<Body> {
|
|||
|
||||
type ChunkItem = Result<body::Bytes, Box<dyn std::error::Error + Send + Sync>>;
|
||||
|
||||
enum ResponseCollector {
|
||||
Pending {
|
||||
tx: oneshot::Sender<Response<Body>>,
|
||||
body_tx: UnboundedSender<ChunkItem>,
|
||||
res: Response<Body>,
|
||||
},
|
||||
Done
|
||||
struct ResponseCollector {
|
||||
tx_res: Option<(oneshot::Sender<Response<Body>>, Response<Body>)>,
|
||||
body_tx: Option<UnboundedSender<ChunkItem>>,
|
||||
}
|
||||
|
||||
impl ResponseCollector {
|
||||
|
@ -46,43 +42,50 @@ impl ResponseCollector {
|
|||
let (body_tx, body_rx) = unbounded_channel();
|
||||
let body_stream: Box<dyn futures::Stream<Item = ChunkItem> + Send> =
|
||||
Box::new(UnboundedReceiverStream::new(body_rx));
|
||||
ResponseCollector::Pending {
|
||||
tx,
|
||||
body_tx,
|
||||
res: Response::new(body_stream.into()),
|
||||
let mut res = Response::new(body_stream.into());
|
||||
*res.status_mut() = StatusCode::OK;
|
||||
ResponseCollector {
|
||||
tx_res: Some((tx, res)),
|
||||
body_tx: Some(body_tx),
|
||||
}
|
||||
}
|
||||
|
||||
fn with_res<F: FnOnce(&mut Response<Body>) -> ActorResult>(&mut self, f: F) -> ActorResult {
|
||||
if let ResponseCollector::Pending { res, .. } = self {
|
||||
if let Some((_, res)) = &mut self.tx_res {
|
||||
f(res)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn deliver_res(&mut self) {
|
||||
if let Some((tx, res)) = std::mem::replace(&mut self.tx_res, None) {
|
||||
let _ = tx.send(res);
|
||||
}
|
||||
}
|
||||
|
||||
fn add_chunk(&mut self, value: http::Chunk) -> ActorResult {
|
||||
if let ResponseCollector::Pending { body_tx, .. } = self {
|
||||
self.deliver_res();
|
||||
|
||||
if let Some(body_tx) = self.body_tx.as_mut() {
|
||||
body_tx.send(Ok(match value {
|
||||
http::Chunk::Bytes(bs) => bs.into(),
|
||||
http::Chunk::String(s) => s.as_bytes().to_vec().into(),
|
||||
}))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn finish(&mut self) -> ActorResult {
|
||||
match std::mem::replace(self, ResponseCollector::Done) {
|
||||
ResponseCollector::Pending { tx, res, .. } => {
|
||||
let _ = tx.send(res);
|
||||
}
|
||||
ResponseCollector::Done => (),
|
||||
}
|
||||
fn finish(&mut self, t: &mut Activation) -> ActorResult {
|
||||
self.deliver_res();
|
||||
self.body_tx = None;
|
||||
t.stop();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Entity<http::HttpResponse> for ResponseCollector {
|
||||
fn message(&mut self, _turn: &mut Activation, message: http::HttpResponse) -> ActorResult {
|
||||
fn message(&mut self, t: &mut Activation, message: http::HttpResponse) -> ActorResult {
|
||||
match message {
|
||||
http::HttpResponse::Status { code, .. } => self.with_res(|r| {
|
||||
*r.status_mut() = StatusCode::from_u16(
|
||||
|
@ -94,10 +97,12 @@ impl Entity<http::HttpResponse> for ResponseCollector {
|
|||
HeaderValue::from_str(value.as_str())?);
|
||||
Ok(())
|
||||
}),
|
||||
http::HttpResponse::Chunk { chunk } => self.add_chunk(*chunk),
|
||||
http::HttpResponse::Chunk { chunk } => {
|
||||
self.add_chunk(*chunk)
|
||||
}
|
||||
http::HttpResponse::Done { chunk } => {
|
||||
self.add_chunk(*chunk)?;
|
||||
self.finish()
|
||||
self.finish(t)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -111,11 +116,11 @@ pub async fn serve(
|
|||
port: u16,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
let host = match req.headers().get("host").and_then(|v| v.to_str().ok()) {
|
||||
None => return Ok(empty_response(StatusCode::BAD_REQUEST)),
|
||||
Some(h) => match h.rsplit_once(':') {
|
||||
None => http::RequestHost::Absent,
|
||||
Some(h) => http::RequestHost::Present(match h.rsplit_once(':') {
|
||||
None => h.to_string(),
|
||||
Some((h, _port)) => h.to_string(),
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
let uri = req.uri();
|
||||
|
@ -160,34 +165,29 @@ pub async fn serve(
|
|||
let account = Account::new(Some(AnyValue::symbol("http")), trace_collector);
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let mut req_handle: Option<Handle> = None;
|
||||
|
||||
facet.activate(&account, Some(trace::TurnCause::external("http")), |t| {
|
||||
let sreq = http::HttpRequest {
|
||||
sequence_number: NEXT_SEQ.fetch_add(1, Ordering::Relaxed).into(),
|
||||
host,
|
||||
port: port.into(),
|
||||
method: req.method().to_string().to_lowercase(),
|
||||
path,
|
||||
headers: http::Headers(headers),
|
||||
query,
|
||||
body,
|
||||
};
|
||||
tracing::debug!(?sreq);
|
||||
let srep = Cap::guard(&language().syndicate, t.create(ResponseCollector::new(tx)));
|
||||
req_handle = httpd.assert(t, language(), &http::HttpContext { req: sreq, res: srep });
|
||||
t.facet(move |t| {
|
||||
let sreq = http::HttpRequest {
|
||||
sequence_number: NEXT_SEQ.fetch_add(1, Ordering::Relaxed).into(),
|
||||
host,
|
||||
port: port.into(),
|
||||
method: req.method().to_string().to_lowercase(),
|
||||
path,
|
||||
headers: http::Headers(headers),
|
||||
query,
|
||||
body,
|
||||
};
|
||||
tracing::debug!(?sreq);
|
||||
let srep = Cap::guard(&language().syndicate, t.create(ResponseCollector::new(tx)));
|
||||
httpd.assert(t, language(), &http::HttpContext { req: sreq, res: srep });
|
||||
Ok(())
|
||||
})?;
|
||||
Ok(())
|
||||
});
|
||||
|
||||
let response_result = rx.await;
|
||||
|
||||
facet.activate(&account, Some(trace::TurnCause::external("http")), |t| {
|
||||
if let Some(h) = req_handle {
|
||||
t.retract(h);
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
|
||||
match response_result {
|
||||
Ok(response) => Ok(response),
|
||||
Err(_ /* sender dropped */) => Ok(empty_response(StatusCode::INTERNAL_SERVER_ERROR)),
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use preserves_schema::Codec;
|
||||
|
||||
use std::convert::TryInto;
|
||||
use std::io;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
@ -61,6 +62,10 @@ struct ServerConfig {
|
|||
|
||||
#[structopt(short = "t", long)]
|
||||
trace_file: Option<PathBuf>,
|
||||
|
||||
/// Enable `$control` entity.
|
||||
#[structopt(long)]
|
||||
control: bool,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
|
@ -115,7 +120,7 @@ async fn main() -> ActorResult {
|
|||
|
||||
let gatekeeper = Cap::guard(Language::arc(), t.create(
|
||||
syndicate::entity(Arc::clone(&server_config_ds))
|
||||
.on_asserted(gatekeeper::handle_resolves)));
|
||||
.on_asserted_facet(gatekeeper::facet_handle_resolve)));
|
||||
gatekeeper::handle_binds(t, &server_config_ds)?;
|
||||
|
||||
let mut env = Map::new();
|
||||
|
@ -123,6 +128,20 @@ async fn main() -> ActorResult {
|
|||
env.insert("log".to_owned(), AnyValue::domain(Arc::clone(&log_ds)));
|
||||
env.insert("gatekeeper".to_owned(), AnyValue::domain(Arc::clone(&gatekeeper)));
|
||||
|
||||
if config.control {
|
||||
env.insert("control".to_owned(), AnyValue::domain(Cap::guard(Language::arc(), t.create(
|
||||
syndicate::entity(())
|
||||
.on_message(|_, _t, m: crate::schemas::control::ExitServer| {
|
||||
tracing::info!("$control received exit request with code {}", m.code);
|
||||
std::process::exit((&m.code).try_into().unwrap_or_else(|_| {
|
||||
tracing::warn!(
|
||||
"exit code {} out-of-range of 32-bit signed integer, using 1 instead",
|
||||
m.code);
|
||||
1
|
||||
}))
|
||||
})))));
|
||||
}
|
||||
|
||||
dependencies::boot(t, Arc::clone(&server_config_ds));
|
||||
services::config_watcher::on_demand(t, Arc::clone(&server_config_ds));
|
||||
services::daemon::on_demand(t, Arc::clone(&server_config_ds), Arc::clone(&log_ds));
|
||||
|
|
|
@ -35,7 +35,7 @@ pub fn run_io_relay(
|
|||
initial_ref: Arc<Cap>,
|
||||
) -> ActorResult {
|
||||
let exit_listener = t.create(ExitListener);
|
||||
t.state.add_exit_hook(&exit_listener);
|
||||
t.add_exit_hook(&exit_listener);
|
||||
relay::TunnelRelay::run(t, i, o, Some(initial_ref), None, false);
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -9,7 +9,7 @@ use syndicate::actor::*;
|
|||
use syndicate::dataspace::Dataspace;
|
||||
use syndicate::during;
|
||||
use syndicate::enclose;
|
||||
use syndicate::pattern::{lift_literal, drop_literal};
|
||||
use syndicate::pattern::{lift_literal, drop_literal, pattern_seq_from_dictionary};
|
||||
use syndicate::schemas::dataspace;
|
||||
use syndicate::schemas::dataspace_patterns as P;
|
||||
use syndicate::schemas::sturdy;
|
||||
|
@ -173,7 +173,7 @@ fn bad_instruction(message: &str) -> io::Error {
|
|||
}
|
||||
|
||||
fn discard() -> P::Pattern {
|
||||
P::Pattern::DDiscard(Box::new(P::DDiscard))
|
||||
P::Pattern::Discard
|
||||
}
|
||||
|
||||
fn dlit(value: AnyValue) -> P::Pattern {
|
||||
|
@ -261,7 +261,6 @@ impl<'env> PatternInstantiator<'env> {
|
|||
fn instantiate_pattern(&mut self, template: &AnyValue) -> io::Result<P::Pattern> {
|
||||
Ok(match template.value() {
|
||||
Value::Boolean(_) |
|
||||
Value::Float(_) |
|
||||
Value::Double(_) |
|
||||
Value::SignedInteger(_) |
|
||||
Value::String(_) |
|
||||
|
@ -273,7 +272,7 @@ impl<'env> PatternInstantiator<'env> {
|
|||
Symbolic::Discard => discard(),
|
||||
Symbolic::Binder(s) => {
|
||||
self.binding_names.push(s);
|
||||
P::Pattern::DBind(Box::new(P::DBind { pattern: discard() }))
|
||||
P::Pattern::Bind { pattern: Box::new(discard()) }
|
||||
}
|
||||
Symbolic::Reference(s) =>
|
||||
dlit(self.env.lookup(&s, "pattern-template variable")?.clone()),
|
||||
|
@ -288,43 +287,47 @@ impl<'env> PatternInstantiator<'env> {
|
|||
Some(pat) => pat,
|
||||
None => {
|
||||
let label = self.instantiate_pattern(r.label())?;
|
||||
let fields = r.fields().iter().map(|p| self.instantiate_pattern(p))
|
||||
.collect::<io::Result<Vec<P::Pattern>>>()?;
|
||||
P::Pattern::DCompound(Box::new(P::DCompound::Rec {
|
||||
label: drop_literal(&label)
|
||||
.ok_or(bad_instruction("Record pattern must have literal label"))?,
|
||||
fields,
|
||||
}))
|
||||
let entries = r.fields().iter().enumerate()
|
||||
.map(|(i, p)| Ok((AnyValue::new(i), self.instantiate_pattern(p)?)))
|
||||
.collect::<io::Result<Map<AnyValue, P::Pattern>>>()?;
|
||||
P::Pattern::Group {
|
||||
type_: Box::new(P::GroupType::Rec {
|
||||
label: drop_literal(&label)
|
||||
.ok_or(bad_instruction("Record pattern must have literal label"))?,
|
||||
}),
|
||||
entries,
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Value::Sequence(v) =>
|
||||
P::Pattern::DCompound(Box::new(P::DCompound::Arr {
|
||||
items: v.iter()
|
||||
.map(|p| self.instantiate_pattern(p))
|
||||
.collect::<io::Result<Vec<P::Pattern>>>()?,
|
||||
})),
|
||||
P::Pattern::Group {
|
||||
type_: Box::new(P::GroupType::Arr),
|
||||
entries: v.iter().enumerate()
|
||||
.map(|(i, p)| Ok((AnyValue::new(i), self.instantiate_pattern(p)?)))
|
||||
.collect::<io::Result<Map<AnyValue, P::Pattern>>>()?,
|
||||
},
|
||||
Value::Set(_) =>
|
||||
Err(bad_instruction(&format!("Sets not permitted in patterns: {:?}", template)))?,
|
||||
Value::Dictionary(v) =>
|
||||
P::Pattern::DCompound(Box::new(P::DCompound::Dict {
|
||||
P::Pattern::Group {
|
||||
type_: Box::new(P::GroupType::Dict),
|
||||
entries: v.iter()
|
||||
.map(|(a, b)| Ok((a.clone(), self.instantiate_pattern(b)?)))
|
||||
.collect::<io::Result<Map<AnyValue, P::Pattern>>>()?,
|
||||
})),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
fn maybe_binder_with_pattern(&mut self, r: &Record<AnyValue>) -> io::Result<Option<P::Pattern>> {
|
||||
match r.label().value().as_symbol().map(|s| analyze(&s)) {
|
||||
Some(Symbolic::Binder(formal)) => if r.fields().len() == 1 {
|
||||
Some(Symbolic::Binder(formal)) if r.fields().len() == 1 => {
|
||||
let pattern = self.instantiate_pattern(&r.fields()[0])?;
|
||||
self.binding_names.push(formal);
|
||||
return Ok(Some(P::Pattern::DBind(Box::new(P::DBind { pattern }))));
|
||||
Ok(Some(P::Pattern::Bind { pattern: Box::new(pattern) }))
|
||||
},
|
||||
_ => (),
|
||||
_ => Ok(None),
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -372,7 +375,6 @@ impl Env {
|
|||
fn instantiate_value(&self, template: &AnyValue) -> io::Result<AnyValue> {
|
||||
Ok(match template.value() {
|
||||
Value::Boolean(_) |
|
||||
Value::Float(_) |
|
||||
Value::Double(_) |
|
||||
Value::SignedInteger(_) |
|
||||
Value::String(_) |
|
||||
|
@ -555,7 +557,7 @@ impl Env {
|
|||
RewriteTemplate::Accept { pattern_template } => {
|
||||
let (_binding_names, pattern) = self.instantiate_pattern(pattern_template)?;
|
||||
Ok(sturdy::Rewrite {
|
||||
pattern: embed_pattern(&P::Pattern::DBind(Box::new(P::DBind { pattern }))),
|
||||
pattern: embed_pattern(&P::Pattern::Bind { pattern: Box::new(pattern) }),
|
||||
template: sturdy::Template::TRef(Box::new(sturdy::TRef { binding: 0.into() })),
|
||||
})
|
||||
}
|
||||
|
@ -605,7 +607,6 @@ impl Env {
|
|||
|
||||
Ok(match template.value() {
|
||||
Value::Boolean(_) |
|
||||
Value::Float(_) |
|
||||
Value::Double(_) |
|
||||
Value::SignedInteger(_) |
|
||||
Value::String(_) |
|
||||
|
@ -677,24 +678,26 @@ impl Env {
|
|||
|
||||
fn embed_pattern(p: &P::Pattern) -> sturdy::Pattern {
|
||||
match p {
|
||||
P::Pattern::DDiscard(_) => sturdy::Pattern::PDiscard(Box::new(sturdy::PDiscard)),
|
||||
P::Pattern::DBind(b) => sturdy::Pattern::PBind(Box::new(sturdy::PBind {
|
||||
pattern: embed_pattern(&b.pattern),
|
||||
P::Pattern::Discard => sturdy::Pattern::PDiscard(Box::new(sturdy::PDiscard)),
|
||||
P::Pattern::Bind { pattern } => sturdy::Pattern::PBind(Box::new(sturdy::PBind {
|
||||
pattern: embed_pattern(&**pattern),
|
||||
})),
|
||||
P::Pattern::DLit(b) => sturdy::Pattern::Lit(Box::new(sturdy::Lit {
|
||||
value: language().unparse(&b.value),
|
||||
P::Pattern::Lit { value } => sturdy::Pattern::Lit(Box::new(sturdy::Lit {
|
||||
value: language().unparse(&**value),
|
||||
})),
|
||||
P::Pattern::DCompound(b) => sturdy::Pattern::PCompound(Box::new(match &**b {
|
||||
P::DCompound::Rec { label, fields } =>
|
||||
P::Pattern::Group { type_, entries } => sturdy::Pattern::PCompound(Box::new(match &**type_ {
|
||||
P::GroupType::Rec { label } =>
|
||||
sturdy::PCompound::Rec {
|
||||
label: label.clone(),
|
||||
fields: fields.iter().map(embed_pattern).collect(),
|
||||
fields: pattern_seq_from_dictionary(entries).expect("correct field entries")
|
||||
.into_iter().map(embed_pattern).collect(),
|
||||
},
|
||||
P::DCompound::Arr { items } =>
|
||||
P::GroupType::Arr =>
|
||||
sturdy::PCompound::Arr {
|
||||
items: items.iter().map(embed_pattern).collect(),
|
||||
items: pattern_seq_from_dictionary(entries).expect("correct element entries")
|
||||
.into_iter().map(embed_pattern).collect(),
|
||||
},
|
||||
P::DCompound::Dict { entries } =>
|
||||
P::GroupType::Dict =>
|
||||
sturdy::PCompound::Dict {
|
||||
entries: entries.iter().map(|(k, v)| (k.clone(), embed_pattern(v))).collect(),
|
||||
},
|
||||
|
|
|
@ -184,7 +184,7 @@ fn run(
|
|||
let mut watcher = watcher(tx, Duration::from_millis(100)).map_err(convert_notify_error)?;
|
||||
watcher.watch(&env.path, RecursiveMode::Recursive).map_err(convert_notify_error)?;
|
||||
|
||||
let facet = t.facet.clone();
|
||||
let facet = t.facet_ref();
|
||||
let trace_collector = t.trace_collector();
|
||||
let span = tracing::Span::current();
|
||||
thread::spawn(move || {
|
||||
|
|
|
@ -41,7 +41,7 @@ fn supervise_daemon(
|
|||
lifecycle::on_service_restart(t, &config_ds, &spec, enclose!(
|
||||
(config_ds, root_ds, spec) move |t| {
|
||||
tracing::info!(id = ?spec.id, "Terminating to restart");
|
||||
t.stop_facet_and_continue(t.facet.facet_id, Some(
|
||||
t.stop_facet_and_continue(t.facet_id(), Some(
|
||||
enclose!((config_ds, root_ds, spec) move |t: &mut Activation| {
|
||||
supervise_daemon(t, config_ds, root_ds, spec)
|
||||
})))
|
||||
|
@ -176,7 +176,7 @@ impl DaemonInstance {
|
|||
fn handle_exit(self, t: &mut Activation, error_message: Option<String>) -> ActorResult {
|
||||
let delay =
|
||||
std::time::Duration::from_millis(if let None = error_message { 200 } else { 1000 });
|
||||
t.stop_facet_and_continue(t.facet.facet_id, Some(move |t: &mut Activation| {
|
||||
t.stop_facet_and_continue(t.facet_id(), Some(move |t: &mut Activation| {
|
||||
#[derive(Debug)]
|
||||
enum NextStep {
|
||||
SleepAndRestart,
|
||||
|
@ -230,7 +230,7 @@ impl DaemonInstance {
|
|||
kind: &str
|
||||
) -> ActorResult {
|
||||
t.facet(|t| {
|
||||
let facet = t.facet.clone();
|
||||
let facet = t.facet_ref();
|
||||
let log_ds = self.log_ds.clone();
|
||||
let service = self.service.clone();
|
||||
let kind = AnyValue::symbol(kind);
|
||||
|
@ -290,7 +290,7 @@ impl DaemonInstance {
|
|||
let pid = child.id();
|
||||
tracing::debug!(?pid, cmd = ?self.cmd, "started");
|
||||
|
||||
let facet = t.facet.clone();
|
||||
let facet = t.facet_ref();
|
||||
|
||||
if let Some(r) = child.stderr.take() {
|
||||
self.log(t, pid, r, "stderr")?;
|
||||
|
@ -401,7 +401,7 @@ fn run(
|
|||
Ok(config) => {
|
||||
tracing::info!(?config);
|
||||
let config = config.elaborate();
|
||||
let facet = t.facet.clone();
|
||||
let facet = t.facet_ref();
|
||||
t.linked_task(Some(AnyValue::symbol("subprocess")), async move {
|
||||
let mut cmd = config.process.build_command().ok_or("Cannot start daemon process")?;
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
|
|||
|
||||
fn run(t: &mut Activation, ds: Arc<Cap>, spec: Gatekeeper<AnyValue>) -> ActorResult {
|
||||
let resolver = t.create(syndicate::entity(Arc::clone(&spec.bindspace))
|
||||
.on_asserted(gatekeeper::handle_resolves));
|
||||
.on_asserted_facet(gatekeeper::facet_handle_resolve));
|
||||
ds.assert(t, language(), &syndicate::schemas::service::ServiceObject {
|
||||
service_name: language().unparse(&spec),
|
||||
object: AnyValue::domain(Cap::guard(Language::arc(), resolver)),
|
||||
|
|
|
@ -10,7 +10,6 @@ use syndicate::error::Error;
|
|||
use syndicate::preserves::rec;
|
||||
use syndicate::preserves::value::Map;
|
||||
use syndicate::preserves::value::NestedValue;
|
||||
use syndicate::preserves::value::Set;
|
||||
use syndicate::schemas::http;
|
||||
use syndicate::value::signed_integer::SignedInteger;
|
||||
|
||||
|
@ -22,7 +21,7 @@ use crate::schemas::internal_services::HttpStaticFileServer;
|
|||
use syndicate_macros::during;
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref MIME_TABLE: Map<String, String> = load_mime_table("/etc/mime.types").expect("MIME table");
|
||||
pub static ref MIME_TABLE: Map<String, String> = load_mime_table("/etc/mime.types").unwrap_or_default();
|
||||
}
|
||||
|
||||
pub fn load_mime_table(path: &str) -> Result<Map<String, String>, std::io::Error> {
|
||||
|
@ -56,10 +55,22 @@ pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
|
|||
});
|
||||
}
|
||||
|
||||
type MethodTable = Map<http::MethodPattern, Set<Arc<Cap>>>;
|
||||
#[derive(Debug, Clone)]
|
||||
struct ActiveHandler {
|
||||
cap: Arc<Cap>,
|
||||
terminated: Arc<Field<bool>>,
|
||||
}
|
||||
type MethodTable = Map<http::MethodPattern, Vec<ActiveHandler>>;
|
||||
type HostTable = Map<http::HostPattern, Map<http::PathPattern, MethodTable>>;
|
||||
type RoutingTable = Map<SignedInteger, HostTable>;
|
||||
|
||||
fn request_host(value: &http::RequestHost) -> Option<String> {
|
||||
match value {
|
||||
http::RequestHost::Present(h) => Some(h.to_owned()),
|
||||
http::RequestHost::Absent => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
|
||||
ds.assert(t, language(), &lifecycle::started(&spec));
|
||||
ds.assert(t, language(), &lifecycle::ready(&spec));
|
||||
|
@ -72,25 +83,35 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
|
|||
enclose!((httpd, routes) during!(t, httpd, language(), <http-listener #(&port1)>, enclose!((routes, port) |t: &mut Activation| {
|
||||
let port2 = port.clone();
|
||||
during!(t, httpd, language(), <http-bind $host #(&port2) $method $path $handler>, |t: &mut Activation| {
|
||||
tracing::debug!("+HTTP binding {:?} {:?} {:?} {:?} {:?}", host, port, method, path, handler);
|
||||
let port = port.value().to_signedinteger()?;
|
||||
let host = language().parse::<http::HostPattern>(&host)?;
|
||||
let path = language().parse::<http::PathPattern>(&path)?;
|
||||
let method = language().parse::<http::MethodPattern>(&method)?;
|
||||
let handler = handler.value().to_embedded()?;
|
||||
let handler_cap = handler.value().to_embedded()?.clone();
|
||||
let handler_terminated = t.named_field("handler-terminated", false);
|
||||
t.get_mut(&routes)
|
||||
.entry(port.clone()).or_default()
|
||||
.entry(host.clone()).or_default()
|
||||
.entry(path.clone()).or_default()
|
||||
.entry(method.clone()).or_default()
|
||||
.insert(handler.clone());
|
||||
t.on_stop(enclose!((routes, handler, method, path, host, port) move |t| {
|
||||
.push(ActiveHandler {
|
||||
cap: handler_cap.clone(),
|
||||
terminated: handler_terminated,
|
||||
});
|
||||
t.on_stop(enclose!((routes, method, path, host, port) move |t| {
|
||||
tracing::debug!("-HTTP binding {:?} {:?} {:?} {:?} {:?}", host, port, method, path, handler);
|
||||
let port_map = t.get_mut(&routes);
|
||||
let host_map = port_map.entry(port.clone()).or_default();
|
||||
let path_map = host_map.entry(host.clone()).or_default();
|
||||
let method_map = path_map.entry(path.clone()).or_default();
|
||||
let handler_set = method_map.entry(method.clone()).or_default();
|
||||
handler_set.remove(&handler);
|
||||
if handler_set.is_empty() {
|
||||
let handler_vec = method_map.entry(method.clone()).or_default();
|
||||
let handler = {
|
||||
let i = handler_vec.iter().position(|a| a.cap == handler_cap)
|
||||
.expect("Expected an index of an active handler to remove");
|
||||
handler_vec.swap_remove(i)
|
||||
};
|
||||
if handler_vec.is_empty() {
|
||||
method_map.remove(&method);
|
||||
}
|
||||
if method_map.is_empty() {
|
||||
|
@ -102,6 +123,7 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
|
|||
if host_map.is_empty() {
|
||||
port_map.remove(&port);
|
||||
}
|
||||
*t.get_mut(&handler.terminated) = true;
|
||||
Ok(())
|
||||
}));
|
||||
Ok(())
|
||||
|
@ -115,12 +137,14 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
|
|||
let req = match language().parse::<http::HttpRequest>(&req) { Ok(v) => v, Err(_) => return Ok(()) };
|
||||
let res = match res.value().to_embedded() { Ok(v) => v, Err(_) => return Ok(()) };
|
||||
|
||||
tracing::trace!("Looking up handler for {:#?} in {:#?}", &req, &t.get(&routes));
|
||||
|
||||
let host_map = match t.get(&routes).get(&req.port) {
|
||||
Some(host_map) => host_map,
|
||||
None => return send_empty(t, res, 404, "Not found"),
|
||||
};
|
||||
|
||||
let methods = match try_hostname(host_map, http::HostPattern::Host(req.host.clone()), &req.path)? {
|
||||
let methods = match request_host(&req.host).and_then(|h| try_hostname(host_map, http::HostPattern::Host(h), &req.path).transpose()).transpose()? {
|
||||
Some(methods) => methods,
|
||||
None => match try_hostname(host_map, http::HostPattern::Any, &req.path)? {
|
||||
Some(methods) => methods,
|
||||
|
@ -141,9 +165,7 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
|
|||
code: 405.into(), message: "Method Not Allowed".into() });
|
||||
res.message(t, language(), &http::HttpResponse::Header {
|
||||
name: "allow".into(), value: allowed });
|
||||
res.message(t, language(), &http::HttpResponse::Done {
|
||||
chunk: Box::new(http::Chunk::Bytes(vec![])) });
|
||||
return Ok(())
|
||||
return send_done(t, res);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -151,21 +173,33 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: HttpRouter) -> ActorResult {
|
|||
if handlers.len() > 1 {
|
||||
tracing::warn!(?req, "Too many handlers available");
|
||||
}
|
||||
let handler = handlers.first().expect("Nonempty handler set").clone();
|
||||
handler.assert(t, language(), &http::HttpContext { req, res: res.clone() });
|
||||
let ActiveHandler { cap, terminated } = handlers.first().expect("Nonempty handler set").clone();
|
||||
tracing::trace!("Handler for {:?} is {:?}", &req, &cap);
|
||||
|
||||
t.dataflow(enclose!((terminated, req, res) move |t| {
|
||||
if *t.get(&terminated) {
|
||||
tracing::trace!("Handler for {:?} terminated", &req);
|
||||
send_empty(t, &res, 500, "Internal Server Error")?;
|
||||
}
|
||||
Ok(())
|
||||
}))?;
|
||||
|
||||
cap.assert(t, language(), &http::HttpContext { req, res: res.clone() });
|
||||
Ok(())
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn send_done(t: &mut Activation, res: &Arc<Cap>) -> ActorResult {
|
||||
res.message(t, language(), &http::HttpResponse::Done {
|
||||
chunk: Box::new(http::Chunk::Bytes(vec![])) });
|
||||
Ok(())
|
||||
}
|
||||
fn send_empty(t: &mut Activation, res: &Arc<Cap>, code: u16, message: &str) -> ActorResult {
|
||||
res.message(t, language(), &http::HttpResponse::Status {
|
||||
code: code.into(), message: message.into() });
|
||||
res.message(t, language(), &http::HttpResponse::Done {
|
||||
chunk: Box::new(http::Chunk::Bytes(vec![])) });
|
||||
return Ok(())
|
||||
send_done(t, res)
|
||||
}
|
||||
|
||||
fn path_pattern_matches(path_pat: &http::PathPattern, path: &Vec<String>) -> bool {
|
||||
|
@ -187,7 +221,10 @@ fn path_pattern_matches(path_pat: &http::PathPattern, path: &Vec<String>) -> boo
|
|||
http::PathPatternElement::Rest => return true,
|
||||
}
|
||||
}
|
||||
true
|
||||
match path_iter.next() {
|
||||
Some(_more) => false,
|
||||
None => true,
|
||||
}
|
||||
}
|
||||
|
||||
fn try_hostname<'table>(
|
||||
|
@ -199,6 +236,7 @@ fn try_hostname<'table>(
|
|||
None => Ok(None),
|
||||
Some(path_table) => {
|
||||
for (path_pat, method_table) in path_table.iter() {
|
||||
tracing::trace!("Checking path {:?} against pattern {:?}", &path, &path_pat);
|
||||
if path_pattern_matches(path_pat, path) {
|
||||
return Ok(Some(method_table));
|
||||
}
|
||||
|
@ -263,9 +301,7 @@ impl HttpStaticFileServer {
|
|||
code: 301.into(), message: "Moved permanently".into() });
|
||||
res.message(t, language(), &http::HttpResponse::Header {
|
||||
name: "location".into(), value: format!("/{}/", req.path.join("/")) });
|
||||
res.message(t, language(), &http::HttpResponse::Done {
|
||||
chunk: Box::new(http::Chunk::Bytes(vec![])) });
|
||||
return Ok(())
|
||||
return send_done(t, res);
|
||||
} else {
|
||||
let mut buf = Vec::new();
|
||||
fh.read_to_end(&mut buf)?;
|
||||
|
@ -286,6 +322,7 @@ impl HttpStaticFileServer {
|
|||
}
|
||||
res.message(t, language(), &http::HttpResponse::Done {
|
||||
chunk: Box::new(http::Chunk::Bytes(body)) });
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,24 +15,20 @@ use tokio::net::TcpListener;
|
|||
use crate::language::language;
|
||||
use crate::lifecycle;
|
||||
use crate::protocol::detect_protocol;
|
||||
use crate::schemas::internal_services::{TcpWithHttp, TcpWithoutHttp, TcpRelayListener};
|
||||
use crate::schemas::internal_services::TcpWithoutHttp;
|
||||
|
||||
use syndicate_macros::during;
|
||||
|
||||
pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
|
||||
t.spawn(Some(AnyValue::symbol("tcp_relay_listener")), move |t| {
|
||||
enclose!((ds) during!(t, ds, language(), <run-service $spec: TcpWithHttp::<AnyValue>>, |t: &mut Activation| {
|
||||
spec.httpd.assert(t, language(), &syndicate::schemas::http::HttpListener { port: spec.addr.port.clone() });
|
||||
run_supervisor(t, ds.clone(), TcpRelayListener::TcpWithHttp(Box::new(spec)))
|
||||
}));
|
||||
enclose!((ds) during!(t, ds, language(), <run-service $spec: TcpWithoutHttp::<AnyValue>>, |t| {
|
||||
run_supervisor(t, ds.clone(), TcpRelayListener::TcpWithoutHttp(Box::new(spec)))
|
||||
run_supervisor(t, ds.clone(), spec)
|
||||
}));
|
||||
Ok(())
|
||||
});
|
||||
}
|
||||
|
||||
fn run_supervisor(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> ActorResult {
|
||||
fn run_supervisor(t: &mut Activation, ds: Arc<Cap>, spec: TcpWithoutHttp) -> ActorResult {
|
||||
Supervisor::start(
|
||||
t,
|
||||
Some(rec![AnyValue::symbol("relay"), language().unparse(&spec)]),
|
||||
|
@ -41,21 +37,35 @@ fn run_supervisor(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> A
|
|||
enclose!((ds) move |t| enclose!((ds, spec) run(t, ds, spec))))
|
||||
}
|
||||
|
||||
fn run(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> ActorResult {
|
||||
fn run(t: &mut Activation, ds: Arc<Cap>, spec: TcpWithoutHttp) -> ActorResult {
|
||||
lifecycle::terminate_on_service_restart(t, &ds, &spec);
|
||||
let (addr, gatekeeper, httpd) = match spec.clone() {
|
||||
TcpRelayListener::TcpWithHttp(b) => {
|
||||
let TcpWithHttp { addr, gatekeeper, httpd } = *b;
|
||||
(addr, gatekeeper, Some(httpd))
|
||||
}
|
||||
TcpRelayListener::TcpWithoutHttp(b) => {
|
||||
let TcpWithoutHttp { addr, gatekeeper } = *b;
|
||||
(addr, gatekeeper, None)
|
||||
}
|
||||
};
|
||||
|
||||
let httpd = t.named_field("httpd", None::<Arc<Cap>>);
|
||||
|
||||
{
|
||||
let ad = spec.addr.clone();
|
||||
let ad2 = ad.clone();
|
||||
let gk = spec.gatekeeper.clone();
|
||||
enclose!((ds, httpd) during!(t, ds, language(),
|
||||
<run-service <relay-listener #(&language().unparse(&ad)) #(&AnyValue::domain(gk)) $h>>, |t: &mut Activation| {
|
||||
if let Some(h) = h.value().as_embedded().cloned() {
|
||||
h.assert(t, language(), &syndicate::schemas::http::HttpListener { port: ad2.port.clone() });
|
||||
*t.get_mut(&httpd) = Some(h.clone());
|
||||
t.on_stop(enclose!((httpd) move |t| {
|
||||
let f = t.get_mut(&httpd);
|
||||
if *f == Some(h.clone()) { *f = None; }
|
||||
Ok(())
|
||||
}));
|
||||
}
|
||||
Ok(())
|
||||
}));
|
||||
}
|
||||
|
||||
let TcpWithoutHttp { addr, gatekeeper } = spec.clone();
|
||||
|
||||
let host = addr.host.clone();
|
||||
let port = u16::try_from(&addr.port).map_err(|_| "Invalid TCP port number")?;
|
||||
let facet = t.facet.clone();
|
||||
let facet = t.facet_ref();
|
||||
let trace_collector = t.trace_collector();
|
||||
t.linked_task(Some(AnyValue::symbol("listener")), async move {
|
||||
let listen_addr = format!("{}:{}", host, port);
|
||||
|
@ -83,15 +93,16 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: TcpRelayListener) -> ActorResult
|
|||
let account = Account::new(name.clone(), trace_collector.clone());
|
||||
if !facet.activate(
|
||||
&account, cause, enclose!((trace_collector, httpd) move |t| {
|
||||
let httpd = t.get(&httpd).clone();
|
||||
t.spawn(name, move |t| {
|
||||
Ok(t.linked_task(None, {
|
||||
let facet = t.facet.clone();
|
||||
let facet = t.facet_ref();
|
||||
async move {
|
||||
detect_protocol(trace_collector,
|
||||
facet,
|
||||
stream,
|
||||
gatekeeper,
|
||||
httpd.map(|r| r.clone()),
|
||||
httpd,
|
||||
addr,
|
||||
port).await?;
|
||||
Ok(LinkedTaskTermination::KeepFacet)
|
||||
|
|
|
@ -39,7 +39,7 @@ pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
|
|||
fn run(t: &mut Activation, ds: Arc<Cap>, spec: UnixRelayListener) -> ActorResult {
|
||||
lifecycle::terminate_on_service_restart(t, &ds, &spec);
|
||||
let path_str = spec.addr.path.clone();
|
||||
let facet = t.facet.clone();
|
||||
let facet = t.facet_ref();
|
||||
let trace_collector = t.trace_collector();
|
||||
t.linked_task(Some(AnyValue::symbol("listener")), async move {
|
||||
let listener = bind_unix_listener(&PathBuf::from(path_str)).await?;
|
||||
|
@ -71,7 +71,7 @@ fn run(t: &mut Activation, ds: Arc<Cap>, spec: UnixRelayListener) -> ActorResult
|
|||
&account, cause, enclose!((trace_collector) move |t| {
|
||||
t.spawn(name, |t| {
|
||||
Ok(t.linked_task(None, {
|
||||
let facet = t.facet.clone();
|
||||
let facet = t.facet_ref();
|
||||
async move {
|
||||
tracing::info!(protocol = %"unix");
|
||||
let (i, o) = stream.into_split();
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "syndicate-tools"
|
||||
version = "0.10.0"
|
||||
version = "0.18.0"
|
||||
authors = ["Tony Garnock-Jones <tonyg@leastfixedpoint.com>"]
|
||||
edition = "2018"
|
||||
|
||||
|
@ -10,8 +10,8 @@ repository = "https://git.syndicate-lang.org/syndicate-lang/syndicate-rs"
|
|||
license = "Apache-2.0"
|
||||
|
||||
[dependencies]
|
||||
preserves = "4.992"
|
||||
syndicate = { path = "../syndicate", version = "0.31.0"}
|
||||
preserves = "4.995"
|
||||
syndicate = { path = "../syndicate", version = "0.40.0"}
|
||||
|
||||
clap = { version = "^4.0", features = ["derive"] }
|
||||
clap_complete = "^4.0"
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "syndicate"
|
||||
version = "0.31.1"
|
||||
version = "0.40.0"
|
||||
authors = ["Tony Garnock-Jones <tonyg@leastfixedpoint.com>"]
|
||||
edition = "2018"
|
||||
|
||||
|
@ -13,13 +13,13 @@ license = "Apache-2.0"
|
|||
vendored-openssl = ["openssl/vendored"]
|
||||
|
||||
[build-dependencies]
|
||||
preserves-schema = "5.992"
|
||||
preserves-schema = "5.995"
|
||||
|
||||
[dependencies]
|
||||
preserves = "4.992"
|
||||
preserves-schema = "5.992"
|
||||
preserves = "4.995"
|
||||
preserves-schema = "5.995"
|
||||
|
||||
tokio = { version = "1.10", features = ["io-util", "macros", "rt", "rt-multi-thread", "time"] }
|
||||
tokio = { version = "1.10", features = ["io-std", "io-util", "macros", "rt", "rt-multi-thread", "time"] }
|
||||
tokio-util = "0.6"
|
||||
bytes = "1.0"
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@ use syndicate::during::entity;
|
|||
use syndicate::dataspace::Dataspace;
|
||||
use syndicate::schemas::dataspace::Observe;
|
||||
use syndicate::schemas::dataspace_patterns as p;
|
||||
use syndicate::value::Map;
|
||||
use syndicate::value::NestedValue;
|
||||
use syndicate::value::Value;
|
||||
|
||||
|
@ -88,11 +89,11 @@ pub fn bench_pub(c: &mut Criterion) {
|
|||
.create_cap(t);
|
||||
|
||||
ds.assert(t, language(), &Observe {
|
||||
pattern: p::Pattern::DBind(Box::new(p::DBind {
|
||||
pattern: p::Pattern::DLit(Box::new(p::DLit {
|
||||
value: p::AnyAtom::Symbol("consumer".to_owned()),
|
||||
})),
|
||||
})),
|
||||
pattern: p::Pattern::Bind {
|
||||
pattern: Box::new(p::Pattern::Lit {
|
||||
value: Box::new(p::AnyAtom::Symbol("consumer".to_owned())),
|
||||
}),
|
||||
},
|
||||
observer: shutdown,
|
||||
});
|
||||
|
||||
|
@ -110,24 +111,27 @@ pub fn bench_pub(c: &mut Criterion) {
|
|||
|
||||
ds.assert(t, &(), &AnyValue::symbol("consumer"));
|
||||
ds.assert(t, language(), &Observe {
|
||||
pattern: p::Pattern::DCompound(Box::new(p::DCompound::Rec {
|
||||
label: AnyValue::symbol("Says"),
|
||||
fields: vec![
|
||||
p::Pattern::DLit(Box::new(p::DLit {
|
||||
value: p::AnyAtom::String("bench_pub".to_owned()),
|
||||
})),
|
||||
p::Pattern::DBind(Box::new(p::DBind {
|
||||
pattern: p::Pattern::DDiscard(Box::new(p::DDiscard)),
|
||||
})),
|
||||
]})),
|
||||
pattern: p::Pattern::Group {
|
||||
type_: Box::new(p::GroupType::Rec {
|
||||
label: AnyValue::symbol("Says"),
|
||||
}),
|
||||
entries: Map::from([
|
||||
(p::_Any::new(0), p::Pattern::Lit {
|
||||
value: Box::new(p::AnyAtom::String("bench_pub".to_owned())),
|
||||
}),
|
||||
(p::_Any::new(1), p::Pattern::Bind {
|
||||
pattern: Box::new(p::Pattern::Discard),
|
||||
}),
|
||||
]),
|
||||
},
|
||||
observer: receiver,
|
||||
});
|
||||
ds.assert(t, language(), &Observe {
|
||||
pattern: p::Pattern::DBind(Box::new(p::DBind {
|
||||
pattern: p::Pattern::DLit(Box::new(p::DLit {
|
||||
value: p::AnyAtom::Bool(true),
|
||||
})),
|
||||
})),
|
||||
pattern: p::Pattern::Bind {
|
||||
pattern: Box::new(p::Pattern::Lit {
|
||||
value: Box::new(p::AnyAtom::Bool(true)),
|
||||
}),
|
||||
},
|
||||
observer: shutdown,
|
||||
});
|
||||
|
||||
|
|
|
@ -2,15 +2,15 @@
|
|||
tcp-remote„´³tupleµ´³named³host´³atom³String„„´³named³port´³atom³
SignedInteger„„„„„³TcpPeerInfo´³rec´³lit³tcp-peer„´³tupleµ´³named³handle´³embedded³any„„´³named³local´³refµ„³TcpLocal„„´³named³remote´³refµ„³ TcpRemote„„„„„„³embeddedType´³refµ³ EntityRef„³Cap„„„µ³http„´³schema·³version°³definitions·³Chunk´³orµµ±string´³atom³String„„µ±bytes´³atom³
|
||||
ByteString„„„„³Headers´³dictof´³atom³Symbol„´³atom³String„„³MimeType´³atom³Symbol„³
|
||||
QueryValue´³orµµ±string´³atom³String„„µ±file´³rec´³lit³file„´³tupleµ´³named³filename´³atom³String„„´³named³headers´³refµ„³Headers„„´³named³body´³atom³
|
||||
ByteString„„„„„„„„³HostPattern´³orµµ±host´³atom³String„„µ±any´³lit€„„„„³HttpBinding´³rec´³lit³ http-bind„´³tupleµ´³named³host´³refµ„³HostPattern„„´³named³port´³atom³
SignedInteger„„´³named³method´³refµ„³
MethodPattern„„´³named³path´³refµ„³PathPattern„„´³named³handler´³embedded´³refµ„³HttpRequest„„„„„„³HttpContext´³rec´³lit³request„´³tupleµ´³named³req´³refµ„³HttpRequest„„´³named³res´³embedded´³refµ„³HttpResponse„„„„„„³HttpRequest´³rec´³lit³http-request„´³tupleµ´³named³sequenceNumber´³atom³
SignedInteger„„´³named³host´³atom³String„„´³named³port´³atom³
SignedInteger„„´³named³method´³atom³Symbol„„´³named³path´³seqof´³atom³String„„„´³named³headers´³refµ„³Headers„„´³named³query´³dictof´³atom³Symbol„´³seqof´³refµ„³
|
||||
ByteString„„„„„„„„³HostPattern´³orµµ±host´³atom³String„„µ±any´³lit€„„„„³HttpBinding´³rec´³lit³ http-bind„´³tupleµ´³named³host´³refµ„³HostPattern„„´³named³port´³atom³
SignedInteger„„´³named³method´³refµ„³
MethodPattern„„´³named³path´³refµ„³PathPattern„„´³named³handler´³embedded´³refµ„³HttpRequest„„„„„„³HttpContext´³rec´³lit³request„´³tupleµ´³named³req´³refµ„³HttpRequest„„´³named³res´³embedded´³refµ„³HttpResponse„„„„„„³HttpRequest´³rec´³lit³http-request„´³tupleµ´³named³sequenceNumber´³atom³
SignedInteger„„´³named³host´³refµ„³RequestHost„„´³named³port´³atom³
SignedInteger„„´³named³method´³atom³Symbol„„´³named³path´³seqof´³atom³String„„„´³named³headers´³refµ„³Headers„„´³named³query´³dictof´³atom³Symbol„´³seqof´³refµ„³
|
||||
QueryValue„„„„´³named³body´³refµ„³RequestBody„„„„„³HttpService´³rec´³lit³http-service„´³tupleµ´³named³host´³refµ„³HostPattern„„´³named³port´³atom³
SignedInteger„„´³named³method´³refµ„³
MethodPattern„„´³named³path´³refµ„³PathPattern„„„„„³PathPattern´³seqof´³refµ„³PathPatternElement„„³RequestBody´³orµµ±present´³atom³
|
||||
ByteString„„µ±absent´³lit€„„„„³HttpListener´³rec´³lit³
http-listener„´³tupleµ´³named³port´³atom³
SignedInteger„„„„„³HttpResponse´³orµµ±status´³rec´³lit³status„´³tupleµ´³named³code´³atom³
SignedInteger„„´³named³message´³atom³String„„„„„„µ±header´³rec´³lit³header„´³tupleµ´³named³name´³atom³Symbol„„´³named³value´³atom³String„„„„„„µ±chunk´³rec´³lit³chunk„´³tupleµ´³named³chunk´³refµ„³Chunk„„„„„„µ±done´³rec´³lit³done„´³tupleµ´³named³chunk´³refµ„³Chunk„„„„„„„„³
MethodPattern´³orµµ±any´³lit€„„µ±specific´³atom³Symbol„„„„³PathPatternElement´³orµµ±label´³atom³String„„µ±wildcard´³lit³_„„µ±rest´³lit³...„„„„„³embeddedType€„„µ³noise„´³schema·³version°³definitions·³Packet´³orµµ±complete´³atom³
|
||||
ByteString„„µ±absent´³lit€„„„„³RequestHost´³orµµ±present´³atom³String„„µ±absent´³lit€„„„„³HttpListener´³rec´³lit³
http-listener„´³tupleµ´³named³port´³atom³
SignedInteger„„„„„³HttpResponse´³orµµ±status´³rec´³lit³status„´³tupleµ´³named³code´³atom³
SignedInteger„„´³named³message´³atom³String„„„„„„µ±header´³rec´³lit³header„´³tupleµ´³named³name´³atom³Symbol„„´³named³value´³atom³String„„„„„„µ±chunk´³rec´³lit³chunk„´³tupleµ´³named³chunk´³refµ„³Chunk„„„„„„µ±done´³rec´³lit³done„´³tupleµ´³named³chunk´³refµ„³Chunk„„„„„„„„³
MethodPattern´³orµµ±any´³lit€„„µ±specific´³atom³Symbol„„„„³PathPatternElement´³orµµ±label´³atom³String„„µ±wildcard´³lit³_„„µ±rest´³lit³...„„„„„³embeddedType€„„µ³noise„´³schema·³version°³definitions·³Packet´³orµµ±complete´³atom³
|
||||
ByteString„„µ±
|
||||
fragmented´³seqof´³atom³
|
||||
ByteString„„„„„³ NoiseSpec´³andµ´³dict·³key´³named³key´³atom³
|
||||
ByteString„„³service´³named³service´³refµ„³ServiceSelector„„„„´³named³protocol´³refµ„³
NoiseProtocol„„´³named³
preSharedKeys´³refµ„³NoisePreSharedKeys„„„„³
NoiseProtocol´³orµµ±present´³dict·³protocol´³named³protocol´³atom³String„„„„„µ±invalid´³dict·³protocol´³named³protocol³any„„„„µ±absent´³dict·„„„„„³
NoiseStepType´³lit³noise„³SecretKeyField´³orµµ±present´³dict·³ secretKey´³named³ secretKey´³atom³
|
||||
ByteString„„„„„³ Initiator´³rec´³lit³ initiator„´³tupleµ´³named³initiatorSession´³embedded´³refµ„³Packet„„„„„„³ NoiseSpec´³andµ´³dict·³key´³named³key´³atom³
|
||||
ByteString„„³service´³named³service´³refµ„³ServiceSelector„„„„´³named³protocol´³refµ„³
NoiseProtocol„„´³named³
preSharedKeys´³refµ„³NoisePreSharedKeys„„„„³SessionItem´³orµµ± Initiator´³refµ„³ Initiator„„µ±Packet´³refµ„³Packet„„„„³
NoiseProtocol´³orµµ±present´³dict·³protocol´³named³protocol´³atom³String„„„„„µ±invalid´³dict·³protocol´³named³protocol³any„„„„µ±absent´³dict·„„„„„³
NoiseStepType´³lit³noise„³SecretKeyField´³orµµ±present´³dict·³ secretKey´³named³ secretKey´³atom³
|
||||
ByteString„„„„„µ±invalid´³dict·³ secretKey´³named³ secretKey³any„„„„µ±absent´³dict·„„„„„³DefaultProtocol´³lit±!Noise_NK_25519_ChaChaPoly_BLAKE2s„³NoiseStepDetail´³refµ„³ServiceSelector„³ServiceSelector³any³NoiseServiceSpec´³andµ´³named³base´³refµ„³ NoiseSpec„„´³named³ secretKey´³refµ„³SecretKeyField„„„„³NoisePreSharedKeys´³orµµ±present´³dict·³
preSharedKeys´³named³
preSharedKeys´³seqof´³atom³
|
||||
ByteString„„„„„„µ±invalid´³dict·³
preSharedKeys´³named³
preSharedKeys³any„„„„µ±absent´³dict·„„„„„³NoisePathStepDetail´³refµ„³ NoiseSpec„³NoiseDescriptionDetail´³refµ„³NoiseServiceSpec„„³embeddedType€„„µ³timer„´³schema·³version°³definitions·³SetTimer´³rec´³lit³ set-timer„´³tupleµ´³named³label³any„´³named³seconds´³atom³Double„„´³named³kind´³refµ„³ TimerKind„„„„„³ LaterThan´³rec´³lit³
|
||||
ByteString„„„„„„µ±invalid´³dict·³
preSharedKeys´³named³
preSharedKeys³any„„„„µ±absent´³dict·„„„„„³NoisePathStepDetail´³refµ„³ NoiseSpec„³NoiseDescriptionDetail´³refµ„³NoiseServiceSpec„„³embeddedType´³refµ³ EntityRef„³Cap„„„µ³timer„´³schema·³version°³definitions·³SetTimer´³rec´³lit³ set-timer„´³tupleµ´³named³label³any„´³named³seconds´³atom³Double„„´³named³kind´³refµ„³ TimerKind„„„„„³ LaterThan´³rec´³lit³
|
||||
later-than„´³tupleµ´³named³seconds´³atom³Double„„„„„³ TimerKind´³orµµ±relative´³lit³relative„„µ±absolute´³lit³absolute„„µ±clear´³lit³clear„„„„³TimerExpired´³rec´³lit³
timer-expired„´³tupleµ´³named³label³any„´³named³seconds´³atom³Double„„„„„„³embeddedType€„„µ³trace„´³schema·³version°³definitions·³Oid³any³Name´³orµµ± anonymous´³rec´³lit³ anonymous„´³tupleµ„„„„µ±named´³rec´³lit³named„´³tupleµ´³named³name³any„„„„„„„³Target´³rec´³lit³entity„´³tupleµ´³named³actor´³refµ„³ActorId„„´³named³facet´³refµ„³FacetId„„´³named³oid´³refµ„³Oid„„„„„³TaskId³any³TurnId³any³ActorId³any³FacetId³any³ TurnCause´³orµµ±turn´³rec´³lit³ caused-by„´³tupleµ´³named³id´³refµ„³TurnId„„„„„„µ±cleanup´³rec´³lit³cleanup„´³tupleµ„„„„µ±linkedTaskRelease´³rec´³lit³linked-task-release„´³tupleµ´³named³id´³refµ„³TaskId„„´³named³reason´³refµ„³LinkedTaskReleaseReason„„„„„„µ±periodicActivation´³rec´³lit³periodic-activation„´³tupleµ´³named³period´³atom³Double„„„„„„µ±delay´³rec´³lit³delay„´³tupleµ´³named³causingTurn´³refµ„³TurnId„„´³named³amount´³atom³Double„„„„„„µ±external´³rec´³lit³external„´³tupleµ´³named³description³any„„„„„„„³ TurnEvent´³orµµ±assert´³rec´³lit³assert„´³tupleµ´³named³ assertion´³refµ„³AssertionDescription„„´³named³handle´³refµ³protocol„³Handle„„„„„„µ±retract´³rec´³lit³retract„´³tupleµ´³named³handle´³refµ³protocol„³Handle„„„„„„µ±message´³rec´³lit³message„´³tupleµ´³named³body´³refµ„³AssertionDescription„„„„„„µ±sync´³rec´³lit³sync„´³tupleµ´³named³peer´³refµ„³Target„„„„„„µ± breakLink´³rec´³lit³
|
||||
break-link„´³tupleµ´³named³source´³refµ„³ActorId„„´³named³handle´³refµ³protocol„³Handle„„„„„„„„³
|
||||
ExitStatus´³orµµ±ok´³lit³ok„„µ±Error´³refµ³protocol„³Error„„„„³
|
||||
|
@ -18,9 +18,13 @@ TraceEntry
|
|||
ExitStatus„„„„„„„„³FacetStopReason´³orµµ±explicitAction´³lit³explicit-action„„µ±inert´³lit³inert„„µ±parentStopping´³lit³parent-stopping„„µ±
actorStopping´³lit³actor-stopping„„„„³TurnDescription´³rec´³lit³turn„´³tupleµ´³named³id´³refµ„³TurnId„„´³named³cause´³refµ„³ TurnCause„„´³named³actions´³seqof´³refµ„³ActionDescription„„„„„„³ActionDescription´³orµµ±dequeue´³rec´³lit³dequeue„´³tupleµ´³named³event´³refµ„³TargetedTurnEvent„„„„„„µ±enqueue´³rec´³lit³enqueue„´³tupleµ´³named³event´³refµ„³TargetedTurnEvent„„„„„„µ±dequeueInternal´³rec´³lit³dequeue-internal„´³tupleµ´³named³event´³refµ„³TargetedTurnEvent„„„„„„µ±enqueueInternal´³rec´³lit³enqueue-internal„´³tupleµ´³named³event´³refµ„³TargetedTurnEvent„„„„„„µ±spawn´³rec´³lit³spawn„´³tupleµ´³named³link´³atom³Boolean„„´³named³id´³refµ„³ActorId„„„„„„µ±link´³rec´³lit³link„´³tupleµ´³named³parentActor´³refµ„³ActorId„„´³named³
childToParent´³refµ³protocol„³Handle„„´³named³
|
||||
childActor´³refµ„³ActorId„„´³named³
parentToChild´³refµ³protocol„³Handle„„„„„„µ±
|
||||
facetStart´³rec´³lit³facet-start„´³tupleµ´³named³path´³seqof´³refµ„³FacetId„„„„„„„µ± facetStop´³rec´³lit³
|
||||
facet-stop„´³tupleµ´³named³path´³seqof´³refµ„³FacetId„„„´³named³reason´³refµ„³FacetStopReason„„„„„„µ±linkedTaskStart´³rec´³lit³linked-task-start„´³tupleµ´³named³taskName´³refµ„³Name„„´³named³id´³refµ„³TaskId„„„„„„„„³TargetedTurnEvent´³rec´³lit³event„´³tupleµ´³named³target´³refµ„³Target„„´³named³detail´³refµ„³ TurnEvent„„„„„³AssertionDescription´³orµµ±value´³rec´³lit³value„´³tupleµ´³named³value³any„„„„„µ±opaque´³rec´³lit³opaque„´³tupleµ´³named³description³any„„„„„„„³LinkedTaskReleaseReason´³orµµ± cancelled´³lit³ cancelled„„µ±normal´³lit³normal„„„„„³embeddedType´³refµ³ EntityRef„³Cap„„„µ³stream„´³schema·³version°³definitions·³Mode´³orµµ±bytes´³lit³bytes„„µ±lines´³refµ„³LineMode„„µ±packet´³rec´³lit³packet„´³tupleµ´³named³size´³atom³
SignedInteger„„„„„„µ±object´³rec´³lit³object„´³tupleµ´³named³description³any„„„„„„„³Sink´³orµµ±source´³rec´³lit³source„´³tupleµ´³named³
|
||||
facet-stop„´³tupleµ´³named³path´³seqof´³refµ„³FacetId„„„´³named³reason´³refµ„³FacetStopReason„„„„„„µ±linkedTaskStart´³rec´³lit³linked-task-start„´³tupleµ´³named³taskName´³refµ„³Name„„´³named³id´³refµ„³TaskId„„„„„„„„³TargetedTurnEvent´³rec´³lit³event„´³tupleµ´³named³target´³refµ„³Target„„´³named³detail´³refµ„³ TurnEvent„„„„„³AssertionDescription´³orµµ±value´³rec´³lit³value„´³tupleµ´³named³value³any„„„„„µ±opaque´³rec´³lit³opaque„´³tupleµ´³named³description³any„„„„„„„³LinkedTaskReleaseReason´³orµµ± cancelled´³lit³ cancelled„„µ±normal´³lit³normal„„„„„³embeddedType´³refµ³ EntityRef„³Cap„„„µ³stdenv„´³schema·³version°³definitions·³
StandardRoute´³orµµ±standard´³tuplePrefixµ´³named³
|
||||
transports´³seqof´³refµ„³StandardTransport„„„´³named³key´³atom³
|
||||
ByteString„„´³named³service³any„´³named³sig´³atom³
|
||||
ByteString„„´³named³oid³any„„´³named³caveats´³seqof´³refµ³sturdy„³Caveat„„„„„µ±general´³refµ³
|
||||
gatekeeper„³Route„„„„³StandardTransport´³orµµ±wsUrl´³atom³String„„µ±other³any„„„„³embeddedType€„„µ³stream„´³schema·³version°³definitions·³Mode´³orµµ±bytes´³lit³bytes„„µ±lines´³refµ„³LineMode„„µ±packet´³rec´³lit³packet„´³tupleµ´³named³size´³atom³
SignedInteger„„„„„„µ±object´³rec´³lit³object„´³tupleµ´³named³description³any„„„„„„„³Sink´³orµµ±source´³rec´³lit³source„´³tupleµ´³named³
|
||||
controller´³embedded´³refµ„³Source„„„„„„„µ±StreamError´³refµ„³StreamError„„µ±data´³rec´³lit³data„´³tupleµ´³named³payload³any„´³named³mode´³refµ„³Mode„„„„„„µ±eof´³rec´³lit³eof„´³tupleµ„„„„„„³Source´³orµµ±sink´³rec´³lit³sink„´³tupleµ´³named³
|
||||
controller´³embedded´³refµ„³Sink„„„„„„„µ±StreamError´³refµ„³StreamError„„µ±credit´³rec´³lit³credit„´³tupleµ´³named³amount´³refµ„³CreditAmount„„´³named³mode´³refµ„³Mode„„„„„„„„³LineMode´³orµµ±lf´³lit³lf„„µ±crlf´³lit³crlf„„„„³StreamError´³rec´³lit³error„´³tupleµ´³named³message´³atom³String„„„„„³CreditAmount´³orµµ±count´³atom³
SignedInteger„„µ± unbounded´³lit³ unbounded„„„„³StreamConnection´³rec´³lit³stream-connection„´³tupleµ´³named³source´³embedded´³refµ„³Source„„„´³named³sink´³embedded´³refµ„³Sink„„„´³named³spec³any„„„„³StreamListenerError´³rec´³lit³stream-listener-error„´³tupleµ´³named³spec³any„´³named³message´³atom³String„„„„„³StreamListenerReady´³rec´³lit³stream-listener-ready„´³tupleµ´³named³spec³any„„„„„³embeddedType´³refµ³ EntityRef„³Cap„„„µ³sturdy„´³schema·³version°³definitions·³Lit´³rec´³lit³lit„´³tupleµ´³named³value³any„„„„³Oid´³atom³
SignedInteger„³Alts´³rec´³lit³or„´³tupleµ´³named³alternatives´³seqof´³refµ„³Rewrite„„„„„„³PAnd´³rec´³lit³and„´³tupleµ´³named³patterns´³seqof´³refµ„³Pattern„„„„„„³PNot´³rec´³lit³not„´³tupleµ´³named³pattern´³refµ„³Pattern„„„„„³TRef´³rec´³lit³ref„´³tupleµ´³named³binding´³atom³
SignedInteger„„„„„³PAtom´³orµµ±Boolean´³lit³Boolean„„µ±Float´³lit³Float„„µ±Double´³lit³Double„„µ±
SignedInteger´³lit³
SignedInteger„„µ±String´³lit³String„„µ±
|
||||
controller´³embedded´³refµ„³Sink„„„„„„„µ±StreamError´³refµ„³StreamError„„µ±credit´³rec´³lit³credit„´³tupleµ´³named³amount´³refµ„³CreditAmount„„´³named³mode´³refµ„³Mode„„„„„„„„³LineMode´³orµµ±lf´³lit³lf„„µ±crlf´³lit³crlf„„„„³StreamError´³rec´³lit³error„´³tupleµ´³named³message´³atom³String„„„„„³CreditAmount´³orµµ±count´³atom³
SignedInteger„„µ± unbounded´³lit³ unbounded„„„„³StreamConnection´³rec´³lit³stream-connection„´³tupleµ´³named³source´³embedded´³refµ„³Source„„„´³named³sink´³embedded´³refµ„³Sink„„„´³named³spec³any„„„„³StreamListenerError´³rec´³lit³stream-listener-error„´³tupleµ´³named³spec³any„´³named³message´³atom³String„„„„„³StreamListenerReady´³rec´³lit³stream-listener-ready„´³tupleµ´³named³spec³any„„„„„³embeddedType´³refµ³ EntityRef„³Cap„„„µ³sturdy„´³schema·³version°³definitions·³Lit´³rec´³lit³lit„´³tupleµ´³named³value³any„„„„³Oid´³atom³
SignedInteger„³Alts´³rec´³lit³or„´³tupleµ´³named³alternatives´³seqof´³refµ„³Rewrite„„„„„„³PAnd´³rec´³lit³and„´³tupleµ´³named³patterns´³seqof´³refµ„³Pattern„„„„„„³PNot´³rec´³lit³not„´³tupleµ´³named³pattern´³refµ„³Pattern„„„„„³TRef´³rec´³lit³ref„´³tupleµ´³named³binding´³atom³
SignedInteger„„„„„³PAtom´³orµµ±Boolean´³lit³Boolean„„µ±Double´³lit³Double„„µ±
SignedInteger´³lit³
SignedInteger„„µ±String´³lit³String„„µ±
|
||||
ByteString´³lit³
|
||||
ByteString„„µ±Symbol´³lit³Symbol„„„„³PBind´³rec´³lit³bind„´³tupleµ´³named³pattern´³refµ„³Pattern„„„„„³Caveat´³orµµ±Rewrite´³refµ„³Rewrite„„µ±Alts´³refµ„³Alts„„µ±Reject´³refµ„³Reject„„µ±unknown³any„„„³Reject´³rec´³lit³reject„´³tupleµ´³named³pattern´³refµ„³Pattern„„„„„³Pattern´³orµµ±PDiscard´³refµ„³PDiscard„„µ±PAtom´³refµ„³PAtom„„µ± PEmbedded´³refµ„³ PEmbedded„„µ±PBind´³refµ„³PBind„„µ±PAnd´³refµ„³PAnd„„µ±PNot´³refµ„³PNot„„µ±Lit´³refµ„³Lit„„µ± PCompound´³refµ„³ PCompound„„„„³Rewrite´³rec´³lit³rewrite„´³tupleµ´³named³pattern´³refµ„³Pattern„„´³named³template´³refµ„³Template„„„„„³WireRef´³orµµ±mine´³tupleµ´³lit° |