From 7e8dcef0e2c808f903a835993e38546c2f21d557 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 8 Feb 2023 18:01:38 +0100 Subject: [PATCH] Refactor gatekeeper implementation for new protocols. --- dev-scripts/benchmark-config.pr | 2 +- syndicate-server/examples/dirty/mod.rs | 6 +- syndicate-server/protocols/schema-bundle.bin | 4 +- .../protocols/schemas/gatekeeperMux.prs | 7 - syndicate-server/src/gatekeeper.rs | 278 ++++++++++++------ syndicate-server/src/main.rs | 3 +- syndicate/src/relay.rs | 11 +- 7 files changed, 195 insertions(+), 116 deletions(-) delete mode 100644 syndicate-server/protocols/schemas/gatekeeperMux.prs diff --git a/dev-scripts/benchmark-config.pr b/dev-scripts/benchmark-config.pr index b40e52a..51ff005 100644 --- a/dev-scripts/benchmark-config.pr +++ b/dev-scripts/benchmark-config.pr @@ -1,3 +1,3 @@ let ?root_ds = dataspace $gatekeeper>> - + $root_ds #f> diff --git a/syndicate-server/examples/dirty/mod.rs b/syndicate-server/examples/dirty/mod.rs index 355747d..6162e65 100644 --- a/syndicate-server/examples/dirty/mod.rs +++ b/syndicate-server/examples/dirty/mod.rs @@ -16,15 +16,15 @@ pub fn dirty_resolve(stream: &mut TcpStream, dataspace: &str) -> Result<(), Box< let iolang = Language::::default(); let sturdyref = sturdy::SturdyRef::from_hex(dataspace)?; - let sturdyref = iolang.parse(&syndicate::language().unparse(&sturdyref) - .copy_via(&mut |_| Err("no!"))?)?; + let sturdyref: IOValue = syndicate::language().unparse(&sturdyref) + .copy_via(&mut |_| Err("no!"))?; let resolve_turn = P::Turn(vec![ P::TurnEvent { oid: P::Oid(0.into()), event: P::Event::Assert(Box::new(P::Assert { assertion: P::Assertion(iolang.unparse(&gatekeeper::Resolve:: { - sturdyref, + step: sturdyref, observer: iolang.unparse(&sturdy::WireRef::Mine { oid: Box::new(sturdy::Oid(0.into())), }), diff --git a/syndicate-server/protocols/schema-bundle.bin b/syndicate-server/protocols/schema-bundle.bin index 604b4d7..3b263de 100644 --- a/syndicate-server/protocols/schema-bundle.bin +++ b/syndicate-server/protocols/schema-bundle.bin @@ -1,7 +1,5 @@ ´³bundle·µ³ documentation„´³schema·³version‘³ definitions·³Url´³orµµ±present´³dict·³url´³named³url´³atom³String„„„„„µ±invalid´³dict·³url´³named³url³any„„„„µ±absent´³dict·„„„„„³IOList´³orµµ±bytes´³atom³ -ByteString„„µ±string´³atom³String„„µ±nested´³seqof´³refµ„³IOList„„„„„³Metadata´³rec´³lit³metadata„´³tupleµ´³named³object³any„´³named³info´³dictof´³atom³Symbol„³any„„„„„³ Description´³orµµ±present´³dict·³ description´³named³ description´³refµ„³IOList„„„„„µ±invalid´³dict·³ description´³named³ description³any„„„„µ±absent´³dict·„„„„„„³ embeddedType€„„µ³ gatekeeperMux„´³schema·³version‘³ definitions·³API´³orµµ±Resolve´³refµ³ -gatekeeper„³Resolve„„µ±Connect´³refµ³noise„³Connect„„„„³ NoiseService´³rec´³lit³noise„´³tupleµ´³named³spec´³refµ³noise„³ NoiseSpec„„´³named³service´³embedded³any„„„„„³SecretKeyField´³orµµ±present´³dict·³ secretKey´³named³ secretKey´³atom³ -ByteString„„„„„µ±invalid´³dict·³ secretKey´³named³ secretKey³any„„„„µ±absent´³dict·„„„„„³NoiseServiceSpec´³andµ´³named³base´³refµ³noise„³ NoiseSpec„„´³named³ secretKey´³refµ„³SecretKeyField„„„„„³ embeddedType€„„µ³externalServices„´³schema·³version‘³ definitions·³Process´³orµµ±simple´³refµ„³ CommandLine„„µ±full´³refµ„³ FullProcess„„„„³Service´³refµ„³ DaemonService„³ClearEnv´³orµµ±present´³dict·³clearEnv´³named³clearEnv´³atom³Boolean„„„„„µ±invalid´³dict·³clearEnv´³named³clearEnv³any„„„„µ±absent´³dict·„„„„„³EnvValue´³orµµ±set´³atom³String„„µ±remove´³lit€„„µ±invalid³any„„„³Protocol´³orµµ±none´³lit³none„„µ±binarySyndicate´³lit³application/syndicate„„µ± textSyndicate´³lit³text/syndicate„„„„³ +ByteString„„µ±string´³atom³String„„µ±nested´³seqof´³refµ„³IOList„„„„„³Metadata´³rec´³lit³metadata„´³tupleµ´³named³object³any„´³named³info´³dictof´³atom³Symbol„³any„„„„„³ Description´³orµµ±present´³dict·³ description´³named³ description´³refµ„³IOList„„„„„µ±invalid´³dict·³ description´³named³ description³any„„„„µ±absent´³dict·„„„„„„³ embeddedType€„„µ³externalServices„´³schema·³version‘³ definitions·³Process´³orµµ±simple´³refµ„³ CommandLine„„µ±full´³refµ„³ FullProcess„„„„³Service´³refµ„³ DaemonService„³ClearEnv´³orµµ±present´³dict·³clearEnv´³named³clearEnv´³atom³Boolean„„„„„µ±invalid´³dict·³clearEnv´³named³clearEnv³any„„„„µ±absent´³dict·„„„„„³EnvValue´³orµµ±set´³atom³String„„µ±remove´³lit€„„µ±invalid³any„„„³Protocol´³orµµ±none´³lit³none„„µ±binarySyndicate´³lit³application/syndicate„„µ± textSyndicate´³lit³text/syndicate„„„„³ ProcessDir´³orµµ±present´³dict·³dir´³named³dir´³atom³String„„„„„µ±invalid´³dict·³dir´³named³dir³any„„„„µ±absent´³dict·„„„„„³ ProcessEnv´³orµµ±present´³dict·³env´³named³env´³dictof´³refµ„³ EnvVariable„´³refµ„³EnvValue„„„„„„µ±invalid´³dict·³env´³named³env³any„„„„µ±absent´³dict·„„„„„³ CommandLine´³orµµ±shell´³atom³String„„µ±full´³refµ„³FullCommandLine„„„„³ EnvVariable´³orµµ±string´³atom³String„„µ±symbol´³atom³Symbol„„µ±invalid³any„„„³ FullProcess´³andµ´³dict·³argv´³named³argv´³refµ„³ CommandLine„„„„´³named³env´³refµ„³ ProcessEnv„„´³named³dir´³refµ„³ diff --git a/syndicate-server/protocols/schemas/gatekeeperMux.prs b/syndicate-server/protocols/schemas/gatekeeperMux.prs deleted file mode 100644 index 6310b5e..0000000 --- a/syndicate-server/protocols/schemas/gatekeeperMux.prs +++ /dev/null @@ -1,7 +0,0 @@ -version 1 . - -API = gatekeeper.Resolve / noise.Connect . - -NoiseService = . -NoiseServiceSpec = @base noise.NoiseSpec & @secretKey SecretKeyField . -SecretKeyField = @present { secretKey: bytes } / @invalid { secretKey: any } / @absent {} . diff --git a/syndicate-server/src/gatekeeper.rs b/syndicate-server/src/gatekeeper.rs index a387a29..10f0bdc 100644 --- a/syndicate-server/src/gatekeeper.rs +++ b/syndicate-server/src/gatekeeper.rs @@ -20,41 +20,117 @@ use syndicate::value::NestedValue; use syndicate::schemas::dataspace; use syndicate::schemas::gatekeeper; use syndicate::schemas::noise; +use syndicate::schemas::sturdy; use crate::language::language; -use crate::schemas::gatekeeper_mux::Api; -use crate::schemas::gatekeeper_mux::NoiseServiceSpec; -use crate::schemas::gatekeeper_mux::SecretKeyField; -// pub fn bind( -// t: &mut Activation, -// ds: &Arc, -// oid: syndicate::schemas::sturdy::_Any, -// key: [u8; 16], -// target: Arc, -// ) { -// let sr = sturdy::SturdyRef::mint(oid.clone(), &key); -// tracing::info!(cap = ?language().unparse(&sr), hex = %sr.to_hex()); -// ds.assert(t, language(), &gatekeeper::Bind { oid, key: key.to_vec(), target }); -// } +use syndicate_macros::during; +use syndicate_macros::pattern; -pub fn handle_assertion( - ds: &mut Arc, +pub fn handle_binds(t: &mut Activation, ds: &Arc) -> ActorResult { + Ok(during!(t, ds, language(), , |t: &mut Activation| { + t.spawn_link(None, move |t| { handle_bind(t, description, target, observer) }); + Ok(()) + })) +} + +fn handle_bind( t: &mut Activation, - a: Api, -) -> DuringResult> { - match a { - Api::Resolve(resolve_box) => handle_resolve(ds, t, *resolve_box), - Api::Connect(connect_box) => handle_connect(ds, t, *connect_box), + description: AnyValue, + target: AnyValue, + observer: AnyValue, +) -> ActorResult { + let _target = target.value().to_embedded()?; + let observer = language().parse::(&observer)?; + + if let Ok(s) = language().parse::(&description) { + let sr = sturdy::SturdyRef::mint(s.oid, &s.key); + if let gatekeeper::BindObserver::Present(o) = observer { + o.assert(t, language(), &gatekeeper::Bound::Bound { + step: language().unparse(&sr), + }); + } + return Ok(()); + } + + if let Ok(s) = language().parse::>(&description) { + match validate_noise_spec(s.spec) { + Ok(spec) => if let gatekeeper::BindObserver::Present(o) = observer { + o.assert(t, language(), &gatekeeper::Bound::Bound { + step: language().unparse(&noise::NoiseRouteStep { + spec: noise::NoiseSpec { + key: spec.public_key, + service: noise::ServiceSelector(spec.service), + protocol: if spec.protocol == default_noise_protocol() { + noise::NoiseProtocol::Absent + } else { + noise::NoiseProtocol::Present { + protocol: spec.protocol, + } + }, + pre_shared_keys: if spec.psks.is_empty() { + noise::NoisePreSharedKeys::Absent + } else { + noise::NoisePreSharedKeys::Present { + pre_shared_keys: spec.psks, + } + }, + }, + }), + }); + }, + Err(e) => { + if let gatekeeper::BindObserver::Present(o) = observer { + o.assert(t, language(), &gatekeeper::Bound::Rejected( + Box::new(gatekeeper::Rejected { + detail: AnyValue::new(format!("{}", &e)), + }))); + } + tracing::error!("Invalid noise bind description: {}", e); + } + } + return Ok(()); + } + + if let gatekeeper::BindObserver::Present(o) = observer { + o.assert(t, language(), &gatekeeper::Bound::Rejected( + Box::new(gatekeeper::Rejected { + detail: AnyValue::symbol("unsupported"), + }))); + } + Ok(()) +} + +fn eventually_retract(h: Option) -> DuringResult { + if let Some(h) = h { + Ok(Some(Box::new(move |_state, t: &mut Activation| Ok(t.retract(h))))) + } else { + Ok(None) } } -fn handle_resolve( +pub fn handle_resolves( ds: &mut Arc, t: &mut Activation, a: gatekeeper::Resolve, ) -> DuringResult> { - let gatekeeper::Resolve { sturdyref, observer } = a; + if let Ok(s) = language().parse::(&a.step) { + return handle_resolve_sturdyref(ds, t, s.0, a.observer); + } + if let Ok(s) = language().parse::>(&a.step) { + return handle_resolve_noise(ds, t, s.service.0, a.observer); + } + eventually_retract(ds.assert(t, language(), &gatekeeper::Rejected { + detail: AnyValue::symbol("unsupported"), + })) +} + +fn handle_resolve_sturdyref( + ds: &mut Arc, + t: &mut Activation, + sturdyref: sturdy::SturdyRef, + observer: Arc, +) -> DuringResult> { let queried_oid = sturdyref.oid.clone(); let handler = syndicate::entity(observer) .on_asserted(move |observer, t, a: AnyValue| { @@ -65,91 +141,111 @@ fn handle_resolve( Err(e) => { tracing::warn!(sturdyref = ?language().unparse(&sturdyref), "sturdyref failed validation: {}", e); - Ok(None) + eventually_retract(observer.assert(t, language(), &gatekeeper::Resolved::Rejected( + Box::new(gatekeeper::Rejected { + detail: AnyValue::symbol("sturdyref-failed-validation"), + })))) }, Ok(target) => { tracing::trace!(sturdyref = ?language().unparse(&sturdyref), ?target, "sturdyref resolved"); - if let Some(h) = observer.assert(t, &(), &AnyValue::domain(target)) { - Ok(Some(Box::new(move |_observer, t| Ok(t.retract(h))))) - } else { - Ok(None) - } + eventually_retract(observer.assert(t, language(), &gatekeeper::Resolved::Accepted { + responder_session: target, + })) } } }) .create_cap(t); - if let Some(oh) = ds.assert(t, language(), &dataspace::Observe { + eventually_retract(ds.assert(t, language(), &dataspace::Observe { // TODO: codegen plugin to generate pattern constructors - pattern: syndicate_macros::pattern!{}, + pattern: pattern!{ $ _>}, observer: handler, - }) { - Ok(Some(Box::new(move |_ds, t| Ok(t.retract(oh))))) - } else { - Ok(None) - } + })) } -fn handle_connect( +struct ValidatedNoiseSpec { + service: AnyValue, + protocol: String, + pattern: HandshakePattern, + psks: Vec>, + secret_key: Option>, + public_key: Vec, +} + +fn default_noise_protocol() -> String { + language().unparse(&noise::DefaultProtocol).value().to_string().unwrap().clone() +} + +fn validate_noise_spec( + spec: noise::NoiseServiceSpec, +) -> Result { + let protocol = match spec.base.protocol { + noise::NoiseProtocol::Present { protocol } => protocol, + noise::NoiseProtocol::Invalid { protocol } => + Err(format!("Invalid noise protocol {:?}", protocol))?, + noise::NoiseProtocol::Absent => default_noise_protocol(), + }; + + const PREFIX: &'static str = "Noise_"; + const SUFFIX: &'static str = "_25519_ChaChaPoly_BLAKE2s"; + if !protocol.starts_with(PREFIX) || !protocol.ends_with(SUFFIX) { + Err(format!("Unsupported protocol {:?}", protocol))?; + } + + let pattern_name = &protocol[PREFIX.len()..(protocol.len()-SUFFIX.len())]; + let pattern = lookup_pattern(pattern_name).ok_or_else::( + || format!("Unsupported handshake pattern {:?}", pattern_name).into())?; + + let psks = match spec.base.pre_shared_keys { + noise::NoisePreSharedKeys::Present { pre_shared_keys } => pre_shared_keys, + noise::NoisePreSharedKeys::Invalid { pre_shared_keys } => + Err(format!("Invalid pre-shared-keys {:?}", pre_shared_keys))?, + noise::NoisePreSharedKeys::Absent => vec![], + }; + + let secret_key = match spec.secret_key { + noise::SecretKeyField::Present { secret_key } => Some(secret_key), + noise::SecretKeyField::Invalid { secret_key } => + Err(format!("Invalid secret key {:?}", secret_key))?, + noise::SecretKeyField::Absent => None, + }; + + Ok(ValidatedNoiseSpec { + service: spec.base.service.0, + protocol, + pattern, + psks, + secret_key, + public_key: spec.base.key, + }) +} + +fn handle_resolve_noise( ds: &mut Arc, t: &mut Activation, - a: noise::Connect, + service_selector: AnyValue, + initiator_session: Arc, ) -> DuringResult> { - let noise::Connect { service_selector, initiator_session } = a; let handler = syndicate::entity(()) .on_asserted_facet(move |_state, t, a: AnyValue| { let initiator_session = Arc::clone(&initiator_session); t.spawn_link(None, move |t| { let bindings = a.value().to_sequence()?; - let spec: NoiseServiceSpec = language().parse(&bindings[0])?; - let protocol = match spec.base.protocol { - noise::NoiseProtocol::Present { protocol } => - protocol, - noise::NoiseProtocol::Invalid { protocol } => - Err(format!("Invalid noise protocol {:?}", protocol))?, - noise::NoiseProtocol::Absent => - language().unparse(&noise::DefaultProtocol).value().to_string()?.clone(), - }; - let psks = match spec.base.pre_shared_keys { - noise::NoisePreSharedKeys::Present { pre_shared_keys } => - pre_shared_keys, - noise::NoisePreSharedKeys::Invalid { pre_shared_keys } => - Err(format!("Invalid pre-shared-keys {:?}", pre_shared_keys))?, - noise::NoisePreSharedKeys::Absent => - vec![], - }; - let secret_key = match spec.secret_key { - SecretKeyField::Present { secret_key } => - Some(secret_key), - SecretKeyField::Invalid { secret_key } => - Err(format!("Invalid secret key {:?}", secret_key))?, - SecretKeyField::Absent => - None, - }; + let spec = validate_noise_spec(language().parse(&bindings[0])?)?; let service = bindings[1].value().to_embedded()?; - run_noise_responder(t, - spec.base.service, - protocol, - psks, - secret_key, - initiator_session, - Arc::clone(service)) + run_noise_responder(t, spec, initiator_session, Arc::clone(service)) }); Ok(()) }) .create_cap(t); - if let Some(oh) = ds.assert(t, language(), &dataspace::Observe { + eventually_retract(ds.assert(t, language(), &dataspace::Observe { // TODO: codegen plugin to generate pattern constructors - pattern: syndicate_macros::pattern!{ - + pattern: pattern!{ + $service _> }, observer: handler, - }) { - Ok(Some(Box::new(move |_ds, t| Ok(t.retract(oh))))) - } else { - Ok(None) - } + })) } struct ResponderDetails { @@ -304,29 +400,17 @@ fn lookup_pattern(name: &str) -> Option { fn run_noise_responder( t: &mut Activation, - service_selector: AnyValue, - protocol: String, - psks: Vec>, - secret_key: Option>, + spec: ValidatedNoiseSpec, initiator_session: Arc, service: Arc, ) -> ActorResult { - const PREFIX: &'static str = "Noise_"; - const SUFFIX: &'static str = "_25519_ChaChaPoly_BLAKE2s"; - if !protocol.starts_with(PREFIX) || !protocol.ends_with(SUFFIX) { - Err(format!("Unsupported protocol {:?}", protocol))?; - } - let pattern_name = &protocol[PREFIX.len()..(protocol.len()-SUFFIX.len())]; - let pattern = lookup_pattern(pattern_name).ok_or_else::( - || format!("Unsupported handshake pattern {:?}", pattern_name).into())?; - let hs = { let mut builder = noise_protocol::HandshakeStateBuilder::new(); - builder.set_pattern(pattern); + builder.set_pattern(spec.pattern); builder.set_is_initiator(false); - let prologue = PackedWriter::encode(&mut NoEmbeddedDomainCodec, &service_selector)?; + let prologue = PackedWriter::encode(&mut NoEmbeddedDomainCodec, &spec.service)?; builder.set_prologue(&prologue); - match secret_key { + match spec.secret_key { None => (), Some(sk) => { let sk: [u8; 32] = sk.try_into().map_err(|_| "Bad secret key length")?; @@ -334,7 +418,7 @@ fn run_noise_responder( }, } let mut hs = builder.build_handshake_state(); - for psk in psks.into_iter() { + for psk in spec.psks.into_iter() { hs.push_psk(&psk); } hs @@ -347,6 +431,6 @@ fn run_noise_responder( let responder_session = Cap::guard(crate::Language::arc(), t.create(ResponderState::Handshake(details, hs))); - initiator_session.assert(t, language(), &noise::Accept { responder_session }); + initiator_session.assert(t, language(), &gatekeeper::Resolved::Accepted { responder_session }); Ok(()) } diff --git a/syndicate-server/src/main.rs b/syndicate-server/src/main.rs index 2a5ffeb..9d561f9 100644 --- a/syndicate-server/src/main.rs +++ b/syndicate-server/src/main.rs @@ -110,7 +110,8 @@ async fn main() -> ActorResult { let gatekeeper = Cap::guard(Language::arc(), t.create( syndicate::entity(Arc::clone(&server_config_ds)) - .on_asserted(gatekeeper::handle_assertion))); + .on_asserted(gatekeeper::handle_resolves))); + gatekeeper::handle_binds(t, &server_config_ds)?; let mut env = Map::new(); env.insert("config".to_owned(), AnyValue::domain(Arc::clone(&server_config_ds))); diff --git a/syndicate/src/relay.rs b/syndicate/src/relay.rs index 5c5eb81..57b409f 100644 --- a/syndicate/src/relay.rs +++ b/syndicate/src/relay.rs @@ -1,6 +1,7 @@ use bytes::Buf; use bytes::BytesMut; +use crate::Language; use crate::language; use crate::actor::*; use crate::during; @@ -36,6 +37,7 @@ use preserves::value::signed_integer::SignedInteger; use preserves_schema::Codec; use preserves_schema::Deserialize; use preserves_schema::ParseError; +use preserves_schema::support::Unparse; use std::io; use std::pin::Pin; @@ -175,17 +177,18 @@ impl Membrane { } } -pub fn connect_stream( +pub fn connect_stream( t: &mut Activation, i: I, o: O, output_text: bool, - sturdyref: sturdy::SturdyRef, + step: Step, initial_state: E, mut f: F, ) where I: 'static + Send + AsyncRead, O: 'static + Send + AsyncWrite, + Step: for<'a> Unparse<&'a Language, AnyValue>, E: 'static + Send, F: 'static + Send + FnMut(&mut E, &mut Activation, Arc) -> during::DuringResult { @@ -196,8 +199,8 @@ pub fn connect_stream( let denotation = a.value().to_embedded()?; f(state, t, Arc::clone(denotation)) })); - gatekeeper.assert(t, language(), &gatekeeper::Resolve { - sturdyref, + gatekeeper.assert(t, language(), &gatekeeper::Resolve:: { + step: language().unparse(&step), observer: Cap::new(&main_entity), }); }