Implement new gatekeeper protocol for syndicate-js
This commit is contained in:
parent
f4c0c826df
commit
cd5452b624
|
@ -1,9 +1,9 @@
|
|||
let ?ds = dataspace
|
||||
|
||||
; Connect using <route [<ws "...">] <ref "syndicate" [] #[acowDB2/oI+6aSEC3YIxGg]>>
|
||||
<bind "syndicate" #x"" $ds>
|
||||
<bind <ref {oid: "syndicate" key: #x""}> $ds #f>
|
||||
|
||||
; Connect using <route [<ws "...">] <noise { service: "syndicate", key: #x"21f6cd4e11e7e37711d6b3084ff18cded8fc8abf293aa47d43e8bb86dda65516" }>>
|
||||
let ?sk = #x"7626eb6c0ee79cb928a0c3f6e29621e9119da6735859f1425956a49fd937e586"
|
||||
let ?pk = #x"21f6cd4e11e7e37711d6b3084ff18cded8fc8abf293aa47d43e8bb86dda65516"
|
||||
<noise { service: "syndicate", key: $pk, secretKey: $sk } $ds>
|
||||
<bind <noise { service: "syndicate", key: $pk, secretKey: $sk }> $ds #f>
|
||||
|
|
|
@ -4,10 +4,10 @@
|
|||
import { fromJS, Bytes, Dataspace, Ref, Sturdy, AnyValue, Reader, Schemas, Embedded, stringify } from "@syndicate-lang/core";
|
||||
import { boot as bootHtml, Anchor, template as html, HtmlFragments, GlobalEvent, UIAttribute, UIChangeableProperty } from "@syndicate-lang/html";
|
||||
import { boot as bootWakeDetector, WakeEvent } from "./wake-detector";
|
||||
import { boot as bootWsRelay, ForceRelayDisconnect, RelayAddress, Resolved, Noise } from "@syndicate-lang/ws-relay";
|
||||
import { boot as bootWsRelay } from "@syndicate-lang/ws-relay";
|
||||
import { Present, Says } from './gen/simpleChatProtocol';
|
||||
|
||||
const Transport = Schemas.transportAddress;
|
||||
import G = Schemas.gatekeeper;
|
||||
import N = Schemas.noise;
|
||||
|
||||
export function main() {
|
||||
document.getElementById('chat_form')!.onsubmit = e => { e.preventDefault(); return false; };
|
||||
|
@ -31,9 +31,9 @@ function bootChat(ds: Ref) {
|
|||
on asserted UIChangeableProperty('#nym', 'value', $v: string) => nym.value = v;
|
||||
|
||||
during UIChangeableProperty('#route', 'value', $routeText: string) => {
|
||||
let route: Noise.Route<Ref> | null = null;
|
||||
let route: G.Route<Ref> | null = null;
|
||||
try {
|
||||
route = Noise.asRoute(new Reader<Ref>(routeText).next());
|
||||
route = G.asRoute(new Reader<Ref>(routeText).next());
|
||||
} catch (e) {
|
||||
console.error(e);
|
||||
}
|
||||
|
@ -41,16 +41,20 @@ function bootChat(ds: Ref) {
|
|||
if (route) contactRemote(route);
|
||||
}
|
||||
|
||||
function contactRemote(route: Noise.Route<Ref>) {
|
||||
function contactRemote(route: G.Route<Ref>) {
|
||||
console.log('contactRemote', route);
|
||||
during Resolved({
|
||||
during G.ResolvePath({
|
||||
"route": route,
|
||||
"addr": $addr: RelayAddress,
|
||||
"resolved": $remoteDs_e: Embedded,
|
||||
"addr": $addr,
|
||||
"control": $control_e: Embedded,
|
||||
"resolved": G.Resolved.accepted($remoteDs_e: Embedded),
|
||||
}) => {
|
||||
const remoteDs = remoteDs_e.embeddedValue;
|
||||
const control = control_e.embeddedValue;
|
||||
|
||||
on message WakeEvent() => send message ForceRelayDisconnect(addr);
|
||||
on message WakeEvent() => at control {
|
||||
send message G.ForceDisconnect();
|
||||
}
|
||||
|
||||
outputState('connected', 'connected to ' + stringify(addr));
|
||||
on stop outputState('disconnected', 'disconnected from ' + stringify(addr));
|
||||
|
@ -94,28 +98,33 @@ function setDataspaceAddress() {
|
|||
`${localWs}://${document.location.hostname}:9001/`,
|
||||
];
|
||||
const transports: AnyValue[] =
|
||||
wsurls.map(u => fromJS(RelayAddress(Transport.WebSocket(u))));
|
||||
wsurls.map(u => fromJS(Schemas.transportAddress.WebSocket(u)));
|
||||
|
||||
route.value = stringify(Noise.Route<Ref>({
|
||||
route.value = stringify(G.Route<Ref>({
|
||||
"transports": transports,
|
||||
"steps": [Noise.RouteStep.NoiseStep(Noise.NoiseStep(Noise.NoiseSpec({
|
||||
"service": "syndicate",
|
||||
"key": Bytes.fromHex("21f6cd4e11e7e37711d6b3084ff18cded8fc8abf293aa47d43e8bb86dda65516"),
|
||||
"protocol": Noise.NoiseProtocol.absent(),
|
||||
"preSharedKeys": Noise.NoisePreSharedKeys.absent(),
|
||||
})))],
|
||||
"pathSteps": [G.PathStep({
|
||||
"stepType": N.$noise,
|
||||
"detail": fromJS(N.NoiseSpec({
|
||||
"service": "syndicate",
|
||||
"key": Bytes.fromHex("21f6cd4e11e7e37711d6b3084ff18cded8fc8abf293aa47d43e8bb86dda65516"),
|
||||
"protocol": N.NoiseProtocol.absent(),
|
||||
"preSharedKeys": N.NoisePreSharedKeys.absent(),
|
||||
})),
|
||||
})],
|
||||
}));
|
||||
|
||||
// To use a sturdyref instead:
|
||||
//
|
||||
// route.value = stringify(Noise.Route<Ref>({
|
||||
// route.value = stringify(G.Route<Ref>({
|
||||
// "transports": transports,
|
||||
// "steps": [Noise.RouteStep.GatekeeperStep(
|
||||
// Sturdy.SturdyRef({
|
||||
// "pathSteps": [G.PathStep({
|
||||
// "stepType": Sturdy.$ref,
|
||||
// "detail": fromJS(Sturdy.Parameters({
|
||||
// "oid": "syndicate",
|
||||
// "caveatChain": [],
|
||||
// "sig": Bytes.fromHex('69ca300c1dbfa08fba692102dd82311a'),
|
||||
// }))],
|
||||
// "caveats": Sturdy.CaveatsField.absent(),
|
||||
// })),
|
||||
// })],
|
||||
// }));
|
||||
//
|
||||
// ... and of course you can chain these things, depending on server setup.
|
||||
|
|
|
@ -36,6 +36,12 @@ export class Supervisor {
|
|||
readonly state: Field<StateName> = Turn.active.field('started');
|
||||
supervisee: Actor | null = null;
|
||||
|
||||
static always(nameFunction: () => AnyValue, bootFunction: LocalAction): Supervisor {
|
||||
return new Supervisor({ restartPolicy: SupervisorRestartPolicy.ALWAYS },
|
||||
nameFunction,
|
||||
bootFunction);
|
||||
}
|
||||
|
||||
constructor(config: Partial<SupervisorConfiguration>,
|
||||
nameFunction: () => AnyValue,
|
||||
bootFunction: LocalAction)
|
||||
|
|
|
@ -38,26 +38,35 @@ export function sturdyDecode(bs: Bytes): SturdyValue {
|
|||
}
|
||||
|
||||
export async function mint(oid: SturdyValue, secretKey: Bytes): Promise<S.SturdyRef> {
|
||||
return S.SturdyRef({
|
||||
oid,
|
||||
caveatChain: [],
|
||||
sig: await mac(secretKey, sturdyEncode(oid)),
|
||||
});
|
||||
return S.SturdyRef(S.Parameters({
|
||||
oid,
|
||||
sig: await mac(secretKey, sturdyEncode(oid)),
|
||||
caveats: S.CaveatsField.absent(),
|
||||
}));
|
||||
}
|
||||
|
||||
async function chainMac(key: Bytes | Promise<Bytes>, caveats: S.Caveat[]): Promise<Bytes> {
|
||||
return caveats.reduce(async (key, c) => mac(await key, sturdyEncode(S.fromCaveat(c))), key);
|
||||
}
|
||||
|
||||
export function caveatChain(r: S.SturdyRef): S.Caveat[] {
|
||||
switch (r.parameters.caveats._variant) {
|
||||
case "present": return r.parameters.caveats.caveats;
|
||||
case "absent": return [];
|
||||
case "invalid": throw new Error("Invalid caveats on sturdyref");
|
||||
}
|
||||
}
|
||||
|
||||
export async function attenuate(r: S.SturdyRef, ... a: S.Caveat[]): Promise<S.SturdyRef> {
|
||||
return S.SturdyRef({
|
||||
oid: r.oid,
|
||||
caveatChain: [... r.caveatChain, ... a],
|
||||
sig: await chainMac(r.sig, a),
|
||||
});
|
||||
if (a.length === 0) return r;
|
||||
return S.SturdyRef(S.Parameters({
|
||||
oid: r.parameters.oid,
|
||||
caveats: S.CaveatsField.present([... caveatChain(r), ... a]),
|
||||
sig: await chainMac(r.parameters.sig, a),
|
||||
}));
|
||||
}
|
||||
|
||||
export async function validate(r: S.SturdyRef, secretKey: Bytes): Promise<boolean> {
|
||||
const sig = await chainMac(await mac(secretKey, sturdyEncode(r.oid)), r.caveatChain);
|
||||
return is(sig, r.sig);
|
||||
const sig = await chainMac(await mac(secretKey, sturdyEncode(r.parameters.oid)), caveatChain(r));
|
||||
return is(sig, r.parameters.sig);
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import { Dataspace, Ref, Sturdy, Reader, Schemas, Embedded, randomId, fromJS } f
|
|||
import html from "@syndicate-lang/html";
|
||||
import wsRelay from "@syndicate-lang/ws-relay";
|
||||
import { ExampleDefinition } from './gen/example';
|
||||
import G = Schemas.gatekeeper;
|
||||
|
||||
export function main() {
|
||||
Dataspace.boot(ds => {
|
||||
|
@ -25,19 +26,19 @@ function bootApp(ds: Ref) {
|
|||
|
||||
const this_instance = randomId(16);
|
||||
|
||||
const route = wsRelay.Noise.Route<Ref>({
|
||||
const route = G.Route<Ref>({
|
||||
"transports": [fromJS(Schemas.transportAddress.WebSocket(
|
||||
`ws://${document.location.hostname}:9001/`))],
|
||||
"steps": [wsRelay.Noise.RouteStep.GatekeeperStep(Sturdy.asSturdyRef(
|
||||
new Reader<Ref>('<ref "syndicate" [] #[acowDB2/oI+6aSEC3YIxGg==]>').next()))],
|
||||
"pathSteps": [G.asPathStep(fromJS(Sturdy.asSturdyRef(
|
||||
new Reader<Ref>(
|
||||
'<ref {oid: "syndicate" sig: #[acowDB2/oI+6aSEC3YIxGg==]}>').next())))],
|
||||
});
|
||||
|
||||
during wsRelay.Resolved({
|
||||
during G.ResolvePath({
|
||||
"route": route,
|
||||
"resolved": $remoteDs_e: Embedded,
|
||||
"resolved": G.Resolved.accepted($remoteDs_e: Embedded),
|
||||
}) => {
|
||||
const remoteDs = remoteDs_e.embeddedValue;
|
||||
|
||||
at remoteDs {
|
||||
assert ExampleDefinition(this_instance);
|
||||
during ExampleDefinition($who: string) => {
|
||||
|
|
|
@ -9,4 +9,4 @@
|
|||
; Create a dataspace entity, and register it with the gatekeeper with name `"syndicate"` and an
|
||||
; empty secret key:
|
||||
let ?ds = dataspace
|
||||
<bind "syndicate" #x"" $ds>
|
||||
<bind <ref {oid: "syndicate" key: #x""}> $ds #f>
|
||||
|
|
|
@ -12,9 +12,7 @@
|
|||
"url": "https://git.syndicate-lang.org/syndicate-lang/syndicate-js"
|
||||
},
|
||||
"scripts": {
|
||||
"prepare": "yarn regenerate && yarn compile && yarn rollup",
|
||||
"regenerate": "rm -rf ./src/gen && preserves-schema-ts --module EntityRef=@syndicate-lang/core --module transportAddress=@syndicate-lang/core:Schemas.transportAddress --module noise=@syndicate-lang/core:Schemas.noise --module sturdy=@syndicate-lang/core:Schemas.sturdy --xref ../../node_modules/@syndicate-lang/core/protocols/schemas --output ./src/gen ./protocols/schemas",
|
||||
"regenerate:watch": "yarn regenerate --watch",
|
||||
"prepare": "yarn compile && yarn rollup",
|
||||
"compile": "syndicate-tsc",
|
||||
"compile:watch": "syndicate-tsc -w --verbose --intermediate-directory src.ts",
|
||||
"rollup": "rollup -c",
|
||||
|
@ -28,7 +26,6 @@
|
|||
"author": "Tony Garnock-Jones <tonyg@leastfixedpoint.com>",
|
||||
"dependencies": {
|
||||
"@preserves/core": ">=0.20.2",
|
||||
"@preserves/schema": ">=0.21.2",
|
||||
"@syndicate-lang/core": "^0.13.1",
|
||||
"salty-crypto": "0.3"
|
||||
},
|
||||
|
|
|
@ -1,8 +0,0 @@
|
|||
version 1 .
|
||||
|
||||
Resolved = <resolved @route noise.Route @addr RelayAddress @resolved #!any> .
|
||||
|
||||
ViaRelay = <via-relay @addr RelayAddress @assertion any> .
|
||||
ForceRelayDisconnect = <force-relay-disconnect @addr RelayAddress> .
|
||||
|
||||
RelayAddress = transportAddress.WebSocket .
|
|
@ -1,218 +1,403 @@
|
|||
/// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
/// SPDX-FileCopyrightText: Copyright © 2016-2023 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
||||
|
||||
import { QuasiValue as Q, Assertion, Bytes, Ref, Relay, Turn, Supervisor, SupervisorRestartPolicy, Observe, Schemas, assertionFacetObserver, canonicalEncode, fromJS, isEmbedded, underlying } from "@syndicate-lang/core";
|
||||
import * as G from "./gen/ws";
|
||||
export * from "./gen/ws";
|
||||
import * as N from "@syndicate-lang/core/lib/gen/noise";
|
||||
export * as Noise from "@syndicate-lang/core/lib/gen/noise";
|
||||
import {
|
||||
Assertion,
|
||||
Bytes,
|
||||
Dataflow,
|
||||
Embedded,
|
||||
Observe,
|
||||
QuasiValue as Q,
|
||||
Ref,
|
||||
Relay,
|
||||
Schemas,
|
||||
Supervisor,
|
||||
Turn,
|
||||
assertionFacetObserver,
|
||||
canonicalEncode,
|
||||
fromJS,
|
||||
isEmbedded,
|
||||
stringify,
|
||||
underlying,
|
||||
} from "@syndicate-lang/core";
|
||||
import G = Schemas.gatekeeper;
|
||||
import S = Schemas.sturdy;
|
||||
import N = Schemas.noise;
|
||||
import T = Schemas.transportAddress;
|
||||
import * as SaltyCrypto from 'salty-crypto';
|
||||
|
||||
type TransportState = {
|
||||
addr: T.WebSocket,
|
||||
control: Ref,
|
||||
peer: Ref,
|
||||
};
|
||||
|
||||
export function boot(ds: Ref, debug: boolean = false) {
|
||||
spawn named 'wsRelay' {
|
||||
spawn named 'transportConnector' {
|
||||
at ds {
|
||||
during Observe({ "pattern": :pattern G.Resolved({
|
||||
"route": \$routePatValue,
|
||||
"addr": \_,
|
||||
"resolved": \Q.bind(),
|
||||
during Observe({ "pattern": :pattern G.TransportConnection({
|
||||
"addr": \$addrPatValue,
|
||||
"control": \_,
|
||||
"resolved": \_,
|
||||
}) }) => {
|
||||
const route = Q.drop_lit(routePatValue, N.toRoute);
|
||||
if (!route) return;
|
||||
const addrs: G.RelayAddress[] = [];
|
||||
route.transports.forEach(t => {
|
||||
const a = G.toRelayAddress(t);
|
||||
if (a) addrs.push(a);
|
||||
});
|
||||
wsConnect(addrs, (e, addr) => {
|
||||
at ds {
|
||||
stop on message G.ForceRelayDisconnect(addr);
|
||||
}
|
||||
resolve(e, route.steps, e => {
|
||||
assert G.Resolved({
|
||||
"route": route,
|
||||
"addr": addr,
|
||||
"resolved": e,
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function tryConnection(addr: G.RelayAddress): Promise<WebSocket> {
|
||||
return new Promise((resolve, reject) => {
|
||||
console.log('@syndicate-lang/ws-relay trying', addr);
|
||||
const ws = new WebSocket(addr.url);
|
||||
ws.binaryType = 'arraybuffer';
|
||||
ws.onopen = () => resolve(ws);
|
||||
ws.onclose = () => reject(null);
|
||||
ws.onerror = (e) => reject(e);
|
||||
});
|
||||
}
|
||||
|
||||
async function establishConnection(
|
||||
addrs: G.RelayAddress[],
|
||||
): Promise<{ws: WebSocket, addr: G.RelayAddress} | null> {
|
||||
for (let i = 0; i < addrs.length; i++) {
|
||||
const addr = addrs[i];
|
||||
try {
|
||||
return { ws: await tryConnection(addr), addr };
|
||||
} catch (e) {
|
||||
console.log(
|
||||
'@syndicate-lang/ws-relay attempt to contact', addr.url, 'failed with', e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function wsConnect(addrs: G.RelayAddress[], k: (e: Ref, addr: G.RelayAddress) => void) {
|
||||
let counter = 0;
|
||||
new Supervisor({
|
||||
restartPolicy: SupervisorRestartPolicy.ALWAYS,
|
||||
}, () => ['wsRelay', fromJS(addrs), counter++], () => {
|
||||
const facet = Turn.activeFacet;
|
||||
facet.preventInertCheck();
|
||||
|
||||
establishConnection(addrs).then(result => facet.turn(() => {
|
||||
if (result === null) {
|
||||
console.log('@syndicate-lang/ws-relay no successful connection');
|
||||
stop {}
|
||||
} else {
|
||||
const {ws, addr} = result;
|
||||
on stop {
|
||||
ws.close();
|
||||
}
|
||||
ws.onclose = () => facet.turn(() => { stop {} });
|
||||
ws.onerror = () => facet.turn(() =>
|
||||
Turn.active.crash(new Error("WebSocket error")));
|
||||
const relay = new Relay.Relay({
|
||||
debug,
|
||||
trustPeer: true,
|
||||
packetWriter: bs => ws.send(bs),
|
||||
setup(r: Relay.Relay) {
|
||||
ws.onmessage = e => facet.turn(() =>
|
||||
r.accept(new Uint8Array(e.data)));
|
||||
},
|
||||
initialOid: 0,
|
||||
});
|
||||
k(relay.peer!, addr);
|
||||
}
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
function resolve(e: Ref, steps: N.RouteStep<Ref>[], k: (e: Ref) => void) {
|
||||
if (steps.length === 0) {
|
||||
k(e);
|
||||
} else {
|
||||
const [step, ...more] = steps;
|
||||
switch (step._variant) {
|
||||
case "NoiseStep": {
|
||||
noiseConnect(e, step.value.spec, e => resolve(e, more, k));
|
||||
break;
|
||||
}
|
||||
case "GatekeeperStep": {
|
||||
at e {
|
||||
assert Schemas.gatekeeper.Resolve({
|
||||
"sturdyref": step.value,
|
||||
"observer": create assertionFacetObserver(e => {
|
||||
if (isEmbedded(e)) {
|
||||
resolve(e.embeddedValue, more, k);
|
||||
}
|
||||
}),
|
||||
});
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: ((_: never) => {})(step);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function noiseConnect(e: Ref, spec: N.NoiseSpec<Ref>, k: (e: Ref) => void) {
|
||||
const noiseSessionFacet = Turn.activeFacet;
|
||||
const algorithms = SaltyCrypto.Noise_25519_ChaChaPoly_BLAKE2s;
|
||||
const protocol =
|
||||
spec.protocol._variant === "present" ? spec.protocol.protocol :
|
||||
spec.protocol._variant === "absent" ? N.fromDefaultProtocol(N.DefaultProtocol()) as string :
|
||||
(() => { throw new Error("Invalid noise protocol name"); })();
|
||||
const patternName = SaltyCrypto.matchPattern(algorithms, protocol);
|
||||
if (patternName === null) throw new Error("Unsupported protocol " + protocol);
|
||||
const preSharedKeys =
|
||||
spec.preSharedKeys._variant === "present" ? spec.preSharedKeys.preSharedKeys :
|
||||
spec.preSharedKeys._variant === "absent" ? [] :
|
||||
(() => { throw new Error("Invalid pre-shared keys"); })();
|
||||
const prologue = underlying(canonicalEncode(spec.service));
|
||||
const H = new SaltyCrypto.Handshake(
|
||||
algorithms,
|
||||
patternName,
|
||||
'initiator',
|
||||
{
|
||||
prologue,
|
||||
remoteStaticPublicKey: underlying(spec.key),
|
||||
preSharedKeys: preSharedKeys.map(underlying),
|
||||
});
|
||||
let transportState: SaltyCrypto.TransportState | null = null;
|
||||
let responderSession: Ref | null = null;
|
||||
let relay: Relay.Relay | null = null;
|
||||
function maybeTransition(s: SaltyCrypto.TransportState | null) {
|
||||
if (transportState !== null) {
|
||||
throw new Error("Unexpected double-transition to transport state");
|
||||
}
|
||||
transportState = s;
|
||||
if (transportState !== null) {
|
||||
k(new Relay.Relay({
|
||||
debug,
|
||||
trustPeer: true,
|
||||
packetWriter: bs => noiseSessionFacet.turn(() => {
|
||||
const fragments = transportState!.send.encrypt_large(bs).map(Bytes.from);
|
||||
at responderSession! {
|
||||
send message ((fragments.length === 1)
|
||||
? N.Packet.complete(fragments[0])
|
||||
: N.Packet.fragmented(fragments));
|
||||
}
|
||||
}),
|
||||
setup(r: Relay.Relay) {
|
||||
relay = r;
|
||||
},
|
||||
initialOid: 0,
|
||||
}).peer!);
|
||||
}
|
||||
}
|
||||
at e {
|
||||
assert N.Connect({
|
||||
"serviceSelector": spec.service,
|
||||
"initiatorSession": create ({
|
||||
... assertionFacetObserver(e => {
|
||||
const accept = N.asAccept(e);
|
||||
responderSession = accept.responderSession;
|
||||
const { packet, finished } = H.writeMessage(new Uint8Array());
|
||||
at responderSession {
|
||||
send message Bytes.from(packet);
|
||||
}
|
||||
maybeTransition(finished);
|
||||
}),
|
||||
message(body: Assertion) {
|
||||
const p = N.asPacket(body);
|
||||
if (transportState) {
|
||||
const packet = transportState.recv.decrypt_large(
|
||||
p._variant === 'complete'
|
||||
? [underlying(p.value)]
|
||||
: p.value.map(underlying));
|
||||
relay!.accept(packet);
|
||||
} else {
|
||||
if (p._variant !== 'complete') {
|
||||
throw new Error("Unexpected fragmentation in handshake");
|
||||
}
|
||||
const { message, finished } = H.readMessage(underlying(p.value));
|
||||
if (message.byteLength !== 0) {
|
||||
throw new Error("Unexpected payload during handshake");
|
||||
}
|
||||
maybeTransition(finished);
|
||||
}
|
||||
},
|
||||
}),
|
||||
const addr = Q.drop_lit(addrPatValue, T.toWebSocket);
|
||||
if (!addr) return;
|
||||
let counter =0;
|
||||
Supervisor.always(() => ['transportConnector', fromJS(addr), counter++], () => {
|
||||
console.log('connecting', addr.url, counter);
|
||||
connectTo(addr);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function connectTo(addr: T.WebSocket) {
|
||||
const facet = Turn.activeFacet;
|
||||
facet.preventInertCheck();
|
||||
const controlEntity = {
|
||||
message(a0: Assertion): void {
|
||||
const a = G.toTransportControl(a0);
|
||||
if (!a) return;
|
||||
stop {} // ForceDisconnect
|
||||
},
|
||||
};
|
||||
let final = false;
|
||||
function succeed(ws: WebSocket) {
|
||||
if (final) return;
|
||||
final = true;
|
||||
console.log('opened', ws);
|
||||
on stop {
|
||||
console.log('closing', ws);
|
||||
ws.close();
|
||||
}
|
||||
ws.onclose = () => facet.turn(() => { stop {} });
|
||||
ws.onerror = () => facet.turn(() =>
|
||||
Turn.active.crash(new Error("WebSocket error")));
|
||||
const relay = new Relay.Relay({
|
||||
debug,
|
||||
trustPeer: true,
|
||||
packetWriter: bs => ws.send(bs),
|
||||
setup(r: Relay.Relay) {
|
||||
ws.onmessage = e => facet.turn(() =>
|
||||
r.accept(new Uint8Array(e.data)));
|
||||
},
|
||||
initialOid: 0,
|
||||
});
|
||||
console.log('succeed', addr.url);
|
||||
at ds {
|
||||
assert G.TransportConnection<Ref>({
|
||||
"addr": fromJS(addr),
|
||||
"control": create controlEntity,
|
||||
"resolved": G.Resolved.accepted(relay.peer!),
|
||||
});
|
||||
}
|
||||
}
|
||||
function fail(detail: Assertion) {
|
||||
if (final) return;
|
||||
final = true;
|
||||
console.log('fail', addr.url, detail);
|
||||
at ds {
|
||||
assert G.TransportConnection<Ref>({
|
||||
"addr": fromJS(addr),
|
||||
"control": create controlEntity,
|
||||
"resolved": G.Resolved.Rejected(G.Rejected(detail)),
|
||||
});
|
||||
}
|
||||
setTimeout(() => facet.turn(() => { stop {} }), 10000);
|
||||
}
|
||||
try {
|
||||
const ws = new WebSocket(addr.url);
|
||||
ws.binaryType = 'arraybuffer';
|
||||
ws.onopen = () => facet.turn(() => succeed(ws));
|
||||
ws.onclose = () => facet.turn(() => fail(Symbol.for('closed')));
|
||||
ws.onerror = (e) => facet.turn(() => fail(Symbol.for('websocket-error-event')));
|
||||
} catch (e) {
|
||||
console.error('Failed opening websocket', addr.url, e);
|
||||
fail(Symbol.for('websocket-exception'));
|
||||
}
|
||||
}
|
||||
|
||||
spawn named 'pathResolver' {
|
||||
at ds {
|
||||
during Observe({ "pattern": :pattern G.ResolvePath({
|
||||
"route": \$routePatValue,
|
||||
"addr": \_,
|
||||
"control": \_,
|
||||
"resolved": \_,
|
||||
}) }) => {
|
||||
const route = Q.drop_lit(routePatValue, G.toRoute);
|
||||
if (!route) return;
|
||||
const candidates: Dataflow.Field<TransportState | null>[] = [];
|
||||
route.transports.forEach(t => {
|
||||
const addr = T.toWebSocket(t);
|
||||
if (!addr) return;
|
||||
let counter = 0;
|
||||
field state: TransportState | null = null;
|
||||
candidates.push(state);
|
||||
console.log('tracking', addr.url);
|
||||
during G.TransportConnection({
|
||||
"addr": addr,
|
||||
"control": $control: Embedded,
|
||||
"resolved": $resolved: G.Resolved,
|
||||
}) => {
|
||||
const me = counter++;
|
||||
switch (resolved._variant) {
|
||||
case "accepted":
|
||||
state.value = {
|
||||
addr,
|
||||
control: control.embeddedValue,
|
||||
peer: resolved.responderSession,
|
||||
};
|
||||
break;
|
||||
case "Rejected":
|
||||
state.value = null;
|
||||
break;
|
||||
}
|
||||
on stop {
|
||||
if (counter === me) {
|
||||
state.value = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
field best: TransportState | null = null;
|
||||
field rootPeer: Ref | null = null;
|
||||
dataflow {
|
||||
best.value = null;
|
||||
for (const c of candidates) {
|
||||
if (c.value !== null) {
|
||||
if (best.value === null) best.value = c.value;
|
||||
}
|
||||
}
|
||||
rootPeer.value = best.value?.peer ?? null;
|
||||
}
|
||||
resolve(() => rootPeer.value, route.pathSteps, (r) => {
|
||||
const s = best.value!;
|
||||
console.log('leaf', s.addr.url, stringify(r));
|
||||
assert G.ResolvePath<Ref>({
|
||||
"route": route,
|
||||
"addr": fromJS(s.addr),
|
||||
"control": s.control,
|
||||
"resolved": r,
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function resolve(
|
||||
e: () => Ref | null,
|
||||
steps: G.PathStep[],
|
||||
k: (r: G.Resolved) => void,
|
||||
) {
|
||||
if (steps.length === 0) {
|
||||
const peer = e();
|
||||
k(peer === null
|
||||
? G.Resolved.Rejected(G.Rejected(Symbol.for('not-connected')))
|
||||
: G.Resolved.accepted(peer));
|
||||
} else {
|
||||
const [step, ...more] = steps;
|
||||
at ds {
|
||||
during G.ResolvedPathStep({
|
||||
"origin": (e()!),
|
||||
"pathStep": step,
|
||||
"resolved": $resolved: G.Resolved,
|
||||
}) when (e()) => {
|
||||
switch (resolved._variant) {
|
||||
case "accepted":
|
||||
resolve(() => resolved.responderSession, more, k);
|
||||
break;
|
||||
case "Rejected":
|
||||
k(resolved);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
spawn named 'noiseStep' {
|
||||
at ds {
|
||||
during Observe({ "pattern": :pattern G.ResolvedPathStep({
|
||||
"origin": \$originPatValue,
|
||||
"pathStep": G.PathStep({
|
||||
"stepType": N.$noise,
|
||||
"detail": \$detailPatValue,
|
||||
}),
|
||||
"resolved": \_,
|
||||
}) }) => {
|
||||
const origin0 = Q.drop_lit(originPatValue);
|
||||
if (!origin0 || !isEmbedded(origin0)) return;
|
||||
const origin = origin0.embeddedValue;
|
||||
|
||||
const detail0 = Q.drop_lit(detailPatValue, N.toNoisePathStepDetail);
|
||||
if (!detail0) return;
|
||||
const spec: N.NoiseSpec<Ref> = detail0;
|
||||
|
||||
const algorithms = SaltyCrypto.Noise_25519_ChaChaPoly_BLAKE2s;
|
||||
const protocol =
|
||||
spec.protocol._variant === "present" ? spec.protocol.protocol :
|
||||
spec.protocol._variant === "absent" ? N.fromDefaultProtocol(N.DefaultProtocol()) as string :
|
||||
(() => { throw new Error("Invalid noise protocol name"); })();
|
||||
const patternName = SaltyCrypto.matchPattern(algorithms, protocol);
|
||||
if (patternName === null) throw new Error("Unsupported protocol " + protocol);
|
||||
const preSharedKeys =
|
||||
spec.preSharedKeys._variant === "present" ? spec.preSharedKeys.preSharedKeys :
|
||||
spec.preSharedKeys._variant === "absent" ? [] :
|
||||
(() => { throw new Error("Invalid pre-shared keys"); })();
|
||||
const prologue = underlying(canonicalEncode(spec.service));
|
||||
|
||||
const H = new SaltyCrypto.Handshake(
|
||||
algorithms,
|
||||
patternName,
|
||||
'initiator',
|
||||
{
|
||||
prologue,
|
||||
remoteStaticPublicKey: underlying(spec.key),
|
||||
preSharedKeys: preSharedKeys.map(underlying),
|
||||
});
|
||||
|
||||
let transportState: SaltyCrypto.TransportState | null = null;
|
||||
let responderSession: Ref | null = null;
|
||||
let relay: Relay.Relay | null = null;
|
||||
|
||||
function maybeTransition(s: SaltyCrypto.TransportState | null) {
|
||||
if (transportState !== null) {
|
||||
throw new Error("Unexpected double-transition to transport state");
|
||||
}
|
||||
transportState = s;
|
||||
if (transportState !== null) {
|
||||
const noiseSessionFacet = Turn.activeFacet;
|
||||
const peer = new Relay.Relay({
|
||||
debug,
|
||||
trustPeer: true,
|
||||
packetWriter: bs => noiseSessionFacet.turn(() => {
|
||||
const fragments = transportState!.send.encrypt_large(bs).map(Bytes.from);
|
||||
at responderSession! {
|
||||
send message ((fragments.length === 1)
|
||||
? N.Packet.complete(fragments[0])
|
||||
: N.Packet.fragmented(fragments));
|
||||
}
|
||||
}),
|
||||
setup(r: Relay.Relay) {
|
||||
relay = r;
|
||||
},
|
||||
initialOid: 0,
|
||||
}).peer!;
|
||||
assert G.ResolvedPathStep<Ref>({
|
||||
"origin": origin,
|
||||
"pathStep": G.PathStep({
|
||||
"stepType": N.$noise,
|
||||
"detail": fromJS(spec),
|
||||
}),
|
||||
"resolved": G.Resolved.accepted(peer),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
react {
|
||||
at origin {
|
||||
assert G.Resolve({
|
||||
"step": G.Step({
|
||||
"stepType": N.$noise,
|
||||
"detail": fromJS(N.ServiceSelector(spec.service)),
|
||||
}),
|
||||
"observer": create ({
|
||||
... assertionFacetObserver(e => {
|
||||
const response = G.toResolved(e);
|
||||
if (!response) return;
|
||||
switch (response._variant) {
|
||||
case "accepted":
|
||||
responderSession = response.responderSession;
|
||||
const { packet, finished } = H.writeMessage(new Uint8Array());
|
||||
at responderSession {
|
||||
send message Bytes.from(packet);
|
||||
}
|
||||
maybeTransition(finished);
|
||||
break;
|
||||
case "Rejected":
|
||||
stop {
|
||||
at ds {
|
||||
assert G.ResolvedPathStep<Ref>({
|
||||
"origin": origin,
|
||||
"pathStep": G.PathStep({
|
||||
"stepType": N.$noise,
|
||||
"detail": fromJS(N.NoisePathStepDetail(spec)),
|
||||
}),
|
||||
"resolved": response,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
message(body: Assertion) {
|
||||
const p = N.asPacket(body);
|
||||
if (transportState) {
|
||||
const packet = transportState.recv.decrypt_large(
|
||||
p._variant === 'complete'
|
||||
? [underlying(p.value)]
|
||||
: p.value.map(underlying));
|
||||
relay!.accept(packet);
|
||||
} else {
|
||||
if (p._variant !== 'complete') {
|
||||
throw new Error("Unexpected fragmentation in handshake");
|
||||
}
|
||||
const { message, finished } = H.readMessage(underlying(p.value));
|
||||
if (message.byteLength !== 0) {
|
||||
throw new Error("Unexpected payload during handshake");
|
||||
}
|
||||
maybeTransition(finished);
|
||||
}
|
||||
},
|
||||
}),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
spawn named 'sturdyRefStep' {
|
||||
at ds {
|
||||
during Observe({ "pattern": :pattern G.ResolvedPathStep({
|
||||
"origin": \$originPatValue,
|
||||
"pathStep": G.PathStep({
|
||||
"stepType": S.$ref,
|
||||
"detail": \$detailPatValue,
|
||||
}),
|
||||
"resolved": \_,
|
||||
}) }) => {
|
||||
const origin0 = Q.drop_lit(originPatValue);
|
||||
if (!origin0 || !isEmbedded(origin0)) return;
|
||||
const origin = origin0.embeddedValue;
|
||||
|
||||
const detail0 = Q.drop_lit(detailPatValue, S.toSturdyPathStepDetail);
|
||||
if (!detail0) return;
|
||||
const parameters: S.Parameters = detail0;
|
||||
|
||||
at origin {
|
||||
assert G.Resolve({
|
||||
"step": G.Step({
|
||||
"stepType": S.$ref,
|
||||
"detail": fromJS(parameters),
|
||||
}),
|
||||
"observer": create assertionFacetObserver(e => {
|
||||
const response = G.toResolved(e);
|
||||
if (!response) return;
|
||||
at ds {
|
||||
assert G.ResolvedPathStep<Ref>({
|
||||
"origin": origin,
|
||||
"pathStep": G.PathStep({
|
||||
"stepType": S.$ref,
|
||||
"detail": fromJS(parameters),
|
||||
}),
|
||||
"resolved": response,
|
||||
});
|
||||
}
|
||||
}),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5804,7 +5804,7 @@ safe-regex-test@^1.0.0:
|
|||
resolved "https://registry.yarnpkg.com/safer-buffer/-/safer-buffer-2.1.2.tgz#44fa161b0187b9549dd84bb91802f9bd8385cd6a"
|
||||
integrity sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg==
|
||||
|
||||
salty-crypto@0.3:
|
||||
salty-crypto@0.3, salty-crypto@0.3.1:
|
||||
version "0.3.1"
|
||||
resolved "https://registry.yarnpkg.com/salty-crypto/-/salty-crypto-0.3.1.tgz#1242cd948d8152aff104d0945980996b1602ba0e"
|
||||
integrity sha512-dCbF8/UzMV8oXPmMCHWzSp7u8G6NA+pHbaFBVqVgLVBiyxuwVn9iPbyt9WXT0EoXfL9sKt7U1mV+bgsGw5cV7A==
|
||||
|
|
Loading…
Reference in New Issue