/// SPDX-License-Identifier: GPL-3.0-or-later /// SPDX-FileCopyrightText: Copyright © 2016-2023 Tony Garnock-Jones import { Assertion, Bytes, Dataflow, Embedded, IdentitySet, Observe, QuasiValue as Q, Ref, Relay, Schemas, Supervisor, Turn, assertionFacetObserver, canonicalEncode, fromJS, isEmbedded, stringify, underlying, } from "@syndicate-lang/core"; import G = Schemas.gatekeeper; import S = Schemas.sturdy; import N = Schemas.noise; import T = Schemas.transportAddress; import * as SaltyCrypto from 'salty-crypto'; type TransportState = { addr: T.WebSocket, control: Ref, peer: Ref, }; export function boot(ds: Ref, debug: boolean = false) { spawn named 'transportConnector' { at ds { during Observe({ "pattern": :pattern G.TransportConnection({ "addr": \$addrPatValue, "control": \_, "resolved": \_, }) }) => { const addr = Q.drop_lit(addrPatValue, T.toWebSocket); if (!addr) return; let counter =0; Supervisor.always(() => ['transportConnector', fromJS(addr), counter++], () => { console.log('connecting', addr.url, counter); connectTo(addr); }); } } } function connectTo(addr: T.WebSocket) { const facet = Turn.activeFacet; facet.preventInertCheck(); const controlEntity = { message(a0: Assertion): void { const a = G.toTransportControl(a0); if (!a) return; stop {} // ForceDisconnect }, }; let final = false; function succeed(ws: WebSocket) { if (final) return; final = true; console.log('opened', ws); on stop { console.log('closing', ws); ws.close(); } ws.onclose = () => facet.turn(() => { stop {} }); ws.onerror = () => facet.turn(() => Turn.active.crash(new Error("WebSocket error"))); const relay = new Relay.Relay({ debug, trustPeer: true, packetWriter: bs => ws.send(bs), setup(r: Relay.Relay) { ws.onmessage = e => facet.turn(() => r.accept(new Uint8Array(e.data))); }, initialOid: 0, }); console.log('succeed', addr.url); at ds { assert G.TransportConnection({ "addr": fromJS(addr), "control": create controlEntity, "resolved": G.Resolved.accepted(relay.peer!), }); } } function fail(detail: Assertion) { if (final) return; final = true; console.log('fail', addr.url, detail); at ds { assert G.TransportConnection({ "addr": fromJS(addr), "control": create controlEntity, "resolved": G.Resolved.Rejected(G.Rejected(detail)), }); } setTimeout(() => facet.turn(() => { stop {} }), 10000); } try { const ws = new WebSocket(addr.url); ws.binaryType = 'arraybuffer'; ws.onopen = () => facet.turn(() => succeed(ws)); ws.onclose = () => facet.turn(() => fail(Symbol.for('closed'))); ws.onerror = () => facet.turn(() => fail(Symbol.for('websocket-error-event'))); } catch (e) { console.error('Failed opening websocket', addr.url, e); fail(Symbol.for('websocket-exception')); } } spawn named 'pathResolver' { at ds { during Observe({ "pattern": :pattern G.ResolvePath({ "route": \$routePatValue, "addr": \_, "control": \_, "resolved": \_, }) }) => { const route = Q.drop_lit(routePatValue, G.toRoute); if (!route) return; field candidates: IdentitySet = new IdentitySet(); route.transports.forEach(t => { const addr = T.toWebSocket(t); if (!addr) return; console.log('tracking', addr.url); during G.TransportConnection({ "addr": addr, "control": $control_e: Embedded, "resolved": G.Resolved.accepted($peer_e: Embedded), }) => { const entry = { addr, control: control_e.embeddedValue, peer: peer_e.embeddedValue, }; candidates.value.add(entry); candidates.changed(); on stop { candidates.value.delete(entry); candidates.changed(); } } }); field best: TransportState | null = null; field rootPeer: Ref | null = null; dataflow { best.value = null; for (const c of candidates.value) { best.value = c; break; } rootPeer.value = best.value?.peer ?? null; } resolve(() => rootPeer.value, route.pathSteps, (r) => { console.log('leaf', best.value?.addr?.url); assert G.ResolvePath({ "route": route, "addr": fromJS(best.value!.addr), "control": best.value!.control, "resolved": r()! }) when (r()); }); } } } function resolve( e: () => Ref | null, steps: G.PathStep[], k: (r: () => G.Resolved | null) => void, ) { if (steps.length === 0) { k(() => { const peer = e(); return peer === null ? null : G.Resolved.accepted(peer); }); } else { const [step, ...more] = steps; at ds { during G.ResolvedPathStep({ "origin": (e()!), "pathStep": step, "resolved": $resolved: G.Resolved, }) when (e()) => { switch (resolved._variant) { case "accepted": resolve(() => resolved.responderSession, more, k); break; case "Rejected": k(() => resolved); break; } } } } } spawn named 'noiseStep' { at ds { during Observe({ "pattern": :pattern G.ResolvedPathStep({ "origin": \$originPatValue, "pathStep": G.PathStep({ "stepType": N.$noise, "detail": \$detailPatValue, }), "resolved": \_, }) }) => { const origin0 = Q.drop_lit(originPatValue); if (!origin0 || !isEmbedded(origin0)) return; const origin = origin0.embeddedValue; const detail0 = Q.drop_lit(detailPatValue, N.toNoisePathStepDetail); if (!detail0) return; const spec: N.NoiseSpec = detail0; const algorithms = SaltyCrypto.Noise_25519_ChaChaPoly_BLAKE2s; const protocol = spec.protocol._variant === "present" ? spec.protocol.protocol : spec.protocol._variant === "absent" ? N.fromDefaultProtocol(N.DefaultProtocol()) as string : (() => { throw new Error("Invalid noise protocol name"); })(); const patternName = SaltyCrypto.matchPattern(algorithms, protocol); if (patternName === null) throw new Error("Unsupported protocol " + protocol); const preSharedKeys = spec.preSharedKeys._variant === "present" ? spec.preSharedKeys.preSharedKeys : spec.preSharedKeys._variant === "absent" ? [] : (() => { throw new Error("Invalid pre-shared keys"); })(); const prologue = underlying(canonicalEncode(spec.service)); const H = new SaltyCrypto.Handshake( algorithms, patternName, 'initiator', { prologue, remoteStaticPublicKey: underlying(spec.key), preSharedKeys: preSharedKeys.map(underlying), }); let transportState: SaltyCrypto.TransportState | null = null; let responderSession: Ref | null = null; let relay: Relay.Relay | null = null; function maybeTransition(s: SaltyCrypto.TransportState | null) { if (transportState !== null) { throw new Error("Unexpected double-transition to transport state"); } transportState = s; if (transportState !== null) { const noiseSessionFacet = Turn.activeFacet; const peer = new Relay.Relay({ debug, trustPeer: true, packetWriter: bs => noiseSessionFacet.turn(() => { const fragments = transportState!.send.encrypt_large(bs).map(Bytes.from); at responderSession! { send message ((fragments.length === 1) ? N.Packet.complete(fragments[0]) : N.Packet.fragmented(fragments)); } }), setup(r: Relay.Relay) { relay = r; }, initialOid: 0, }).peer!; assert G.ResolvedPathStep({ "origin": origin, "pathStep": G.PathStep({ "stepType": N.$noise, "detail": fromJS(spec), }), "resolved": G.Resolved.accepted(peer), }); } } react { at origin { assert G.Resolve({ "step": G.Step({ "stepType": N.$noise, "detail": fromJS(N.ServiceSelector(spec.service)), }), "observer": create ({ ... assertionFacetObserver(e => { const response = G.toResolved(e); if (!response) return; switch (response._variant) { case "accepted": responderSession = response.responderSession; const { packet, finished } = H.writeMessage(new Uint8Array()); at responderSession { send message Bytes.from(packet); } maybeTransition(finished); break; case "Rejected": stop { at ds { assert G.ResolvedPathStep({ "origin": origin, "pathStep": G.PathStep({ "stepType": N.$noise, "detail": fromJS(N.NoisePathStepDetail(spec)), }), "resolved": response, }); } } } }), message(body: Assertion) { const p = N.asPacket(body); if (transportState) { const packet = transportState.recv.decrypt_large( p._variant === 'complete' ? [underlying(p.value)] : p.value.map(underlying)); relay!.accept(packet); } else { if (p._variant !== 'complete') { throw new Error("Unexpected fragmentation in handshake"); } const { message, finished } = H.readMessage(underlying(p.value)); if (message.byteLength !== 0) { throw new Error("Unexpected payload during handshake"); } maybeTransition(finished); } }, }), }); } } } } } spawn named 'sturdyRefStep' { at ds { during Observe({ "pattern": :pattern G.ResolvedPathStep({ "origin": \$originPatValue, "pathStep": G.PathStep({ "stepType": S.$ref, "detail": \$detailPatValue, }), "resolved": \_, }) }) => { const origin0 = Q.drop_lit(originPatValue); if (!origin0 || !isEmbedded(origin0)) return; const origin = origin0.embeddedValue; const detail0 = Q.drop_lit(detailPatValue, S.toSturdyPathStepDetail); if (!detail0) return; const parameters: S.Parameters = detail0; at origin { assert G.Resolve({ "step": G.Step({ "stepType": S.$ref, "detail": fromJS(parameters), }), "observer": create assertionFacetObserver(e => { const response = G.toResolved(e); if (!response) return; at ds { assert G.ResolvedPathStep({ "origin": origin, "pathStep": G.PathStep({ "stepType": S.$ref, "detail": fromJS(parameters), }), "resolved": response, }); } }), }); } } } } }