404 lines
17 KiB
TypeScript
404 lines
17 KiB
TypeScript
/// SPDX-License-Identifier: GPL-3.0-or-later
|
|
/// SPDX-FileCopyrightText: Copyright © 2016-2023 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
|
|
|
import {
|
|
Assertion,
|
|
Bytes,
|
|
Dataflow,
|
|
Embedded,
|
|
Observe,
|
|
QuasiValue as Q,
|
|
Ref,
|
|
Relay,
|
|
Schemas,
|
|
Supervisor,
|
|
Turn,
|
|
assertionFacetObserver,
|
|
canonicalEncode,
|
|
fromJS,
|
|
isEmbedded,
|
|
stringify,
|
|
underlying,
|
|
} from "@syndicate-lang/core";
|
|
import G = Schemas.gatekeeper;
|
|
import S = Schemas.sturdy;
|
|
import N = Schemas.noise;
|
|
import T = Schemas.transportAddress;
|
|
import * as SaltyCrypto from 'salty-crypto';
|
|
|
|
type TransportState = {
|
|
addr: T.WebSocket,
|
|
control: Ref,
|
|
peer: Ref,
|
|
};
|
|
|
|
export function boot(ds: Ref, debug: boolean = false) {
|
|
spawn named 'transportConnector' {
|
|
at ds {
|
|
during Observe({ "pattern": :pattern G.TransportConnection({
|
|
"addr": \$addrPatValue,
|
|
"control": \_,
|
|
"resolved": \_,
|
|
}) }) => {
|
|
const addr = Q.drop_lit(addrPatValue, T.toWebSocket);
|
|
if (!addr) return;
|
|
let counter =0;
|
|
Supervisor.always(() => ['transportConnector', fromJS(addr), counter++], () => {
|
|
console.log('connecting', addr.url, counter);
|
|
connectTo(addr);
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
function connectTo(addr: T.WebSocket) {
|
|
const facet = Turn.activeFacet;
|
|
facet.preventInertCheck();
|
|
const controlEntity = {
|
|
message(a0: Assertion): void {
|
|
const a = G.toTransportControl(a0);
|
|
if (!a) return;
|
|
stop {} // ForceDisconnect
|
|
},
|
|
};
|
|
let final = false;
|
|
function succeed(ws: WebSocket) {
|
|
if (final) return;
|
|
final = true;
|
|
console.log('opened', ws);
|
|
on stop {
|
|
console.log('closing', ws);
|
|
ws.close();
|
|
}
|
|
ws.onclose = () => facet.turn(() => { stop {} });
|
|
ws.onerror = () => facet.turn(() =>
|
|
Turn.active.crash(new Error("WebSocket error")));
|
|
const relay = new Relay.Relay({
|
|
debug,
|
|
trustPeer: true,
|
|
packetWriter: bs => ws.send(bs),
|
|
setup(r: Relay.Relay) {
|
|
ws.onmessage = e => facet.turn(() =>
|
|
r.accept(new Uint8Array(e.data)));
|
|
},
|
|
initialOid: 0,
|
|
});
|
|
console.log('succeed', addr.url);
|
|
at ds {
|
|
assert G.TransportConnection<Ref>({
|
|
"addr": fromJS(addr),
|
|
"control": create controlEntity,
|
|
"resolved": G.Resolved.accepted(relay.peer!),
|
|
});
|
|
}
|
|
}
|
|
function fail(detail: Assertion) {
|
|
if (final) return;
|
|
final = true;
|
|
console.log('fail', addr.url, detail);
|
|
at ds {
|
|
assert G.TransportConnection<Ref>({
|
|
"addr": fromJS(addr),
|
|
"control": create controlEntity,
|
|
"resolved": G.Resolved.Rejected(G.Rejected(detail)),
|
|
});
|
|
}
|
|
setTimeout(() => facet.turn(() => { stop {} }), 10000);
|
|
}
|
|
try {
|
|
const ws = new WebSocket(addr.url);
|
|
ws.binaryType = 'arraybuffer';
|
|
ws.onopen = () => facet.turn(() => succeed(ws));
|
|
ws.onclose = () => facet.turn(() => fail(Symbol.for('closed')));
|
|
ws.onerror = (e) => facet.turn(() => fail(Symbol.for('websocket-error-event')));
|
|
} catch (e) {
|
|
console.error('Failed opening websocket', addr.url, e);
|
|
fail(Symbol.for('websocket-exception'));
|
|
}
|
|
}
|
|
|
|
spawn named 'pathResolver' {
|
|
at ds {
|
|
during Observe({ "pattern": :pattern G.ResolvePath({
|
|
"route": \$routePatValue,
|
|
"addr": \_,
|
|
"control": \_,
|
|
"resolved": \_,
|
|
}) }) => {
|
|
const route = Q.drop_lit(routePatValue, G.toRoute);
|
|
if (!route) return;
|
|
const candidates: Dataflow.Field<TransportState | null>[] = [];
|
|
route.transports.forEach(t => {
|
|
const addr = T.toWebSocket(t);
|
|
if (!addr) return;
|
|
let counter = 0;
|
|
field state: TransportState | null = null;
|
|
candidates.push(state);
|
|
console.log('tracking', addr.url);
|
|
during G.TransportConnection({
|
|
"addr": addr,
|
|
"control": $control: Embedded,
|
|
"resolved": $resolved: G.Resolved,
|
|
}) => {
|
|
const me = counter++;
|
|
switch (resolved._variant) {
|
|
case "accepted":
|
|
state.value = {
|
|
addr,
|
|
control: control.embeddedValue,
|
|
peer: resolved.responderSession,
|
|
};
|
|
break;
|
|
case "Rejected":
|
|
state.value = null;
|
|
break;
|
|
}
|
|
on stop {
|
|
if (counter === me) {
|
|
state.value = null;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
field best: TransportState | null = null;
|
|
field rootPeer: Ref | null = null;
|
|
dataflow {
|
|
best.value = null;
|
|
for (const c of candidates) {
|
|
if (c.value !== null) {
|
|
if (best.value === null) best.value = c.value;
|
|
}
|
|
}
|
|
rootPeer.value = best.value?.peer ?? null;
|
|
}
|
|
resolve(() => rootPeer.value, route.pathSteps, (r) => {
|
|
const s = best.value!;
|
|
console.log('leaf', s.addr.url, stringify(r));
|
|
assert G.ResolvePath<Ref>({
|
|
"route": route,
|
|
"addr": fromJS(s.addr),
|
|
"control": s.control,
|
|
"resolved": r,
|
|
});
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
function resolve(
|
|
e: () => Ref | null,
|
|
steps: G.PathStep[],
|
|
k: (r: G.Resolved) => void,
|
|
) {
|
|
if (steps.length === 0) {
|
|
const peer = e();
|
|
k(peer === null
|
|
? G.Resolved.Rejected(G.Rejected(Symbol.for('not-connected')))
|
|
: G.Resolved.accepted(peer));
|
|
} else {
|
|
const [step, ...more] = steps;
|
|
at ds {
|
|
during G.ResolvedPathStep({
|
|
"origin": (e()!),
|
|
"pathStep": step,
|
|
"resolved": $resolved: G.Resolved,
|
|
}) when (e()) => {
|
|
switch (resolved._variant) {
|
|
case "accepted":
|
|
resolve(() => resolved.responderSession, more, k);
|
|
break;
|
|
case "Rejected":
|
|
k(resolved);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
spawn named 'noiseStep' {
|
|
at ds {
|
|
during Observe({ "pattern": :pattern G.ResolvedPathStep({
|
|
"origin": \$originPatValue,
|
|
"pathStep": G.PathStep({
|
|
"stepType": N.$noise,
|
|
"detail": \$detailPatValue,
|
|
}),
|
|
"resolved": \_,
|
|
}) }) => {
|
|
const origin0 = Q.drop_lit(originPatValue);
|
|
if (!origin0 || !isEmbedded(origin0)) return;
|
|
const origin = origin0.embeddedValue;
|
|
|
|
const detail0 = Q.drop_lit(detailPatValue, N.toNoisePathStepDetail);
|
|
if (!detail0) return;
|
|
const spec: N.NoiseSpec<Ref> = detail0;
|
|
|
|
const algorithms = SaltyCrypto.Noise_25519_ChaChaPoly_BLAKE2s;
|
|
const protocol =
|
|
spec.protocol._variant === "present" ? spec.protocol.protocol :
|
|
spec.protocol._variant === "absent" ? N.fromDefaultProtocol(N.DefaultProtocol()) as string :
|
|
(() => { throw new Error("Invalid noise protocol name"); })();
|
|
const patternName = SaltyCrypto.matchPattern(algorithms, protocol);
|
|
if (patternName === null) throw new Error("Unsupported protocol " + protocol);
|
|
const preSharedKeys =
|
|
spec.preSharedKeys._variant === "present" ? spec.preSharedKeys.preSharedKeys :
|
|
spec.preSharedKeys._variant === "absent" ? [] :
|
|
(() => { throw new Error("Invalid pre-shared keys"); })();
|
|
const prologue = underlying(canonicalEncode(spec.service));
|
|
|
|
const H = new SaltyCrypto.Handshake(
|
|
algorithms,
|
|
patternName,
|
|
'initiator',
|
|
{
|
|
prologue,
|
|
remoteStaticPublicKey: underlying(spec.key),
|
|
preSharedKeys: preSharedKeys.map(underlying),
|
|
});
|
|
|
|
let transportState: SaltyCrypto.TransportState | null = null;
|
|
let responderSession: Ref | null = null;
|
|
let relay: Relay.Relay | null = null;
|
|
|
|
function maybeTransition(s: SaltyCrypto.TransportState | null) {
|
|
if (transportState !== null) {
|
|
throw new Error("Unexpected double-transition to transport state");
|
|
}
|
|
transportState = s;
|
|
if (transportState !== null) {
|
|
const noiseSessionFacet = Turn.activeFacet;
|
|
const peer = new Relay.Relay({
|
|
debug,
|
|
trustPeer: true,
|
|
packetWriter: bs => noiseSessionFacet.turn(() => {
|
|
const fragments = transportState!.send.encrypt_large(bs).map(Bytes.from);
|
|
at responderSession! {
|
|
send message ((fragments.length === 1)
|
|
? N.Packet.complete(fragments[0])
|
|
: N.Packet.fragmented(fragments));
|
|
}
|
|
}),
|
|
setup(r: Relay.Relay) {
|
|
relay = r;
|
|
},
|
|
initialOid: 0,
|
|
}).peer!;
|
|
assert G.ResolvedPathStep<Ref>({
|
|
"origin": origin,
|
|
"pathStep": G.PathStep({
|
|
"stepType": N.$noise,
|
|
"detail": fromJS(spec),
|
|
}),
|
|
"resolved": G.Resolved.accepted(peer),
|
|
});
|
|
}
|
|
}
|
|
|
|
react {
|
|
at origin {
|
|
assert G.Resolve({
|
|
"step": G.Step({
|
|
"stepType": N.$noise,
|
|
"detail": fromJS(N.ServiceSelector(spec.service)),
|
|
}),
|
|
"observer": create ({
|
|
... assertionFacetObserver(e => {
|
|
const response = G.toResolved(e);
|
|
if (!response) return;
|
|
switch (response._variant) {
|
|
case "accepted":
|
|
responderSession = response.responderSession;
|
|
const { packet, finished } = H.writeMessage(new Uint8Array());
|
|
at responderSession {
|
|
send message Bytes.from(packet);
|
|
}
|
|
maybeTransition(finished);
|
|
break;
|
|
case "Rejected":
|
|
stop {
|
|
at ds {
|
|
assert G.ResolvedPathStep<Ref>({
|
|
"origin": origin,
|
|
"pathStep": G.PathStep({
|
|
"stepType": N.$noise,
|
|
"detail": fromJS(N.NoisePathStepDetail(spec)),
|
|
}),
|
|
"resolved": response,
|
|
});
|
|
}
|
|
}
|
|
}
|
|
}),
|
|
message(body: Assertion) {
|
|
const p = N.asPacket(body);
|
|
if (transportState) {
|
|
const packet = transportState.recv.decrypt_large(
|
|
p._variant === 'complete'
|
|
? [underlying(p.value)]
|
|
: p.value.map(underlying));
|
|
relay!.accept(packet);
|
|
} else {
|
|
if (p._variant !== 'complete') {
|
|
throw new Error("Unexpected fragmentation in handshake");
|
|
}
|
|
const { message, finished } = H.readMessage(underlying(p.value));
|
|
if (message.byteLength !== 0) {
|
|
throw new Error("Unexpected payload during handshake");
|
|
}
|
|
maybeTransition(finished);
|
|
}
|
|
},
|
|
}),
|
|
});
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
spawn named 'sturdyRefStep' {
|
|
at ds {
|
|
during Observe({ "pattern": :pattern G.ResolvedPathStep({
|
|
"origin": \$originPatValue,
|
|
"pathStep": G.PathStep({
|
|
"stepType": S.$ref,
|
|
"detail": \$detailPatValue,
|
|
}),
|
|
"resolved": \_,
|
|
}) }) => {
|
|
const origin0 = Q.drop_lit(originPatValue);
|
|
if (!origin0 || !isEmbedded(origin0)) return;
|
|
const origin = origin0.embeddedValue;
|
|
|
|
const detail0 = Q.drop_lit(detailPatValue, S.toSturdyPathStepDetail);
|
|
if (!detail0) return;
|
|
const parameters: S.Parameters = detail0;
|
|
|
|
at origin {
|
|
assert G.Resolve({
|
|
"step": G.Step({
|
|
"stepType": S.$ref,
|
|
"detail": fromJS(parameters),
|
|
}),
|
|
"observer": create assertionFacetObserver(e => {
|
|
const response = G.toResolved(e);
|
|
if (!response) return;
|
|
at ds {
|
|
assert G.ResolvedPathStep<Ref>({
|
|
"origin": origin,
|
|
"pathStep": G.PathStep({
|
|
"stepType": S.$ref,
|
|
"detail": fromJS(parameters),
|
|
}),
|
|
"resolved": response,
|
|
});
|
|
}
|
|
}),
|
|
});
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|