2021-12-01 16:24:29 +00:00
|
|
|
/// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
/// SPDX-FileCopyrightText: Copyright © 2016-2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
2021-12-01 16:13:00 +00:00
|
|
|
|
|
|
|
import { Actor, Assertion, Entity, Facet, Handle, Ref, Turn } from '../runtime/actor.js';
|
|
|
|
import { BytesLike, Decoder, Dictionary, embed, encode, IdentityMap, mapEmbeddeds, underlying, Value } from '@preserves/core';
|
|
|
|
import * as IO from '../gen/protocol.js';
|
|
|
|
import { wireRefEmbeddedType, WireSymbol } from './protocol.js';
|
|
|
|
import { queueTask } from '../runtime/task.js';
|
|
|
|
import { attenuate } from '../runtime/rewrite.js';
|
|
|
|
import { fromAttenuation, WireRef } from '../gen/sturdy.js';
|
|
|
|
|
|
|
|
export class SyncPeerEntity implements Entity {
|
|
|
|
readonly relay: Relay;
|
|
|
|
readonly peer: Ref;
|
|
|
|
readonly handleMap = new IdentityMap<Handle, Handle>();
|
|
|
|
e: WireSymbol | null = null;
|
|
|
|
|
|
|
|
constructor(relay: Relay, peer: Ref) {
|
|
|
|
this.relay = relay;
|
|
|
|
this.peer = peer;
|
|
|
|
}
|
|
|
|
|
|
|
|
assert(turn: Turn, assertion: Assertion, handle: Handle): void {
|
|
|
|
this.handleMap.set(handle, turn.assert(this.peer, assertion));
|
|
|
|
}
|
|
|
|
|
|
|
|
retract(turn: Turn, handle: Handle): void {
|
|
|
|
turn.retract(this.handleMap.get(handle)!);
|
|
|
|
this.handleMap.delete(handle);
|
|
|
|
}
|
|
|
|
|
|
|
|
message(turn: Turn, body: Assertion): void {
|
|
|
|
// We get to vanish from the indexes now
|
|
|
|
this.relay.releaseRefOut(this.e!);
|
|
|
|
turn.message(this.peer, body);
|
|
|
|
}
|
|
|
|
|
|
|
|
sync(turn: Turn, peer: Ref): void {
|
|
|
|
turn._sync(this.peer, peer);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
export class RelayEntity implements Entity {
|
|
|
|
readonly relay: Relay;
|
|
|
|
readonly oid: IO.Oid;
|
|
|
|
|
|
|
|
constructor(relay: Relay, oid: IO.Oid) {
|
|
|
|
this.relay = relay;
|
|
|
|
this.oid = oid;
|
|
|
|
}
|
|
|
|
|
|
|
|
send(m: IO.Event<WireRef>): void {
|
|
|
|
this.relay.send(this.oid, m);
|
|
|
|
}
|
|
|
|
|
|
|
|
assert(_turn: Turn, assertion: Assertion, handle: Handle): void {
|
|
|
|
this.send(IO.Event.Assert(IO.Assert({
|
|
|
|
assertion: this.relay.register(assertion, handle),
|
|
|
|
handle
|
|
|
|
})))
|
|
|
|
}
|
|
|
|
|
|
|
|
retract(_turn: Turn, handle: Handle): void {
|
|
|
|
this.relay.deregister(handle);
|
|
|
|
this.send(IO.Event.Retract(IO.Retract(handle)));
|
|
|
|
}
|
|
|
|
|
|
|
|
message(_turn: Turn, body: Assertion): void {
|
|
|
|
this.send(IO.Event.Message(IO.Message(this.relay.register(body, null))));
|
|
|
|
}
|
|
|
|
|
|
|
|
sync(turn: Turn, peer: Ref): void {
|
|
|
|
const peerEntity = new SyncPeerEntity(this.relay, peer);
|
|
|
|
const exported: Array<WireSymbol> = [];
|
|
|
|
const ior = this.relay.rewriteRefOut(turn.ref(peerEntity), false, exported);
|
|
|
|
peerEntity.e = exported[0];
|
|
|
|
this.send(IO.Event.Sync(IO.Sync(ior)));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
export class Membrane {
|
|
|
|
readonly byOid = new IdentityMap<IO.Oid, WireSymbol>();
|
|
|
|
readonly byRef = new IdentityMap<Ref, WireSymbol>();
|
|
|
|
|
|
|
|
grab<Table extends "byOid" | "byRef">(table: Table,
|
|
|
|
key: Parameters<Membrane[Table]['get']>[0],
|
|
|
|
transient: boolean,
|
|
|
|
f: () => WireSymbol): WireSymbol
|
|
|
|
{
|
|
|
|
let e = this[table].get(key as any);
|
|
|
|
if (e === void 0) {
|
|
|
|
e = f();
|
|
|
|
this.byRef.set(e.ref, e);
|
|
|
|
this.byOid.set(e.oid, e);
|
|
|
|
}
|
|
|
|
if (!transient) e.count++;
|
|
|
|
return e;
|
|
|
|
}
|
|
|
|
|
|
|
|
drop(e: WireSymbol): void {
|
|
|
|
e.count--;
|
|
|
|
if (e.count === 0) {
|
|
|
|
this.byOid.delete(e.oid);
|
|
|
|
this.byRef.delete(e.ref);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
export const INERT_REF: Ref = {
|
|
|
|
relay: (() => {
|
|
|
|
const a = new Actor(t => t.stop());
|
|
|
|
return a.root;
|
|
|
|
})(),
|
|
|
|
target: {},
|
|
|
|
};
|
|
|
|
|
|
|
|
export type PacketWriter = (bs: Uint8Array) => void;
|
|
|
|
|
|
|
|
export interface RelayOptions {
|
|
|
|
packetWriter: PacketWriter;
|
|
|
|
setup(t: Turn, r: Relay): void;
|
|
|
|
debug?: boolean;
|
|
|
|
trustPeer?: boolean;
|
|
|
|
}
|
|
|
|
|
|
|
|
export class Relay {
|
|
|
|
readonly facet: Facet;
|
|
|
|
readonly w: PacketWriter;
|
|
|
|
readonly inboundAssertions = new IdentityMap<Handle, {
|
|
|
|
localHandle: Handle,
|
|
|
|
imported: Array<WireSymbol>,
|
|
|
|
}>();
|
|
|
|
readonly outboundAssertions = new IdentityMap<Handle, Array<WireSymbol>>();
|
|
|
|
readonly exported = new Membrane();
|
|
|
|
readonly imported = new Membrane();
|
|
|
|
nextLocalOid: IO.Oid = 0;
|
|
|
|
pendingTurn: IO.Turn<WireRef> = [];
|
|
|
|
debug: boolean;
|
|
|
|
trustPeer: boolean;
|
|
|
|
|
|
|
|
readonly decoder = new Decoder(void 0, {
|
|
|
|
includeAnnotations: false,
|
|
|
|
embeddedDecode: wireRefEmbeddedType,
|
|
|
|
});
|
|
|
|
|
|
|
|
constructor(t: Turn, options: RelayOptions) {
|
|
|
|
this.facet = t.activeFacet;
|
|
|
|
this.w = options.packetWriter;
|
|
|
|
this.debug = options.debug ?? false;
|
|
|
|
this.trustPeer = options.trustPeer ?? true;
|
|
|
|
|
|
|
|
this.facet.preventInertCheck();
|
|
|
|
options.setup(t, this);
|
|
|
|
}
|
|
|
|
|
|
|
|
rewriteOut(assertion: Assertion, transient: boolean): [Value<WireRef>, Array<WireSymbol>]
|
|
|
|
{
|
|
|
|
const exported: Array<WireSymbol> = [];
|
|
|
|
const rewritten = mapEmbeddeds(assertion, r => embed(this.rewriteRefOut(r, transient, exported)));
|
|
|
|
return [rewritten, exported];
|
|
|
|
}
|
|
|
|
|
|
|
|
rewriteIn(t: Turn, a: Value<WireRef>): [Assertion, Array<WireSymbol>]
|
|
|
|
{
|
|
|
|
const imported: Array<WireSymbol> = [];
|
|
|
|
const rewritten = mapEmbeddeds(a, r => embed(this.rewriteRefIn(t, r, imported)));
|
|
|
|
return [rewritten, imported];
|
|
|
|
}
|
|
|
|
|
|
|
|
register(assertion: Assertion, handle: Handle | null): Value<WireRef> {
|
|
|
|
const [rewritten, exported] = this.rewriteOut(assertion, handle === null);
|
|
|
|
if (handle !== null) this.outboundAssertions.set(handle, exported);
|
|
|
|
return rewritten;
|
|
|
|
}
|
|
|
|
|
|
|
|
deregister(handle: Handle): void {
|
|
|
|
(this.outboundAssertions.get(handle) ?? []).forEach(e => this.releaseRefOut(e));
|
|
|
|
this.outboundAssertions.delete(handle);
|
|
|
|
}
|
|
|
|
|
|
|
|
rewriteRefOut(r: Ref, transient: boolean, exported: Array<WireSymbol>): WireRef {
|
|
|
|
if (r.target instanceof RelayEntity && r.target.relay === this) {
|
|
|
|
if (r.attenuation === void 0 || r.attenuation.length === 0) {
|
|
|
|
// No extra conditions on this reference since it was sent to us.
|
|
|
|
return WireRef.yours({ oid: r.target.oid, attenuation: [] });
|
|
|
|
} else {
|
|
|
|
// This reference has been attenuated since it was sent to us.
|
|
|
|
// Do we trust the peer to enforce such attenuation on our behalf?
|
|
|
|
if (this.trustPeer) {
|
|
|
|
return WireRef.yours({ oid: r.target.oid, attenuation: r.attenuation });
|
|
|
|
} else {
|
|
|
|
// fall through: treat the attenuated ref as a local ref, and re-export it.
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
const e = this.exported.grab(
|
|
|
|
"byRef", r, transient, () => {
|
|
|
|
if (transient) throw new Error("Cannot send transient reference");
|
|
|
|
return { oid: this.nextLocalOid++, ref: r, count: 0 };
|
|
|
|
});
|
|
|
|
exported.push(e);
|
|
|
|
return WireRef.mine(e.oid);
|
|
|
|
}
|
|
|
|
|
|
|
|
releaseRefOut(e: WireSymbol) {
|
|
|
|
this.exported.drop(e);
|
|
|
|
}
|
|
|
|
|
|
|
|
rewriteRefIn(t: Turn, n: WireRef, imported: Array<WireSymbol>): Ref {
|
|
|
|
switch (n._variant) {
|
|
|
|
case 'yours': {
|
|
|
|
const r = this.lookupLocal(n.oid);
|
|
|
|
if (n.attenuation.length === 0 || r === INERT_REF) {
|
|
|
|
return r;
|
|
|
|
} else {
|
|
|
|
type AttenuatedRef = Ref & { __attenuations?: Dictionary<any, Ref> };
|
|
|
|
const ar = r as AttenuatedRef;
|
|
|
|
if (ar.__attenuations === void 0) {
|
|
|
|
ar.__attenuations = new Dictionary();
|
|
|
|
}
|
|
|
|
return ar.__attenuations.getOrSet(fromAttenuation(n.attenuation), () =>
|
|
|
|
attenuate(r, ... n.attenuation));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
case 'mine': {
|
|
|
|
const e = this.imported.grab("byOid", n.oid, false, () =>
|
|
|
|
({ oid: n.oid, ref: t.ref(new RelayEntity(this, n.oid)), count: 0 }));
|
|
|
|
imported.push(e);
|
|
|
|
return e.ref;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
send(remoteOid: IO.Oid, m: IO.Event<WireRef>): void {
|
|
|
|
if (this.pendingTurn.length === 0) {
|
|
|
|
queueTask(() => {
|
|
|
|
if (this.debug) console.log('OUT', IO.fromTurn(this.pendingTurn).asPreservesText());
|
|
|
|
this.w(underlying(encode(IO.fromTurn(this.pendingTurn), {
|
|
|
|
canonical: true,
|
|
|
|
embeddedEncode: wireRefEmbeddedType,
|
|
|
|
})));
|
|
|
|
this.pendingTurn = [];
|
|
|
|
});
|
|
|
|
}
|
|
|
|
this.pendingTurn.push(IO.TurnEvent({ oid: remoteOid, event: m }));
|
|
|
|
}
|
|
|
|
|
|
|
|
lookupLocal(localOid: IO.Oid): Ref {
|
|
|
|
return this.exported.byOid.get(localOid)?.ref ?? INERT_REF;
|
|
|
|
}
|
|
|
|
|
|
|
|
accept(bs: BytesLike): void {
|
|
|
|
Turn.for(this.facet, t => {
|
|
|
|
this.decoder.write(bs);
|
|
|
|
while (true) {
|
|
|
|
const rawTurn = this.decoder.try_next();
|
|
|
|
if (rawTurn === void 0) break;
|
|
|
|
const wireTurn = IO.toTurn(rawTurn);
|
|
|
|
if (wireTurn === void 0) throw new Error("Bad IO.Turn");
|
|
|
|
if (this.debug) console.log('IN', rawTurn.asPreservesText());
|
|
|
|
wireTurn.forEach(v => {
|
|
|
|
const { oid: localOid, event: m } = v;
|
|
|
|
this.handle(t, this.lookupLocal(localOid), m);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
handle(t: Turn, r: Ref, m: IO.Event<WireRef>) {
|
|
|
|
switch (m._variant) {
|
|
|
|
case 'Assert': {
|
|
|
|
const [a, imported] = this.rewriteIn(t, m.value.assertion);
|
|
|
|
this.inboundAssertions.set(m.value.handle, {
|
|
|
|
localHandle: t.assert(r, a),
|
|
|
|
imported,
|
|
|
|
});
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case 'Retract': {
|
|
|
|
const remoteHandle = m.value.handle;
|
|
|
|
const h = this.inboundAssertions.get(remoteHandle);
|
|
|
|
if (h === void 0) throw new Error(`Peer retracted invalid handle ${remoteHandle}`);
|
|
|
|
this.inboundAssertions.delete(remoteHandle);
|
|
|
|
h.imported.forEach(e => this.imported.drop(e));
|
|
|
|
t.retract(h.localHandle);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case 'Message': {
|
|
|
|
const [a, imported] = this.rewriteIn(t, m.value.body);
|
|
|
|
if (imported.length > 0) throw new Error("Cannot receive transient reference");
|
|
|
|
t.message(r, a);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case 'Sync': {
|
|
|
|
const imported: Array<WireSymbol> = [];
|
|
|
|
const k = this.rewriteRefIn(t, m.value.peer, imported);
|
|
|
|
t.sync(r).then(t => {
|
|
|
|
t.message(k, true);
|
|
|
|
imported.forEach(e => this.imported.drop(e));
|
|
|
|
});
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
export interface RelayActorOptions extends RelayOptions {
|
|
|
|
initialOid?: IO.Oid;
|
|
|
|
initialRef?: Ref;
|
|
|
|
nextLocalOid?: IO.Oid;
|
|
|
|
}
|
|
|
|
|
|
|
|
export function spawnRelay(t: Turn, options: RelayActorOptions & {initialOid: IO.Oid}): Promise<Ref>;
|
|
|
|
export function spawnRelay(t: Turn, options: Omit<RelayActorOptions, 'initialOid'>): Promise<null>;
|
|
|
|
export function spawnRelay(t: Turn, options: RelayActorOptions): Promise<Ref | null>
|
|
|
|
{
|
|
|
|
return new Promise(resolve => {
|
|
|
|
t.spawn(t => {
|
|
|
|
const relay = new Relay(t, options);
|
|
|
|
if (options.initialRef !== void 0) {
|
|
|
|
relay.rewriteRefOut(options.initialRef, false, []);
|
|
|
|
}
|
|
|
|
if (options.initialOid !== void 0) {
|
|
|
|
resolve(relay.rewriteRefIn(t, WireRef.mine(options.initialOid), []));
|
|
|
|
} else {
|
|
|
|
resolve(null);
|
|
|
|
}
|
|
|
|
if (options.nextLocalOid !== void 0) {
|
|
|
|
relay.nextLocalOid = (options.nextLocalOid === 0) ? 1 : options.nextLocalOid;
|
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|