Repair relay pins, to pin target refs too
This commit is contained in:
parent
eaa7268c9b
commit
fe427a67be
|
@ -2,12 +2,8 @@
|
|||
/// SPDX-FileCopyrightText: Copyright © 2016-2022 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
||||
|
||||
import * as S from '../gen/sturdy.js';
|
||||
import { Oid } from '../gen/protocol.js';
|
||||
import { Ref } from '../runtime/actor.js';
|
||||
import { Decoder, DecoderState, Encoder, EncoderState, GenericEmbedded, neverEmbeddedType, EmbeddedType, Value, EmbeddedWriter } from '@preserves/core';
|
||||
|
||||
export type WireSymbol = { oid: Oid, ref: Ref, count: number };
|
||||
|
||||
export const wireRefEmbeddedType: EmbeddedType<S.WireRef> & EmbeddedWriter<S.WireRef> = {
|
||||
decode(s: DecoderState): S.WireRef {
|
||||
return S.asWireRef(new Decoder<any>(s).next());
|
||||
|
|
|
@ -4,16 +4,34 @@
|
|||
import { Actor, Assertion, Entity, Facet, Handle, Ref, Turn } from '../runtime/actor.js';
|
||||
import { BytesLike, Decoder, Dictionary, embed, encode, IdentityMap, mapEmbeddeds, stringify, underlying, Value } from '@preserves/core';
|
||||
import * as IO from '../gen/protocol.js';
|
||||
import { wireRefEmbeddedType, WireSymbol } from './protocol.js';
|
||||
import { wireRefEmbeddedType } from './protocol.js';
|
||||
import { queueTask } from '../runtime/task.js';
|
||||
import { attenuate } from '../runtime/rewrite.js';
|
||||
import { fromAttenuation, WireRef } from '../gen/sturdy.js';
|
||||
|
||||
export class WireSymbol {
|
||||
count = 0;
|
||||
|
||||
constructor (
|
||||
public side: Membrane,
|
||||
public oid: IO.Oid,
|
||||
public ref: Ref,
|
||||
) {}
|
||||
|
||||
drop(): void {
|
||||
this.count--;
|
||||
if (this.count === 0) {
|
||||
this.side.byOid.delete(this.oid);
|
||||
this.side.byRef.delete(this.ref);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
export class SyncPeerEntity implements Entity {
|
||||
readonly relay: Relay;
|
||||
readonly peer: Ref;
|
||||
readonly handleMap = new IdentityMap<Handle, Handle>();
|
||||
e: WireSymbol | null = null;
|
||||
pins: Array<WireSymbol> = [];
|
||||
|
||||
constructor(relay: Relay, peer: Ref) {
|
||||
this.relay = relay;
|
||||
|
@ -31,7 +49,7 @@ export class SyncPeerEntity implements Entity {
|
|||
|
||||
message(body: Assertion): void {
|
||||
// We get to vanish from the indexes now
|
||||
this.relay.releaseRefOut(this.e!);
|
||||
this.pins.forEach(e => e.drop());
|
||||
Turn.active.message(this.peer, body);
|
||||
}
|
||||
|
||||
|
@ -55,7 +73,7 @@ export class RelayEntity implements Entity {
|
|||
|
||||
assert(assertion: Assertion, handle: Handle): void {
|
||||
this.send(IO.Event.Assert(IO.Assert({
|
||||
assertion: this.relay.register(assertion, handle),
|
||||
assertion: this.relay.register(this.oid, assertion, handle),
|
||||
handle
|
||||
})))
|
||||
}
|
||||
|
@ -66,29 +84,38 @@ export class RelayEntity implements Entity {
|
|||
}
|
||||
|
||||
message(body: Assertion): void {
|
||||
this.send(IO.Event.Message(IO.Message(this.relay.register(body, null))));
|
||||
this.send(IO.Event.Message(IO.Message(this.relay.register(null, body, null))));
|
||||
}
|
||||
|
||||
sync(peer: Ref): void {
|
||||
const peerEntity = new SyncPeerEntity(this.relay, peer);
|
||||
const exported: Array<WireSymbol> = [];
|
||||
const ior = this.relay.rewriteRefOut(Turn.ref(peerEntity), false, exported);
|
||||
peerEntity.e = exported[0];
|
||||
this.relay.grabImportedOid(this.oid, peerEntity.pins);
|
||||
const ior = this.relay.rewriteRefOut(Turn.ref(peerEntity), false, peerEntity.pins);
|
||||
this.send(IO.Event.Sync(IO.Sync(ior)));
|
||||
}
|
||||
}
|
||||
|
||||
export type WhichTable = "byOid" | "byRef";
|
||||
|
||||
export class Membrane {
|
||||
readonly byOid = new IdentityMap<IO.Oid, WireSymbol>();
|
||||
readonly byRef = new IdentityMap<Ref, WireSymbol>();
|
||||
|
||||
grab<Table extends "byOid" | "byRef">(table: Table,
|
||||
key: Parameters<Membrane[Table]['get']>[0],
|
||||
transient: boolean,
|
||||
f: () => WireSymbol): WireSymbol
|
||||
{
|
||||
grab<Table extends WhichTable>(table: Table,
|
||||
key: Parameters<Membrane[Table]['get']>[0],
|
||||
transient: boolean,
|
||||
f: () => WireSymbol): WireSymbol;
|
||||
grab<Table extends WhichTable>(table: Table,
|
||||
key: Parameters<Membrane[Table]['get']>[0],
|
||||
transient: boolean,
|
||||
f: null): WireSymbol | null;
|
||||
grab<Table extends WhichTable>(table: Table,
|
||||
key: Parameters<Membrane[Table]['get']>[0],
|
||||
transient: boolean,
|
||||
f: (() => WireSymbol) | null): WireSymbol | null {
|
||||
let e = this[table].get(key as any);
|
||||
if (e === void 0) {
|
||||
if (f === null) return null;
|
||||
e = f();
|
||||
this.byRef.set(e.ref, e);
|
||||
this.byOid.set(e.oid, e);
|
||||
|
@ -96,14 +123,6 @@ export class Membrane {
|
|||
if (!transient) e.count++;
|
||||
return e;
|
||||
}
|
||||
|
||||
drop(e: WireSymbol): void {
|
||||
e.count--;
|
||||
if (e.count === 0) {
|
||||
this.byOid.delete(e.oid);
|
||||
this.byRef.delete(e.ref);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const INERT_REF: Ref = {
|
||||
|
@ -128,7 +147,7 @@ export class Relay {
|
|||
readonly w: PacketWriter;
|
||||
readonly inboundAssertions = new IdentityMap<Handle, {
|
||||
localHandle: Handle,
|
||||
imported: Array<WireSymbol>,
|
||||
pins: Array<WireSymbol>,
|
||||
}>();
|
||||
readonly outboundAssertions = new IdentityMap<Handle, Array<WireSymbol>>();
|
||||
readonly exported = new Membrane();
|
||||
|
@ -166,40 +185,52 @@ export class Relay {
|
|||
}
|
||||
}
|
||||
|
||||
rewriteOut(assertion: Assertion, transient: boolean): [Value<WireRef>, Array<WireSymbol>]
|
||||
{
|
||||
const exported: Array<WireSymbol> = [];
|
||||
const rewritten = mapEmbeddeds(assertion, r => embed(this.rewriteRefOut(r, transient, exported)));
|
||||
return [rewritten, exported];
|
||||
}
|
||||
|
||||
rewriteIn(a: Value<WireRef>): [Assertion, Array<WireSymbol>]
|
||||
{
|
||||
const imported: Array<WireSymbol> = [];
|
||||
const rewritten = mapEmbeddeds(a, r => embed(this.rewriteRefIn(r, imported)));
|
||||
return [rewritten, imported];
|
||||
}
|
||||
|
||||
register(assertion: Assertion, handle: Handle | null): Value<WireRef> {
|
||||
const [rewritten, exported] = this.rewriteOut(assertion, handle === null);
|
||||
if (handle !== null) this.outboundAssertions.set(handle, exported);
|
||||
register(targetRemoteOid: IO.Oid, assertion: Assertion, handle: Handle): Value<WireRef>;
|
||||
register(targetRemoteOid: null, assertion: Assertion, handle: null): Value<WireRef>;
|
||||
register(targetRemoteOid: IO.Oid | null, assertion: Assertion, handle: Handle | null): Value<WireRef> {
|
||||
const transient = (handle === null);
|
||||
const pins: Array<WireSymbol> = [];
|
||||
const rewritten = mapEmbeddeds(assertion, r => embed(this.rewriteRefOut(r, transient, pins)));
|
||||
if (handle !== null) {
|
||||
if (targetRemoteOid !== null /* belt and suspenders */) {
|
||||
this.grabImportedOid(targetRemoteOid, pins);
|
||||
}
|
||||
this.outboundAssertions.set(handle, pins);
|
||||
}
|
||||
return rewritten;
|
||||
}
|
||||
|
||||
deregister(handle: Handle): void {
|
||||
(this.outboundAssertions.get(handle) ?? []).forEach(e => this.releaseRefOut(e));
|
||||
(this.outboundAssertions.get(handle) ?? []).forEach(e => e.drop());
|
||||
this.outboundAssertions.delete(handle);
|
||||
}
|
||||
|
||||
rewriteRefOut(r: Ref, transient: boolean, exported: Array<WireSymbol>): WireRef {
|
||||
grabImportedOid(oid: IO.Oid, pins: Array<WireSymbol>) {
|
||||
const e = this.imported.grab("byOid", oid, false, null);
|
||||
if (e === null) {
|
||||
throw new Error("Internal error: import table missing entry for oid " + oid);
|
||||
}
|
||||
pins.push(e);
|
||||
}
|
||||
|
||||
grabExportedOid(oid: IO.Oid, pins: Array<WireSymbol>): Ref {
|
||||
const e = this.exported.grab("byOid", oid, false, null);
|
||||
if (e === null) return INERT_REF;
|
||||
pins.push(e);
|
||||
return e.ref;
|
||||
}
|
||||
|
||||
rewriteRefOut(r: Ref, transient: boolean, pins: Array<WireSymbol>): WireRef {
|
||||
if (r.target instanceof RelayEntity && r.target.relay === this) {
|
||||
if (r.attenuation === void 0 || r.attenuation.length === 0) {
|
||||
// No extra conditions on this reference since it was sent to us.
|
||||
this.grabImportedOid(r.target.oid, pins);
|
||||
return WireRef.yours({ oid: r.target.oid, attenuation: [] });
|
||||
} else {
|
||||
// This reference has been attenuated since it was sent to us.
|
||||
// Do we trust the peer to enforce such attenuation on our behalf?
|
||||
if (this.trustPeer) {
|
||||
this.grabImportedOid(r.target.oid, pins);
|
||||
return WireRef.yours({ oid: r.target.oid, attenuation: r.attenuation });
|
||||
} else {
|
||||
// fall through: treat the attenuated ref as a local ref, and re-export it.
|
||||
|
@ -210,36 +241,38 @@ export class Relay {
|
|||
const e = this.exported.grab(
|
||||
"byRef", r, transient, () => {
|
||||
if (transient) throw new Error("Cannot send transient reference");
|
||||
return { oid: this.nextLocalOid++, ref: r, count: 0 };
|
||||
return new WireSymbol(this.exported, this.nextLocalOid++, r);
|
||||
});
|
||||
exported.push(e);
|
||||
pins.push(e);
|
||||
return WireRef.mine(e.oid);
|
||||
}
|
||||
|
||||
releaseRefOut(e: WireSymbol) {
|
||||
this.exported.drop(e);
|
||||
}
|
||||
|
||||
rewriteRefIn(n: WireRef, imported: Array<WireSymbol>): Ref {
|
||||
rewriteRefIn(n: WireRef, pins: Array<WireSymbol>): Ref {
|
||||
switch (n._variant) {
|
||||
case 'yours': {
|
||||
const r = this.lookupLocal(n.oid);
|
||||
if (n.attenuation.length === 0 || r === INERT_REF) {
|
||||
return r;
|
||||
const e = this.exported.grab("byOid", n.oid, false, null);
|
||||
if (e === null) {
|
||||
return INERT_REF;
|
||||
} else {
|
||||
type AttenuatedRef = Ref & { __attenuations?: Dictionary<any, Ref> };
|
||||
const ar = r as AttenuatedRef;
|
||||
if (ar.__attenuations === void 0) {
|
||||
ar.__attenuations = new Dictionary();
|
||||
pins.push(e);
|
||||
const r = e.ref;
|
||||
if (n.attenuation.length === 0) {
|
||||
return r;
|
||||
} else {
|
||||
type AttenuatedRef = Ref & { __attenuations?: Dictionary<any, Ref> };
|
||||
const ar = r as AttenuatedRef;
|
||||
if (ar.__attenuations === void 0) {
|
||||
ar.__attenuations = new Dictionary();
|
||||
}
|
||||
return ar.__attenuations.getOrSet(fromAttenuation(n.attenuation), () =>
|
||||
attenuate(r, ... n.attenuation));
|
||||
}
|
||||
return ar.__attenuations.getOrSet(fromAttenuation(n.attenuation), () =>
|
||||
attenuate(r, ... n.attenuation));
|
||||
}
|
||||
}
|
||||
case 'mine': {
|
||||
const e = this.imported.grab("byOid", n.oid, false, () =>
|
||||
({ oid: n.oid, ref: Turn.ref(new RelayEntity(this, n.oid)), count: 0 }));
|
||||
imported.push(e);
|
||||
new WireSymbol(this.imported, n.oid, Turn.ref(new RelayEntity(this, n.oid))));
|
||||
pins.push(e);
|
||||
return e.ref;
|
||||
}
|
||||
}
|
||||
|
@ -259,10 +292,6 @@ export class Relay {
|
|||
this.pendingTurn.push(IO.TurnEvent({ oid: remoteOid, event: m }));
|
||||
}
|
||||
|
||||
lookupLocal(localOid: IO.Oid): Ref {
|
||||
return this.exported.byOid.get(localOid)?.ref ?? INERT_REF;
|
||||
}
|
||||
|
||||
accept(bs: BytesLike): void {
|
||||
Turn.for(this.facet, () => {
|
||||
this.decoder.write(bs);
|
||||
|
@ -272,21 +301,26 @@ export class Relay {
|
|||
const wireTurn = IO.toTurn(rawTurn);
|
||||
if (wireTurn === void 0) throw new Error("Bad IO.Turn");
|
||||
if (this.debug) console.log('IN', stringify(rawTurn));
|
||||
wireTurn.forEach(v => {
|
||||
const { oid: localOid, event: m } = v;
|
||||
this.handle(this.lookupLocal(localOid), m);
|
||||
});
|
||||
wireTurn.forEach(v => this.handle(v.oid, v.event));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
handle(r: Ref, m: IO.Event<WireRef>) {
|
||||
rewriteIn(a: Value<WireRef>): [Assertion, Array<WireSymbol>]
|
||||
{
|
||||
const pins: Array<WireSymbol> = [];
|
||||
const rewritten = mapEmbeddeds(a, r => embed(this.rewriteRefIn(r, pins)));
|
||||
return [rewritten, pins];
|
||||
}
|
||||
|
||||
handle(localOid: IO.Oid, m: IO.Event<WireRef>) {
|
||||
switch (m._variant) {
|
||||
case 'Assert': {
|
||||
const [a, imported] = this.rewriteIn(m.value.assertion);
|
||||
const [a, pins] = this.rewriteIn(m.value.assertion);
|
||||
const r = this.grabExportedOid(localOid, pins);
|
||||
this.inboundAssertions.set(m.value.handle, {
|
||||
localHandle: Turn.active.assert(r, a),
|
||||
imported,
|
||||
pins,
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
@ -295,22 +329,24 @@ export class Relay {
|
|||
const h = this.inboundAssertions.get(remoteHandle);
|
||||
if (h === void 0) throw new Error(`Peer retracted invalid handle ${remoteHandle}`);
|
||||
this.inboundAssertions.delete(remoteHandle);
|
||||
h.imported.forEach(e => this.imported.drop(e));
|
||||
h.pins.forEach(e => e.drop());
|
||||
Turn.active.retract(h.localHandle);
|
||||
break;
|
||||
}
|
||||
case 'Message': {
|
||||
const [a, imported] = this.rewriteIn(m.value.body);
|
||||
if (imported.length > 0) throw new Error("Cannot receive transient reference");
|
||||
Turn.active.message(r, a);
|
||||
const [a, pins] = this.rewriteIn(m.value.body);
|
||||
if (pins.length > 0) throw new Error("Cannot receive transient reference");
|
||||
const r = this.exported.byOid.get(localOid)?.ref;
|
||||
if (r) Turn.active.message(r, a);
|
||||
break;
|
||||
}
|
||||
case 'Sync': {
|
||||
const imported: Array<WireSymbol> = [];
|
||||
const k = this.rewriteRefIn(m.value.peer, imported);
|
||||
const pins: Array<WireSymbol> = [];
|
||||
const r = this.grabExportedOid(localOid, pins);
|
||||
const k = this.rewriteRefIn(m.value.peer, pins);
|
||||
Turn.active.sync(r).then(() => {
|
||||
Turn.active.message(k, true);
|
||||
imported.forEach(e => this.imported.drop(e));
|
||||
pins.forEach(e => e.drop());
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue