/// SPDX-License-Identifier: GPL-3.0-or-later /// SPDX-FileCopyrightText: Copyright © 2016-2023 Tony Garnock-Jones import { QuasiValue as Q, Assertion, Bytes, Ref, Relay, Turn, Supervisor, SupervisorRestartPolicy, Observe, Schemas, assertionFacetObserver, canonicalEncode, fromJS, isEmbedded, underlying } from "@syndicate-lang/core"; import * as G from "./gen/ws"; export * from "./gen/ws"; import * as N from "@syndicate-lang/core/lib/gen/noise"; export * as Noise from "@syndicate-lang/core/lib/gen/noise"; import * as SaltyCrypto from 'salty-crypto'; export function boot(ds: Ref, debug: boolean = false) { spawn named 'wsRelay' { at ds { during Observe({ "pattern": :pattern G.Resolved({ "route": \$routePatValue, "addr": \_, "resolved": \Q.bind(), }) }) => { const route = Q.drop_lit(routePatValue, N.toRoute); if (!route) return; const addrs: G.RelayAddress[] = []; route.transports.forEach(t => { const a = G.toRelayAddress(t); if (a) addrs.push(a); }); wsConnect(addrs, (e, addr) => { at ds { stop on message G.ForceRelayDisconnect(addr); } resolve(e, route.steps, e => { assert G.Resolved({ "route": route, "addr": addr, "resolved": e, }); }); }); } } function tryConnection(addr: G.RelayAddress): Promise { return new Promise((resolve, reject) => { console.log('@syndicate-lang/ws-relay trying', addr); const ws = new WebSocket(addr.url); ws.binaryType = 'arraybuffer'; ws.onopen = () => resolve(ws); ws.onclose = () => reject(null); ws.onerror = (e) => reject(e); }); } async function establishConnection( addrs: G.RelayAddress[], ): Promise<{ws: WebSocket, addr: G.RelayAddress} | null> { for (let i = 0; i < addrs.length; i++) { const addr = addrs[i]; try { return { ws: await tryConnection(addr), addr }; } catch (e) { console.log( '@syndicate-lang/ws-relay attempt to contact', addr.url, 'failed with', e); } } return null; } function wsConnect(addrs: G.RelayAddress[], k: (e: Ref, addr: G.RelayAddress) => void) { let counter = 0; new Supervisor({ restartPolicy: SupervisorRestartPolicy.ALWAYS, }, () => ['wsRelay', fromJS(addrs), counter++], () => { const facet = Turn.activeFacet; facet.preventInertCheck(); establishConnection(addrs).then(result => facet.turn(() => { if (result === null) { console.log('@syndicate-lang/ws-relay no successful connection'); stop {} } else { const {ws, addr} = result; on stop { ws.close(); } ws.onclose = () => facet.turn(() => { stop {} }); ws.onerror = () => facet.turn(() => Turn.active.crash(new Error("WebSocket error"))); const relay = new Relay.Relay({ debug, trustPeer: true, packetWriter: bs => ws.send(bs), setup(r: Relay.Relay) { ws.onmessage = e => facet.turn(() => r.accept(new Uint8Array(e.data))); }, initialOid: 0, }); k(relay.peer!, addr); } })); }); } function resolve(e: Ref, steps: N.RouteStep[], k: (e: Ref) => void) { if (steps.length === 0) { k(e); } else { const [step, ...more] = steps; switch (step._variant) { case "NoiseStep": { noiseConnect(e, step.value.spec, e => resolve(e, more, k)); break; } case "GatekeeperStep": { at e { assert Schemas.gatekeeper.Resolve({ "sturdyref": step.value, "observer": create assertionFacetObserver(e => { if (isEmbedded(e)) { resolve(e.embeddedValue, more, k); } }), }); } break; } default: ((_: never) => {})(step); } } } function noiseConnect(e: Ref, spec: N.NoiseSpec, k: (e: Ref) => void) { const noiseSessionFacet = Turn.activeFacet; const algorithms = SaltyCrypto.Noise_25519_ChaChaPoly_BLAKE2s; const protocol = spec.protocol._variant === "present" ? spec.protocol.protocol : spec.protocol._variant === "absent" ? N.fromDefaultProtocol(N.DefaultProtocol()) as string : (() => { throw new Error("Invalid noise protocol name"); })(); const patternName = SaltyCrypto.matchPattern(algorithms, protocol); if (patternName === null) throw new Error("Unsupported protocol " + protocol); const preSharedKeys = spec.preSharedKeys._variant === "present" ? spec.preSharedKeys.preSharedKeys : spec.preSharedKeys._variant === "absent" ? [] : (() => { throw new Error("Invalid pre-shared keys"); })(); const prologue = underlying(canonicalEncode(spec.service)); const H = new SaltyCrypto.Handshake( algorithms, patternName, 'initiator', { prologue, remoteStaticPublicKey: underlying(spec.key), preSharedKeys: preSharedKeys.map(underlying), }); let transportState: SaltyCrypto.TransportState | null = null; let responderSession: Ref | null = null; let relay: Relay.Relay | null = null; function maybeTransition(s: SaltyCrypto.TransportState | null) { if (transportState !== null) { throw new Error("Unexpected double-transition to transport state"); } transportState = s; if (transportState !== null) { k(new Relay.Relay({ debug, trustPeer: true, packetWriter: bs => noiseSessionFacet.turn(() => { const fragments = transportState!.send.encrypt_large(bs).map(Bytes.from); at responderSession! { send message ((fragments.length === 1) ? N.Packet.complete(fragments[0]) : N.Packet.fragmented(fragments)); } }), setup(r: Relay.Relay) { relay = r; }, initialOid: 0, }).peer!); } } at e { assert N.Connect({ "serviceSelector": spec.service, "initiatorSession": create ({ ... assertionFacetObserver(e => { const accept = N.asAccept(e); responderSession = accept.responderSession; const { packet, finished } = H.writeMessage(new Uint8Array()); at responderSession { send message Bytes.from(packet); } maybeTransition(finished); }), message(body: Assertion) { const p = N.asPacket(body); if (transportState) { const packet = transportState.recv.decrypt_large( p._variant === 'complete' ? [underlying(p.value)] : p.value.map(underlying)); relay!.accept(packet); } else { if (p._variant !== 'complete') { throw new Error("Unexpected fragmentation in handshake"); } const { message, finished } = H.readMessage(underlying(p.value)); if (message.byteLength !== 0) { throw new Error("Unexpected payload during handshake"); } maybeTransition(finished); } }, }), }); } } } }