import { Actor, Assertion, Entity, Facet, Handle, Ref, Turn } from '../runtime/actor.js'; import { BytesLike, Decoder, Dictionary, embed, encode, IdentityMap, mapEmbeddeds, underlying, Value } from '@preserves/core'; import * as IO from '../gen/protocol.js'; import { wireRefEmbeddedType, WireSymbol } from './protocol.js'; import { queueTask } from '../runtime/task.js'; import { attenuate } from '../runtime/rewrite.js'; import { fromAttenuation, WireRef } from '../gen/sturdy.js'; export class SyncPeerEntity implements Entity { readonly relay: Relay; readonly peer: Ref; readonly handleMap = new IdentityMap(); e: WireSymbol | null = null; constructor(relay: Relay, peer: Ref) { this.relay = relay; this.peer = peer; } assert(turn: Turn, assertion: Assertion, handle: Handle): void { this.handleMap.set(handle, turn.assert(this.peer, assertion)); } retract(turn: Turn, handle: Handle): void { turn.retract(this.handleMap.get(handle)!); this.handleMap.delete(handle); } message(turn: Turn, body: Assertion): void { // We get to vanish from the indexes now this.relay.releaseRefOut(this.e!); turn.message(this.peer, body); } sync(turn: Turn, peer: Ref): void { turn._sync(this.peer, peer); } } export class RelayEntity implements Entity { readonly relay: Relay; readonly oid: IO.Oid; constructor(relay: Relay, oid: IO.Oid) { this.relay = relay; this.oid = oid; } send(m: IO.Event): void { this.relay.send(this.oid, m); } assert(_turn: Turn, assertion: Assertion, handle: Handle): void { this.send(IO.Event.Assert(IO.Assert({ assertion: this.relay.register(assertion, handle), handle }))) } retract(_turn: Turn, handle: Handle): void { this.relay.deregister(handle); this.send(IO.Event.Retract(IO.Retract(handle))); } message(_turn: Turn, body: Assertion): void { this.send(IO.Event.Message(IO.Message(this.relay.register(body, null)))); } sync(turn: Turn, peer: Ref): void { const peerEntity = new SyncPeerEntity(this.relay, peer); const exported: Array = []; const ior = this.relay.rewriteRefOut(turn.ref(peerEntity), false, exported); peerEntity.e = exported[0]; this.send(IO.Event.Sync(IO.Sync(ior))); } } export class Membrane { readonly byOid = new IdentityMap(); readonly byRef = new IdentityMap(); grab(table: Table, key: Parameters[0], transient: boolean, f: () => WireSymbol): WireSymbol { let e = this[table].get(key as any); if (e === void 0) { e = f(); this.byRef.set(e.ref, e); this.byOid.set(e.oid, e); } if (!transient) e.count++; return e; } drop(e: WireSymbol): void { e.count--; if (e.count === 0) { this.byOid.delete(e.oid); this.byRef.delete(e.ref); } } } export const INERT_REF: Ref = { relay: (() => { const a = new Actor(t => t.stop()); return a.root; })(), target: {}, }; export type PacketWriter = (bs: Uint8Array) => void; export interface RelayOptions { packetWriter: PacketWriter; setup(t: Turn, r: Relay): void; debug?: boolean; trustPeer?: boolean; } export class Relay { readonly facet: Facet; readonly w: PacketWriter; readonly inboundAssertions = new IdentityMap, }>(); readonly outboundAssertions = new IdentityMap>(); readonly exported = new Membrane(); readonly imported = new Membrane(); nextLocalOid: IO.Oid = 0; pendingTurn: IO.Turn = []; debug: boolean; trustPeer: boolean; readonly decoder = new Decoder(void 0, { includeAnnotations: false, embeddedDecode: wireRefEmbeddedType, }); constructor(t: Turn, options: RelayOptions) { this.facet = t.activeFacet; this.w = options.packetWriter; this.debug = options.debug ?? false; this.trustPeer = options.trustPeer ?? true; this.facet.preventInertCheck(); options.setup(t, this); } rewriteOut(assertion: Assertion, transient: boolean): [Value, Array] { const exported: Array = []; const rewritten = mapEmbeddeds(assertion, r => embed(this.rewriteRefOut(r, transient, exported))); return [rewritten, exported]; } rewriteIn(t: Turn, a: Value): [Assertion, Array] { const imported: Array = []; const rewritten = mapEmbeddeds(a, r => embed(this.rewriteRefIn(t, r, imported))); return [rewritten, imported]; } register(assertion: Assertion, handle: Handle | null): Value { const [rewritten, exported] = this.rewriteOut(assertion, handle === null); if (handle !== null) this.outboundAssertions.set(handle, exported); return rewritten; } deregister(handle: Handle): void { (this.outboundAssertions.get(handle) ?? []).forEach(e => this.releaseRefOut(e)); this.outboundAssertions.delete(handle); } rewriteRefOut(r: Ref, transient: boolean, exported: Array): WireRef { if (r.target instanceof RelayEntity && r.target.relay === this) { if (r.attenuation === void 0 || r.attenuation.length === 0) { // No extra conditions on this reference since it was sent to us. return WireRef.yours({ oid: r.target.oid, attenuation: [] }); } else { // This reference has been attenuated since it was sent to us. // Do we trust the peer to enforce such attenuation on our behalf? if (this.trustPeer) { return WireRef.yours({ oid: r.target.oid, attenuation: r.attenuation }); } else { // fall through: treat the attenuated ref as a local ref, and re-export it. } } } const e = this.exported.grab( "byRef", r, transient, () => { if (transient) throw new Error("Cannot send transient reference"); return { oid: this.nextLocalOid++, ref: r, count: 0 }; }); exported.push(e); return WireRef.mine(e.oid); } releaseRefOut(e: WireSymbol) { this.exported.drop(e); } rewriteRefIn(t: Turn, n: WireRef, imported: Array): Ref { switch (n._variant) { case 'yours': { const r = this.lookupLocal(n.oid); if (n.attenuation.length === 0 || r === INERT_REF) { return r; } else { type AttenuatedRef = Ref & { __attenuations?: Dictionary }; const ar = r as AttenuatedRef; if (ar.__attenuations === void 0) { ar.__attenuations = new Dictionary(); } return ar.__attenuations.getOrSet(fromAttenuation(n.attenuation), () => attenuate(r, ... n.attenuation)); } } case 'mine': { const e = this.imported.grab("byOid", n.oid, false, () => ({ oid: n.oid, ref: t.ref(new RelayEntity(this, n.oid)), count: 0 })); imported.push(e); return e.ref; } } } send(remoteOid: IO.Oid, m: IO.Event): void { if (this.pendingTurn.length === 0) { queueTask(() => { if (this.debug) console.log('OUT', IO.fromTurn(this.pendingTurn).asPreservesText()); this.w(underlying(encode(IO.fromTurn(this.pendingTurn), { canonical: true, embeddedEncode: wireRefEmbeddedType, }))); this.pendingTurn = []; }); } this.pendingTurn.push(IO.TurnEvent({ oid: remoteOid, event: m })); } lookupLocal(localOid: IO.Oid): Ref { return this.exported.byOid.get(localOid)?.ref ?? INERT_REF; } accept(bs: BytesLike): void { Turn.for(this.facet, t => { this.decoder.write(bs); while (true) { const rawTurn = this.decoder.try_next(); if (rawTurn === void 0) break; const wireTurn = IO.toTurn(rawTurn); if (wireTurn === void 0) throw new Error("Bad IO.Turn"); if (this.debug) console.log('IN', rawTurn.asPreservesText()); wireTurn.forEach(v => { const { oid: localOid, event: m } = v; this.handle(t, this.lookupLocal(localOid), m); }); } }); } handle(t: Turn, r: Ref, m: IO.Event) { switch (m._variant) { case 'Assert': { const [a, imported] = this.rewriteIn(t, m.value.assertion); this.inboundAssertions.set(m.value.handle, { localHandle: t.assert(r, a), imported, }); break; } case 'Retract': { const remoteHandle = m.value.handle; const h = this.inboundAssertions.get(remoteHandle); if (h === void 0) throw new Error(`Peer retracted invalid handle ${remoteHandle}`); this.inboundAssertions.delete(remoteHandle); h.imported.forEach(e => this.imported.drop(e)); t.retract(h.localHandle); break; } case 'Message': { const [a, imported] = this.rewriteIn(t, m.value.body); if (imported.length > 0) throw new Error("Cannot receive transient reference"); t.message(r, a); break; } case 'Sync': { const imported: Array = []; const k = this.rewriteRefIn(t, m.value.peer, imported); t.sync(r).then(t => { t.message(k, true); imported.forEach(e => this.imported.drop(e)); }); break; } } } } export interface RelayActorOptions extends RelayOptions { initialOid?: IO.Oid; initialRef?: Ref; nextLocalOid?: IO.Oid; } export function spawnRelay(t: Turn, options: RelayActorOptions & {initialOid: IO.Oid}): Promise; export function spawnRelay(t: Turn, options: Omit): Promise; export function spawnRelay(t: Turn, options: RelayActorOptions): Promise { return new Promise(resolve => { t.spawn(t => { const relay = new Relay(t, options); if (options.initialRef !== void 0) { relay.rewriteRefOut(options.initialRef, false, []); } if (options.initialOid !== void 0) { resolve(relay.rewriteRefIn(t, WireRef.mine(options.initialOid), [])); } else { resolve(null); } if (options.nextLocalOid !== void 0) { relay.nextLocalOid = (options.nextLocalOid === 0) ? 1 : options.nextLocalOid; } }); }); }