From 92cc57d2cd2186c2c65fcc0aaea36d359d06ef39 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sat, 8 Jun 2024 02:18:03 +0200 Subject: [PATCH] Client-side noise protocol --- syndicate-server/src/resolution/client.rs | 3 + syndicate-server/src/resolution/noise.rs | 423 +++++++++++++++------- syndicate-server/src/resolution/sturdy.rs | 3 +- 3 files changed, 302 insertions(+), 127 deletions(-) diff --git a/syndicate-server/src/resolution/client.rs b/syndicate-server/src/resolution/client.rs index 55fe543..68e84db 100644 --- a/syndicate-server/src/resolution/client.rs +++ b/syndicate-server/src/resolution/client.rs @@ -38,6 +38,9 @@ pub fn start(t: &mut Activation, ds: Arc) { t.spawn(Some(AnyValue::symbol("sturdy_ref_step")), enclose!((ds) move |t| super::sturdy::handle_sturdy_path_steps(t, ds))); + + t.spawn(Some(AnyValue::symbol("noise_ref_step")), + enclose!((ds) move |t| super::noise::handle_noise_path_steps(t, ds))); } fn run(t: &mut Activation, ds: Arc, route: G::Route) -> ActorResult { diff --git a/syndicate-server/src/resolution/noise.rs b/syndicate-server/src/resolution/noise.rs index f46d57b..f93f051 100644 --- a/syndicate-server/src/resolution/noise.rs +++ b/syndicate-server/src/resolution/noise.rs @@ -13,17 +13,21 @@ use preserves_schema::Codec; use syndicate::actor::*; use syndicate::relay::Mutex; use syndicate::relay::TunnelRelay; +use syndicate::rpc; use syndicate::trace::TurnCause; use syndicate::value::NestedValue; use syndicate::value::NoEmbeddedDomainCodec; use syndicate::value::PackedWriter; +use syndicate::enclose; use syndicate_macros::during; use syndicate_macros::pattern; use syndicate::schemas::dataspace; use syndicate::schemas::gatekeeper; use syndicate::schemas::noise; +use syndicate::schemas::rpc as R; +use syndicate::schemas::sturdy; use crate::language; @@ -37,7 +41,7 @@ pub fn handle_noise_binds(t: &mut Activation, ds: &Arc) -> ActorResult { target.value().to_embedded()?; let observer = language().parse::(&observer)?; let spec = language().parse::>(&desc)?.0; - match validate_noise_spec(spec) { + match validate_noise_service_spec(spec) { Ok(spec) => if let gatekeeper::BindObserver::Present(o) = observer { o.assert(t, language(), &gatekeeper::Bound::Bound { path_step: Box::new(gatekeeper::PathStep { @@ -108,9 +112,9 @@ fn default_noise_protocol() -> String { } fn validate_noise_spec( - spec: noise::NoiseServiceSpec, + spec: noise::NoiseSpec, ) -> Result { - let protocol = match spec.base.protocol { + let protocol = match spec.protocol { noise::NoiseProtocol::Present { protocol } => protocol, noise::NoiseProtocol::Invalid { protocol } => Err(format!("Invalid noise protocol {:?}", protocol))?, @@ -127,28 +131,35 @@ fn validate_noise_spec( let pattern = lookup_pattern(pattern_name).ok_or_else::( || format!("Unsupported handshake pattern {:?}", pattern_name).into())?; - let psks = match spec.base.pre_shared_keys { + let psks = match spec.pre_shared_keys { noise::NoisePreSharedKeys::Present { pre_shared_keys } => pre_shared_keys, noise::NoisePreSharedKeys::Invalid { pre_shared_keys } => Err(format!("Invalid pre-shared-keys {:?}", pre_shared_keys))?, noise::NoisePreSharedKeys::Absent => vec![], }; - let secret_key = match spec.secret_key { + Ok(ValidatedNoiseSpec { + service: spec.service.0, + protocol, + pattern, + psks, + secret_key: None, + public_key: spec.key, + }) +} + +fn validate_noise_service_spec( + spec: noise::NoiseServiceSpec, +) -> Result { + let noise::NoiseServiceSpec { base, secret_key } = spec; + let v = validate_noise_spec(base)?; + let secret_key = match secret_key { noise::SecretKeyField::Present { secret_key } => Some(secret_key), noise::SecretKeyField::Invalid { secret_key } => Err(format!("Invalid secret key {:?}", secret_key))?, noise::SecretKeyField::Absent => None, }; - - Ok(ValidatedNoiseSpec { - service: spec.base.service.0, - protocol, - pattern, - psks, - secret_key, - public_key: spec.base.key, - }) + Ok(ValidatedNoiseSpec { secret_key, .. v }) } fn await_bind_noise( @@ -164,9 +175,14 @@ fn await_bind_noise( let observer = Arc::clone(&observer); t.spawn_link(None, move |t| { let bindings = a.value().to_sequence()?; - let spec = validate_noise_spec(language().parse(&bindings[0])?)?; - let service = bindings[1].value().to_embedded()?; - run_noise_responder(t, spec, observer, Arc::clone(service)) + let spec = validate_noise_service_spec(language().parse(&bindings[0])?)?; + let service = bindings[1].value().to_embedded()?.clone(); + let hs = make_handshake(&spec, false)?; + let responder_session = Cap::guard(crate::Language::arc(), t.create( + ResponderState::Introduction{ service, hs })); + observer.assert( + t, language(), &gatekeeper::Resolved::Accepted { responder_session }); + Ok(()) }); Ok(()) }) @@ -181,23 +197,28 @@ fn await_bind_noise( Ok(()) } -type HandshakeState = noise_protocol::HandshakeState; +type NoiseHandshakeState = noise_protocol::HandshakeState; + +struct HandshakeState { + peer: Arc, + hs: NoiseHandshakeState, + initial_ref: Option>, + initial_oid: Option, +} + +struct TransportState { + relay_input: Arc>>, + c_recv: CipherState, +} enum ResponderState { Invalid, // used during state transitions Introduction { service: Arc, - hs: HandshakeState, - }, - Handshake { - initiator_session: Arc, - service: Arc, - hs: HandshakeState, - }, - Transport { - relay_input: Arc>>, - c_recv: CipherState, + hs: NoiseHandshakeState, }, + Handshake(HandshakeState), + Transport(TransportState), } impl Entity for ResponderState { @@ -208,7 +229,12 @@ impl Entity for ResponderState { }; match std::mem::replace(self, ResponderState::Invalid) { ResponderState::Introduction { service, hs } => { - *self = ResponderState::Handshake { initiator_session, service, hs }; + *self = ResponderState::Handshake(HandshakeState { + peer: initiator_session, + hs, + initial_ref: Some(service.clone()), + initial_oid: None, + }); Ok(()) } _ => @@ -224,77 +250,125 @@ impl Entity for ResponderState { match self { ResponderState::Invalid | ResponderState::Introduction { .. } => Err("Received Packet in invalid ResponderState")?, - ResponderState::Handshake { initiator_session, service, hs } => match p { - noise::Packet::Complete(bs) => { - if bs.len() < hs.get_next_message_overhead() { - Err("Invalid handshake message for pattern")?; - } - if bs.len() > hs.get_next_message_overhead() { - Err("Cannot accept payload during handshake")?; - } - hs.read_message(&bs, &mut [])?; - let mut reply = vec![0u8; hs.get_next_message_overhead()]; - hs.write_message(&[], &mut reply[..])?; - initiator_session.message(t, language(), &noise::Packet::Complete(reply.into())); - if hs.completed() { - let (c_recv, mut c_send) = hs.get_ciphers(); - let (_, relay_input, mut relay_output) = - TunnelRelay::_run(t, Some(Arc::clone(service)), None, false); - let trace_collector = t.trace_collector(); - let initiator_session = Arc::clone(initiator_session); - let relay_output_name = Some(AnyValue::symbol("relay_output")); - let transport_facet = t.facet_ref(); - t.linked_task(relay_output_name.clone(), async move { - let account = Account::new(relay_output_name, trace_collector); - let cause = TurnCause::external("relay_output"); - loop { - match relay_output.recv().await { - None => return Ok(LinkedTaskTermination::KeepFacet), - Some(loaned_item) => { - const MAXSIZE: usize = 65535 - 16; /* Noise tag length is 16 */ - let p = if loaned_item.item.len() > MAXSIZE { - noise::Packet::Fragmented( - loaned_item.item - .chunks(MAXSIZE) - .map(|c| c_send.encrypt_vec(c)) - .collect()) - } else { - noise::Packet::Complete(c_send.encrypt_vec(&loaned_item.item)) - }; - if !transport_facet.activate(&account, Some(cause.clone()), |t| { - initiator_session.message(t, language(), &p); - Ok(()) - }) { - break; - } - } - } - } - Ok(LinkedTaskTermination::Normal) - }); - *self = ResponderState::Transport { relay_input, c_recv }; + ResponderState::Handshake(hss) => { + if let Some((None, ts)) = hss.handle_packet(t, p)? { + *self = ResponderState::Transport(ts); + } + } + ResponderState::Transport(ts) => ts.handle_packet(t, p)?, + } + Ok(()) + } +} + +impl HandshakeState { + fn handle_packet( + &mut self, + t: &mut Activation, + p: noise::Packet, + ) -> Result>, TransportState)>, ActorError> { + match p { + noise::Packet::Complete(bs) => { + if bs.len() < self.hs.get_next_message_overhead() { + Err("Invalid handshake message for pattern")?; + } + if bs.len() > self.hs.get_next_message_overhead() { + Err("Cannot accept payload during handshake")?; + } + self.hs.read_message(&bs, &mut [])?; + if self.hs.completed() { + self.complete_handshake(t) + } else { + self.send_handshake_packet(t) + } + } + _ => Err("Fragmented handshake is not allowed")?, + } + } + + fn send_handshake_packet( + &mut self, + t: &mut Activation, + ) -> Result>, TransportState)>, ActorError> { + let mut reply = vec![0u8; self.hs.get_next_message_overhead()]; + self.hs.write_message(&[], &mut reply[..])?; + self.peer.message(t, language(), &noise::Packet::Complete(reply.into())); + if self.hs.completed() { + self.complete_handshake(t) + } else { + Ok(None) + } + } + + fn complete_handshake( + &mut self, + t: &mut Activation, + ) -> Result>, TransportState)>, ActorError> { + let (c_i_to_r, c_r_to_i) = self.hs.get_ciphers(); + let (c_recv, mut c_send) = if self.hs.get_is_initiator() { + (c_r_to_i, c_i_to_r) + } else { + (c_i_to_r, c_r_to_i) + }; + let (peer_service, relay_input, mut relay_output) = + TunnelRelay::_run(t, self.initial_ref.clone(), self.initial_oid.clone(), false); + let trace_collector = t.trace_collector(); + let peer = self.peer.clone(); + let relay_output_name = Some(AnyValue::symbol("relay_output")); + let transport_facet = t.facet_ref(); + t.linked_task(relay_output_name.clone(), async move { + let account = Account::new(relay_output_name, trace_collector); + let cause = TurnCause::external("relay_output"); + loop { + match relay_output.recv().await { + None => return Ok(LinkedTaskTermination::KeepFacet), + Some(loaned_item) => { + const MAXSIZE: usize = 65535 - 16; /* Noise tag length is 16 */ + let p = if loaned_item.item.len() > MAXSIZE { + noise::Packet::Fragmented( + loaned_item.item + .chunks(MAXSIZE) + .map(|c| c_send.encrypt_vec(c)) + .collect()) + } else { + noise::Packet::Complete(c_send.encrypt_vec(&loaned_item.item)) + }; + if !transport_facet.activate(&account, Some(cause.clone()), |t| { + peer.message(t, language(), &p); + Ok(()) + }) { + break; + } } } - _ => Err("Fragmented handshake is not allowed")?, - }, - ResponderState::Transport { relay_input, c_recv } => { - let bs = match p { - noise::Packet::Complete(bs) => - c_recv.decrypt_vec(&bs[..]).map_err(|_| "Cannot decrypt packet")?, - noise::Packet::Fragmented(pieces) => { - let mut result = Vec::with_capacity(1024); - for piece in pieces { - result.extend(c_recv.decrypt_vec(&piece[..]) - .map_err(|_| "Cannot decrypt packet fragment")?); - } - result - } - }; - let mut g = relay_input.lock(); - let tr = g.as_mut().expect("initialized"); - tr.handle_inbound_datagram(t, &bs[..])?; } - } + Ok(LinkedTaskTermination::Normal) + }); + Ok(Some((peer_service, TransportState { relay_input, c_recv }))) + } +} + +impl TransportState { + fn handle_packet( + &mut self, + t: &mut Activation, + p: noise::Packet, + ) -> ActorResult { + let bs = match p { + noise::Packet::Complete(bs) => + self.c_recv.decrypt_vec(&bs[..]).map_err(|_| "Cannot decrypt packet")?, + noise::Packet::Fragmented(pieces) => { + let mut result = Vec::with_capacity(1024); + for piece in pieces { + result.extend(self.c_recv.decrypt_vec(&piece[..]) + .map_err(|_| "Cannot decrypt packet fragment")?); + } + result + } + }; + let mut g = self.relay_input.lock(); + let tr = g.as_mut().expect("initialized"); + tr.handle_inbound_datagram(t, &bs[..])?; Ok(()) } } @@ -355,34 +429,133 @@ fn lookup_pattern(name: &str) -> Option { }) } -fn run_noise_responder( - t: &mut Activation, - spec: ValidatedNoiseSpec, - observer: Arc, - service: Arc, -) -> ActorResult { - let hs = { - let mut builder = noise_protocol::HandshakeStateBuilder::new(); - builder.set_pattern(spec.pattern); - builder.set_is_initiator(false); - let prologue = PackedWriter::encode(&mut NoEmbeddedDomainCodec, &spec.service)?; - builder.set_prologue(&prologue); - match spec.secret_key { - None => (), - Some(sk) => { - let sk: [u8; 32] = sk.try_into().map_err(|_| "Bad secret key length")?; - builder.set_s(U8Array::from_slice(&sk)); - }, - } - let mut hs = builder.build_handshake_state(); - for psk in spec.psks.into_iter() { - hs.push_psk(&psk); - } - hs - }; +fn make_handshake( + spec: &ValidatedNoiseSpec, + is_initiator: bool, +) -> Result { + let mut builder = noise_protocol::HandshakeStateBuilder::new(); + builder.set_pattern(spec.pattern.clone()); + builder.set_is_initiator(is_initiator); + let prologue = PackedWriter::encode(&mut NoEmbeddedDomainCodec, &spec.service)?; + builder.set_prologue(&prologue); + match spec.secret_key.clone() { + None => (), + Some(sk) => { + let sk: [u8; 32] = sk.try_into().map_err(|_| "Bad secret key length")?; + builder.set_s(U8Array::from_slice(&sk)); + }, + } + builder.set_rs(U8Array::from_slice(&spec.public_key)); + let mut hs = builder.build_handshake_state(); + for psk in spec.psks.iter() { + hs.push_psk(psk); + } + Ok(hs) +} - let responder_session = - Cap::guard(crate::Language::arc(), t.create(ResponderState::Introduction{ service, hs })); - observer.assert(t, language(), &gatekeeper::Resolved::Accepted { responder_session }); +pub fn handle_noise_path_steps(t: &mut Activation, ds: Arc) -> ActorResult { + during!(t, ds, language(), + >>, + enclose!((ds) move |t: &mut Activation| { + if let Ok(spec) = language().parse::(&spec0) { + if let Some(origin) = origin.value().as_embedded().cloned() { + t.spawn_link(None, move |t| run_noise_initiator(t, ds, origin, spec)); + } + } + Ok(()) + })); Ok(()) } + +fn run_noise_initiator( + t: &mut Activation, + ds: Arc, + origin: Arc, + spec: noise::NoiseSpec, +) -> ActorResult { + let q = language().unparse(&gatekeeper::ResolvePathStep { + origin: origin.clone(), + path_step: gatekeeper::PathStep { + step_type: "noise".to_string(), + detail: language().unparse(&spec), + } + }); + let service = spec.service.clone(); + let validated = validate_noise_spec(spec)?; + let observer = Cap::guard(&language().syndicate, t.create( + syndicate::entity(()).on_asserted_facet( + enclose!((ds, q) move |_, t, r: gatekeeper::Resolved| { + match r { + gatekeeper::Resolved::Rejected(b) => { + ds.assert(t, language(), &rpc::answer( + language(), q.clone(), R::Result::Error { + error: b.detail })); + } + gatekeeper::Resolved::Accepted { responder_session } => + run_initiator_session( + t, ds.clone(), q.clone(), &validated, responder_session)?, + } + Ok(()) + })))); + origin.assert(t, language(), &gatekeeper::Resolve { + step: gatekeeper::Step { + step_type: "noise".to_string(), + detail: language().unparse(&service), + }, + observer, + }); + Ok(()) +} + +fn run_initiator_session( + t: &mut Activation, + ds: Arc, + question: AnyValue, + spec: &ValidatedNoiseSpec, + responder_session: Arc, +) -> ActorResult { + let initiator_session_ref = t.create_inert(); + let initiator_session = Cap::guard(crate::Language::arc(), initiator_session_ref.clone()); + responder_session.assert(t, language(), &noise::Initiator { initiator_session }); + let mut hss = HandshakeState { + peer: responder_session.clone(), + hs: make_handshake(spec, true)?, + initial_ref: None, + initial_oid: Some(sturdy::Oid(0.into())), + }; + if !hss.hs.completed() { + if hss.send_handshake_packet(t)?.is_some() { + // TODO: this might be a valid pattern, check + panic!("Unexpected complete handshake after no messages"); + } + } + initiator_session_ref.become_entity(InitiatorState::Handshake { ds, question, hss }); + Ok(()) +} + +enum InitiatorState { + Handshake { + ds: Arc, + question: AnyValue, + hss: HandshakeState, + }, + Transport(TransportState), +} + +impl Entity for InitiatorState { + fn message(&mut self, t: &mut Activation, p: noise::Packet) -> ActorResult { + match self { + InitiatorState::Handshake { hss, ds, question } => { + if let Some((Some(peer_service), ts)) = hss.handle_packet(t, p)? { + let ds = ds.clone(); + let question = question.clone(); + *self = InitiatorState::Transport(ts); + ds.assert(t, language(), &rpc::answer(language(), question, R::Result::Ok { + value: AnyValue::domain(peer_service) })); + } + } + InitiatorState::Transport(ts) => ts.handle_packet(t, p)?, + } + Ok(()) + } +} diff --git a/syndicate-server/src/resolution/sturdy.rs b/syndicate-server/src/resolution/sturdy.rs index 45a6f45..22c4df8 100644 --- a/syndicate-server/src/resolution/sturdy.rs +++ b/syndicate-server/src/resolution/sturdy.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use preserves_schema::Codec; use syndicate::actor::*; -use syndicate::during; use syndicate::rpc; use syndicate::value::NestedValue; @@ -107,7 +106,7 @@ pub fn handle_sturdy_path_steps(t: &mut Activation, ds: Arc) -> ActorResult enclose!((ds) move |t: &mut Activation| { if let Some(origin) = origin.value().as_embedded().cloned() { let observer = Cap::guard(&language().syndicate, t.create( - during::entity(()).on_asserted_facet( + syndicate::entity(()).on_asserted_facet( enclose!((origin, parameters) move |_, t, r: gatekeeper::Resolved| { ds.assert(t, language(), &rpc::answer( language(),