syndicate-js/packages/core/src/transport/membrane.ts

308 lines
11 KiB
TypeScript

/// SPDX-License-Identifier: GPL-3.0-or-later
/// SPDX-FileCopyrightText: Copyright © 2016-2024 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
import { Actor, Ref, Handle, Assertion, Entity, Turn } from '../runtime/actor.js';
import { Value, Embedded, mapEmbeddeds, IdentityMap, Dictionary, stringify } from '@preserves/core';
import * as IO from '../gen/protocol.js';
import { fromCaveat, WireRef } from '../gen/sturdy.js';
import { attenuate } from '../runtime/rewrite.js';
export class WireSymbol {
count = 0;
constructor (
public side: Membrane,
public oid: IO.Oid,
public ref: Ref,
) {}
drop(): void {
this.count--;
if (this.count === 0) {
this.side.byOid.delete(this.oid);
this.side.byRef.delete(this.ref);
}
}
};
export type WhichTable = "byOid" | "byRef";
export class Membrane {
readonly byOid = new IdentityMap<IO.Oid, WireSymbol>();
readonly byRef = new IdentityMap<Ref, WireSymbol>();
grab<Table extends WhichTable>(table: Table,
key: Parameters<Membrane[Table]['get']>[0],
transient: boolean,
f: () => WireSymbol): WireSymbol;
grab<Table extends WhichTable>(table: Table,
key: Parameters<Membrane[Table]['get']>[0],
transient: boolean,
f: null): WireSymbol | null;
grab<Table extends WhichTable>(table: Table,
key: Parameters<Membrane[Table]['get']>[0],
transient: boolean,
f: (() => WireSymbol) | null): WireSymbol | null {
let e = this[table].get(key as any);
if (e === void 0) {
if (f === null) return null;
e = f();
this.byRef.set(e.ref, e);
this.byOid.set(e.oid, e);
}
if (!transient) e.count++;
return e;
}
}
export const INERT_REF = new Ref(
Actor.boot(() => Turn.active.stop(Turn.activeFacet)).root,
{});
export interface ProxyInbound {
proxyPacket(packet: IO.Packet<Embedded<WireRef>>): void;
}
export interface ProxyOutbound {
send(remoteOid: IO.Oid, event: IO.Event<Embedded<WireRef>>): void;
proxyAssertion(targetRemoteOid: IO.Oid, assertion: Assertion, handle: Handle): Value<Embedded<WireRef>>;
proxyRetract(handle: Handle): void;
proxyMessage(assertion: Assertion): Value<Embedded<WireRef>>;
proxySync(targetRemoteOid: IO.Oid, peer: Ref): Embedded<WireRef>;
}
export abstract class LayerBoundary implements ProxyOutbound, ProxyInbound {
readonly inboundAssertions = new IdentityMap<Handle, {
localHandle: Handle,
pins: Array<WireSymbol>,
}>();
readonly outboundAssertions = new IdentityMap<Handle, Array<WireSymbol>>();
readonly exported = new Membrane();
readonly imported = new Membrane();
constructor(public trustPeer = true, public nextLocalOid: IO.Oid = 0) {}
abstract send(remoteOid: IO.Oid, event: IO.Event<Embedded<WireRef>>): void;
proxyAssertion(targetRemoteOid: IO.Oid, assertion: Assertion, handle: Handle): Value<Embedded<WireRef>> {
const pins: Array<WireSymbol> = [];
const rewritten = mapEmbeddeds(assertion, r => this.rewriteRefOut(r, false, pins));
this.grabImportedOid(targetRemoteOid, pins);
this.outboundAssertions.set(handle, pins);
return rewritten;
}
proxyRetract(handle: Handle): void {
(this.outboundAssertions.get(handle) ?? []).forEach(e => e.drop());
this.outboundAssertions.delete(handle);
}
proxyMessage(assertion: Assertion): Value<Embedded<WireRef>> {
const pins: Array<WireSymbol> = [];
return mapEmbeddeds(assertion, r => this.rewriteRefOut(r, true, pins));
}
proxySync(targetRemoteOid: IO.Oid, peer: Ref): Embedded<WireRef> {
const peerEntity = new SyncPeerEntity(peer);
this.grabImportedOid(targetRemoteOid, peerEntity.pins);
return this.rewriteRefOut(Turn.ref(peerEntity), false, peerEntity.pins);
}
grabImportedOid(oid: IO.Oid, pins: Array<WireSymbol>): void {
const e = this.imported.grab("byOid", oid, false, null);
if (e === null) {
throw new Error("Internal error: import table missing entry for oid " + oid);
}
pins.push(e);
}
grabExportedOid(oid: IO.Oid, pins: Array<WireSymbol>): Ref {
const e = this.exported.grab("byOid", oid, false, null);
if (e === null) return INERT_REF;
pins.push(e);
return e.ref;
}
rewriteRefOut(r: Ref, transient: boolean, pins: Array<WireSymbol>): Embedded<WireRef> {
if (r.target instanceof ProxyEntity && r.target.relay === this) {
if (r.attenuation === void 0 || r.attenuation.length === 0) {
// No extra conditions on this reference since it was sent to us.
this.grabImportedOid(r.target.oid, pins);
return new Embedded<WireRef>(WireRef.yours({ oid: r.target.oid, attenuation: [] }));
} else {
// This reference has been attenuated since it was sent to us.
// Do we trust the peer to enforce such attenuation on our behalf?
if (this.trustPeer) {
this.grabImportedOid(r.target.oid, pins);
return new Embedded<WireRef>(WireRef.yours({ oid: r.target.oid, attenuation: r.attenuation }));
} else {
// fall through: treat the attenuated ref as a local ref, and re-export it.
}
}
}
const e = this.exported.grab(
"byRef", r, transient, () => {
if (transient) throw new Error("Cannot send transient reference");
return new WireSymbol(this.exported, this.nextLocalOid++, r);
});
pins.push(e);
return new Embedded<WireRef>(WireRef.mine(e.oid));
}
rewriteRefIn(nw: Embedded<WireRef>, pins: Array<WireSymbol>): Ref {
const n = nw.value;
switch (n._variant) {
case 'yours': {
const e = this.exported.grab("byOid", n.oid, false, null);
if (e === null) {
return INERT_REF;
} else {
pins.push(e);
const r = e.ref;
if (n.attenuation.length === 0) {
return r;
} else {
type AttenuatedRef = Ref & { __attenuations?: Dictionary<any, Ref> };
const ar = r as AttenuatedRef;
if (ar.__attenuations === void 0) {
ar.__attenuations = new Dictionary();
}
return ar.__attenuations.getOrSet(n.attenuation.map(fromCaveat), () =>
attenuate(r, ... n.attenuation));
}
}
}
case 'mine': {
const e = this.imported.grab("byOid", n.oid, false, () =>
new WireSymbol(this.imported, n.oid, Turn.ref(new ProxyEntity(this, n.oid))));
pins.push(e);
return e.ref;
}
}
}
rewriteIn(a: Value<Embedded<WireRef>>): [Assertion, Array<WireSymbol>]
{
const pins: Array<WireSymbol> = [];
const rewritten = mapEmbeddeds(a, r => this.rewriteRefIn(r, pins));
return [rewritten, pins];
}
handle(localOid: IO.Oid, m: IO.Event<Embedded<WireRef>>): void {
switch (m._variant) {
case 'Assert': {
const [a, pins] = this.rewriteIn(m.value.assertion);
const r = this.grabExportedOid(localOid, pins);
this.inboundAssertions.set(m.value.handle, {
localHandle: Turn.active.assert(r, a),
pins,
});
break;
}
case 'Retract': {
const remoteHandle = m.value.handle;
const h = this.inboundAssertions.get(remoteHandle);
if (h === void 0) throw new Error(`Peer retracted invalid handle ${remoteHandle}`);
this.inboundAssertions.delete(remoteHandle);
h.pins.forEach(e => e.drop());
Turn.active.retract(h.localHandle);
break;
}
case 'Message': {
const [a, pins] = this.rewriteIn(m.value.body);
if (pins.length > 0) throw new Error("Cannot receive transient reference");
const r = this.exported.byOid.get(localOid)?.ref;
if (r) Turn.active.message(r, a);
break;
}
case 'Sync': {
const pins: Array<WireSymbol> = [];
const r = this.grabExportedOid(localOid, pins);
const k = this.rewriteRefIn(m.value.peer, pins);
Turn.active.sync(r).then(() => {
Turn.active.message(k, true);
pins.forEach(e => e.drop());
});
break;
}
}
}
proxyPacket(packet: IO.Packet<Embedded<WireRef>>): void {
switch (packet._variant) {
case 'Turn':
packet.value.forEach(v => this.handle(v.oid, v.event));
break;
case 'Error':
throw new Error(`Remote peer terminated relay: ${stringify(packet.value)}`);
case 'Extension':
// Ignore unknown extensions.
break;
}
}
}
export class ProxyEntity implements Entity {
readonly relay: ProxyOutbound;
readonly oid: IO.Oid;
constructor(relay: ProxyOutbound, oid: IO.Oid) {
this.relay = relay;
this.oid = oid;
}
send(m: IO.Event<Embedded<WireRef>>): void {
this.relay.send(this.oid, m);
}
assert(assertion: Assertion, handle: Handle): void {
this.send(IO.Event.Assert(IO.Assert({
assertion: this.relay.proxyAssertion(this.oid, assertion, handle),
handle
})))
}
retract(handle: Handle): void {
this.relay.proxyRetract(handle);
this.send(IO.Event.Retract(IO.Retract(handle)));
}
message(body: Assertion): void {
this.send(IO.Event.Message(IO.Message(this.relay.proxyMessage(body))));
}
sync(peer: Ref): void {
this.send(IO.Event.Sync(IO.Sync(this.relay.proxySync(this.oid, peer))));
}
}
export class SyncPeerEntity implements Entity {
readonly peer: Ref;
readonly handleMap = new IdentityMap<Handle, Handle>();
pins: Array<WireSymbol> = [];
constructor(peer: Ref) {
this.peer = peer;
}
assert(assertion: Assertion, handle: Handle): void {
this.handleMap.set(handle, Turn.active.assert(this.peer, assertion));
}
retract(handle: Handle): void {
Turn.active.retract(this.handleMap.get(handle)!);
this.handleMap.delete(handle);
}
message(body: Assertion): void {
// We get to vanish from the indexes now
this.pins.forEach(e => e.drop());
Turn.active.message(this.peer, body);
}
sync(peer: Ref): void {
Turn.active._sync(this.peer, peer);
}
}