From 54c2ddfd9922525926d0a1259cc1da855db205a2 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 31 Jan 2023 16:22:09 +0100 Subject: [PATCH] Try multiple transports in order --- packages/ws-relay/src/index.ts | 113 +++++++++++++++++++++------------ 1 file changed, 73 insertions(+), 40 deletions(-) diff --git a/packages/ws-relay/src/index.ts b/packages/ws-relay/src/index.ts index decd0e5..87435b3 100644 --- a/packages/ws-relay/src/index.ts +++ b/packages/ws-relay/src/index.ts @@ -1,7 +1,7 @@ /// SPDX-License-Identifier: GPL-3.0-or-later /// SPDX-FileCopyrightText: Copyright © 2016-2023 Tony Garnock-Jones -import { QuasiValue as Q, Assertion, Bytes, Ref, Relay, Turn, Supervisor, SupervisorRestartPolicy, Observe, Schemas, assertionFacetObserver, canonicalEncode, isEmbedded, underlying } from "@syndicate-lang/core"; +import { QuasiValue as Q, Assertion, Bytes, Ref, Relay, Turn, Supervisor, SupervisorRestartPolicy, Observe, Schemas, assertionFacetObserver, canonicalEncode, fromJS, isEmbedded, underlying } from "@syndicate-lang/core"; import * as G from "./gen/ws"; export * from "./gen/ws"; import * as N from "@syndicate-lang/core/lib/gen/noise"; @@ -18,55 +18,88 @@ export function boot(ds: Ref, debug: boolean = false) { }) }) => { const route = Q.drop_lit(routePatValue, N.toRoute); if (!route) return; - let addr0: G.RelayAddress | undefined; - route.transports.forEach(t => addr0 = addr0 ?? G.toRelayAddress(t)); - if (!addr0) return; - const addr = addr0; - wsConnect(addr, e => resolve(e, route.steps, e => { - assert G.Resolved({ - "route": route, - "addr": addr, - "resolved": e, + const addrs: G.RelayAddress[] = []; + route.transports.forEach(t => { + const a = G.toRelayAddress(t); + if (a) addrs.push(a); + }); + wsConnect(addrs, (e, addr) => { + at ds { + stop on message G.ForceRelayDisconnect(addr); + } + resolve(e, route.steps, e => { + assert G.Resolved({ + "route": route, + "addr": addr, + "resolved": e, + }); }); - })); - at ds { - stop on message G.ForceRelayDisconnect(addr); - } + }); } } - function wsConnect(addr: G.RelayAddress, k: (e: Ref) => void) { - console.log('wsConnect', addr); + function tryConnection(addr: G.RelayAddress): Promise { + return new Promise((resolve, reject) => { + console.log('trying', addr); + const ws = new WebSocket(addr.url); + ws.binaryType = 'arraybuffer'; + ws.onopen = () => resolve(ws); + ws.onclose = () => reject(null); + ws.onerror = (e) => reject(e); + }); + } + + async function establishConnection( + addrs: G.RelayAddress[], + ): Promise<{ws: WebSocket, addr: G.RelayAddress} | null> { + for (let i = 0; i < addrs.length; i++) { + const addr = addrs[i]; + try { + return { ws: await tryConnection(addr), addr }; + } catch (e) { + console.log('attempt to contact', addr.url, 'failed with', e); + } + } + return null; + } + + function wsConnect(addrs: G.RelayAddress[], k: (e: Ref, addr: G.RelayAddress) => void) { + console.log('wsConnect', addrs); let counter = 0; new Supervisor({ restartPolicy: SupervisorRestartPolicy.ALWAYS, - }, () => ['wsRelay', G.fromRelayAddress(addr), counter++], () => { + }, () => ['wsRelay', fromJS(addrs), counter++], () => { console.log('try!', counter - 1); const facet = Turn.activeFacet; - const ws = new WebSocket(addr.url); facet.preventInertCheck(); - on stop { - console.log('on stop triggered'); - ws.close(); - } - ws.binaryType = 'arraybuffer'; - ws.onclose = () => facet.turn(() => { stop {} }); - ws.onerror = () => facet.turn(() => - Turn.active.crash(new Error("WebSocket error"))); - ws.onopen = () => facet.turn(() => { - console.log('hey cool'); - const relay = new Relay.Relay({ - debug, - trustPeer: true, - packetWriter: bs => ws.send(bs), - setup(r: Relay.Relay) { - ws.onmessage = e => facet.turn(() => - r.accept(new Uint8Array(e.data))); - }, - initialOid: 0, - }); - k(relay.peer!); - }); + + establishConnection(addrs).then(result => facet.turn(() => { + if (result === null) { + console.log('no successful connection'); + stop {} + } else { + const {ws, addr} = result; + console.log('hey cool', addr); + on stop { + console.log('on stop triggered'); + ws.close(); + } + ws.onclose = () => facet.turn(() => { stop {} }); + ws.onerror = () => facet.turn(() => + Turn.active.crash(new Error("WebSocket error"))); + const relay = new Relay.Relay({ + debug, + trustPeer: true, + packetWriter: bs => ws.send(bs), + setup(r: Relay.Relay) { + ws.onmessage = e => facet.turn(() => + r.accept(new Uint8Array(e.data))); + }, + initialOid: 0, + }); + k(relay.peer!, addr); + } + })); }); }