Client-side resolution protocol
/ build (push) Successful in 4m32s Details

This commit is contained in:
Tony Garnock-Jones 2024-06-07 22:18:50 +02:00
parent 57037d6f8c
commit 46f4071d4f
3 changed files with 178 additions and 9 deletions

View File

@ -1,10 +1,136 @@
use preserves_schema::Codec;
use syndicate::dataspace::Dataspace;
use syndicate::preserves::value::Set;
use std::sync::Arc;
use syndicate::actor::*;
use syndicate::rpc;
use syndicate::supervise::{Supervisor, SupervisorConfiguration};
use syndicate::value::NestedValue;
use syndicate::schemas::gatekeeper;
use syndicate::schemas::transport_address;
use syndicate::schemas::gatekeeper as G;
use syndicate::schemas::rpc as R;
use crate::language;
use syndicate::enclose;
use syndicate::preserves::rec;
use syndicate_macros::during;
pub fn start(t: &mut Activation, ds: Arc<Cap>) {
t.spawn(Some(AnyValue::symbol("path_resolver")), enclose!((ds) move |t| {
during!(t, ds, language(), <q <resolve-path $route0>>, |t| {
if let Ok(route) = language().parse::<G::Route>(&route0) {
Supervisor::start(
t,
Some(rec![AnyValue::symbol("path_resolver"), language().unparse(&route)]),
SupervisorConfiguration::default(),
|_t, _s| Ok(()),
enclose!((ds) move |t| enclose!((ds, route) run(t, ds, route))))
} else {
tracing::debug!(?route0, "Ignoring bogus route");
Ok(())
}
});
Ok(())
}));
t.spawn(Some(AnyValue::symbol("sturdy_ref_step")),
enclose!((ds) move |t| super::sturdy::handle_sturdy_path_steps(t, ds)));
}
fn run(t: &mut Activation, ds: Arc<Cap>, route: G::Route) -> ActorResult {
let candidates = t.named_field("candidates", Set::new());
for addr in &route.transports {
ds.assert(t, language(), &rpc::question(language(), G::ConnectTransport { addr: addr.clone() }));
enclose!((candidates) during!(
t, ds, language(),
<a <connect-transport #(addr)> <ok $c: G::ConnectedTransport::<AnyValue>>>,
|t: &mut Activation| {
t.get_mut(&candidates).insert(c.clone());
t.on_stop(enclose!((candidates, c) move |t: &mut Activation| {
t.get_mut(&candidates).remove(&c);
Ok(())
}));
Ok(())
}));
}
let best = t.named_field("best", None);
let root_peer = t.named_field("rootPeer", None);
t.dataflow(enclose!((best, root_peer) move |t| {
let c = t.get(&candidates).first().cloned();
t.set(&root_peer, c.as_ref().map(
|G::ConnectedTransport { responder_session, .. }| responder_session.clone()));
t.set(&best, c);
Ok(())
}))?;
let steps_ref = t.create(Dataspace::new(None));
let steps_ds = Cap::new(&steps_ref);
let mut handle_zero = None;
t.dataflow(enclose!((root_peer) move |t| {
let p = t.get(&root_peer).as_ref().cloned();
t.update(&mut handle_zero, &steps_ref, p.map(|p| AnyValue::new(
vec![AnyValue::new(0), AnyValue::domain(p)])));
Ok(())
}))?;
for (i, step) in route.path_steps.clone().into_iter().enumerate() {
enclose!((ds, steps_ds) during!(
t, steps_ds, language(),
[#(&AnyValue::new(i)), $origin: G::ResolvedPathStep::<AnyValue>],
enclose!((ds, step, steps_ds) move |t: &mut Activation| {
let q = G::ResolvePathStep { origin: origin.0, path_step: step };
ds.assert(t, language(), &rpc::question(language(), q.clone()));
let q2 = q.clone();
during!(
t, ds, language(),
<a #(&language().unparse(&q2)) $a>,
enclose!((q) |t| {
if let Ok(a) = language().parse::<R::Result>(&a) {
match a {
R::Result::Error { .. } => {
ds.assert(t, language(), &rpc::answer(language(), q, a));
}
R::Result::Ok { value } => {
if let Some(next) = value.value().as_embedded() {
steps_ds.assert(t, language(), &AnyValue::new(
vec![AnyValue::new(i + 1),
AnyValue::domain(next.clone())]));
} else {
ds.assert(t, language(), &rpc::answer(
language(), q, R::Result::Error {
error: AnyValue::symbol("invalid-path-step-result"),
}));
}
}
}
}
Ok(())
}));
Ok(())
})));
}
let i = route.path_steps.len();
during!(t, steps_ds, language(),
[#(&AnyValue::new(i)), $r: G::ResolvedPathStep::<AnyValue>],
enclose!((best, ds, route) move |t: &mut Activation| {
let G::ConnectedTransport { addr, control, .. } =
t.get(&best).as_ref().unwrap().clone();
let responder_session = r.0;
ds.assert(t, language(), &rpc::answer(
language(),
G::ResolvePath { route },
R::Result::Ok { value: language().unparse(
&G::ResolvedPath { addr, control, responder_session }) }));
Ok(())
}));
Ok(())
}

View File

@ -3,14 +3,18 @@ use std::sync::Arc;
use preserves_schema::Codec;
use syndicate::actor::*;
use syndicate::during;
use syndicate::rpc;
use syndicate::value::NestedValue;
use syndicate::enclose;
use syndicate_macros::during;
use syndicate_macros::pattern;
use syndicate::schemas::dataspace;
use syndicate::schemas::gatekeeper;
use syndicate::schemas::sturdy;
use syndicate::schemas::rpc as R;
use crate::language;
@ -96,3 +100,42 @@ fn await_bind_sturdyref(
});
Ok(())
}
pub fn handle_sturdy_path_steps(t: &mut Activation, ds: Arc<Cap>) -> ActorResult {
during!(t, ds, language(),
<q <resolve-path-step $origin <ref $parameters: sturdy::SturdyPathStepDetail::<AnyValue>>>>,
enclose!((ds) move |t: &mut Activation| {
if let Some(origin) = origin.value().as_embedded().cloned() {
let observer = Cap::guard(&language().syndicate, t.create(
during::entity(()).on_asserted_facet(
enclose!((origin, parameters) move |_, t, r: gatekeeper::Resolved| {
ds.assert(t, language(), &rpc::answer(
language(),
gatekeeper::ResolvePathStep {
origin: origin.clone(),
path_step: gatekeeper::PathStep {
step_type: "ref".to_string(),
detail: language().unparse(&parameters),
},
},
match r {
gatekeeper::Resolved::Accepted { responder_session } =>
R::Result::Ok { value: language().unparse(
&gatekeeper::ResolvedPathStep(responder_session)) },
gatekeeper::Resolved::Rejected(b) =>
R::Result::Error { error: b.detail },
}));
Ok(())
}))));
origin.assert(t, language(), &gatekeeper::Resolve {
step: gatekeeper::Step {
step_type: "ref".to_string(),
detail: language().unparse(&parameters),
},
observer,
});
}
Ok(())
}));
Ok(())
}

View File

@ -1,17 +1,17 @@
use preserves_schema::support::Codec;
use preserves_schema::support::Unparse;
use crate::actor::AnyValue;
use crate::language;
use crate::schemas::rpc as R;
pub fn question<L, Q: Unparse<L, AnyValue>>(literals: L, request: Q) -> R::Question {
R::Question {
request: request.unparse(literals),
}
}
pub fn answer<'a, L, Q: Unparse<&'a L, AnyValue>, A: Unparse<&'a L, AnyValue>>(literals: &'a L, request: Q, response: A) -> R::Answer {
R::Answer {
request: request.unparse(literals),
response: response.unparse(literals),
}
}
pub fn answer_value<'a, L, Q: Unparse<&'a L, AnyValue>, A: Unparse<&'a L, AnyValue>>(literals: &'a L, request: Q, response: A) -> AnyValue {
language().unparse(&answer(literals, request, response))
}