From fe427a67be9b4ab7421f11d4953353875dea5d3e Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 11 Jan 2023 11:48:25 +0100 Subject: [PATCH] Repair relay pins, to pin target refs too --- packages/core/src/transport/protocol.ts | 4 - packages/core/src/transport/relay.ts | 190 ++++++++++++++---------- 2 files changed, 113 insertions(+), 81 deletions(-) diff --git a/packages/core/src/transport/protocol.ts b/packages/core/src/transport/protocol.ts index b5e039c..f5ceb0d 100644 --- a/packages/core/src/transport/protocol.ts +++ b/packages/core/src/transport/protocol.ts @@ -2,12 +2,8 @@ /// SPDX-FileCopyrightText: Copyright © 2016-2022 Tony Garnock-Jones import * as S from '../gen/sturdy.js'; -import { Oid } from '../gen/protocol.js'; -import { Ref } from '../runtime/actor.js'; import { Decoder, DecoderState, Encoder, EncoderState, GenericEmbedded, neverEmbeddedType, EmbeddedType, Value, EmbeddedWriter } from '@preserves/core'; -export type WireSymbol = { oid: Oid, ref: Ref, count: number }; - export const wireRefEmbeddedType: EmbeddedType & EmbeddedWriter = { decode(s: DecoderState): S.WireRef { return S.asWireRef(new Decoder(s).next()); diff --git a/packages/core/src/transport/relay.ts b/packages/core/src/transport/relay.ts index bacf98c..0f6c49b 100644 --- a/packages/core/src/transport/relay.ts +++ b/packages/core/src/transport/relay.ts @@ -4,16 +4,34 @@ import { Actor, Assertion, Entity, Facet, Handle, Ref, Turn } from '../runtime/actor.js'; import { BytesLike, Decoder, Dictionary, embed, encode, IdentityMap, mapEmbeddeds, stringify, underlying, Value } from '@preserves/core'; import * as IO from '../gen/protocol.js'; -import { wireRefEmbeddedType, WireSymbol } from './protocol.js'; +import { wireRefEmbeddedType } from './protocol.js'; import { queueTask } from '../runtime/task.js'; import { attenuate } from '../runtime/rewrite.js'; import { fromAttenuation, WireRef } from '../gen/sturdy.js'; +export class WireSymbol { + count = 0; + + constructor ( + public side: Membrane, + public oid: IO.Oid, + public ref: Ref, + ) {} + + drop(): void { + this.count--; + if (this.count === 0) { + this.side.byOid.delete(this.oid); + this.side.byRef.delete(this.ref); + } + } +}; + export class SyncPeerEntity implements Entity { readonly relay: Relay; readonly peer: Ref; readonly handleMap = new IdentityMap(); - e: WireSymbol | null = null; + pins: Array = []; constructor(relay: Relay, peer: Ref) { this.relay = relay; @@ -31,7 +49,7 @@ export class SyncPeerEntity implements Entity { message(body: Assertion): void { // We get to vanish from the indexes now - this.relay.releaseRefOut(this.e!); + this.pins.forEach(e => e.drop()); Turn.active.message(this.peer, body); } @@ -55,7 +73,7 @@ export class RelayEntity implements Entity { assert(assertion: Assertion, handle: Handle): void { this.send(IO.Event.Assert(IO.Assert({ - assertion: this.relay.register(assertion, handle), + assertion: this.relay.register(this.oid, assertion, handle), handle }))) } @@ -66,29 +84,38 @@ export class RelayEntity implements Entity { } message(body: Assertion): void { - this.send(IO.Event.Message(IO.Message(this.relay.register(body, null)))); + this.send(IO.Event.Message(IO.Message(this.relay.register(null, body, null)))); } sync(peer: Ref): void { const peerEntity = new SyncPeerEntity(this.relay, peer); - const exported: Array = []; - const ior = this.relay.rewriteRefOut(Turn.ref(peerEntity), false, exported); - peerEntity.e = exported[0]; + this.relay.grabImportedOid(this.oid, peerEntity.pins); + const ior = this.relay.rewriteRefOut(Turn.ref(peerEntity), false, peerEntity.pins); this.send(IO.Event.Sync(IO.Sync(ior))); } } +export type WhichTable = "byOid" | "byRef"; + export class Membrane { readonly byOid = new IdentityMap(); readonly byRef = new IdentityMap(); - grab(table: Table, - key: Parameters[0], - transient: boolean, - f: () => WireSymbol): WireSymbol - { + grab
(table: Table, + key: Parameters[0], + transient: boolean, + f: () => WireSymbol): WireSymbol; + grab
(table: Table, + key: Parameters[0], + transient: boolean, + f: null): WireSymbol | null; + grab
(table: Table, + key: Parameters[0], + transient: boolean, + f: (() => WireSymbol) | null): WireSymbol | null { let e = this[table].get(key as any); if (e === void 0) { + if (f === null) return null; e = f(); this.byRef.set(e.ref, e); this.byOid.set(e.oid, e); @@ -96,14 +123,6 @@ export class Membrane { if (!transient) e.count++; return e; } - - drop(e: WireSymbol): void { - e.count--; - if (e.count === 0) { - this.byOid.delete(e.oid); - this.byRef.delete(e.ref); - } - } } export const INERT_REF: Ref = { @@ -128,7 +147,7 @@ export class Relay { readonly w: PacketWriter; readonly inboundAssertions = new IdentityMap, + pins: Array, }>(); readonly outboundAssertions = new IdentityMap>(); readonly exported = new Membrane(); @@ -166,40 +185,52 @@ export class Relay { } } - rewriteOut(assertion: Assertion, transient: boolean): [Value, Array] - { - const exported: Array = []; - const rewritten = mapEmbeddeds(assertion, r => embed(this.rewriteRefOut(r, transient, exported))); - return [rewritten, exported]; - } - - rewriteIn(a: Value): [Assertion, Array] - { - const imported: Array = []; - const rewritten = mapEmbeddeds(a, r => embed(this.rewriteRefIn(r, imported))); - return [rewritten, imported]; - } - - register(assertion: Assertion, handle: Handle | null): Value { - const [rewritten, exported] = this.rewriteOut(assertion, handle === null); - if (handle !== null) this.outboundAssertions.set(handle, exported); + register(targetRemoteOid: IO.Oid, assertion: Assertion, handle: Handle): Value; + register(targetRemoteOid: null, assertion: Assertion, handle: null): Value; + register(targetRemoteOid: IO.Oid | null, assertion: Assertion, handle: Handle | null): Value { + const transient = (handle === null); + const pins: Array = []; + const rewritten = mapEmbeddeds(assertion, r => embed(this.rewriteRefOut(r, transient, pins))); + if (handle !== null) { + if (targetRemoteOid !== null /* belt and suspenders */) { + this.grabImportedOid(targetRemoteOid, pins); + } + this.outboundAssertions.set(handle, pins); + } return rewritten; } deregister(handle: Handle): void { - (this.outboundAssertions.get(handle) ?? []).forEach(e => this.releaseRefOut(e)); + (this.outboundAssertions.get(handle) ?? []).forEach(e => e.drop()); this.outboundAssertions.delete(handle); } - rewriteRefOut(r: Ref, transient: boolean, exported: Array): WireRef { + grabImportedOid(oid: IO.Oid, pins: Array) { + const e = this.imported.grab("byOid", oid, false, null); + if (e === null) { + throw new Error("Internal error: import table missing entry for oid " + oid); + } + pins.push(e); + } + + grabExportedOid(oid: IO.Oid, pins: Array): Ref { + const e = this.exported.grab("byOid", oid, false, null); + if (e === null) return INERT_REF; + pins.push(e); + return e.ref; + } + + rewriteRefOut(r: Ref, transient: boolean, pins: Array): WireRef { if (r.target instanceof RelayEntity && r.target.relay === this) { if (r.attenuation === void 0 || r.attenuation.length === 0) { // No extra conditions on this reference since it was sent to us. + this.grabImportedOid(r.target.oid, pins); return WireRef.yours({ oid: r.target.oid, attenuation: [] }); } else { // This reference has been attenuated since it was sent to us. // Do we trust the peer to enforce such attenuation on our behalf? if (this.trustPeer) { + this.grabImportedOid(r.target.oid, pins); return WireRef.yours({ oid: r.target.oid, attenuation: r.attenuation }); } else { // fall through: treat the attenuated ref as a local ref, and re-export it. @@ -210,36 +241,38 @@ export class Relay { const e = this.exported.grab( "byRef", r, transient, () => { if (transient) throw new Error("Cannot send transient reference"); - return { oid: this.nextLocalOid++, ref: r, count: 0 }; + return new WireSymbol(this.exported, this.nextLocalOid++, r); }); - exported.push(e); + pins.push(e); return WireRef.mine(e.oid); } - releaseRefOut(e: WireSymbol) { - this.exported.drop(e); - } - - rewriteRefIn(n: WireRef, imported: Array): Ref { + rewriteRefIn(n: WireRef, pins: Array): Ref { switch (n._variant) { case 'yours': { - const r = this.lookupLocal(n.oid); - if (n.attenuation.length === 0 || r === INERT_REF) { - return r; + const e = this.exported.grab("byOid", n.oid, false, null); + if (e === null) { + return INERT_REF; } else { - type AttenuatedRef = Ref & { __attenuations?: Dictionary }; - const ar = r as AttenuatedRef; - if (ar.__attenuations === void 0) { - ar.__attenuations = new Dictionary(); + pins.push(e); + const r = e.ref; + if (n.attenuation.length === 0) { + return r; + } else { + type AttenuatedRef = Ref & { __attenuations?: Dictionary }; + const ar = r as AttenuatedRef; + if (ar.__attenuations === void 0) { + ar.__attenuations = new Dictionary(); + } + return ar.__attenuations.getOrSet(fromAttenuation(n.attenuation), () => + attenuate(r, ... n.attenuation)); } - return ar.__attenuations.getOrSet(fromAttenuation(n.attenuation), () => - attenuate(r, ... n.attenuation)); } } case 'mine': { const e = this.imported.grab("byOid", n.oid, false, () => - ({ oid: n.oid, ref: Turn.ref(new RelayEntity(this, n.oid)), count: 0 })); - imported.push(e); + new WireSymbol(this.imported, n.oid, Turn.ref(new RelayEntity(this, n.oid)))); + pins.push(e); return e.ref; } } @@ -259,10 +292,6 @@ export class Relay { this.pendingTurn.push(IO.TurnEvent({ oid: remoteOid, event: m })); } - lookupLocal(localOid: IO.Oid): Ref { - return this.exported.byOid.get(localOid)?.ref ?? INERT_REF; - } - accept(bs: BytesLike): void { Turn.for(this.facet, () => { this.decoder.write(bs); @@ -272,21 +301,26 @@ export class Relay { const wireTurn = IO.toTurn(rawTurn); if (wireTurn === void 0) throw new Error("Bad IO.Turn"); if (this.debug) console.log('IN', stringify(rawTurn)); - wireTurn.forEach(v => { - const { oid: localOid, event: m } = v; - this.handle(this.lookupLocal(localOid), m); - }); + wireTurn.forEach(v => this.handle(v.oid, v.event)); } }); } - handle(r: Ref, m: IO.Event) { + rewriteIn(a: Value): [Assertion, Array] + { + const pins: Array = []; + const rewritten = mapEmbeddeds(a, r => embed(this.rewriteRefIn(r, pins))); + return [rewritten, pins]; + } + + handle(localOid: IO.Oid, m: IO.Event) { switch (m._variant) { case 'Assert': { - const [a, imported] = this.rewriteIn(m.value.assertion); + const [a, pins] = this.rewriteIn(m.value.assertion); + const r = this.grabExportedOid(localOid, pins); this.inboundAssertions.set(m.value.handle, { localHandle: Turn.active.assert(r, a), - imported, + pins, }); break; } @@ -295,22 +329,24 @@ export class Relay { const h = this.inboundAssertions.get(remoteHandle); if (h === void 0) throw new Error(`Peer retracted invalid handle ${remoteHandle}`); this.inboundAssertions.delete(remoteHandle); - h.imported.forEach(e => this.imported.drop(e)); + h.pins.forEach(e => e.drop()); Turn.active.retract(h.localHandle); break; } case 'Message': { - const [a, imported] = this.rewriteIn(m.value.body); - if (imported.length > 0) throw new Error("Cannot receive transient reference"); - Turn.active.message(r, a); + const [a, pins] = this.rewriteIn(m.value.body); + if (pins.length > 0) throw new Error("Cannot receive transient reference"); + const r = this.exported.byOid.get(localOid)?.ref; + if (r) Turn.active.message(r, a); break; } case 'Sync': { - const imported: Array = []; - const k = this.rewriteRefIn(m.value.peer, imported); + const pins: Array = []; + const r = this.grabExportedOid(localOid, pins); + const k = this.rewriteRefIn(m.value.peer, pins); Turn.active.sync(r).then(() => { Turn.active.message(k, true); - imported.forEach(e => this.imported.drop(e)); + pins.forEach(e => e.drop()); }); break; }