diff --git a/packages/ws-relay/package.json b/packages/ws-relay/package.json index 3bb0fb4..5ac5c9e 100644 --- a/packages/ws-relay/package.json +++ b/packages/ws-relay/package.json @@ -13,7 +13,7 @@ }, "scripts": { "prepare": "yarn regenerate && yarn compile && yarn rollup", - "regenerate": "rm -rf ./src/gen && preserves-schema-ts --module EntityRef=@syndicate-lang/core --module transportAddress=@syndicate-lang/core:Schemas.transportAddress --module sturdy=@syndicate-lang/core:Schemas.sturdy --xref ../../node_modules/@syndicate-lang/core/protocols/schemas --output ./src/gen ./protocols/schemas", + "regenerate": "rm -rf ./src/gen && preserves-schema-ts --module EntityRef=@syndicate-lang/core --module transportAddress=@syndicate-lang/core:Schemas.transportAddress --module noise=@syndicate-lang/core:Schemas.noise --module sturdy=@syndicate-lang/core:Schemas.sturdy --xref ../../node_modules/@syndicate-lang/core/protocols/schemas --output ./src/gen ./protocols/schemas", "regenerate:watch": "yarn regenerate --watch", "compile": "syndicate-tsc", "compile:watch": "syndicate-tsc -w --verbose --intermediate-directory src.ts", @@ -29,7 +29,8 @@ "dependencies": { "@preserves/core": ">=0.20.2", "@preserves/schema": ">=0.21.2", - "@syndicate-lang/core": "^0.11.9" + "@syndicate-lang/core": "^0.11.9", + "salty-crypto": "0.2" }, "devDependencies": { "@syndicate-lang/ts-plugin": "^0.11.10", diff --git a/packages/ws-relay/protocols/schemas/ws.prs b/packages/ws-relay/protocols/schemas/ws.prs index c2e1613..35bcbd1 100644 --- a/packages/ws-relay/protocols/schemas/ws.prs +++ b/packages/ws-relay/protocols/schemas/ws.prs @@ -1,6 +1,6 @@ version 1 . -Resolved = . +Resolved = . ViaRelay = . ForceRelayDisconnect = . diff --git a/packages/ws-relay/src/index.ts b/packages/ws-relay/src/index.ts index 24dbdac..decd0e5 100644 --- a/packages/ws-relay/src/index.ts +++ b/packages/ws-relay/src/index.ts @@ -1,71 +1,188 @@ /// SPDX-License-Identifier: GPL-3.0-or-later /// SPDX-FileCopyrightText: Copyright © 2016-2023 Tony Garnock-Jones -import { QuasiValue as Q, Ref, Relay, Turn, Supervisor, SupervisorRestartPolicy, Observe, Schemas, assertionFacetObserver, isEmbedded } from "@syndicate-lang/core"; +import { QuasiValue as Q, Assertion, Bytes, Ref, Relay, Turn, Supervisor, SupervisorRestartPolicy, Observe, Schemas, assertionFacetObserver, canonicalEncode, isEmbedded, underlying } from "@syndicate-lang/core"; import * as G from "./gen/ws"; export * from "./gen/ws"; +import * as N from "@syndicate-lang/core/lib/gen/noise"; +export * as Noise from "@syndicate-lang/core/lib/gen/noise"; +import * as SaltyCrypto from 'salty-crypto'; export function boot(ds: Ref, debug: boolean = false) { spawn named 'wsRelay' { at ds { during Observe({ "pattern": :pattern G.Resolved({ - "addr": \$addrPatValue, - "sturdyref": \$sturdyRefPatValue, + "route": \$routePatValue, + "addr": \_, "resolved": \Q.bind(), }) }) => { - const addr = Q.drop_lit(addrPatValue, G.toRelayAddress); - const sturdyref = Q.drop_lit(sturdyRefPatValue, Schemas.sturdy.toSturdyRef); - if (addr && sturdyref) { - assert G.ViaRelay({ + const route = Q.drop_lit(routePatValue, N.toRoute); + if (!route) return; + let addr0: G.RelayAddress | undefined; + route.transports.forEach(t => addr0 = addr0 ?? G.toRelayAddress(t)); + if (!addr0) return; + const addr = addr0; + wsConnect(addr, e => resolve(e, route.steps, e => { + assert G.Resolved({ + "route": route, "addr": addr, - "assertion": Schemas.gatekeeper.fromResolve(Schemas.gatekeeper.Resolve({ - "sturdyref": sturdyref, - "observer": create assertionFacetObserver(e => { - if (isEmbedded(e)) { - assert G.fromResolved(G.Resolved({ - "addr": addr, - "sturdyref": sturdyref, - "resolved": e.embeddedValue, - })); - } - }), - })), + "resolved": e, }); + })); + at ds { + stop on message G.ForceRelayDisconnect(addr); } } + } - during G.ViaRelay({ "addr": $addrValue }) => spawn named ['wsRelay', addrValue] { - let counter = 0; - new Supervisor({ - restartPolicy: SupervisorRestartPolicy.ALWAYS, - }, () => ['wsRelay', addrValue, counter++], () => { - const addr = G.toRelayAddress(addrValue); - if (addr !== void 0) { - stop on message G.ForceRelayDisconnect(addrValue); - const facet = Turn.activeFacet; - const ws = new WebSocket(addr.url); - ws.binaryType = 'arraybuffer'; - ws.onclose = () => facet.turn(() => { stop {} }); - ws.onerror = () => facet.turn(() => - Turn.active.crash(new Error("WebSocket error"))); - ws.onopen = () => facet.turn(() => { - const relay = new Relay.Relay({ - debug, - trustPeer: true, - packetWriter: bs => ws.send(bs), - setup(r: Relay.Relay) { - ws.onmessage = e => facet.turn(() => - r.accept(new Uint8Array(e.data))); - }, - initialOid: 0, - }); - during G.ViaRelay({ "addr": addrValue, "assertion": $a }) => { - at relay.peer! { - assert a; - } - } - }); + function wsConnect(addr: G.RelayAddress, k: (e: Ref) => void) { + console.log('wsConnect', addr); + let counter = 0; + new Supervisor({ + restartPolicy: SupervisorRestartPolicy.ALWAYS, + }, () => ['wsRelay', G.fromRelayAddress(addr), counter++], () => { + console.log('try!', counter - 1); + const facet = Turn.activeFacet; + const ws = new WebSocket(addr.url); + facet.preventInertCheck(); + on stop { + console.log('on stop triggered'); + ws.close(); + } + ws.binaryType = 'arraybuffer'; + ws.onclose = () => facet.turn(() => { stop {} }); + ws.onerror = () => facet.turn(() => + Turn.active.crash(new Error("WebSocket error"))); + ws.onopen = () => facet.turn(() => { + console.log('hey cool'); + const relay = new Relay.Relay({ + debug, + trustPeer: true, + packetWriter: bs => ws.send(bs), + setup(r: Relay.Relay) { + ws.onmessage = e => facet.turn(() => + r.accept(new Uint8Array(e.data))); + }, + initialOid: 0, + }); + k(relay.peer!); + }); + }); + } + + function resolve(e: Ref, steps: N.RouteStep[], k: (e: Ref) => void) { + console.log('resolve', e, steps); + if (steps.length === 0) { + k(e); + } else { + const [step, ...more] = steps; + switch (step._variant) { + case "NoiseStep": { + noiseConnect(e, step.value.spec, e => resolve(e, more, k)); + break; } + case "GatekeeperStep": { + at e { + assert Schemas.gatekeeper.Resolve({ + "sturdyref": step.value, + "observer": create assertionFacetObserver(e => { + if (isEmbedded(e)) { + resolve(e.embeddedValue, more, k); + } + }), + }); + } + break; + } + default: ((_: never) => {})(step); + } + } + } + + function noiseConnect(e: Ref, spec: N.NoiseSpec, k: (e: Ref) => void) { + const noiseSessionFacet = Turn.activeFacet; + const algorithms = SaltyCrypto.Noise_25519_ChaChaPoly_BLAKE2s; + const protocol = + spec.protocol._variant === "present" ? spec.protocol.protocol : + spec.protocol._variant === "absent" ? N.fromDefaultProtocol(N.DefaultProtocol()) as string : + (() => { throw new Error("Invalid noise protocol name"); })(); + const patternName = SaltyCrypto.matchPattern(algorithms, protocol); + if (patternName === null) throw new Error("Unsupported protocol " + protocol); + const preSharedKeys = + spec.preSharedKeys._variant === "present" ? spec.preSharedKeys.preSharedKeys : + spec.preSharedKeys._variant === "absent" ? [] : + (() => { throw new Error("Invalid pre-shared keys"); })(); + const prologue = underlying(canonicalEncode(spec.service)); + const H = new SaltyCrypto.Handshake( + algorithms, + patternName, + 'initiator', + { + prologue, + remoteStaticPublicKey: underlying(spec.key), + preSharedKeys: preSharedKeys.map(underlying), + }); + console.log('HANDSHAKE', H); + let transportState: SaltyCrypto.TransportState | null = null; + let responderSession: Ref | null = null; + let relay: Relay.Relay | null = null; + function maybeTransition(s: SaltyCrypto.TransportState | null) { + if (transportState !== null) { + throw new Error("Unexpected double-transition to transport state"); + } + transportState = s; + if (transportState !== null) { + k(new Relay.Relay({ + debug, + trustPeer: true, + packetWriter: bs => noiseSessionFacet.turn(() => { + const fragments = transportState!.send.encrypt_large(bs).map(Bytes.from); + at responderSession! { + send message ((fragments.length === 1) + ? N.Packet.complete(fragments[0]) + : N.Packet.fragmented(fragments)); + } + }), + setup(r: Relay.Relay) { + relay = r; + }, + initialOid: 0, + }).peer!); + } + } + at e { + assert N.Connect({ + "serviceSelector": spec.service, + "initiatorSession": create ({ + ... assertionFacetObserver(e => { + const accept = N.asAccept(e); + responderSession = accept.responderSession; + const { packet, finished } = H.writeMessage(new Uint8Array()); + at responderSession { + send message Bytes.from(packet); + } + maybeTransition(finished); + }), + message(body: Assertion) { + const p = N.asPacket(body); + if (transportState) { + const packet = transportState.recv.decrypt_large( + p._variant === 'complete' + ? [underlying(p.value)] + : p.value.map(underlying)); + relay!.accept(packet); + } else { + if (p._variant !== 'complete') { + throw new Error("Unexpected fragmentation in handshake"); + } + const { message, finished } = H.readMessage(underlying(p.value)); + if (message.byteLength !== 0) { + throw new Error("Unexpected payload during handshake"); + } + maybeTransition(finished); + } + }, + }), }); } }