Handle refinement to gatekeeper protocol allowing JIT binding and/or direct rejection
This commit is contained in:
parent
eb9d9bed0f
commit
55456621d4
|
@ -15,7 +15,7 @@ use std::convert::TryInto;
|
|||
use std::sync::Arc;
|
||||
|
||||
use syndicate::actor::*;
|
||||
use syndicate::during::DuringResult;
|
||||
use syndicate::enclose;
|
||||
use syndicate::value::NestedValue;
|
||||
use syndicate::schemas::dataspace;
|
||||
use syndicate::schemas::gatekeeper;
|
||||
|
@ -102,49 +102,78 @@ pub fn handle_binds(t: &mut Activation, ds: &Arc<Cap>) -> ActorResult {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn eventually_retract<E>(h: Option<Handle>) -> DuringResult<E> {
|
||||
if let Some(h) = h {
|
||||
Ok(Some(Box::new(move |_state, t: &mut Activation| Ok(t.retract(h)))))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_resolves(
|
||||
pub fn facet_handle_resolve(
|
||||
ds: &mut Arc<Cap>,
|
||||
t: &mut Activation,
|
||||
a: gatekeeper::Resolve,
|
||||
) -> DuringResult<Arc<Cap>> {
|
||||
) -> ActorResult {
|
||||
let mut detail: &'static str = "unsupported";
|
||||
|
||||
if a.step.step_type == sturdy_step_type() {
|
||||
detail = "invalid";
|
||||
if let Ok(s) = language().parse::<sturdy::SturdyStepDetail>(&a.step.detail) {
|
||||
return handle_resolve_sturdyref(ds, t, sturdy::SturdyRef { parameters: s.0 }, a.observer);
|
||||
t.facet(|t| {
|
||||
let f = handle_direct_resolution(ds, t, a.clone())?;
|
||||
await_bind_sturdyref(ds, t, sturdy::SturdyRef { parameters: s.0 }, a.observer, f)
|
||||
})?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
if a.step.step_type == noise_step_type() {
|
||||
detail = "invalid";
|
||||
if let Ok(s) = language().parse::<noise::NoiseStepDetail<AnyValue>>(&a.step.detail) {
|
||||
return handle_resolve_noise(ds, t, s.0.0, a.observer);
|
||||
t.facet(|t| {
|
||||
let f = handle_direct_resolution(ds, t, a.clone())?;
|
||||
await_bind_noise(ds, t, s.0.0, a.observer, f)
|
||||
})?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
eventually_retract(ds.assert(t, language(), &gatekeeper::Rejected {
|
||||
a.observer.assert(t, language(), &gatekeeper::Rejected {
|
||||
detail: AnyValue::symbol(detail),
|
||||
}))
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_resolve_sturdyref(
|
||||
fn handle_direct_resolution(
|
||||
ds: &mut Arc<Cap>,
|
||||
t: &mut Activation,
|
||||
a: gatekeeper::Resolve,
|
||||
) -> Result<FacetId, ActorError> {
|
||||
let outer_facet = t.facet_id();
|
||||
t.facet(move |t| {
|
||||
let handler = syndicate::entity(a.observer)
|
||||
.on_asserted(move |observer, t, a: AnyValue| {
|
||||
t.stop_facet_and_continue(outer_facet, Some(
|
||||
enclose!((observer, a) move |t: &mut Activation| {
|
||||
observer.assert(t, language(), &a);
|
||||
Ok(())
|
||||
})))?;
|
||||
Ok(None)
|
||||
})
|
||||
.create_cap(t);
|
||||
ds.assert(t, language(), &gatekeeper::Resolve {
|
||||
step: a.step.clone(),
|
||||
observer: handler,
|
||||
});
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn await_bind_sturdyref(
|
||||
ds: &mut Arc<Cap>,
|
||||
t: &mut Activation,
|
||||
sturdyref: sturdy::SturdyRef,
|
||||
observer: Arc<Cap>,
|
||||
) -> DuringResult<Arc<Cap>> {
|
||||
direct_resolution_facet: FacetId,
|
||||
) -> ActorResult {
|
||||
let queried_oid = sturdyref.parameters.oid.clone();
|
||||
let handler = syndicate::entity(observer)
|
||||
.on_asserted(move |observer, t, a: AnyValue| {
|
||||
t.stop_facet(direct_resolution_facet);
|
||||
let bindings = a.value().to_sequence()?;
|
||||
let key = bindings[0].value().to_bytestring()?;
|
||||
let unattenuated_target = bindings[1].value().to_embedded()?;
|
||||
|
@ -152,27 +181,29 @@ fn handle_resolve_sturdyref(
|
|||
Err(e) => {
|
||||
tracing::warn!(sturdyref = ?language().unparse(&sturdyref),
|
||||
"sturdyref failed validation: {}", e);
|
||||
eventually_retract(observer.assert(t, language(), &gatekeeper::Resolved::Rejected(
|
||||
observer.assert(t, language(), &gatekeeper::Resolved::Rejected(
|
||||
Box::new(gatekeeper::Rejected {
|
||||
detail: AnyValue::symbol("sturdyref-failed-validation"),
|
||||
}))))
|
||||
})));
|
||||
},
|
||||
Ok(target) => {
|
||||
tracing::trace!(sturdyref = ?language().unparse(&sturdyref),
|
||||
?target,
|
||||
"sturdyref resolved");
|
||||
eventually_retract(observer.assert(t, language(), &gatekeeper::Resolved::Accepted {
|
||||
observer.assert(t, language(), &gatekeeper::Resolved::Accepted {
|
||||
responder_session: target,
|
||||
}))
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
})
|
||||
.create_cap(t);
|
||||
eventually_retract(ds.assert(t, language(), &dataspace::Observe {
|
||||
ds.assert(t, language(), &dataspace::Observe {
|
||||
// TODO: codegen plugin to generate pattern constructors
|
||||
pattern: pattern!{<bind <ref { oid: #(&queried_oid), key: $ }> $ _>},
|
||||
observer: handler,
|
||||
}))
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct ValidatedNoiseSpec {
|
||||
|
@ -232,14 +263,16 @@ fn validate_noise_spec(
|
|||
})
|
||||
}
|
||||
|
||||
fn handle_resolve_noise(
|
||||
fn await_bind_noise(
|
||||
ds: &mut Arc<Cap>,
|
||||
t: &mut Activation,
|
||||
service_selector: AnyValue,
|
||||
initiator_session: Arc<Cap>,
|
||||
) -> DuringResult<Arc<Cap>> {
|
||||
direct_resolution_facet: FacetId,
|
||||
) -> ActorResult {
|
||||
let handler = syndicate::entity(())
|
||||
.on_asserted_facet(move |_state, t, a: AnyValue| {
|
||||
t.stop_facet(direct_resolution_facet);
|
||||
let initiator_session = Arc::clone(&initiator_session);
|
||||
t.spawn_link(None, move |t| {
|
||||
let bindings = a.value().to_sequence()?;
|
||||
|
@ -250,13 +283,14 @@ fn handle_resolve_noise(
|
|||
Ok(())
|
||||
})
|
||||
.create_cap(t);
|
||||
eventually_retract(ds.assert(t, language(), &dataspace::Observe {
|
||||
ds.assert(t, language(), &dataspace::Observe {
|
||||
// TODO: codegen plugin to generate pattern constructors
|
||||
pattern: pattern!{
|
||||
<bind <noise $spec:NoiseServiceSpec{ { service: #(&service_selector) } }> $service _>
|
||||
},
|
||||
observer: handler,
|
||||
}))
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct ResponderDetails {
|
||||
|
|
|
@ -120,7 +120,7 @@ async fn main() -> ActorResult {
|
|||
|
||||
let gatekeeper = Cap::guard(Language::arc(), t.create(
|
||||
syndicate::entity(Arc::clone(&server_config_ds))
|
||||
.on_asserted(gatekeeper::handle_resolves)));
|
||||
.on_asserted_facet(gatekeeper::facet_handle_resolve)));
|
||||
gatekeeper::handle_binds(t, &server_config_ds)?;
|
||||
|
||||
let mut env = Map::new();
|
||||
|
|
|
@ -27,7 +27,7 @@ pub fn on_demand(t: &mut Activation, ds: Arc<Cap>) {
|
|||
|
||||
fn run(t: &mut Activation, ds: Arc<Cap>, spec: Gatekeeper<AnyValue>) -> ActorResult {
|
||||
let resolver = t.create(syndicate::entity(Arc::clone(&spec.bindspace))
|
||||
.on_asserted(gatekeeper::handle_resolves));
|
||||
.on_asserted_facet(gatekeeper::facet_handle_resolve));
|
||||
ds.assert(t, language(), &syndicate::schemas::service::ServiceObject {
|
||||
service_name: language().unparse(&spec),
|
||||
object: AnyValue::domain(Cap::guard(Language::arc(), resolver)),
|
||||
|
|
|
@ -1510,8 +1510,7 @@ impl Activation {
|
|||
trace::FacetStopReason::ExplicitAction)
|
||||
}
|
||||
|
||||
/// Arranges for the [`Facet`] named by `facet_id` to be stopped cleanly when `self`
|
||||
/// commits.
|
||||
/// Cleanly stops the [`Facet`] named by `facet_id`.
|
||||
///
|
||||
/// Equivalent to `self.stop_facet_and_continue(facet_id, None)`, except that the lack of a
|
||||
/// continuation means that there's no need for this method to return `ActorResult`.
|
||||
|
@ -1520,15 +1519,15 @@ impl Activation {
|
|||
.expect("Non-failing stop_facet_and_continue")
|
||||
}
|
||||
|
||||
/// Arranges for the active facet to be stopped cleanly when `self` commits.
|
||||
/// Cleanly stops the active facet.
|
||||
///
|
||||
/// Equivalent to `self.stop_facet(self.facet.facet_id)`.
|
||||
/// Equivalent to `self.stop_facet(self.facet_id())`.
|
||||
pub fn stop(&mut self) {
|
||||
self.stop_facet(self.facet_id())
|
||||
}
|
||||
|
||||
/// Arranges for the active actor's root facet to be stopped cleanly when `self` commits;
|
||||
/// this is one way to arrange a clean shutdown of the entire actor.
|
||||
/// Cleanly stops the active actor's root facet.
|
||||
/// This is one way to arrange a clean shutdown of the entire actor.
|
||||
///
|
||||
/// Equivalent to `self.stop_facet(self.state.root)`.
|
||||
pub fn stop_root(&mut self) {
|
||||
|
|
Loading…
Reference in New Issue