diff --git a/syndicate-server/src/resolution/client.rs b/syndicate-server/src/resolution/client.rs index b31ba86..55fe543 100644 --- a/syndicate-server/src/resolution/client.rs +++ b/syndicate-server/src/resolution/client.rs @@ -1,10 +1,136 @@ +use preserves_schema::Codec; +use syndicate::dataspace::Dataspace; +use syndicate::preserves::value::Set; + use std::sync::Arc; use syndicate::actor::*; +use syndicate::rpc; +use syndicate::supervise::{Supervisor, SupervisorConfiguration}; +use syndicate::value::NestedValue; -use syndicate::schemas::gatekeeper; -use syndicate::schemas::transport_address; +use syndicate::schemas::gatekeeper as G; +use syndicate::schemas::rpc as R; + +use crate::language; + +use syndicate::enclose; +use syndicate::preserves::rec; +use syndicate_macros::during; pub fn start(t: &mut Activation, ds: Arc) { - + t.spawn(Some(AnyValue::symbol("path_resolver")), enclose!((ds) move |t| { + during!(t, ds, language(), >, |t| { + if let Ok(route) = language().parse::(&route0) { + Supervisor::start( + t, + Some(rec![AnyValue::symbol("path_resolver"), language().unparse(&route)]), + SupervisorConfiguration::default(), + |_t, _s| Ok(()), + enclose!((ds) move |t| enclose!((ds, route) run(t, ds, route)))) + } else { + tracing::debug!(?route0, "Ignoring bogus route"); + Ok(()) + } + }); + Ok(()) + })); + + t.spawn(Some(AnyValue::symbol("sturdy_ref_step")), + enclose!((ds) move |t| super::sturdy::handle_sturdy_path_steps(t, ds))); +} + +fn run(t: &mut Activation, ds: Arc, route: G::Route) -> ActorResult { + let candidates = t.named_field("candidates", Set::new()); + + for addr in &route.transports { + ds.assert(t, language(), &rpc::question(language(), G::ConnectTransport { addr: addr.clone() })); + enclose!((candidates) during!( + t, ds, language(), + >>, + |t: &mut Activation| { + t.get_mut(&candidates).insert(c.clone()); + t.on_stop(enclose!((candidates, c) move |t: &mut Activation| { + t.get_mut(&candidates).remove(&c); + Ok(()) + })); + Ok(()) + })); + } + + let best = t.named_field("best", None); + let root_peer = t.named_field("rootPeer", None); + + t.dataflow(enclose!((best, root_peer) move |t| { + let c = t.get(&candidates).first().cloned(); + t.set(&root_peer, c.as_ref().map( + |G::ConnectedTransport { responder_session, .. }| responder_session.clone())); + t.set(&best, c); + Ok(()) + }))?; + + let steps_ref = t.create(Dataspace::new(None)); + let steps_ds = Cap::new(&steps_ref); + + let mut handle_zero = None; + t.dataflow(enclose!((root_peer) move |t| { + let p = t.get(&root_peer).as_ref().cloned(); + t.update(&mut handle_zero, &steps_ref, p.map(|p| AnyValue::new( + vec![AnyValue::new(0), AnyValue::domain(p)]))); + Ok(()) + }))?; + + for (i, step) in route.path_steps.clone().into_iter().enumerate() { + enclose!((ds, steps_ds) during!( + t, steps_ds, language(), + [#(&AnyValue::new(i)), $origin: G::ResolvedPathStep::], + enclose!((ds, step, steps_ds) move |t: &mut Activation| { + let q = G::ResolvePathStep { origin: origin.0, path_step: step }; + ds.assert(t, language(), &rpc::question(language(), q.clone())); + let q2 = q.clone(); + during!( + t, ds, language(), + , + enclose!((q) |t| { + if let Ok(a) = language().parse::(&a) { + match a { + R::Result::Error { .. } => { + ds.assert(t, language(), &rpc::answer(language(), q, a)); + } + R::Result::Ok { value } => { + if let Some(next) = value.value().as_embedded() { + steps_ds.assert(t, language(), &AnyValue::new( + vec![AnyValue::new(i + 1), + AnyValue::domain(next.clone())])); + } else { + ds.assert(t, language(), &rpc::answer( + language(), q, R::Result::Error { + error: AnyValue::symbol("invalid-path-step-result"), + })); + } + } + } + } + Ok(()) + })); + Ok(()) + }))); + } + + let i = route.path_steps.len(); + during!(t, steps_ds, language(), + [#(&AnyValue::new(i)), $r: G::ResolvedPathStep::], + enclose!((best, ds, route) move |t: &mut Activation| { + let G::ConnectedTransport { addr, control, .. } = + t.get(&best).as_ref().unwrap().clone(); + let responder_session = r.0; + ds.assert(t, language(), &rpc::answer( + language(), + G::ResolvePath { route }, + R::Result::Ok { value: language().unparse( + &G::ResolvedPath { addr, control, responder_session }) })); + Ok(()) + })); + + Ok(()) } diff --git a/syndicate-server/src/resolution/sturdy.rs b/syndicate-server/src/resolution/sturdy.rs index e57ed04..45a6f45 100644 --- a/syndicate-server/src/resolution/sturdy.rs +++ b/syndicate-server/src/resolution/sturdy.rs @@ -3,14 +3,18 @@ use std::sync::Arc; use preserves_schema::Codec; use syndicate::actor::*; +use syndicate::during; +use syndicate::rpc; use syndicate::value::NestedValue; +use syndicate::enclose; use syndicate_macros::during; use syndicate_macros::pattern; use syndicate::schemas::dataspace; use syndicate::schemas::gatekeeper; use syndicate::schemas::sturdy; +use syndicate::schemas::rpc as R; use crate::language; @@ -96,3 +100,42 @@ fn await_bind_sturdyref( }); Ok(()) } + +pub fn handle_sturdy_path_steps(t: &mut Activation, ds: Arc) -> ActorResult { + during!(t, ds, language(), + >>>, + enclose!((ds) move |t: &mut Activation| { + if let Some(origin) = origin.value().as_embedded().cloned() { + let observer = Cap::guard(&language().syndicate, t.create( + during::entity(()).on_asserted_facet( + enclose!((origin, parameters) move |_, t, r: gatekeeper::Resolved| { + ds.assert(t, language(), &rpc::answer( + language(), + gatekeeper::ResolvePathStep { + origin: origin.clone(), + path_step: gatekeeper::PathStep { + step_type: "ref".to_string(), + detail: language().unparse(¶meters), + }, + }, + match r { + gatekeeper::Resolved::Accepted { responder_session } => + R::Result::Ok { value: language().unparse( + &gatekeeper::ResolvedPathStep(responder_session)) }, + gatekeeper::Resolved::Rejected(b) => + R::Result::Error { error: b.detail }, + })); + Ok(()) + })))); + origin.assert(t, language(), &gatekeeper::Resolve { + step: gatekeeper::Step { + step_type: "ref".to_string(), + detail: language().unparse(¶meters), + }, + observer, + }); + } + Ok(()) + })); + Ok(()) +} diff --git a/syndicate/src/rpc.rs b/syndicate/src/rpc.rs index 21982f3..8794396 100644 --- a/syndicate/src/rpc.rs +++ b/syndicate/src/rpc.rs @@ -1,17 +1,17 @@ -use preserves_schema::support::Codec; use preserves_schema::support::Unparse; use crate::actor::AnyValue; -use crate::language; use crate::schemas::rpc as R; +pub fn question>(literals: L, request: Q) -> R::Question { + R::Question { + request: request.unparse(literals), + } +} + pub fn answer<'a, L, Q: Unparse<&'a L, AnyValue>, A: Unparse<&'a L, AnyValue>>(literals: &'a L, request: Q, response: A) -> R::Answer { R::Answer { request: request.unparse(literals), response: response.unparse(literals), } } - -pub fn answer_value<'a, L, Q: Unparse<&'a L, AnyValue>, A: Unparse<&'a L, AnyValue>>(literals: &'a L, request: Q, response: A) -> AnyValue { - language().unparse(&answer(literals, request, response)) -}