syndicate-js/packages/ws-relay/src/index.ts

480 lines
20 KiB
TypeScript

/// SPDX-License-Identifier: GPL-3.0-or-later
/// SPDX-FileCopyrightText: Copyright © 2016-2023 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
import {
AnyValue,
Assertion,
Bytes,
Dataspace,
Embedded,
IdentitySet,
Observe,
QuasiValue as Q,
Record,
Ref,
Relay,
Schemas,
Supervisor,
Turn,
assertionFacetObserver,
canonicalEncode,
decode,
fromJS,
isEmbedded,
stringify,
underlying,
} from "@syndicate-lang/core";
import G = Schemas.gatekeeper;
import S = Schemas.sturdy;
import N = Schemas.noise;
import T = Schemas.transportAddress;
import E = Schemas.stdenv;
import * as SaltyCrypto from 'salty-crypto';
export * as SaltyCrypto from 'salty-crypto';
type TransportState = {
addr: T.WebSocket,
control: Ref,
peer: Ref,
};
export function boot(ds = Dataspace.global, debug: boolean = false, WebSocketConstructor?: typeof WebSocket) {
spawn named 'transportConnector' {
at ds {
during Observe({ "pattern": :pattern G.TransportConnection({
"addr": \$addrPatValue,
"control": \_,
"resolved": \_,
}) }) => {
const addr = Q.drop_lit(addrPatValue, T.toWebSocket);
if (!addr) return;
let counter =0;
Supervisor.always(() => ['transportConnector', fromJS(addr), counter++], () => {
console.log('connecting', addr.url, counter);
connectTo(addr);
});
}
}
}
function connectTo(addr: T.WebSocket) {
const facet = Turn.activeFacet;
facet.preventInertCheck();
const controlEntity = {
message(a0: Assertion): void {
const a = G.toTransportControl(a0);
if (!a) return;
stop {} // ForceDisconnect
},
};
let final = false;
function succeed(ws: WebSocket) {
if (final) return;
final = true;
console.log('opened', addr.url);
on stop {
console.log('closing', addr.url);
ws.close();
}
ws.onclose = () => facet.turn(() => { stop {} });
ws.onerror = () => facet.turn(() =>
Turn.active.crash(new Error("WebSocket error")));
const relay = new Relay.Relay({
debug,
trustPeer: true,
packetWriter: bs => ws.send(bs),
setup(r: Relay.Relay) {
ws.onmessage = e => facet.turn(() =>
r.accept(new Uint8Array(e.data)));
},
initialOid: 0,
});
console.log('succeed', addr.url);
at ds {
assert G.TransportConnection<Ref>({
"addr": fromJS(addr),
"control": create controlEntity,
"resolved": G.Resolved.accepted(relay.peer!),
});
}
}
function fail(detail: Assertion) {
if (final) return;
final = true;
console.log('fail', addr.url, detail);
at ds {
assert G.TransportConnection<Ref>({
"addr": fromJS(addr),
"control": create controlEntity,
"resolved": G.Resolved.Rejected(G.Rejected(detail)),
});
}
setTimeout(() => facet.turn(() => { stop {} }), 10000);
}
try {
const ws = new (WebSocketConstructor ?? WebSocket)(addr.url);
ws.binaryType = 'arraybuffer';
ws.onopen = () => facet.turn(() => succeed(ws));
ws.onclose = () => facet.turn(() => fail(Symbol.for('closed')));
ws.onerror = () => facet.turn(() => fail(Symbol.for('websocket-error-event')));
} catch (e) {
console.error('Failed opening websocket', addr.url, e);
fail(Symbol.for('websocket-exception'));
}
}
spawn named 'pathResolver' {
at ds {
during Observe({ "pattern": :pattern G.ResolvePath({
"route": \$routePatValue,
"addr": \_,
"control": \_,
"resolved": \_,
}) }) => {
const route = Q.drop_lit(routePatValue, G.toRoute);
if (!route) return;
field candidates: IdentitySet<TransportState> = new IdentitySet();
route.transports.forEach(t => {
const addr = T.toWebSocket(t);
if (!addr) return;
console.log('tracking', addr.url);
during G.TransportConnection({
"addr": addr,
"control": $control_e: Embedded,
"resolved": G.Resolved.accepted($peer_e: Embedded),
}) => {
const entry = {
addr,
control: control_e.embeddedValue,
peer: peer_e.embeddedValue,
};
candidates.value.add(entry);
candidates.changed();
on stop {
candidates.value.delete(entry);
candidates.changed();
}
}
});
field best: TransportState | null = null;
field rootPeer: Ref | null = null;
dataflow {
best.suppressCycleWarning();
best.value = null;
for (const c of candidates.value) {
best.value = c;
break;
}
rootPeer.value = best.value?.peer ?? null;
}
resolve(() => rootPeer.value, route.pathSteps, (r) => {
console.log('leaf', best.value?.addr?.url);
assert G.ResolvePath<Ref>({
"route": route,
"addr": fromJS(best.value!.addr),
"control": best.value!.control,
"resolved": r()!
}) when (r());
});
}
}
}
function resolve(
e: () => Ref | null,
steps: G.PathStep[],
k: (r: () => G.Resolved | null) => void,
) {
if (steps.length === 0) {
k(() => {
const peer = e();
return peer === null ? null : G.Resolved.accepted(peer);
});
} else {
const [step, ...more] = steps;
at ds {
during G.ResolvedPathStep({
"origin": (e()!),
"pathStep": step,
"resolved": $resolved: G.Resolved,
}) when (e()) => {
switch (resolved._variant) {
case "accepted":
// Include a call to our e() in the e we pass in to the recursive
// call to resolve(). e() returning non-null is a precondition for
// the call; if that precondition ever changes, we want to NOT
// reevaluate the body of any assertion, so we should test it
// before we do.
//
// Concrete example of a problem that occurs if the `e() && ...`
// isn't there: connected all OK, the websocket disconnects,
// best.value gets set null, the previous assertion of ResolvePath
// at the end of the chain gets reevaluated because
// resolved.responderSession is still non-null, it includes
// best.value!, which is now null, boom. With the call to e(), we
// short circuit and so the assertion becomes null at that point.
//
resolve(() => e() && resolved.responderSession, more, k);
break;
case "Rejected":
k(() => resolved);
break;
}
}
}
}
}
spawn named 'noiseStep' {
at ds {
during Observe({ "pattern": :pattern G.ResolvedPathStep({
"origin": \$originPatValue,
"pathStep": G.PathStep({
"stepType": N.$noise,
"detail": \$detailPatValue,
}),
"resolved": \_,
}) }) => {
const origin0 = Q.drop_lit(originPatValue);
if (!origin0 || !isEmbedded(origin0)) return;
const origin = origin0.embeddedValue;
const detail0 = Q.drop_lit(detailPatValue, N.toNoisePathStepDetail);
if (!detail0) return;
const spec: N.NoiseSpec<Ref> = detail0;
const algorithms = SaltyCrypto.Noise_25519_ChaChaPoly_BLAKE2s;
const protocol =
spec.protocol._variant === "present" ? spec.protocol.protocol :
spec.protocol._variant === "absent" ? N.fromDefaultProtocol(N.DefaultProtocol()) as string :
(() => { throw new Error("Invalid noise protocol name"); })();
const patternName = SaltyCrypto.matchPattern(algorithms, protocol);
if (patternName === null) throw new Error("Unsupported protocol " + protocol);
const preSharedKeys =
spec.preSharedKeys._variant === "present" ? spec.preSharedKeys.preSharedKeys :
spec.preSharedKeys._variant === "absent" ? [] :
(() => { throw new Error("Invalid pre-shared keys"); })();
const prologue = underlying(canonicalEncode(spec.service));
const H = new SaltyCrypto.Handshake(
algorithms,
patternName,
'initiator',
{
prologue,
remoteStaticPublicKey: underlying(spec.key),
preSharedKeys: preSharedKeys.map(underlying),
});
let transportState: SaltyCrypto.TransportState | null = null;
let responderSession: Ref | null = null;
let relay: Relay.Relay | null = null;
function maybeTransition(s: SaltyCrypto.TransportState | null) {
if (transportState !== null) {
throw new Error("Unexpected double-transition to transport state");
}
transportState = s;
if (transportState !== null) {
const noiseSessionFacet = Turn.activeFacet;
const peer = new Relay.Relay({
debug,
trustPeer: true,
packetWriter: bs => noiseSessionFacet.turn(() => {
const fragments = transportState!.send.encrypt_large(bs).map(Bytes.from);
at responderSession! {
send message ((fragments.length === 1)
? N.Packet.complete(fragments[0])
: N.Packet.fragmented(fragments));
}
}),
setup(r: Relay.Relay) {
relay = r;
},
initialOid: 0,
}).peer!;
assert G.ResolvedPathStep<Ref>({
"origin": origin,
"pathStep": G.PathStep({
"stepType": N.$noise,
"detail": fromJS(spec),
}),
"resolved": G.Resolved.accepted(peer),
});
}
}
react {
at origin {
assert G.Resolve({
"step": G.Step({
"stepType": N.$noise,
"detail": fromJS(N.ServiceSelector(spec.service)),
}),
"observer": create ({
... assertionFacetObserver(e => {
const response = G.toResolved(e);
if (!response) return;
switch (response._variant) {
case "accepted":
responderSession = response.responderSession;
const { packet, finished } = H.writeMessage(new Uint8Array());
at responderSession {
send message Bytes.from(packet);
}
maybeTransition(finished);
break;
case "Rejected":
stop {
at ds {
assert G.ResolvedPathStep<Ref>({
"origin": origin,
"pathStep": G.PathStep({
"stepType": N.$noise,
"detail": fromJS(N.NoisePathStepDetail(spec)),
}),
"resolved": response,
});
}
}
}
}),
message(body: Assertion) {
const p = N.asPacket(body);
if (transportState) {
const packet = transportState.recv.decrypt_large(
p._variant === 'complete'
? [underlying(p.value)]
: p.value.map(underlying));
relay!.accept(packet);
} else {
if (p._variant !== 'complete') {
throw new Error("Unexpected fragmentation in handshake");
}
const { message, finished } = H.readMessage(underlying(p.value));
if (message.byteLength !== 0) {
throw new Error("Unexpected payload during handshake");
}
maybeTransition(finished);
}
},
}),
});
}
}
}
}
}
spawn named 'sturdyRefStep' {
at ds {
during Observe({ "pattern": :pattern G.ResolvedPathStep({
"origin": \$originPatValue,
"pathStep": G.PathStep({
"stepType": S.$ref,
"detail": \$detailPatValue,
}),
"resolved": \_,
}) }) => {
const origin0 = Q.drop_lit(originPatValue);
if (!origin0 || !isEmbedded(origin0)) return;
const origin = origin0.embeddedValue;
const detail0 = Q.drop_lit(detailPatValue, S.toSturdyPathStepDetail);
if (!detail0) return;
const parameters: S.Parameters = detail0;
at origin {
assert G.Resolve({
"step": G.Step({
"stepType": S.$ref,
"detail": fromJS(parameters),
}),
"observer": create assertionFacetObserver(e => {
const response = G.toResolved(e);
if (!response) return;
at ds {
assert G.ResolvedPathStep<Ref>({
"origin": origin,
"pathStep": G.PathStep({
"stepType": S.$ref,
"detail": fromJS(parameters),
}),
"resolved": response,
});
}
}),
});
}
}
}
}
}
export function unpackStandardRoute<R>(route: E.StandardRoute<R>): G.Route<R> {
if (route._variant === 'general') return route.value;
const { transports, key, service, sig, oid } = route;
const protocol = N.NoiseProtocol.absent<R>();
const preSharedKeys = N.NoisePreSharedKeys.absent<R>();
const caveats = route.caveats.length
? S.CaveatsField.present<R>(route.caveats)
: S.CaveatsField.absent<R>();
return G.Route({
transports: transports.map(t => {
switch (t._variant) {
case 'wsUrl': return T.fromWebSocket(T.WebSocket(t.value));
default: {
const x = stringify(E.fromStandardTransport(t));
throw new Error(`Unsupported transport: ${x}`);
}
}
}),
pathSteps: [
G.PathStep({
stepType: N.$noise,
detail: N.fromNoiseSpec(N.NoiseSpec({ service, key, protocol, preSharedKeys })),
}),
G.PathStep({
stepType: S.$ref,
detail: S.fromParameters(S.Parameters({ oid, sig, caveats })),
}),
],
});
}
export function decodeStandardRoute(s: string): G.Route | null {
try {
const route = E.toStandardRoute<Ref>(decode(
Bytes.fromBase64(s.replace(/[^-_+/A-Za-z0-9=]/g, ''))));
return route === void 0 ? null : unpackStandardRoute(route);
} catch (e) {
console.error('Decoding standard route:', e);
return null;
}
}
export function contactRemote(
route: G.Route<Ref> | Record<AnyValue, Array<AnyValue>, Ref>,
connectedFacet: (
remoteObject: Ref,
controlObject: Ref,
transportAddr: AnyValue,
) => void,
ds = Dataspace.global,
) {
const routeValue = Record.isRecord(route) ? route : G.fromRoute(G.Route(route));
at ds {
during G.ResolvePath({
"route": routeValue,
"addr": $addr,
"control": $control_e: Embedded,
"resolved": G.Resolved.accepted($resolved_e: Embedded),
}) => {
connectedFacet(resolved_e.embeddedValue, control_e.embeddedValue, addr);
}
}
}