From a51851283fd58dc4867710bfd7c8a649b4156005 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sat, 11 Feb 2023 22:01:19 +0100 Subject: [PATCH] Repair route with no steps --- packages/core/src/runtime/dataflow.ts | 6 ++- packages/ws-relay/src/index.ts | 67 +++++++++++---------------- 2 files changed, 33 insertions(+), 40 deletions(-) diff --git a/packages/core/src/runtime/dataflow.ts b/packages/core/src/runtime/dataflow.ts index f1a7711..c153dd1 100644 --- a/packages/core/src/runtime/dataflow.ts +++ b/packages/core/src/runtime/dataflow.ts @@ -112,9 +112,13 @@ export abstract class Cell { return this.__value; } + changed() { + this.graph.recordDamage(this); + } + update(newValue: unknown) { if (!this.valuesEqual(this.__value, newValue)) { - this.graph.recordDamage(this); + this.changed(); this.__value = newValue; } } diff --git a/packages/ws-relay/src/index.ts b/packages/ws-relay/src/index.ts index 3c73a72..53d02e2 100644 --- a/packages/ws-relay/src/index.ts +++ b/packages/ws-relay/src/index.ts @@ -6,6 +6,7 @@ import { Bytes, Dataflow, Embedded, + IdentitySet, Observe, QuasiValue as Q, Ref, @@ -110,7 +111,7 @@ export function boot(ds: Ref, debug: boolean = false) { ws.binaryType = 'arraybuffer'; ws.onopen = () => facet.turn(() => succeed(ws)); ws.onclose = () => facet.turn(() => fail(Symbol.for('closed'))); - ws.onerror = (e) => facet.turn(() => fail(Symbol.for('websocket-error-event'))); + ws.onerror = () => facet.turn(() => fail(Symbol.for('websocket-error-event'))); } catch (e) { console.error('Failed opening websocket', addr.url, e); fail(Symbol.for('websocket-exception')); @@ -127,36 +128,26 @@ export function boot(ds: Ref, debug: boolean = false) { }) }) => { const route = Q.drop_lit(routePatValue, G.toRoute); if (!route) return; - const candidates: Dataflow.Field[] = []; + field candidates: IdentitySet = new IdentitySet(); route.transports.forEach(t => { const addr = T.toWebSocket(t); if (!addr) return; - let counter = 0; - field state: TransportState | null = null; - candidates.push(state); console.log('tracking', addr.url); during G.TransportConnection({ "addr": addr, - "control": $control: Embedded, - "resolved": $resolved: G.Resolved, + "control": $control_e: Embedded, + "resolved": G.Resolved.accepted($peer_e: Embedded), }) => { - const me = counter++; - switch (resolved._variant) { - case "accepted": - state.value = { - addr, - control: control.embeddedValue, - peer: resolved.responderSession, - }; - break; - case "Rejected": - state.value = null; - break; - } + const entry = { + addr, + control: control_e.embeddedValue, + peer: peer_e.embeddedValue, + }; + candidates.value.add(entry); + candidates.changed(); on stop { - if (counter === me) { - state.value = null; - } + candidates.value.delete(entry); + candidates.changed(); } } }); @@ -164,22 +155,20 @@ export function boot(ds: Ref, debug: boolean = false) { field rootPeer: Ref | null = null; dataflow { best.value = null; - for (const c of candidates) { - if (c.value !== null) { - if (best.value === null) best.value = c.value; - } + for (const c of candidates.value) { + best.value = c; + break; } rootPeer.value = best.value?.peer ?? null; } resolve(() => rootPeer.value, route.pathSteps, (r) => { - const s = best.value!; - console.log('leaf', s.addr.url, stringify(r)); + console.log('leaf', best.value?.addr?.url); assert G.ResolvePath({ "route": route, - "addr": fromJS(s.addr), - "control": s.control, - "resolved": r, - }); + "addr": fromJS(best.value!.addr), + "control": best.value!.control, + "resolved": r()! + }) when (r()); }); } } @@ -188,13 +177,13 @@ export function boot(ds: Ref, debug: boolean = false) { function resolve( e: () => Ref | null, steps: G.PathStep[], - k: (r: G.Resolved) => void, + k: (r: () => G.Resolved | null) => void, ) { if (steps.length === 0) { - const peer = e(); - k(peer === null - ? G.Resolved.Rejected(G.Rejected(Symbol.for('not-connected'))) - : G.Resolved.accepted(peer)); + k(() => { + const peer = e(); + return peer === null ? null : G.Resolved.accepted(peer); + }); } else { const [step, ...more] = steps; at ds { @@ -208,7 +197,7 @@ export function boot(ds: Ref, debug: boolean = false) { resolve(() => resolved.responderSession, more, k); break; case "Rejected": - k(resolved); + k(() => resolved); break; } }