Client-side noise protocol
/ build (push) Successful in 4m32s Details

This commit is contained in:
Tony Garnock-Jones 2024-06-08 02:18:03 +02:00
parent 46f4071d4f
commit 92cc57d2cd
3 changed files with 302 additions and 127 deletions

View File

@ -38,6 +38,9 @@ pub fn start(t: &mut Activation, ds: Arc<Cap>) {
t.spawn(Some(AnyValue::symbol("sturdy_ref_step")),
enclose!((ds) move |t| super::sturdy::handle_sturdy_path_steps(t, ds)));
t.spawn(Some(AnyValue::symbol("noise_ref_step")),
enclose!((ds) move |t| super::noise::handle_noise_path_steps(t, ds)));
}
fn run(t: &mut Activation, ds: Arc<Cap>, route: G::Route) -> ActorResult {

View File

@ -13,17 +13,21 @@ use preserves_schema::Codec;
use syndicate::actor::*;
use syndicate::relay::Mutex;
use syndicate::relay::TunnelRelay;
use syndicate::rpc;
use syndicate::trace::TurnCause;
use syndicate::value::NestedValue;
use syndicate::value::NoEmbeddedDomainCodec;
use syndicate::value::PackedWriter;
use syndicate::enclose;
use syndicate_macros::during;
use syndicate_macros::pattern;
use syndicate::schemas::dataspace;
use syndicate::schemas::gatekeeper;
use syndicate::schemas::noise;
use syndicate::schemas::rpc as R;
use syndicate::schemas::sturdy;
use crate::language;
@ -37,7 +41,7 @@ pub fn handle_noise_binds(t: &mut Activation, ds: &Arc<Cap>) -> ActorResult {
target.value().to_embedded()?;
let observer = language().parse::<gatekeeper::BindObserver>(&observer)?;
let spec = language().parse::<noise::NoiseDescriptionDetail<AnyValue>>(&desc)?.0;
match validate_noise_spec(spec) {
match validate_noise_service_spec(spec) {
Ok(spec) => if let gatekeeper::BindObserver::Present(o) = observer {
o.assert(t, language(), &gatekeeper::Bound::Bound {
path_step: Box::new(gatekeeper::PathStep {
@ -108,9 +112,9 @@ fn default_noise_protocol() -> String {
}
fn validate_noise_spec(
spec: noise::NoiseServiceSpec<AnyValue>,
spec: noise::NoiseSpec<AnyValue>,
) -> Result<ValidatedNoiseSpec, ActorError> {
let protocol = match spec.base.protocol {
let protocol = match spec.protocol {
noise::NoiseProtocol::Present { protocol } => protocol,
noise::NoiseProtocol::Invalid { protocol } =>
Err(format!("Invalid noise protocol {:?}", protocol))?,
@ -127,28 +131,35 @@ fn validate_noise_spec(
let pattern = lookup_pattern(pattern_name).ok_or_else::<ActorError, _>(
|| format!("Unsupported handshake pattern {:?}", pattern_name).into())?;
let psks = match spec.base.pre_shared_keys {
let psks = match spec.pre_shared_keys {
noise::NoisePreSharedKeys::Present { pre_shared_keys } => pre_shared_keys,
noise::NoisePreSharedKeys::Invalid { pre_shared_keys } =>
Err(format!("Invalid pre-shared-keys {:?}", pre_shared_keys))?,
noise::NoisePreSharedKeys::Absent => vec![],
};
let secret_key = match spec.secret_key {
Ok(ValidatedNoiseSpec {
service: spec.service.0,
protocol,
pattern,
psks,
secret_key: None,
public_key: spec.key,
})
}
fn validate_noise_service_spec(
spec: noise::NoiseServiceSpec<AnyValue>,
) -> Result<ValidatedNoiseSpec, ActorError> {
let noise::NoiseServiceSpec { base, secret_key } = spec;
let v = validate_noise_spec(base)?;
let secret_key = match secret_key {
noise::SecretKeyField::Present { secret_key } => Some(secret_key),
noise::SecretKeyField::Invalid { secret_key } =>
Err(format!("Invalid secret key {:?}", secret_key))?,
noise::SecretKeyField::Absent => None,
};
Ok(ValidatedNoiseSpec {
service: spec.base.service.0,
protocol,
pattern,
psks,
secret_key,
public_key: spec.base.key,
})
Ok(ValidatedNoiseSpec { secret_key, .. v })
}
fn await_bind_noise(
@ -164,9 +175,14 @@ fn await_bind_noise(
let observer = Arc::clone(&observer);
t.spawn_link(None, move |t| {
let bindings = a.value().to_sequence()?;
let spec = validate_noise_spec(language().parse(&bindings[0])?)?;
let service = bindings[1].value().to_embedded()?;
run_noise_responder(t, spec, observer, Arc::clone(service))
let spec = validate_noise_service_spec(language().parse(&bindings[0])?)?;
let service = bindings[1].value().to_embedded()?.clone();
let hs = make_handshake(&spec, false)?;
let responder_session = Cap::guard(crate::Language::arc(), t.create(
ResponderState::Introduction{ service, hs }));
observer.assert(
t, language(), &gatekeeper::Resolved::Accepted { responder_session });
Ok(())
});
Ok(())
})
@ -181,23 +197,28 @@ fn await_bind_noise(
Ok(())
}
type HandshakeState = noise_protocol::HandshakeState<X25519, ChaCha20Poly1305, Blake2s>;
type NoiseHandshakeState = noise_protocol::HandshakeState<X25519, ChaCha20Poly1305, Blake2s>;
struct HandshakeState {
peer: Arc<Cap>,
hs: NoiseHandshakeState,
initial_ref: Option<Arc<Cap>>,
initial_oid: Option<sturdy::Oid>,
}
struct TransportState {
relay_input: Arc<Mutex<Option<TunnelRelay>>>,
c_recv: CipherState<ChaCha20Poly1305>,
}
enum ResponderState {
Invalid, // used during state transitions
Introduction {
service: Arc<Cap>,
hs: HandshakeState,
},
Handshake {
initiator_session: Arc<Cap>,
service: Arc<Cap>,
hs: HandshakeState,
},
Transport {
relay_input: Arc<Mutex<Option<TunnelRelay>>>,
c_recv: CipherState<ChaCha20Poly1305>,
hs: NoiseHandshakeState,
},
Handshake(HandshakeState),
Transport(TransportState),
}
impl Entity<noise::SessionItem> for ResponderState {
@ -208,7 +229,12 @@ impl Entity<noise::SessionItem> for ResponderState {
};
match std::mem::replace(self, ResponderState::Invalid) {
ResponderState::Introduction { service, hs } => {
*self = ResponderState::Handshake { initiator_session, service, hs };
*self = ResponderState::Handshake(HandshakeState {
peer: initiator_session,
hs,
initial_ref: Some(service.clone()),
initial_oid: None,
});
Ok(())
}
_ =>
@ -224,77 +250,125 @@ impl Entity<noise::SessionItem> for ResponderState {
match self {
ResponderState::Invalid | ResponderState::Introduction { .. } =>
Err("Received Packet in invalid ResponderState")?,
ResponderState::Handshake { initiator_session, service, hs } => match p {
noise::Packet::Complete(bs) => {
if bs.len() < hs.get_next_message_overhead() {
Err("Invalid handshake message for pattern")?;
}
if bs.len() > hs.get_next_message_overhead() {
Err("Cannot accept payload during handshake")?;
}
hs.read_message(&bs, &mut [])?;
let mut reply = vec![0u8; hs.get_next_message_overhead()];
hs.write_message(&[], &mut reply[..])?;
initiator_session.message(t, language(), &noise::Packet::Complete(reply.into()));
if hs.completed() {
let (c_recv, mut c_send) = hs.get_ciphers();
let (_, relay_input, mut relay_output) =
TunnelRelay::_run(t, Some(Arc::clone(service)), None, false);
let trace_collector = t.trace_collector();
let initiator_session = Arc::clone(initiator_session);
let relay_output_name = Some(AnyValue::symbol("relay_output"));
let transport_facet = t.facet_ref();
t.linked_task(relay_output_name.clone(), async move {
let account = Account::new(relay_output_name, trace_collector);
let cause = TurnCause::external("relay_output");
loop {
match relay_output.recv().await {
None => return Ok(LinkedTaskTermination::KeepFacet),
Some(loaned_item) => {
const MAXSIZE: usize = 65535 - 16; /* Noise tag length is 16 */
let p = if loaned_item.item.len() > MAXSIZE {
noise::Packet::Fragmented(
loaned_item.item
.chunks(MAXSIZE)
.map(|c| c_send.encrypt_vec(c))
.collect())
} else {
noise::Packet::Complete(c_send.encrypt_vec(&loaned_item.item))
};
if !transport_facet.activate(&account, Some(cause.clone()), |t| {
initiator_session.message(t, language(), &p);
Ok(())
}) {
break;
}
}
}
}
Ok(LinkedTaskTermination::Normal)
});
*self = ResponderState::Transport { relay_input, c_recv };
ResponderState::Handshake(hss) => {
if let Some((None, ts)) = hss.handle_packet(t, p)? {
*self = ResponderState::Transport(ts);
}
}
ResponderState::Transport(ts) => ts.handle_packet(t, p)?,
}
Ok(())
}
}
impl HandshakeState {
fn handle_packet(
&mut self,
t: &mut Activation,
p: noise::Packet,
) -> Result<Option<(Option<Arc<Cap>>, TransportState)>, ActorError> {
match p {
noise::Packet::Complete(bs) => {
if bs.len() < self.hs.get_next_message_overhead() {
Err("Invalid handshake message for pattern")?;
}
if bs.len() > self.hs.get_next_message_overhead() {
Err("Cannot accept payload during handshake")?;
}
self.hs.read_message(&bs, &mut [])?;
if self.hs.completed() {
self.complete_handshake(t)
} else {
self.send_handshake_packet(t)
}
}
_ => Err("Fragmented handshake is not allowed")?,
}
}
fn send_handshake_packet(
&mut self,
t: &mut Activation,
) -> Result<Option<(Option<Arc<Cap>>, TransportState)>, ActorError> {
let mut reply = vec![0u8; self.hs.get_next_message_overhead()];
self.hs.write_message(&[], &mut reply[..])?;
self.peer.message(t, language(), &noise::Packet::Complete(reply.into()));
if self.hs.completed() {
self.complete_handshake(t)
} else {
Ok(None)
}
}
fn complete_handshake(
&mut self,
t: &mut Activation,
) -> Result<Option<(Option<Arc<Cap>>, TransportState)>, ActorError> {
let (c_i_to_r, c_r_to_i) = self.hs.get_ciphers();
let (c_recv, mut c_send) = if self.hs.get_is_initiator() {
(c_r_to_i, c_i_to_r)
} else {
(c_i_to_r, c_r_to_i)
};
let (peer_service, relay_input, mut relay_output) =
TunnelRelay::_run(t, self.initial_ref.clone(), self.initial_oid.clone(), false);
let trace_collector = t.trace_collector();
let peer = self.peer.clone();
let relay_output_name = Some(AnyValue::symbol("relay_output"));
let transport_facet = t.facet_ref();
t.linked_task(relay_output_name.clone(), async move {
let account = Account::new(relay_output_name, trace_collector);
let cause = TurnCause::external("relay_output");
loop {
match relay_output.recv().await {
None => return Ok(LinkedTaskTermination::KeepFacet),
Some(loaned_item) => {
const MAXSIZE: usize = 65535 - 16; /* Noise tag length is 16 */
let p = if loaned_item.item.len() > MAXSIZE {
noise::Packet::Fragmented(
loaned_item.item
.chunks(MAXSIZE)
.map(|c| c_send.encrypt_vec(c))
.collect())
} else {
noise::Packet::Complete(c_send.encrypt_vec(&loaned_item.item))
};
if !transport_facet.activate(&account, Some(cause.clone()), |t| {
peer.message(t, language(), &p);
Ok(())
}) {
break;
}
}
}
_ => Err("Fragmented handshake is not allowed")?,
},
ResponderState::Transport { relay_input, c_recv } => {
let bs = match p {
noise::Packet::Complete(bs) =>
c_recv.decrypt_vec(&bs[..]).map_err(|_| "Cannot decrypt packet")?,
noise::Packet::Fragmented(pieces) => {
let mut result = Vec::with_capacity(1024);
for piece in pieces {
result.extend(c_recv.decrypt_vec(&piece[..])
.map_err(|_| "Cannot decrypt packet fragment")?);
}
result
}
};
let mut g = relay_input.lock();
let tr = g.as_mut().expect("initialized");
tr.handle_inbound_datagram(t, &bs[..])?;
}
}
Ok(LinkedTaskTermination::Normal)
});
Ok(Some((peer_service, TransportState { relay_input, c_recv })))
}
}
impl TransportState {
fn handle_packet(
&mut self,
t: &mut Activation,
p: noise::Packet,
) -> ActorResult {
let bs = match p {
noise::Packet::Complete(bs) =>
self.c_recv.decrypt_vec(&bs[..]).map_err(|_| "Cannot decrypt packet")?,
noise::Packet::Fragmented(pieces) => {
let mut result = Vec::with_capacity(1024);
for piece in pieces {
result.extend(self.c_recv.decrypt_vec(&piece[..])
.map_err(|_| "Cannot decrypt packet fragment")?);
}
result
}
};
let mut g = self.relay_input.lock();
let tr = g.as_mut().expect("initialized");
tr.handle_inbound_datagram(t, &bs[..])?;
Ok(())
}
}
@ -355,34 +429,133 @@ fn lookup_pattern(name: &str) -> Option<HandshakePattern> {
})
}
fn run_noise_responder(
t: &mut Activation,
spec: ValidatedNoiseSpec,
observer: Arc<Cap>,
service: Arc<Cap>,
) -> ActorResult {
let hs = {
let mut builder = noise_protocol::HandshakeStateBuilder::new();
builder.set_pattern(spec.pattern);
builder.set_is_initiator(false);
let prologue = PackedWriter::encode(&mut NoEmbeddedDomainCodec, &spec.service)?;
builder.set_prologue(&prologue);
match spec.secret_key {
None => (),
Some(sk) => {
let sk: [u8; 32] = sk.try_into().map_err(|_| "Bad secret key length")?;
builder.set_s(U8Array::from_slice(&sk));
},
}
let mut hs = builder.build_handshake_state();
for psk in spec.psks.into_iter() {
hs.push_psk(&psk);
}
hs
};
fn make_handshake(
spec: &ValidatedNoiseSpec,
is_initiator: bool,
) -> Result<NoiseHandshakeState, ActorError> {
let mut builder = noise_protocol::HandshakeStateBuilder::new();
builder.set_pattern(spec.pattern.clone());
builder.set_is_initiator(is_initiator);
let prologue = PackedWriter::encode(&mut NoEmbeddedDomainCodec, &spec.service)?;
builder.set_prologue(&prologue);
match spec.secret_key.clone() {
None => (),
Some(sk) => {
let sk: [u8; 32] = sk.try_into().map_err(|_| "Bad secret key length")?;
builder.set_s(U8Array::from_slice(&sk));
},
}
builder.set_rs(U8Array::from_slice(&spec.public_key));
let mut hs = builder.build_handshake_state();
for psk in spec.psks.iter() {
hs.push_psk(psk);
}
Ok(hs)
}
let responder_session =
Cap::guard(crate::Language::arc(), t.create(ResponderState::Introduction{ service, hs }));
observer.assert(t, language(), &gatekeeper::Resolved::Accepted { responder_session });
pub fn handle_noise_path_steps(t: &mut Activation, ds: Arc<Cap>) -> ActorResult {
during!(t, ds, language(),
<q <resolve-path-step $origin <noise $spec0>>>,
enclose!((ds) move |t: &mut Activation| {
if let Ok(spec) = language().parse::<noise::NoiseSpec>(&spec0) {
if let Some(origin) = origin.value().as_embedded().cloned() {
t.spawn_link(None, move |t| run_noise_initiator(t, ds, origin, spec));
}
}
Ok(())
}));
Ok(())
}
fn run_noise_initiator(
t: &mut Activation,
ds: Arc<Cap>,
origin: Arc<Cap>,
spec: noise::NoiseSpec,
) -> ActorResult {
let q = language().unparse(&gatekeeper::ResolvePathStep {
origin: origin.clone(),
path_step: gatekeeper::PathStep {
step_type: "noise".to_string(),
detail: language().unparse(&spec),
}
});
let service = spec.service.clone();
let validated = validate_noise_spec(spec)?;
let observer = Cap::guard(&language().syndicate, t.create(
syndicate::entity(()).on_asserted_facet(
enclose!((ds, q) move |_, t, r: gatekeeper::Resolved| {
match r {
gatekeeper::Resolved::Rejected(b) => {
ds.assert(t, language(), &rpc::answer(
language(), q.clone(), R::Result::Error {
error: b.detail }));
}
gatekeeper::Resolved::Accepted { responder_session } =>
run_initiator_session(
t, ds.clone(), q.clone(), &validated, responder_session)?,
}
Ok(())
}))));
origin.assert(t, language(), &gatekeeper::Resolve {
step: gatekeeper::Step {
step_type: "noise".to_string(),
detail: language().unparse(&service),
},
observer,
});
Ok(())
}
fn run_initiator_session(
t: &mut Activation,
ds: Arc<Cap>,
question: AnyValue,
spec: &ValidatedNoiseSpec,
responder_session: Arc<Cap>,
) -> ActorResult {
let initiator_session_ref = t.create_inert();
let initiator_session = Cap::guard(crate::Language::arc(), initiator_session_ref.clone());
responder_session.assert(t, language(), &noise::Initiator { initiator_session });
let mut hss = HandshakeState {
peer: responder_session.clone(),
hs: make_handshake(spec, true)?,
initial_ref: None,
initial_oid: Some(sturdy::Oid(0.into())),
};
if !hss.hs.completed() {
if hss.send_handshake_packet(t)?.is_some() {
// TODO: this might be a valid pattern, check
panic!("Unexpected complete handshake after no messages");
}
}
initiator_session_ref.become_entity(InitiatorState::Handshake { ds, question, hss });
Ok(())
}
enum InitiatorState {
Handshake {
ds: Arc<Cap>,
question: AnyValue,
hss: HandshakeState,
},
Transport(TransportState),
}
impl Entity<noise::Packet> for InitiatorState {
fn message(&mut self, t: &mut Activation, p: noise::Packet) -> ActorResult {
match self {
InitiatorState::Handshake { hss, ds, question } => {
if let Some((Some(peer_service), ts)) = hss.handle_packet(t, p)? {
let ds = ds.clone();
let question = question.clone();
*self = InitiatorState::Transport(ts);
ds.assert(t, language(), &rpc::answer(language(), question, R::Result::Ok {
value: AnyValue::domain(peer_service) }));
}
}
InitiatorState::Transport(ts) => ts.handle_packet(t, p)?,
}
Ok(())
}
}

View File

@ -3,7 +3,6 @@ use std::sync::Arc;
use preserves_schema::Codec;
use syndicate::actor::*;
use syndicate::during;
use syndicate::rpc;
use syndicate::value::NestedValue;
@ -107,7 +106,7 @@ pub fn handle_sturdy_path_steps(t: &mut Activation, ds: Arc<Cap>) -> ActorResult
enclose!((ds) move |t: &mut Activation| {
if let Some(origin) = origin.value().as_embedded().cloned() {
let observer = Cap::guard(&language().syndicate, t.create(
during::entity(()).on_asserted_facet(
syndicate::entity(()).on_asserted_facet(
enclose!((origin, parameters) move |_, t, r: gatekeeper::Resolved| {
ds.assert(t, language(), &rpc::answer(
language(),