diff --git a/packages/core/src/transport/membrane.ts b/packages/core/src/transport/membrane.ts new file mode 100644 index 0000000..077fd64 --- /dev/null +++ b/packages/core/src/transport/membrane.ts @@ -0,0 +1,308 @@ +/// SPDX-License-Identifier: GPL-3.0-or-later +/// SPDX-FileCopyrightText: Copyright © 2016-2024 Tony Garnock-Jones + +import { Actor, Ref, Handle, Assertion, Entity, Turn } from '../runtime/actor.js'; +import { Value, Embedded, mapEmbeddeds, IdentityMap, Dictionary, stringify } from '@preserves/core'; +import * as IO from '../gen/protocol.js'; +import { fromCaveat, WireRef } from '../gen/sturdy.js'; +import { attenuate } from '../runtime/rewrite.js'; + +export class WireSymbol { + count = 0; + + constructor ( + public side: Membrane, + public oid: IO.Oid, + public ref: Ref, + ) {} + + drop(): void { + this.count--; + if (this.count === 0) { + this.side.byOid.delete(this.oid); + this.side.byRef.delete(this.ref); + } + } +}; + +export type WhichTable = "byOid" | "byRef"; + +export class Membrane { + readonly byOid = new IdentityMap(); + readonly byRef = new IdentityMap(); + + grab(table: Table, + key: Parameters[0], + transient: boolean, + f: () => WireSymbol): WireSymbol; + grab
(table: Table, + key: Parameters[0], + transient: boolean, + f: null): WireSymbol | null; + grab
(table: Table, + key: Parameters[0], + transient: boolean, + f: (() => WireSymbol) | null): WireSymbol | null { + let e = this[table].get(key as any); + if (e === void 0) { + if (f === null) return null; + e = f(); + this.byRef.set(e.ref, e); + this.byOid.set(e.oid, e); + } + if (!transient) e.count++; + return e; + } +} + +export const INERT_REF: Ref = { + relay: Actor.boot(() => Turn.active.stop(Turn.activeFacet)).root, + target: {}, +}; + +export interface ProxyInbound { + proxyPacket(packet: IO.Packet>): void; +} + +export interface ProxyOutbound { + send(remoteOid: IO.Oid, event: IO.Event>): void; + proxyAssertion(targetRemoteOid: IO.Oid, assertion: Assertion, handle: Handle): Value>; + proxyRetract(handle: Handle): void; + proxyMessage(assertion: Assertion): Value>; + proxySync(targetRemoteOid: IO.Oid, peer: Ref): Embedded; +} + +export abstract class LayerBoundary implements ProxyOutbound, ProxyInbound { + readonly inboundAssertions = new IdentityMap, + }>(); + readonly outboundAssertions = new IdentityMap>(); + readonly exported = new Membrane(); + readonly imported = new Membrane(); + + constructor(public trustPeer = true, public nextLocalOid: IO.Oid = 0) {} + + abstract send(remoteOid: IO.Oid, event: IO.Event>): void; + + proxyAssertion(targetRemoteOid: IO.Oid, assertion: Assertion, handle: Handle): Value> { + const pins: Array = []; + const rewritten = mapEmbeddeds(assertion, r => this.rewriteRefOut(r, false, pins)); + this.grabImportedOid(targetRemoteOid, pins); + this.outboundAssertions.set(handle, pins); + return rewritten; + } + + proxyRetract(handle: Handle): void { + (this.outboundAssertions.get(handle) ?? []).forEach(e => e.drop()); + this.outboundAssertions.delete(handle); + } + + proxyMessage(assertion: Assertion): Value> { + const pins: Array = []; + return mapEmbeddeds(assertion, r => this.rewriteRefOut(r, true, pins)); + } + + proxySync(targetRemoteOid: IO.Oid, peer: Ref): Embedded { + const peerEntity = new SyncPeerEntity(peer); + this.grabImportedOid(targetRemoteOid, peerEntity.pins); + return this.rewriteRefOut(Turn.ref(peerEntity), false, peerEntity.pins); + } + + grabImportedOid(oid: IO.Oid, pins: Array): void { + const e = this.imported.grab("byOid", oid, false, null); + if (e === null) { + throw new Error("Internal error: import table missing entry for oid " + oid); + } + pins.push(e); + } + + grabExportedOid(oid: IO.Oid, pins: Array): Ref { + const e = this.exported.grab("byOid", oid, false, null); + if (e === null) return INERT_REF; + pins.push(e); + return e.ref; + } + + rewriteRefOut(r: Ref, transient: boolean, pins: Array): Embedded { + if (r.target instanceof ProxyEntity && r.target.relay === this) { + if (r.attenuation === void 0 || r.attenuation.length === 0) { + // No extra conditions on this reference since it was sent to us. + this.grabImportedOid(r.target.oid, pins); + return new Embedded(WireRef.yours({ oid: r.target.oid, attenuation: [] })); + } else { + // This reference has been attenuated since it was sent to us. + // Do we trust the peer to enforce such attenuation on our behalf? + if (this.trustPeer) { + this.grabImportedOid(r.target.oid, pins); + return new Embedded(WireRef.yours({ oid: r.target.oid, attenuation: r.attenuation })); + } else { + // fall through: treat the attenuated ref as a local ref, and re-export it. + } + } + } + + const e = this.exported.grab( + "byRef", r, transient, () => { + if (transient) throw new Error("Cannot send transient reference"); + return new WireSymbol(this.exported, this.nextLocalOid++, r); + }); + pins.push(e); + return new Embedded(WireRef.mine(e.oid)); + } + + rewriteRefIn(nw: Embedded, pins: Array): Ref { + const n = nw.value; + switch (n._variant) { + case 'yours': { + const e = this.exported.grab("byOid", n.oid, false, null); + if (e === null) { + return INERT_REF; + } else { + pins.push(e); + const r = e.ref; + if (n.attenuation.length === 0) { + return r; + } else { + type AttenuatedRef = Ref & { __attenuations?: Dictionary }; + const ar = r as AttenuatedRef; + if (ar.__attenuations === void 0) { + ar.__attenuations = new Dictionary(); + } + return ar.__attenuations.getOrSet(n.attenuation.map(fromCaveat), () => + attenuate(r, ... n.attenuation)); + } + } + } + case 'mine': { + const e = this.imported.grab("byOid", n.oid, false, () => + new WireSymbol(this.imported, n.oid, Turn.ref(new ProxyEntity(this, n.oid)))); + pins.push(e); + return e.ref; + } + } + } + + rewriteIn(a: Value>): [Assertion, Array] + { + const pins: Array = []; + const rewritten = mapEmbeddeds(a, r => this.rewriteRefIn(r, pins)); + return [rewritten, pins]; + } + + handle(localOid: IO.Oid, m: IO.Event>): void { + switch (m._variant) { + case 'Assert': { + const [a, pins] = this.rewriteIn(m.value.assertion); + const r = this.grabExportedOid(localOid, pins); + this.inboundAssertions.set(m.value.handle, { + localHandle: Turn.active.assert(r, a), + pins, + }); + break; + } + case 'Retract': { + const remoteHandle = m.value.handle; + const h = this.inboundAssertions.get(remoteHandle); + if (h === void 0) throw new Error(`Peer retracted invalid handle ${remoteHandle}`); + this.inboundAssertions.delete(remoteHandle); + h.pins.forEach(e => e.drop()); + Turn.active.retract(h.localHandle); + break; + } + case 'Message': { + const [a, pins] = this.rewriteIn(m.value.body); + if (pins.length > 0) throw new Error("Cannot receive transient reference"); + const r = this.exported.byOid.get(localOid)?.ref; + if (r) Turn.active.message(r, a); + break; + } + case 'Sync': { + const pins: Array = []; + const r = this.grabExportedOid(localOid, pins); + const k = this.rewriteRefIn(m.value.peer, pins); + Turn.active.sync(r).then(() => { + Turn.active.message(k, true); + pins.forEach(e => e.drop()); + }); + break; + } + } + } + + proxyPacket(packet: IO.Packet>): void { + switch (packet._variant) { + case 'Turn': + packet.value.forEach(v => this.handle(v.oid, v.event)); + break; + case 'Error': + throw new Error(`Remote peer terminated relay: ${stringify(packet.value)}`); + case 'Extension': + // Ignore unknown extensions. + break; + } + } +} + +export class ProxyEntity implements Entity { + readonly relay: ProxyOutbound; + readonly oid: IO.Oid; + + constructor(relay: ProxyOutbound, oid: IO.Oid) { + this.relay = relay; + this.oid = oid; + } + + send(m: IO.Event>): void { + this.relay.send(this.oid, m); + } + + assert(assertion: Assertion, handle: Handle): void { + this.send(IO.Event.Assert(IO.Assert({ + assertion: this.relay.proxyAssertion(this.oid, assertion, handle), + handle + }))) + } + + retract(handle: Handle): void { + this.relay.proxyRetract(handle); + this.send(IO.Event.Retract(IO.Retract(handle))); + } + + message(body: Assertion): void { + this.send(IO.Event.Message(IO.Message(this.relay.proxyMessage(body)))); + } + + sync(peer: Ref): void { + this.send(IO.Event.Sync(IO.Sync(this.relay.proxySync(this.oid, peer)))); + } +} + +export class SyncPeerEntity implements Entity { + readonly peer: Ref; + readonly handleMap = new IdentityMap(); + pins: Array = []; + + constructor(peer: Ref) { + this.peer = peer; + } + + assert(assertion: Assertion, handle: Handle): void { + this.handleMap.set(handle, Turn.active.assert(this.peer, assertion)); + } + + retract(handle: Handle): void { + Turn.active.retract(this.handleMap.get(handle)!); + this.handleMap.delete(handle); + } + + message(body: Assertion): void { + // We get to vanish from the indexes now + this.pins.forEach(e => e.drop()); + Turn.active.message(this.peer, body); + } + + sync(peer: Ref): void { + Turn.active._sync(this.peer, peer); + } +} diff --git a/packages/core/src/transport/protocol.ts b/packages/core/src/transport/protocol.ts index 0c927a7..f5602c1 100644 --- a/packages/core/src/transport/protocol.ts +++ b/packages/core/src/transport/protocol.ts @@ -2,7 +2,7 @@ /// SPDX-FileCopyrightText: Copyright © 2016-2024 Tony Garnock-Jones import * as S from '../gen/sturdy.js'; -import { Decoder, DecoderState, Encoder, EncoderState, GenericEmbedded, neverEmbeddedType, EmbeddedType, Value, Embedded, EmbeddedWriter } from '@preserves/core'; +import { Decoder, DecoderState, Encoder, EncoderState, neverEmbeddedType, EmbeddedType, Value, Embedded, EmbeddedWriter } from '@preserves/core'; export const wireRefEmbeddedType: EmbeddedType> & EmbeddedWriter> = { decode(s: DecoderState): Embedded { @@ -13,11 +13,11 @@ export const wireRefEmbeddedType: EmbeddedType> & EmbeddedWr new Encoder(s, neverEmbeddedType).push(S.fromWireRef(v.value)); }, - fromValue(v: Value): Embedded { + fromValue(v: Value): Embedded { return new Embedded(S.asWireRef(v as Value)); }, - toValue(v: Embedded): Value { - return S.fromWireRef(v.value) as Value; + toValue(v: Embedded): Value { + return S.fromWireRef(v.value) as Value; } }; diff --git a/packages/core/src/transport/relay.ts b/packages/core/src/transport/relay.ts index 6651d8e..7aaf857 100644 --- a/packages/core/src/transport/relay.ts +++ b/packages/core/src/transport/relay.ts @@ -1,136 +1,15 @@ /// SPDX-License-Identifier: GPL-3.0-or-later /// SPDX-FileCopyrightText: Copyright © 2016-2024 Tony Garnock-Jones -import { Actor, Assertion, Entity, Facet, Handle, Ref, Turn } from '../runtime/actor.js'; -import { BytesLike, Decoder, Dictionary, encode, IdentityMap, mapEmbeddeds, Embedded, stringify, underlying, Value } from '@preserves/core'; +import { Assertion, Facet, Ref, Turn } from '../runtime/actor.js'; +import { BytesLike, Decoder, encode, Embedded, stringify, underlying } from '@preserves/core'; import * as IO from '../gen/protocol.js'; import { wireRefEmbeddedType } from './protocol.js'; -import { attenuate } from '../runtime/rewrite.js'; -import { fromCaveat, WireRef } from '../gen/sturdy.js'; +import { WireRef } from '../gen/sturdy.js'; +import { LayerBoundary } from './membrane.js'; const FLUSH = Symbol.for('flush'); -export class WireSymbol { - count = 0; - - constructor ( - public side: Membrane, - public oid: IO.Oid, - public ref: Ref, - ) {} - - drop(): void { - this.count--; - if (this.count === 0) { - this.side.byOid.delete(this.oid); - this.side.byRef.delete(this.ref); - } - } -}; - -export class SyncPeerEntity implements Entity { - readonly relay: Relay; - readonly peer: Ref; - readonly handleMap = new IdentityMap(); - pins: Array = []; - - constructor(relay: Relay, peer: Ref) { - this.relay = relay; - this.peer = peer; - } - - assert(assertion: Assertion, handle: Handle): void { - this.handleMap.set(handle, Turn.active.assert(this.peer, assertion)); - } - - retract(handle: Handle): void { - Turn.active.retract(this.handleMap.get(handle)!); - this.handleMap.delete(handle); - } - - message(body: Assertion): void { - // We get to vanish from the indexes now - this.pins.forEach(e => e.drop()); - Turn.active.message(this.peer, body); - } - - sync(peer: Ref): void { - Turn.active._sync(this.peer, peer); - } -} - -export class RelayEntity implements Entity { - readonly relay: Relay; - readonly oid: IO.Oid; - - constructor(relay: Relay, oid: IO.Oid) { - this.relay = relay; - this.oid = oid; - } - - send(m: IO.Event>): void { - this.relay.send(this.oid, m); - } - - assert(assertion: Assertion, handle: Handle): void { - this.send(IO.Event.Assert(IO.Assert({ - assertion: this.relay.register(this.oid, assertion, handle), - handle - }))) - } - - retract(handle: Handle): void { - this.relay.deregister(handle); - this.send(IO.Event.Retract(IO.Retract(handle))); - } - - message(body: Assertion): void { - this.send(IO.Event.Message(IO.Message(this.relay.register(null, body, null)))); - } - - sync(peer: Ref): void { - const peerEntity = new SyncPeerEntity(this.relay, peer); - this.relay.grabImportedOid(this.oid, peerEntity.pins); - const ior = this.relay.rewriteRefOut(Turn.ref(peerEntity), false, peerEntity.pins); - this.send(IO.Event.Sync(IO.Sync(ior))); - } -} - -export type WhichTable = "byOid" | "byRef"; - -export class Membrane { - readonly byOid = new IdentityMap(); - readonly byRef = new IdentityMap(); - - grab
(table: Table, - key: Parameters[0], - transient: boolean, - f: () => WireSymbol): WireSymbol; - grab
(table: Table, - key: Parameters[0], - transient: boolean, - f: null): WireSymbol | null; - grab
(table: Table, - key: Parameters[0], - transient: boolean, - f: (() => WireSymbol) | null): WireSymbol | null { - let e = this[table].get(key as any); - if (e === void 0) { - if (f === null) return null; - e = f(); - this.byRef.set(e.ref, e); - this.byOid.set(e.oid, e); - } - if (!transient) e.count++; - return e; - } -} - -export const INERT_REF: Ref = { - relay: Actor.boot(() => Turn.active.stop(Turn.activeFacet)).root, - target: {}, -}; - export type PacketWriter = (bs: Uint8Array) => void; export interface RelayOptions { @@ -143,22 +22,13 @@ export interface RelayOptions { nextLocalOid?: IO.Oid; } -export class Relay { +export class Relay extends LayerBoundary { readonly facet: Facet; readonly selfRef: Ref; readonly w: PacketWriter; - readonly inboundAssertions = new IdentityMap, - }>(); - readonly outboundAssertions = new IdentityMap>(); - readonly exported = new Membrane(); - readonly imported = new Membrane(); readonly peer: Ref | null; - nextLocalOid: IO.Oid = 0; pendingTurn: IO.Turn> = []; debug: boolean; - trustPeer: boolean; readonly decoder = new Decoder(void 0, { includeAnnotations: false, @@ -166,11 +36,12 @@ export class Relay { }); constructor(options: RelayOptions) { + super(options.trustPeer, options.nextLocalOid); + this.facet = Turn.activeFacet; this.selfRef = Turn.ref(this); this.w = options.packetWriter; this.debug = options.debug ?? false; - this.trustPeer = options.trustPeer ?? true; this.facet.preventInertCheck(); options.setup(this); @@ -182,109 +53,11 @@ export class Relay { this.peer = (options.initialOid !== void 0) ? this.rewriteRefIn(new Embedded(WireRef.mine(options.initialOid)), []) : null; - - if (options.nextLocalOid !== void 0) { - this.nextLocalOid = (options.nextLocalOid === 0) ? 1 : options.nextLocalOid; - } - } - - register(targetRemoteOid: IO.Oid, assertion: Assertion, handle: Handle): Value>; - register(targetRemoteOid: null, assertion: Assertion, handle: null): Value>; - register(targetRemoteOid: IO.Oid | null, assertion: Assertion, handle: Handle | null): Value> { - const transient = (handle === null); - const pins: Array = []; - const rewritten = mapEmbeddeds(assertion, r => this.rewriteRefOut(r, transient, pins)); - if (handle !== null) { - if (targetRemoteOid !== null /* belt and suspenders */) { - this.grabImportedOid(targetRemoteOid, pins); - } - this.outboundAssertions.set(handle, pins); - } - return rewritten; - } - - deregister(handle: Handle): void { - (this.outboundAssertions.get(handle) ?? []).forEach(e => e.drop()); - this.outboundAssertions.delete(handle); - } - - grabImportedOid(oid: IO.Oid, pins: Array) { - const e = this.imported.grab("byOid", oid, false, null); - if (e === null) { - throw new Error("Internal error: import table missing entry for oid " + oid); - } - pins.push(e); - } - - grabExportedOid(oid: IO.Oid, pins: Array): Ref { - const e = this.exported.grab("byOid", oid, false, null); - if (e === null) return INERT_REF; - pins.push(e); - return e.ref; - } - - rewriteRefOut(r: Ref, transient: boolean, pins: Array): Embedded { - if (r.target instanceof RelayEntity && r.target.relay === this) { - if (r.attenuation === void 0 || r.attenuation.length === 0) { - // No extra conditions on this reference since it was sent to us. - this.grabImportedOid(r.target.oid, pins); - return new Embedded(WireRef.yours({ oid: r.target.oid, attenuation: [] })); - } else { - // This reference has been attenuated since it was sent to us. - // Do we trust the peer to enforce such attenuation on our behalf? - if (this.trustPeer) { - this.grabImportedOid(r.target.oid, pins); - return new Embedded(WireRef.yours({ oid: r.target.oid, attenuation: r.attenuation })); - } else { - // fall through: treat the attenuated ref as a local ref, and re-export it. - } - } - } - - const e = this.exported.grab( - "byRef", r, transient, () => { - if (transient) throw new Error("Cannot send transient reference"); - return new WireSymbol(this.exported, this.nextLocalOid++, r); - }); - pins.push(e); - return new Embedded(WireRef.mine(e.oid)); - } - - rewriteRefIn(nw: Embedded, pins: Array): Ref { - const n = nw.value; - switch (n._variant) { - case 'yours': { - const e = this.exported.grab("byOid", n.oid, false, null); - if (e === null) { - return INERT_REF; - } else { - pins.push(e); - const r = e.ref; - if (n.attenuation.length === 0) { - return r; - } else { - type AttenuatedRef = Ref & { __attenuations?: Dictionary }; - const ar = r as AttenuatedRef; - if (ar.__attenuations === void 0) { - ar.__attenuations = new Dictionary(); - } - return ar.__attenuations.getOrSet(n.attenuation.map(fromCaveat), () => - attenuate(r, ... n.attenuation)); - } - } - } - case 'mine': { - const e = this.imported.grab("byOid", n.oid, false, () => - new WireSymbol(this.imported, n.oid, Turn.ref(new RelayEntity(this, n.oid)))); - pins.push(e); - return e.ref; - } - } } message(body: Assertion) { if (body === FLUSH) { - if (this.debug) console.log('OUT', stringify(IO.fromTurn(this.pendingTurn)), this.pendingTurn); + if (this.debug) console.log('OUT', stringify(IO.fromTurn(this.pendingTurn))); this.w(underlying(encode(IO.fromTurn(this.pendingTurn), { canonical: true, embeddedEncode: wireRefEmbeddedType, @@ -301,63 +74,16 @@ export class Relay { } accept(bs: BytesLike): void { - Turn.for(this.facet, () => { + this.facet.turn(() => { this.decoder.write(bs); while (true) { - const rawTurn = this.decoder.try_next(); - if (rawTurn === void 0) break; - const wireTurn = IO.toTurn(rawTurn); - if (wireTurn === void 0) throw new Error("Bad IO.Turn"); - if (this.debug) console.log('IN', stringify(rawTurn)); - wireTurn.forEach(v => this.handle(v.oid, v.event)); + const rawPacket = this.decoder.try_next(); + if (rawPacket === void 0) break; + const wirePacket = IO.toPacket(rawPacket); + if (wirePacket === void 0) throw new Error("Bad IO.Packet"); + if (this.debug) console.log('IN', stringify(rawPacket)); + this.proxyPacket(wirePacket); } }); } - - rewriteIn(a: Value>): [Assertion, Array] - { - const pins: Array = []; - const rewritten = mapEmbeddeds(a, r => this.rewriteRefIn(r, pins)); - return [rewritten, pins]; - } - - handle(localOid: IO.Oid, m: IO.Event>) { - switch (m._variant) { - case 'Assert': { - const [a, pins] = this.rewriteIn(m.value.assertion); - const r = this.grabExportedOid(localOid, pins); - this.inboundAssertions.set(m.value.handle, { - localHandle: Turn.active.assert(r, a), - pins, - }); - break; - } - case 'Retract': { - const remoteHandle = m.value.handle; - const h = this.inboundAssertions.get(remoteHandle); - if (h === void 0) throw new Error(`Peer retracted invalid handle ${remoteHandle}`); - this.inboundAssertions.delete(remoteHandle); - h.pins.forEach(e => e.drop()); - Turn.active.retract(h.localHandle); - break; - } - case 'Message': { - const [a, pins] = this.rewriteIn(m.value.body); - if (pins.length > 0) throw new Error("Cannot receive transient reference"); - const r = this.exported.byOid.get(localOid)?.ref; - if (r) Turn.active.message(r, a); - break; - } - case 'Sync': { - const pins: Array = []; - const r = this.grabExportedOid(localOid, pins); - const k = this.rewriteRefIn(m.value.peer, pins); - Turn.active.sync(r).then(() => { - Turn.active.message(k, true); - pins.forEach(e => e.drop()); - }); - break; - } - } - } }