novy-syndicate/src/relay.ts

323 lines
10 KiB
TypeScript

import { Actor, Assertion, Entity, Handle, Ref, Turn } from './actor.js';
import { Bytes, decode, encode, IdentityMap, mapPointers, underlying, Value } from 'preserves';
import {
Oid,
Assert,
EntityMessage,
Message,
Retract,
Sync,
WireSymbol,
_Assert,
_Retract,
_Message,
_Sync,
WireRef,
TurnMessage,
} from './protocol.js';
export class SyncPeerEntity implements Entity {
readonly relay: Relay;
readonly peer: Ref;
readonly handleMap = new IdentityMap<Handle, Handle>();
e: WireSymbol | null = null;
constructor(relay: Relay, peer: Ref) {
this.relay = relay;
this.peer = peer;
}
assert(turn: Turn, assertion: Assertion, handle: Handle): void {
this.handleMap.set(handle, turn.assert(this.peer, assertion));
}
retract(turn: Turn, handle: Handle): void {
turn.retract(this.handleMap.get(handle)!);
this.handleMap.delete(handle);
}
message(turn: Turn, body: Assertion): void {
// We get to vanish from the indexes now
this.relay.releaseRefOut(this.e!);
turn.message(this.peer, body);
}
sync(turn: Turn, peer: Ref): void {
turn._sync(this.peer, peer);
}
}
export class RelayEntity implements Entity {
readonly relay: Relay;
readonly e: WireSymbol;
constructor(relay: Relay, e: WireSymbol) {
this.relay = relay;
this.e = e;
}
send(m: EntityMessage): void {
this.relay.send(this.e.name.oid, m);
}
assert(_turn: Turn, assertion: Assertion, handle: Handle): void {
this.send(Assert(this.relay.register(assertion, handle), handle));
}
retract(_turn: Turn, handle: Handle): void {
this.relay.deregister(handle);
this.send(Retract(handle));
}
message(_turn: Turn, body: Assertion): void {
this.send(Message(this.relay.register(body, null)));
}
sync(turn: Turn, peer: Ref): void {
const peerEntity = new SyncPeerEntity(this.relay, peer);
peerEntity.e = this.relay.rewriteRefOut(turn.ref(peerEntity), false, null);
this.send(Sync(peerEntity.e.name));
}
}
export class Membrane {
readonly byOid = new IdentityMap<Oid, WireSymbol>();
readonly byRef = new IdentityMap<Ref, WireSymbol>();
grab<Table extends "byOid" | "byRef">(table: Table,
key: (Table extends "byOid" ? Oid : Ref),
transient: boolean,
f: () => WireSymbol): WireSymbol
{
let e =
(this[table] as IdentityMap<Table extends "byOid" ? Oid : Ref, WireSymbol>)
.get(key);
if (e === void 0) {
e = f();
this.byRef.set(e.ref, e);
this.byOid.set(e.name.oid, e);
}
if (!transient) e.count++;
return e;
}
drop(e: WireSymbol): void {
e.count--;
if (e.count === 0) {
this.byOid.delete(e.name.oid);
this.byRef.delete(e.ref);
}
}
}
export const INERT_REF: Ref = {
relay: (() => {
const a = new Actor();
a.exitReason = { ok: true };
return a;
})(),
target: {},
};
export type PacketWriter = (bs: Uint8Array) => void;
export class Relay {
readonly actor: Actor;
readonly w: PacketWriter;
readonly inboundAssertions = new IdentityMap<Handle, {
localHandle: Handle,
imported: Array<WireSymbol>,
}>();
readonly outboundAssertions = new IdentityMap<Handle, Array<WireSymbol>>();
readonly exported = new Membrane();
readonly imported = new Membrane();
nextLocalOid: Oid = 0;
pendingTurn: TurnMessage = [];
debug: boolean;
constructor(actor: Actor, w: PacketWriter, debug: boolean) {
this.actor = actor;
this.w = w;
this.debug = debug;
}
rewriteOut(assertion: Assertion, transient: boolean): [Value<WireRef>, Array<WireSymbol>]
{
const exported: Array<WireSymbol> = [];
const rewritten =
mapPointers(assertion, r => this.rewriteRefOut(r, transient, exported).name);
return [rewritten, exported];
}
rewriteIn(t: Turn, a: Value<WireRef>): [Assertion, Array<WireSymbol>]
{
const imported: Array<WireSymbol> = [];
const rewritten = mapPointers(a, r => this.rewriteRefIn(t, r, imported));
return [rewritten, imported];
}
register(assertion: Assertion, handle: Handle | null): Value<WireRef> {
const [rewritten, exported] = this.rewriteOut(assertion, handle === null);
if (handle !== null) this.outboundAssertions.set(handle, exported);
return rewritten;
}
deregister(handle: Handle): void {
(this.outboundAssertions.get(handle) ?? []).forEach(e => this.releaseRefOut(e));
}
rewriteRefOut(r: Ref, transient: boolean, exported: Array<WireSymbol> | null): WireSymbol {
if (r.target instanceof RelayEntity && r.target.relay === this) {
return r.target.e;
} else {
const e = this.exported.grab("byRef", r, transient, () => {
if (transient) throw new Error("Cannot send transient reference");
return { name: { loc: "mine", oid: this.nextLocalOid++ }, ref: r, count: 0 };
});
exported?.push(e);
return e;
}
}
releaseRefOut(e: WireSymbol) {
this.exported.drop(e);
}
rewriteRefIn(t: Turn, n: WireRef, imported: Array<WireSymbol> | null): Ref {
switch (n.loc) {
case 'your':
return this.lookupLocal(n.oid);
case 'mine': {
const e = this.imported.grab("byOid", n.oid, false, () => {
const e: WireSymbol = {
name: { loc: 'your', oid: n.oid },
ref: null as any,
count: 0,
};
e.ref = t.ref(new RelayEntity(this, e as WireSymbol));
return e;
});
imported?.push(e);
return e.ref;
}
}
}
send(remoteOid: Oid, m: EntityMessage): void {
if (this.pendingTurn.length === 0) {
queueMicrotask(() => {
if (this.debug) console.log('OUT', this.pendingTurn.asPreservesText());
this.w(underlying(encode<WireRef>(this.pendingTurn, {
canonical: true,
encodePointer: n => {
switch (n.loc) {
case 'mine': return [0, n.oid];
case 'your': return [1, n.oid];
}
},
})));
this.pendingTurn = [];
});
}
this.pendingTurn.push([remoteOid, m]);
}
lookupLocal(localOid: Oid): Ref {
return this.exported.byOid.get(localOid)?.ref ?? INERT_REF;
}
accept(bs0: Uint8Array): void {
Turn.for(this.actor, t => {
const bs = Bytes.from(bs0);
const wireTurn = decode<WireRef>(bs, {
decodePointer: v => {
if (!Array.isArray(v) || v.length !== 2) {
throw new Error(
`Received invalid object reference ${v.asPreservesText()} from peer`);
}
const loc = v[0] === 0 ? 'mine' : 'your';
return { loc, oid: v[1] as Oid };
},
});
if (this.debug) console.log('IN', wireTurn.asPreservesText());
if (!Array.isArray(wireTurn)) invalidTopLevelMessage(wireTurn);
wireTurn.forEach(v => {
if (Array.isArray(v) && v.length === 2 && typeof v[0] === 'number') {
const [localOid, m] = v;
// TODO: deep check that m is EntityMessage
this.handle(t, this.lookupLocal(localOid), m as EntityMessage);
} else {
invalidTopLevelMessage(wireTurn);
}
});
});
}
handle(t: Turn, r: Ref, m: EntityMessage) {
switch (m.label) {
case _Assert: {
const [a, imported] = this.rewriteIn(t, Assert._.assertion(m));
this.inboundAssertions.set(Assert._.handle(m), {
localHandle: t.assert(r, a),
imported,
});
break;
}
case _Retract: {
const remoteHandle = Retract._.handle(m);
const h = this.inboundAssertions.get(remoteHandle);
if (h === void 0) throw new Error(`Peer retracted invalid handle ${remoteHandle}`);
this.inboundAssertions.delete(remoteHandle);
h.imported.forEach(e => this.imported.drop(e));
t.retract(h.localHandle);
break;
}
case _Message: {
const [a, imported] = this.rewriteIn(t, Message._.body(m));
if (imported.length > 0) throw new Error("Cannot receive transient reference");
t.message(r, a);
break;
}
case _Sync: {
const imported: Array<WireSymbol> = [];
const k = this.rewriteRefIn(t, Sync._.peer(m), imported);
t.sync(r).then(t => {
t.message(k, true);
imported.forEach(e => this.imported.drop(e));
});
break;
}
}
}
}
function invalidTopLevelMessage(m: Value<WireRef>): never {
throw new Error(
`Received invalid top-level protocol message from peer: ${m.asPreservesText()}`);
}
export type RelayOptions = {
packetWriter: PacketWriter,
setup(t: Turn, r: Relay): void,
initialOid?: Oid,
initialRef?: Ref,
debug?: boolean,
};
export function spawnRelay(t: Turn, options: RelayOptions): Promise<Ref | null> {
return new Promise(resolve => {
t.spawn(t => {
const relay = new Relay(t.actor, options.packetWriter, options.debug ?? false);
options.setup(t, relay);
if (options.initialRef !== void 0) {
relay.rewriteRefOut(options.initialRef, false, null);
}
if (options.initialOid !== void 0) {
resolve(relay.rewriteRefIn(t, { loc: 'mine', oid: options.initialOid }, null));
} else {
resolve(null);
}
});
});
}