Try multiple transports in order

This commit is contained in:
Tony Garnock-Jones 2023-01-31 16:22:09 +01:00
parent 60ecab200c
commit 54c2ddfd99
1 changed files with 73 additions and 40 deletions

View File

@ -1,7 +1,7 @@
/// SPDX-License-Identifier: GPL-3.0-or-later /// SPDX-License-Identifier: GPL-3.0-or-later
/// SPDX-FileCopyrightText: Copyright © 2016-2023 Tony Garnock-Jones <tonyg@leastfixedpoint.com> /// SPDX-FileCopyrightText: Copyright © 2016-2023 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
import { QuasiValue as Q, Assertion, Bytes, Ref, Relay, Turn, Supervisor, SupervisorRestartPolicy, Observe, Schemas, assertionFacetObserver, canonicalEncode, isEmbedded, underlying } from "@syndicate-lang/core"; import { QuasiValue as Q, Assertion, Bytes, Ref, Relay, Turn, Supervisor, SupervisorRestartPolicy, Observe, Schemas, assertionFacetObserver, canonicalEncode, fromJS, isEmbedded, underlying } from "@syndicate-lang/core";
import * as G from "./gen/ws"; import * as G from "./gen/ws";
export * from "./gen/ws"; export * from "./gen/ws";
import * as N from "@syndicate-lang/core/lib/gen/noise"; import * as N from "@syndicate-lang/core/lib/gen/noise";
@ -18,55 +18,88 @@ export function boot(ds: Ref, debug: boolean = false) {
}) }) => { }) }) => {
const route = Q.drop_lit(routePatValue, N.toRoute); const route = Q.drop_lit(routePatValue, N.toRoute);
if (!route) return; if (!route) return;
let addr0: G.RelayAddress | undefined; const addrs: G.RelayAddress[] = [];
route.transports.forEach(t => addr0 = addr0 ?? G.toRelayAddress(t)); route.transports.forEach(t => {
if (!addr0) return; const a = G.toRelayAddress(t);
const addr = addr0; if (a) addrs.push(a);
wsConnect(addr, e => resolve(e, route.steps, e => { });
assert G.Resolved({ wsConnect(addrs, (e, addr) => {
"route": route, at ds {
"addr": addr, stop on message G.ForceRelayDisconnect(addr);
"resolved": e, }
resolve(e, route.steps, e => {
assert G.Resolved({
"route": route,
"addr": addr,
"resolved": e,
});
}); });
})); });
at ds {
stop on message G.ForceRelayDisconnect(addr);
}
} }
} }
function wsConnect(addr: G.RelayAddress, k: (e: Ref) => void) { function tryConnection(addr: G.RelayAddress): Promise<WebSocket> {
console.log('wsConnect', addr); return new Promise((resolve, reject) => {
console.log('trying', addr);
const ws = new WebSocket(addr.url);
ws.binaryType = 'arraybuffer';
ws.onopen = () => resolve(ws);
ws.onclose = () => reject(null);
ws.onerror = (e) => reject(e);
});
}
async function establishConnection(
addrs: G.RelayAddress[],
): Promise<{ws: WebSocket, addr: G.RelayAddress} | null> {
for (let i = 0; i < addrs.length; i++) {
const addr = addrs[i];
try {
return { ws: await tryConnection(addr), addr };
} catch (e) {
console.log('attempt to contact', addr.url, 'failed with', e);
}
}
return null;
}
function wsConnect(addrs: G.RelayAddress[], k: (e: Ref, addr: G.RelayAddress) => void) {
console.log('wsConnect', addrs);
let counter = 0; let counter = 0;
new Supervisor({ new Supervisor({
restartPolicy: SupervisorRestartPolicy.ALWAYS, restartPolicy: SupervisorRestartPolicy.ALWAYS,
}, () => ['wsRelay', G.fromRelayAddress(addr), counter++], () => { }, () => ['wsRelay', fromJS(addrs), counter++], () => {
console.log('try!', counter - 1); console.log('try!', counter - 1);
const facet = Turn.activeFacet; const facet = Turn.activeFacet;
const ws = new WebSocket(addr.url);
facet.preventInertCheck(); facet.preventInertCheck();
on stop {
console.log('on stop triggered'); establishConnection(addrs).then(result => facet.turn(() => {
ws.close(); if (result === null) {
} console.log('no successful connection');
ws.binaryType = 'arraybuffer'; stop {}
ws.onclose = () => facet.turn(() => { stop {} }); } else {
ws.onerror = () => facet.turn(() => const {ws, addr} = result;
Turn.active.crash(new Error("WebSocket error"))); console.log('hey cool', addr);
ws.onopen = () => facet.turn(() => { on stop {
console.log('hey cool'); console.log('on stop triggered');
const relay = new Relay.Relay({ ws.close();
debug, }
trustPeer: true, ws.onclose = () => facet.turn(() => { stop {} });
packetWriter: bs => ws.send(bs), ws.onerror = () => facet.turn(() =>
setup(r: Relay.Relay) { Turn.active.crash(new Error("WebSocket error")));
ws.onmessage = e => facet.turn(() => const relay = new Relay.Relay({
r.accept(new Uint8Array(e.data))); debug,
}, trustPeer: true,
initialOid: 0, packetWriter: bs => ws.send(bs),
}); setup(r: Relay.Relay) {
k(relay.peer!); ws.onmessage = e => facet.turn(() =>
}); r.accept(new Uint8Array(e.data)));
},
initialOid: 0,
});
k(relay.peer!, addr);
}
}));
}); });
} }