From 55456621d440f41c4dc18a4213f5a8367e1d827e Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Fri, 22 Mar 2024 11:22:58 +0100 Subject: [PATCH] Handle refinement to gatekeeper protocol allowing JIT binding and/or direct rejection --- syndicate-server/src/gatekeeper.rs | 88 ++++++++++++++------- syndicate-server/src/main.rs | 2 +- syndicate-server/src/services/gatekeeper.rs | 2 +- syndicate/src/actor.rs | 11 ++- 4 files changed, 68 insertions(+), 35 deletions(-) diff --git a/syndicate-server/src/gatekeeper.rs b/syndicate-server/src/gatekeeper.rs index f56d74f..26b592d 100644 --- a/syndicate-server/src/gatekeeper.rs +++ b/syndicate-server/src/gatekeeper.rs @@ -15,7 +15,7 @@ use std::convert::TryInto; use std::sync::Arc; use syndicate::actor::*; -use syndicate::during::DuringResult; +use syndicate::enclose; use syndicate::value::NestedValue; use syndicate::schemas::dataspace; use syndicate::schemas::gatekeeper; @@ -102,49 +102,78 @@ pub fn handle_binds(t: &mut Activation, ds: &Arc) -> ActorResult { Ok(()) } -fn eventually_retract(h: Option) -> DuringResult { - if let Some(h) = h { - Ok(Some(Box::new(move |_state, t: &mut Activation| Ok(t.retract(h))))) - } else { - Ok(None) - } -} - -pub fn handle_resolves( +pub fn facet_handle_resolve( ds: &mut Arc, t: &mut Activation, a: gatekeeper::Resolve, -) -> DuringResult> { +) -> ActorResult { let mut detail: &'static str = "unsupported"; if a.step.step_type == sturdy_step_type() { detail = "invalid"; if let Ok(s) = language().parse::(&a.step.detail) { - return handle_resolve_sturdyref(ds, t, sturdy::SturdyRef { parameters: s.0 }, a.observer); + t.facet(|t| { + let f = handle_direct_resolution(ds, t, a.clone())?; + await_bind_sturdyref(ds, t, sturdy::SturdyRef { parameters: s.0 }, a.observer, f) + })?; + return Ok(()); } } if a.step.step_type == noise_step_type() { detail = "invalid"; if let Ok(s) = language().parse::>(&a.step.detail) { - return handle_resolve_noise(ds, t, s.0.0, a.observer); + t.facet(|t| { + let f = handle_direct_resolution(ds, t, a.clone())?; + await_bind_noise(ds, t, s.0.0, a.observer, f) + })?; + return Ok(()); } } - eventually_retract(ds.assert(t, language(), &gatekeeper::Rejected { + a.observer.assert(t, language(), &gatekeeper::Rejected { detail: AnyValue::symbol(detail), - })) + }); + + Ok(()) } -fn handle_resolve_sturdyref( +fn handle_direct_resolution( + ds: &mut Arc, + t: &mut Activation, + a: gatekeeper::Resolve, +) -> Result { + let outer_facet = t.facet_id(); + t.facet(move |t| { + let handler = syndicate::entity(a.observer) + .on_asserted(move |observer, t, a: AnyValue| { + t.stop_facet_and_continue(outer_facet, Some( + enclose!((observer, a) move |t: &mut Activation| { + observer.assert(t, language(), &a); + Ok(()) + })))?; + Ok(None) + }) + .create_cap(t); + ds.assert(t, language(), &gatekeeper::Resolve { + step: a.step.clone(), + observer: handler, + }); + Ok(()) + }) +} + +fn await_bind_sturdyref( ds: &mut Arc, t: &mut Activation, sturdyref: sturdy::SturdyRef, observer: Arc, -) -> DuringResult> { + direct_resolution_facet: FacetId, +) -> ActorResult { let queried_oid = sturdyref.parameters.oid.clone(); let handler = syndicate::entity(observer) .on_asserted(move |observer, t, a: AnyValue| { + t.stop_facet(direct_resolution_facet); let bindings = a.value().to_sequence()?; let key = bindings[0].value().to_bytestring()?; let unattenuated_target = bindings[1].value().to_embedded()?; @@ -152,27 +181,29 @@ fn handle_resolve_sturdyref( Err(e) => { tracing::warn!(sturdyref = ?language().unparse(&sturdyref), "sturdyref failed validation: {}", e); - eventually_retract(observer.assert(t, language(), &gatekeeper::Resolved::Rejected( + observer.assert(t, language(), &gatekeeper::Resolved::Rejected( Box::new(gatekeeper::Rejected { detail: AnyValue::symbol("sturdyref-failed-validation"), - })))) + }))); }, Ok(target) => { tracing::trace!(sturdyref = ?language().unparse(&sturdyref), ?target, "sturdyref resolved"); - eventually_retract(observer.assert(t, language(), &gatekeeper::Resolved::Accepted { + observer.assert(t, language(), &gatekeeper::Resolved::Accepted { responder_session: target, - })) + }); } } + Ok(None) }) .create_cap(t); - eventually_retract(ds.assert(t, language(), &dataspace::Observe { + ds.assert(t, language(), &dataspace::Observe { // TODO: codegen plugin to generate pattern constructors pattern: pattern!{ $ _>}, observer: handler, - })) + }); + Ok(()) } struct ValidatedNoiseSpec { @@ -232,14 +263,16 @@ fn validate_noise_spec( }) } -fn handle_resolve_noise( +fn await_bind_noise( ds: &mut Arc, t: &mut Activation, service_selector: AnyValue, initiator_session: Arc, -) -> DuringResult> { + direct_resolution_facet: FacetId, +) -> ActorResult { let handler = syndicate::entity(()) .on_asserted_facet(move |_state, t, a: AnyValue| { + t.stop_facet(direct_resolution_facet); let initiator_session = Arc::clone(&initiator_session); t.spawn_link(None, move |t| { let bindings = a.value().to_sequence()?; @@ -250,13 +283,14 @@ fn handle_resolve_noise( Ok(()) }) .create_cap(t); - eventually_retract(ds.assert(t, language(), &dataspace::Observe { + ds.assert(t, language(), &dataspace::Observe { // TODO: codegen plugin to generate pattern constructors pattern: pattern!{ $service _> }, observer: handler, - })) + }); + Ok(()) } struct ResponderDetails { diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index d03ccf0..7fc28a9 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -120,7 +120,7 @@ async fn main() -> ActorResult { let gatekeeper = Cap::guard(Language::arc(), t.create( syndicate::entity(Arc::clone(&server_config_ds)) - .on_asserted(gatekeeper::handle_resolves))); + .on_asserted_facet(gatekeeper::facet_handle_resolve))); gatekeeper::handle_binds(t, &server_config_ds)?; let mut env = Map::new(); diff --git a/syndicate-server/src/services/gatekeeper.rs b/syndicate-server/src/services/gatekeeper.rs index c0eb2ae..93ffd88 100644 --- a/syndicate-server/src/services/gatekeeper.rs +++ b/syndicate-server/src/services/gatekeeper.rs @@ -27,7 +27,7 @@ pub fn on_demand(t: &mut Activation, ds: Arc) { fn run(t: &mut Activation, ds: Arc, spec: Gatekeeper) -> ActorResult { let resolver = t.create(syndicate::entity(Arc::clone(&spec.bindspace)) - .on_asserted(gatekeeper::handle_resolves)); + .on_asserted_facet(gatekeeper::facet_handle_resolve)); ds.assert(t, language(), &syndicate::schemas::service::ServiceObject { service_name: language().unparse(&spec), object: AnyValue::domain(Cap::guard(Language::arc(), resolver)), diff --git a/syndicate/src/actor.rs b/syndicate/src/actor.rs index 8f3d481..50d70c1 100644 --- a/syndicate/src/actor.rs +++ b/syndicate/src/actor.rs @@ -1510,8 +1510,7 @@ impl Activation { trace::FacetStopReason::ExplicitAction) } - /// Arranges for the [`Facet`] named by `facet_id` to be stopped cleanly when `self` - /// commits. + /// Cleanly stops the [`Facet`] named by `facet_id`. /// /// Equivalent to `self.stop_facet_and_continue(facet_id, None)`, except that the lack of a /// continuation means that there's no need for this method to return `ActorResult`. @@ -1520,15 +1519,15 @@ impl Activation { .expect("Non-failing stop_facet_and_continue") } - /// Arranges for the active facet to be stopped cleanly when `self` commits. + /// Cleanly stops the active facet. /// - /// Equivalent to `self.stop_facet(self.facet.facet_id)`. + /// Equivalent to `self.stop_facet(self.facet_id())`. pub fn stop(&mut self) { self.stop_facet(self.facet_id()) } - /// Arranges for the active actor's root facet to be stopped cleanly when `self` commits; - /// this is one way to arrange a clean shutdown of the entire actor. + /// Cleanly stops the active actor's root facet. + /// This is one way to arrange a clean shutdown of the entire actor. /// /// Equivalent to `self.stop_facet(self.state.root)`. pub fn stop_root(&mut self) {