Refactor gatekeeper implementation for new protocols.
This commit is contained in:
parent
9a5d452754
commit
7e8dcef0e2
|
@ -1,3 +1,3 @@
|
|||
let ?root_ds = dataspace
|
||||
<require-service <relay-listener <tcp "0.0.0.0" 9001> $gatekeeper>>
|
||||
<bind "syndicate" #x"" $root_ds>
|
||||
<bind <ref "syndicate" #x""> $root_ds #f>
|
||||
|
|
|
@ -16,15 +16,15 @@ pub fn dirty_resolve(stream: &mut TcpStream, dataspace: &str) -> Result<(), Box<
|
|||
let iolang = Language::<IOValue>::default();
|
||||
|
||||
let sturdyref = sturdy::SturdyRef::from_hex(dataspace)?;
|
||||
let sturdyref = iolang.parse(&syndicate::language().unparse(&sturdyref)
|
||||
.copy_via(&mut |_| Err("no!"))?)?;
|
||||
let sturdyref: IOValue = syndicate::language().unparse(&sturdyref)
|
||||
.copy_via(&mut |_| Err("no!"))?;
|
||||
|
||||
let resolve_turn = P::Turn(vec![
|
||||
P::TurnEvent {
|
||||
oid: P::Oid(0.into()),
|
||||
event: P::Event::Assert(Box::new(P::Assert {
|
||||
assertion: P::Assertion(iolang.unparse(&gatekeeper::Resolve::<IOValue> {
|
||||
sturdyref,
|
||||
step: sturdyref,
|
||||
observer: iolang.unparse(&sturdy::WireRef::Mine {
|
||||
oid: Box::new(sturdy::Oid(0.into())),
|
||||
}),
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
´³bundle·µ³
documentation„´³schema·³version‘³definitions·³Url´³orµµ±present´³dict·³url´³named³url´³atom³String„„„„„µ±invalid´³dict·³url´³named³url³any„„„„µ±absent´³dict·„„„„„³IOList´³orµµ±bytes´³atom³
|
||||
ByteString„„µ±string´³atom³String„„µ±nested´³seqof´³refµ„³IOList„„„„„³Metadata´³rec´³lit³metadata„´³tupleµ´³named³object³any„´³named³info´³dictof´³atom³Symbol„³any„„„„„³Description´³orµµ±present´³dict·³description´³named³description´³refµ„³IOList„„„„„µ±invalid´³dict·³description´³named³description³any„„„„µ±absent´³dict·„„„„„„³embeddedType€„„µ³
gatekeeperMux„´³schema·³version‘³definitions·³API´³orµµ±Resolve´³refµ³
|
||||
gatekeeper„³Resolve„„µ±Connect´³refµ³noise„³Connect„„„„³NoiseService´³rec´³lit³noise„´³tupleµ´³named³spec´³refµ³noise„³ NoiseSpec„„´³named³service´³embedded³any„„„„„³SecretKeyField´³orµµ±present´³dict·³ secretKey´³named³ secretKey´³atom³
|
||||
ByteString„„„„„µ±invalid´³dict·³ secretKey´³named³ secretKey³any„„„„µ±absent´³dict·„„„„„³NoiseServiceSpec´³andµ´³named³base´³refµ³noise„³ NoiseSpec„„´³named³ secretKey´³refµ„³SecretKeyField„„„„„³embeddedType€„„µ³externalServices„´³schema·³version‘³definitions·³Process´³orµµ±simple´³refµ„³CommandLine„„µ±full´³refµ„³FullProcess„„„„³Service´³refµ„³
DaemonService„³ClearEnv´³orµµ±present´³dict·³clearEnv´³named³clearEnv´³atom³Boolean„„„„„µ±invalid´³dict·³clearEnv´³named³clearEnv³any„„„„µ±absent´³dict·„„„„„³EnvValue´³orµµ±set´³atom³String„„µ±remove´³lit€„„µ±invalid³any„„„³Protocol´³orµµ±none´³lit³none„„µ±binarySyndicate´³lit³application/syndicate„„µ±
textSyndicate´³lit³text/syndicate„„„„³
|
||||
ByteString„„µ±string´³atom³String„„µ±nested´³seqof´³refµ„³IOList„„„„„³Metadata´³rec´³lit³metadata„´³tupleµ´³named³object³any„´³named³info´³dictof´³atom³Symbol„³any„„„„„³Description´³orµµ±present´³dict·³description´³named³description´³refµ„³IOList„„„„„µ±invalid´³dict·³description´³named³description³any„„„„µ±absent´³dict·„„„„„„³embeddedType€„„µ³externalServices„´³schema·³version‘³definitions·³Process´³orµµ±simple´³refµ„³CommandLine„„µ±full´³refµ„³FullProcess„„„„³Service´³refµ„³
DaemonService„³ClearEnv´³orµµ±present´³dict·³clearEnv´³named³clearEnv´³atom³Boolean„„„„„µ±invalid´³dict·³clearEnv´³named³clearEnv³any„„„„µ±absent´³dict·„„„„„³EnvValue´³orµµ±set´³atom³String„„µ±remove´³lit€„„µ±invalid³any„„„³Protocol´³orµµ±none´³lit³none„„µ±binarySyndicate´³lit³application/syndicate„„µ±
textSyndicate´³lit³text/syndicate„„„„³
|
||||
ProcessDir´³orµµ±present´³dict·³dir´³named³dir´³atom³String„„„„„µ±invalid´³dict·³dir´³named³dir³any„„„„µ±absent´³dict·„„„„„³
|
||||
ProcessEnv´³orµµ±present´³dict·³env´³named³env´³dictof´³refµ„³EnvVariable„´³refµ„³EnvValue„„„„„„µ±invalid´³dict·³env´³named³env³any„„„„µ±absent´³dict·„„„„„³CommandLine´³orµµ±shell´³atom³String„„µ±full´³refµ„³FullCommandLine„„„„³EnvVariable´³orµµ±string´³atom³String„„µ±symbol´³atom³Symbol„„µ±invalid³any„„„³FullProcess´³andµ´³dict·³argv´³named³argv´³refµ„³CommandLine„„„„´³named³env´³refµ„³
|
||||
ProcessEnv„„´³named³dir´³refµ„³
|
||||
|
|
|
@ -1,7 +0,0 @@
|
|||
version 1 .
|
||||
|
||||
API = gatekeeper.Resolve / noise.Connect .
|
||||
|
||||
NoiseService = <noise @spec noise.NoiseSpec @service #!any> .
|
||||
NoiseServiceSpec = @base noise.NoiseSpec & @secretKey SecretKeyField .
|
||||
SecretKeyField = @present { secretKey: bytes } / @invalid { secretKey: any } / @absent {} .
|
|
@ -20,41 +20,117 @@ use syndicate::value::NestedValue;
|
|||
use syndicate::schemas::dataspace;
|
||||
use syndicate::schemas::gatekeeper;
|
||||
use syndicate::schemas::noise;
|
||||
use syndicate::schemas::sturdy;
|
||||
|
||||
use crate::language::language;
|
||||
use crate::schemas::gatekeeper_mux::Api;
|
||||
use crate::schemas::gatekeeper_mux::NoiseServiceSpec;
|
||||
use crate::schemas::gatekeeper_mux::SecretKeyField;
|
||||
|
||||
// pub fn bind(
|
||||
// t: &mut Activation,
|
||||
// ds: &Arc<Cap>,
|
||||
// oid: syndicate::schemas::sturdy::_Any,
|
||||
// key: [u8; 16],
|
||||
// target: Arc<Cap>,
|
||||
// ) {
|
||||
// let sr = sturdy::SturdyRef::mint(oid.clone(), &key);
|
||||
// tracing::info!(cap = ?language().unparse(&sr), hex = %sr.to_hex());
|
||||
// ds.assert(t, language(), &gatekeeper::Bind { oid, key: key.to_vec(), target });
|
||||
// }
|
||||
use syndicate_macros::during;
|
||||
use syndicate_macros::pattern;
|
||||
|
||||
pub fn handle_assertion(
|
||||
ds: &mut Arc<Cap>,
|
||||
pub fn handle_binds(t: &mut Activation, ds: &Arc<Cap>) -> ActorResult {
|
||||
Ok(during!(t, ds, language(), <bind $description $target $observer>, |t: &mut Activation| {
|
||||
t.spawn_link(None, move |t| { handle_bind(t, description, target, observer) });
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
|
||||
fn handle_bind(
|
||||
t: &mut Activation,
|
||||
a: Api<AnyValue>,
|
||||
) -> DuringResult<Arc<Cap>> {
|
||||
match a {
|
||||
Api::Resolve(resolve_box) => handle_resolve(ds, t, *resolve_box),
|
||||
Api::Connect(connect_box) => handle_connect(ds, t, *connect_box),
|
||||
description: AnyValue,
|
||||
target: AnyValue,
|
||||
observer: AnyValue,
|
||||
) -> ActorResult {
|
||||
let _target = target.value().to_embedded()?;
|
||||
let observer = language().parse::<gatekeeper::BindObserver>(&observer)?;
|
||||
|
||||
if let Ok(s) = language().parse::<sturdy::SturdyService>(&description) {
|
||||
let sr = sturdy::SturdyRef::mint(s.oid, &s.key);
|
||||
if let gatekeeper::BindObserver::Present(o) = observer {
|
||||
o.assert(t, language(), &gatekeeper::Bound::Bound {
|
||||
step: language().unparse(&sr),
|
||||
});
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Ok(s) = language().parse::<noise::NoiseService<AnyValue>>(&description) {
|
||||
match validate_noise_spec(s.spec) {
|
||||
Ok(spec) => if let gatekeeper::BindObserver::Present(o) = observer {
|
||||
o.assert(t, language(), &gatekeeper::Bound::Bound {
|
||||
step: language().unparse(&noise::NoiseRouteStep {
|
||||
spec: noise::NoiseSpec {
|
||||
key: spec.public_key,
|
||||
service: noise::ServiceSelector(spec.service),
|
||||
protocol: if spec.protocol == default_noise_protocol() {
|
||||
noise::NoiseProtocol::Absent
|
||||
} else {
|
||||
noise::NoiseProtocol::Present {
|
||||
protocol: spec.protocol,
|
||||
}
|
||||
},
|
||||
pre_shared_keys: if spec.psks.is_empty() {
|
||||
noise::NoisePreSharedKeys::Absent
|
||||
} else {
|
||||
noise::NoisePreSharedKeys::Present {
|
||||
pre_shared_keys: spec.psks,
|
||||
}
|
||||
},
|
||||
},
|
||||
}),
|
||||
});
|
||||
},
|
||||
Err(e) => {
|
||||
if let gatekeeper::BindObserver::Present(o) = observer {
|
||||
o.assert(t, language(), &gatekeeper::Bound::Rejected(
|
||||
Box::new(gatekeeper::Rejected {
|
||||
detail: AnyValue::new(format!("{}", &e)),
|
||||
})));
|
||||
}
|
||||
tracing::error!("Invalid noise bind description: {}", e);
|
||||
}
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let gatekeeper::BindObserver::Present(o) = observer {
|
||||
o.assert(t, language(), &gatekeeper::Bound::Rejected(
|
||||
Box::new(gatekeeper::Rejected {
|
||||
detail: AnyValue::symbol("unsupported"),
|
||||
})));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn eventually_retract<E>(h: Option<Handle>) -> DuringResult<E> {
|
||||
if let Some(h) = h {
|
||||
Ok(Some(Box::new(move |_state, t: &mut Activation| Ok(t.retract(h)))))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_resolve(
|
||||
pub fn handle_resolves(
|
||||
ds: &mut Arc<Cap>,
|
||||
t: &mut Activation,
|
||||
a: gatekeeper::Resolve,
|
||||
) -> DuringResult<Arc<Cap>> {
|
||||
let gatekeeper::Resolve { sturdyref, observer } = a;
|
||||
if let Ok(s) = language().parse::<sturdy::SturdyStep>(&a.step) {
|
||||
return handle_resolve_sturdyref(ds, t, s.0, a.observer);
|
||||
}
|
||||
if let Ok(s) = language().parse::<noise::NoiseStep<AnyValue>>(&a.step) {
|
||||
return handle_resolve_noise(ds, t, s.service.0, a.observer);
|
||||
}
|
||||
eventually_retract(ds.assert(t, language(), &gatekeeper::Rejected {
|
||||
detail: AnyValue::symbol("unsupported"),
|
||||
}))
|
||||
}
|
||||
|
||||
fn handle_resolve_sturdyref(
|
||||
ds: &mut Arc<Cap>,
|
||||
t: &mut Activation,
|
||||
sturdyref: sturdy::SturdyRef,
|
||||
observer: Arc<Cap>,
|
||||
) -> DuringResult<Arc<Cap>> {
|
||||
let queried_oid = sturdyref.oid.clone();
|
||||
let handler = syndicate::entity(observer)
|
||||
.on_asserted(move |observer, t, a: AnyValue| {
|
||||
|
@ -65,91 +141,111 @@ fn handle_resolve(
|
|||
Err(e) => {
|
||||
tracing::warn!(sturdyref = ?language().unparse(&sturdyref),
|
||||
"sturdyref failed validation: {}", e);
|
||||
Ok(None)
|
||||
eventually_retract(observer.assert(t, language(), &gatekeeper::Resolved::Rejected(
|
||||
Box::new(gatekeeper::Rejected {
|
||||
detail: AnyValue::symbol("sturdyref-failed-validation"),
|
||||
}))))
|
||||
},
|
||||
Ok(target) => {
|
||||
tracing::trace!(sturdyref = ?language().unparse(&sturdyref),
|
||||
?target,
|
||||
"sturdyref resolved");
|
||||
if let Some(h) = observer.assert(t, &(), &AnyValue::domain(target)) {
|
||||
Ok(Some(Box::new(move |_observer, t| Ok(t.retract(h)))))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
eventually_retract(observer.assert(t, language(), &gatekeeper::Resolved::Accepted {
|
||||
responder_session: target,
|
||||
}))
|
||||
}
|
||||
}
|
||||
})
|
||||
.create_cap(t);
|
||||
if let Some(oh) = ds.assert(t, language(), &dataspace::Observe {
|
||||
eventually_retract(ds.assert(t, language(), &dataspace::Observe {
|
||||
// TODO: codegen plugin to generate pattern constructors
|
||||
pattern: syndicate_macros::pattern!{<bind #(&queried_oid) $ $>},
|
||||
pattern: pattern!{<bind <ref #(&queried_oid) $> $ _>},
|
||||
observer: handler,
|
||||
}) {
|
||||
Ok(Some(Box::new(move |_ds, t| Ok(t.retract(oh)))))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
fn handle_connect(
|
||||
struct ValidatedNoiseSpec {
|
||||
service: AnyValue,
|
||||
protocol: String,
|
||||
pattern: HandshakePattern,
|
||||
psks: Vec<Vec<u8>>,
|
||||
secret_key: Option<Vec<u8>>,
|
||||
public_key: Vec<u8>,
|
||||
}
|
||||
|
||||
fn default_noise_protocol() -> String {
|
||||
language().unparse(&noise::DefaultProtocol).value().to_string().unwrap().clone()
|
||||
}
|
||||
|
||||
fn validate_noise_spec(
|
||||
spec: noise::NoiseServiceSpec<AnyValue>,
|
||||
) -> Result<ValidatedNoiseSpec, ActorError> {
|
||||
let protocol = match spec.base.protocol {
|
||||
noise::NoiseProtocol::Present { protocol } => protocol,
|
||||
noise::NoiseProtocol::Invalid { protocol } =>
|
||||
Err(format!("Invalid noise protocol {:?}", protocol))?,
|
||||
noise::NoiseProtocol::Absent => default_noise_protocol(),
|
||||
};
|
||||
|
||||
const PREFIX: &'static str = "Noise_";
|
||||
const SUFFIX: &'static str = "_25519_ChaChaPoly_BLAKE2s";
|
||||
if !protocol.starts_with(PREFIX) || !protocol.ends_with(SUFFIX) {
|
||||
Err(format!("Unsupported protocol {:?}", protocol))?;
|
||||
}
|
||||
|
||||
let pattern_name = &protocol[PREFIX.len()..(protocol.len()-SUFFIX.len())];
|
||||
let pattern = lookup_pattern(pattern_name).ok_or_else::<ActorError, _>(
|
||||
|| format!("Unsupported handshake pattern {:?}", pattern_name).into())?;
|
||||
|
||||
let psks = match spec.base.pre_shared_keys {
|
||||
noise::NoisePreSharedKeys::Present { pre_shared_keys } => pre_shared_keys,
|
||||
noise::NoisePreSharedKeys::Invalid { pre_shared_keys } =>
|
||||
Err(format!("Invalid pre-shared-keys {:?}", pre_shared_keys))?,
|
||||
noise::NoisePreSharedKeys::Absent => vec![],
|
||||
};
|
||||
|
||||
let secret_key = match spec.secret_key {
|
||||
noise::SecretKeyField::Present { secret_key } => Some(secret_key),
|
||||
noise::SecretKeyField::Invalid { secret_key } =>
|
||||
Err(format!("Invalid secret key {:?}", secret_key))?,
|
||||
noise::SecretKeyField::Absent => None,
|
||||
};
|
||||
|
||||
Ok(ValidatedNoiseSpec {
|
||||
service: spec.base.service.0,
|
||||
protocol,
|
||||
pattern,
|
||||
psks,
|
||||
secret_key,
|
||||
public_key: spec.base.key,
|
||||
})
|
||||
}
|
||||
|
||||
fn handle_resolve_noise(
|
||||
ds: &mut Arc<Cap>,
|
||||
t: &mut Activation,
|
||||
a: noise::Connect<AnyValue>,
|
||||
service_selector: AnyValue,
|
||||
initiator_session: Arc<Cap>,
|
||||
) -> DuringResult<Arc<Cap>> {
|
||||
let noise::Connect { service_selector, initiator_session } = a;
|
||||
let handler = syndicate::entity(())
|
||||
.on_asserted_facet(move |_state, t, a: AnyValue| {
|
||||
let initiator_session = Arc::clone(&initiator_session);
|
||||
t.spawn_link(None, move |t| {
|
||||
let bindings = a.value().to_sequence()?;
|
||||
let spec: NoiseServiceSpec<AnyValue> = language().parse(&bindings[0])?;
|
||||
let protocol = match spec.base.protocol {
|
||||
noise::NoiseProtocol::Present { protocol } =>
|
||||
protocol,
|
||||
noise::NoiseProtocol::Invalid { protocol } =>
|
||||
Err(format!("Invalid noise protocol {:?}", protocol))?,
|
||||
noise::NoiseProtocol::Absent =>
|
||||
language().unparse(&noise::DefaultProtocol).value().to_string()?.clone(),
|
||||
};
|
||||
let psks = match spec.base.pre_shared_keys {
|
||||
noise::NoisePreSharedKeys::Present { pre_shared_keys } =>
|
||||
pre_shared_keys,
|
||||
noise::NoisePreSharedKeys::Invalid { pre_shared_keys } =>
|
||||
Err(format!("Invalid pre-shared-keys {:?}", pre_shared_keys))?,
|
||||
noise::NoisePreSharedKeys::Absent =>
|
||||
vec![],
|
||||
};
|
||||
let secret_key = match spec.secret_key {
|
||||
SecretKeyField::Present { secret_key } =>
|
||||
Some(secret_key),
|
||||
SecretKeyField::Invalid { secret_key } =>
|
||||
Err(format!("Invalid secret key {:?}", secret_key))?,
|
||||
SecretKeyField::Absent =>
|
||||
None,
|
||||
};
|
||||
let spec = validate_noise_spec(language().parse(&bindings[0])?)?;
|
||||
let service = bindings[1].value().to_embedded()?;
|
||||
run_noise_responder(t,
|
||||
spec.base.service,
|
||||
protocol,
|
||||
psks,
|
||||
secret_key,
|
||||
initiator_session,
|
||||
Arc::clone(service))
|
||||
run_noise_responder(t, spec, initiator_session, Arc::clone(service))
|
||||
});
|
||||
Ok(())
|
||||
})
|
||||
.create_cap(t);
|
||||
if let Some(oh) = ds.assert(t, language(), &dataspace::Observe {
|
||||
eventually_retract(ds.assert(t, language(), &dataspace::Observe {
|
||||
// TODO: codegen plugin to generate pattern constructors
|
||||
pattern: syndicate_macros::pattern!{
|
||||
<noise $spec:NoiseServiceSpec{ { service: #(&service_selector) } } $service >
|
||||
pattern: pattern!{
|
||||
<bind <noise $spec:NoiseServiceSpec{ { service: #(&service_selector) } }> $service _>
|
||||
},
|
||||
observer: handler,
|
||||
}) {
|
||||
Ok(Some(Box::new(move |_ds, t| Ok(t.retract(oh)))))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
struct ResponderDetails {
|
||||
|
@ -304,29 +400,17 @@ fn lookup_pattern(name: &str) -> Option<HandshakePattern> {
|
|||
|
||||
fn run_noise_responder(
|
||||
t: &mut Activation,
|
||||
service_selector: AnyValue,
|
||||
protocol: String,
|
||||
psks: Vec<Vec<u8>>,
|
||||
secret_key: Option<Vec<u8>>,
|
||||
spec: ValidatedNoiseSpec,
|
||||
initiator_session: Arc<Cap>,
|
||||
service: Arc<Cap>,
|
||||
) -> ActorResult {
|
||||
const PREFIX: &'static str = "Noise_";
|
||||
const SUFFIX: &'static str = "_25519_ChaChaPoly_BLAKE2s";
|
||||
if !protocol.starts_with(PREFIX) || !protocol.ends_with(SUFFIX) {
|
||||
Err(format!("Unsupported protocol {:?}", protocol))?;
|
||||
}
|
||||
let pattern_name = &protocol[PREFIX.len()..(protocol.len()-SUFFIX.len())];
|
||||
let pattern = lookup_pattern(pattern_name).ok_or_else::<ActorError, _>(
|
||||
|| format!("Unsupported handshake pattern {:?}", pattern_name).into())?;
|
||||
|
||||
let hs = {
|
||||
let mut builder = noise_protocol::HandshakeStateBuilder::new();
|
||||
builder.set_pattern(pattern);
|
||||
builder.set_pattern(spec.pattern);
|
||||
builder.set_is_initiator(false);
|
||||
let prologue = PackedWriter::encode(&mut NoEmbeddedDomainCodec, &service_selector)?;
|
||||
let prologue = PackedWriter::encode(&mut NoEmbeddedDomainCodec, &spec.service)?;
|
||||
builder.set_prologue(&prologue);
|
||||
match secret_key {
|
||||
match spec.secret_key {
|
||||
None => (),
|
||||
Some(sk) => {
|
||||
let sk: [u8; 32] = sk.try_into().map_err(|_| "Bad secret key length")?;
|
||||
|
@ -334,7 +418,7 @@ fn run_noise_responder(
|
|||
},
|
||||
}
|
||||
let mut hs = builder.build_handshake_state();
|
||||
for psk in psks.into_iter() {
|
||||
for psk in spec.psks.into_iter() {
|
||||
hs.push_psk(&psk);
|
||||
}
|
||||
hs
|
||||
|
@ -347,6 +431,6 @@ fn run_noise_responder(
|
|||
|
||||
let responder_session =
|
||||
Cap::guard(crate::Language::arc(), t.create(ResponderState::Handshake(details, hs)));
|
||||
initiator_session.assert(t, language(), &noise::Accept { responder_session });
|
||||
initiator_session.assert(t, language(), &gatekeeper::Resolved::Accepted { responder_session });
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -110,7 +110,8 @@ async fn main() -> ActorResult {
|
|||
|
||||
let gatekeeper = Cap::guard(Language::arc(), t.create(
|
||||
syndicate::entity(Arc::clone(&server_config_ds))
|
||||
.on_asserted(gatekeeper::handle_assertion)));
|
||||
.on_asserted(gatekeeper::handle_resolves)));
|
||||
gatekeeper::handle_binds(t, &server_config_ds)?;
|
||||
|
||||
let mut env = Map::new();
|
||||
env.insert("config".to_owned(), AnyValue::domain(Arc::clone(&server_config_ds)));
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use bytes::Buf;
|
||||
use bytes::BytesMut;
|
||||
|
||||
use crate::Language;
|
||||
use crate::language;
|
||||
use crate::actor::*;
|
||||
use crate::during;
|
||||
|
@ -36,6 +37,7 @@ use preserves::value::signed_integer::SignedInteger;
|
|||
use preserves_schema::Codec;
|
||||
use preserves_schema::Deserialize;
|
||||
use preserves_schema::ParseError;
|
||||
use preserves_schema::support::Unparse;
|
||||
|
||||
use std::io;
|
||||
use std::pin::Pin;
|
||||
|
@ -175,17 +177,18 @@ impl Membrane {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn connect_stream<I, O, E, F>(
|
||||
pub fn connect_stream<I, O, Step, E, F>(
|
||||
t: &mut Activation,
|
||||
i: I,
|
||||
o: O,
|
||||
output_text: bool,
|
||||
sturdyref: sturdy::SturdyRef,
|
||||
step: Step,
|
||||
initial_state: E,
|
||||
mut f: F,
|
||||
) where
|
||||
I: 'static + Send + AsyncRead,
|
||||
O: 'static + Send + AsyncWrite,
|
||||
Step: for<'a> Unparse<&'a Language<AnyValue>, AnyValue>,
|
||||
E: 'static + Send,
|
||||
F: 'static + Send + FnMut(&mut E, &mut Activation, Arc<Cap>) -> during::DuringResult<E>
|
||||
{
|
||||
|
@ -196,8 +199,8 @@ pub fn connect_stream<I, O, E, F>(
|
|||
let denotation = a.value().to_embedded()?;
|
||||
f(state, t, Arc::clone(denotation))
|
||||
}));
|
||||
gatekeeper.assert(t, language(), &gatekeeper::Resolve {
|
||||
sturdyref,
|
||||
gatekeeper.assert(t, language(), &gatekeeper::Resolve::<AnyValue> {
|
||||
step: language().unparse(&step),
|
||||
observer: Cap::new(&main_entity),
|
||||
});
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue