Refactor Relay while puzzling over sessions
This commit is contained in:
parent
300e15b674
commit
1958e4ec49
|
@ -0,0 +1,308 @@
|
||||||
|
/// SPDX-License-Identifier: GPL-3.0-or-later
|
||||||
|
/// SPDX-FileCopyrightText: Copyright © 2016-2024 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
||||||
|
|
||||||
|
import { Actor, Ref, Handle, Assertion, Entity, Turn } from '../runtime/actor.js';
|
||||||
|
import { Value, Embedded, mapEmbeddeds, IdentityMap, Dictionary, stringify } from '@preserves/core';
|
||||||
|
import * as IO from '../gen/protocol.js';
|
||||||
|
import { fromCaveat, WireRef } from '../gen/sturdy.js';
|
||||||
|
import { attenuate } from '../runtime/rewrite.js';
|
||||||
|
|
||||||
|
export class WireSymbol {
|
||||||
|
count = 0;
|
||||||
|
|
||||||
|
constructor (
|
||||||
|
public side: Membrane,
|
||||||
|
public oid: IO.Oid,
|
||||||
|
public ref: Ref,
|
||||||
|
) {}
|
||||||
|
|
||||||
|
drop(): void {
|
||||||
|
this.count--;
|
||||||
|
if (this.count === 0) {
|
||||||
|
this.side.byOid.delete(this.oid);
|
||||||
|
this.side.byRef.delete(this.ref);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
export type WhichTable = "byOid" | "byRef";
|
||||||
|
|
||||||
|
export class Membrane {
|
||||||
|
readonly byOid = new IdentityMap<IO.Oid, WireSymbol>();
|
||||||
|
readonly byRef = new IdentityMap<Ref, WireSymbol>();
|
||||||
|
|
||||||
|
grab<Table extends WhichTable>(table: Table,
|
||||||
|
key: Parameters<Membrane[Table]['get']>[0],
|
||||||
|
transient: boolean,
|
||||||
|
f: () => WireSymbol): WireSymbol;
|
||||||
|
grab<Table extends WhichTable>(table: Table,
|
||||||
|
key: Parameters<Membrane[Table]['get']>[0],
|
||||||
|
transient: boolean,
|
||||||
|
f: null): WireSymbol | null;
|
||||||
|
grab<Table extends WhichTable>(table: Table,
|
||||||
|
key: Parameters<Membrane[Table]['get']>[0],
|
||||||
|
transient: boolean,
|
||||||
|
f: (() => WireSymbol) | null): WireSymbol | null {
|
||||||
|
let e = this[table].get(key as any);
|
||||||
|
if (e === void 0) {
|
||||||
|
if (f === null) return null;
|
||||||
|
e = f();
|
||||||
|
this.byRef.set(e.ref, e);
|
||||||
|
this.byOid.set(e.oid, e);
|
||||||
|
}
|
||||||
|
if (!transient) e.count++;
|
||||||
|
return e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const INERT_REF: Ref = {
|
||||||
|
relay: Actor.boot(() => Turn.active.stop(Turn.activeFacet)).root,
|
||||||
|
target: {},
|
||||||
|
};
|
||||||
|
|
||||||
|
export interface ProxyInbound {
|
||||||
|
proxyPacket(packet: IO.Packet<Embedded<WireRef>>): void;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ProxyOutbound {
|
||||||
|
send(remoteOid: IO.Oid, event: IO.Event<Embedded<WireRef>>): void;
|
||||||
|
proxyAssertion(targetRemoteOid: IO.Oid, assertion: Assertion, handle: Handle): Value<Embedded<WireRef>>;
|
||||||
|
proxyRetract(handle: Handle): void;
|
||||||
|
proxyMessage(assertion: Assertion): Value<Embedded<WireRef>>;
|
||||||
|
proxySync(targetRemoteOid: IO.Oid, peer: Ref): Embedded<WireRef>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export abstract class LayerBoundary implements ProxyOutbound, ProxyInbound {
|
||||||
|
readonly inboundAssertions = new IdentityMap<Handle, {
|
||||||
|
localHandle: Handle,
|
||||||
|
pins: Array<WireSymbol>,
|
||||||
|
}>();
|
||||||
|
readonly outboundAssertions = new IdentityMap<Handle, Array<WireSymbol>>();
|
||||||
|
readonly exported = new Membrane();
|
||||||
|
readonly imported = new Membrane();
|
||||||
|
|
||||||
|
constructor(public trustPeer = true, public nextLocalOid: IO.Oid = 0) {}
|
||||||
|
|
||||||
|
abstract send(remoteOid: IO.Oid, event: IO.Event<Embedded<WireRef>>): void;
|
||||||
|
|
||||||
|
proxyAssertion(targetRemoteOid: IO.Oid, assertion: Assertion, handle: Handle): Value<Embedded<WireRef>> {
|
||||||
|
const pins: Array<WireSymbol> = [];
|
||||||
|
const rewritten = mapEmbeddeds(assertion, r => this.rewriteRefOut(r, false, pins));
|
||||||
|
this.grabImportedOid(targetRemoteOid, pins);
|
||||||
|
this.outboundAssertions.set(handle, pins);
|
||||||
|
return rewritten;
|
||||||
|
}
|
||||||
|
|
||||||
|
proxyRetract(handle: Handle): void {
|
||||||
|
(this.outboundAssertions.get(handle) ?? []).forEach(e => e.drop());
|
||||||
|
this.outboundAssertions.delete(handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
proxyMessage(assertion: Assertion): Value<Embedded<WireRef>> {
|
||||||
|
const pins: Array<WireSymbol> = [];
|
||||||
|
return mapEmbeddeds(assertion, r => this.rewriteRefOut(r, true, pins));
|
||||||
|
}
|
||||||
|
|
||||||
|
proxySync(targetRemoteOid: IO.Oid, peer: Ref): Embedded<WireRef> {
|
||||||
|
const peerEntity = new SyncPeerEntity(peer);
|
||||||
|
this.grabImportedOid(targetRemoteOid, peerEntity.pins);
|
||||||
|
return this.rewriteRefOut(Turn.ref(peerEntity), false, peerEntity.pins);
|
||||||
|
}
|
||||||
|
|
||||||
|
grabImportedOid(oid: IO.Oid, pins: Array<WireSymbol>): void {
|
||||||
|
const e = this.imported.grab("byOid", oid, false, null);
|
||||||
|
if (e === null) {
|
||||||
|
throw new Error("Internal error: import table missing entry for oid " + oid);
|
||||||
|
}
|
||||||
|
pins.push(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
grabExportedOid(oid: IO.Oid, pins: Array<WireSymbol>): Ref {
|
||||||
|
const e = this.exported.grab("byOid", oid, false, null);
|
||||||
|
if (e === null) return INERT_REF;
|
||||||
|
pins.push(e);
|
||||||
|
return e.ref;
|
||||||
|
}
|
||||||
|
|
||||||
|
rewriteRefOut(r: Ref, transient: boolean, pins: Array<WireSymbol>): Embedded<WireRef> {
|
||||||
|
if (r.target instanceof ProxyEntity && r.target.relay === this) {
|
||||||
|
if (r.attenuation === void 0 || r.attenuation.length === 0) {
|
||||||
|
// No extra conditions on this reference since it was sent to us.
|
||||||
|
this.grabImportedOid(r.target.oid, pins);
|
||||||
|
return new Embedded<WireRef>(WireRef.yours({ oid: r.target.oid, attenuation: [] }));
|
||||||
|
} else {
|
||||||
|
// This reference has been attenuated since it was sent to us.
|
||||||
|
// Do we trust the peer to enforce such attenuation on our behalf?
|
||||||
|
if (this.trustPeer) {
|
||||||
|
this.grabImportedOid(r.target.oid, pins);
|
||||||
|
return new Embedded<WireRef>(WireRef.yours({ oid: r.target.oid, attenuation: r.attenuation }));
|
||||||
|
} else {
|
||||||
|
// fall through: treat the attenuated ref as a local ref, and re-export it.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const e = this.exported.grab(
|
||||||
|
"byRef", r, transient, () => {
|
||||||
|
if (transient) throw new Error("Cannot send transient reference");
|
||||||
|
return new WireSymbol(this.exported, this.nextLocalOid++, r);
|
||||||
|
});
|
||||||
|
pins.push(e);
|
||||||
|
return new Embedded<WireRef>(WireRef.mine(e.oid));
|
||||||
|
}
|
||||||
|
|
||||||
|
rewriteRefIn(nw: Embedded<WireRef>, pins: Array<WireSymbol>): Ref {
|
||||||
|
const n = nw.value;
|
||||||
|
switch (n._variant) {
|
||||||
|
case 'yours': {
|
||||||
|
const e = this.exported.grab("byOid", n.oid, false, null);
|
||||||
|
if (e === null) {
|
||||||
|
return INERT_REF;
|
||||||
|
} else {
|
||||||
|
pins.push(e);
|
||||||
|
const r = e.ref;
|
||||||
|
if (n.attenuation.length === 0) {
|
||||||
|
return r;
|
||||||
|
} else {
|
||||||
|
type AttenuatedRef = Ref & { __attenuations?: Dictionary<any, Ref> };
|
||||||
|
const ar = r as AttenuatedRef;
|
||||||
|
if (ar.__attenuations === void 0) {
|
||||||
|
ar.__attenuations = new Dictionary();
|
||||||
|
}
|
||||||
|
return ar.__attenuations.getOrSet(n.attenuation.map(fromCaveat), () =>
|
||||||
|
attenuate(r, ... n.attenuation));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case 'mine': {
|
||||||
|
const e = this.imported.grab("byOid", n.oid, false, () =>
|
||||||
|
new WireSymbol(this.imported, n.oid, Turn.ref(new ProxyEntity(this, n.oid))));
|
||||||
|
pins.push(e);
|
||||||
|
return e.ref;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rewriteIn(a: Value<Embedded<WireRef>>): [Assertion, Array<WireSymbol>]
|
||||||
|
{
|
||||||
|
const pins: Array<WireSymbol> = [];
|
||||||
|
const rewritten = mapEmbeddeds(a, r => this.rewriteRefIn(r, pins));
|
||||||
|
return [rewritten, pins];
|
||||||
|
}
|
||||||
|
|
||||||
|
handle(localOid: IO.Oid, m: IO.Event<Embedded<WireRef>>): void {
|
||||||
|
switch (m._variant) {
|
||||||
|
case 'Assert': {
|
||||||
|
const [a, pins] = this.rewriteIn(m.value.assertion);
|
||||||
|
const r = this.grabExportedOid(localOid, pins);
|
||||||
|
this.inboundAssertions.set(m.value.handle, {
|
||||||
|
localHandle: Turn.active.assert(r, a),
|
||||||
|
pins,
|
||||||
|
});
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'Retract': {
|
||||||
|
const remoteHandle = m.value.handle;
|
||||||
|
const h = this.inboundAssertions.get(remoteHandle);
|
||||||
|
if (h === void 0) throw new Error(`Peer retracted invalid handle ${remoteHandle}`);
|
||||||
|
this.inboundAssertions.delete(remoteHandle);
|
||||||
|
h.pins.forEach(e => e.drop());
|
||||||
|
Turn.active.retract(h.localHandle);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'Message': {
|
||||||
|
const [a, pins] = this.rewriteIn(m.value.body);
|
||||||
|
if (pins.length > 0) throw new Error("Cannot receive transient reference");
|
||||||
|
const r = this.exported.byOid.get(localOid)?.ref;
|
||||||
|
if (r) Turn.active.message(r, a);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'Sync': {
|
||||||
|
const pins: Array<WireSymbol> = [];
|
||||||
|
const r = this.grabExportedOid(localOid, pins);
|
||||||
|
const k = this.rewriteRefIn(m.value.peer, pins);
|
||||||
|
Turn.active.sync(r).then(() => {
|
||||||
|
Turn.active.message(k, true);
|
||||||
|
pins.forEach(e => e.drop());
|
||||||
|
});
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
proxyPacket(packet: IO.Packet<Embedded<WireRef>>): void {
|
||||||
|
switch (packet._variant) {
|
||||||
|
case 'Turn':
|
||||||
|
packet.value.forEach(v => this.handle(v.oid, v.event));
|
||||||
|
break;
|
||||||
|
case 'Error':
|
||||||
|
throw new Error(`Remote peer terminated relay: ${stringify(packet.value)}`);
|
||||||
|
case 'Extension':
|
||||||
|
// Ignore unknown extensions.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export class ProxyEntity implements Entity {
|
||||||
|
readonly relay: ProxyOutbound;
|
||||||
|
readonly oid: IO.Oid;
|
||||||
|
|
||||||
|
constructor(relay: ProxyOutbound, oid: IO.Oid) {
|
||||||
|
this.relay = relay;
|
||||||
|
this.oid = oid;
|
||||||
|
}
|
||||||
|
|
||||||
|
send(m: IO.Event<Embedded<WireRef>>): void {
|
||||||
|
this.relay.send(this.oid, m);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(assertion: Assertion, handle: Handle): void {
|
||||||
|
this.send(IO.Event.Assert(IO.Assert({
|
||||||
|
assertion: this.relay.proxyAssertion(this.oid, assertion, handle),
|
||||||
|
handle
|
||||||
|
})))
|
||||||
|
}
|
||||||
|
|
||||||
|
retract(handle: Handle): void {
|
||||||
|
this.relay.proxyRetract(handle);
|
||||||
|
this.send(IO.Event.Retract(IO.Retract(handle)));
|
||||||
|
}
|
||||||
|
|
||||||
|
message(body: Assertion): void {
|
||||||
|
this.send(IO.Event.Message(IO.Message(this.relay.proxyMessage(body))));
|
||||||
|
}
|
||||||
|
|
||||||
|
sync(peer: Ref): void {
|
||||||
|
this.send(IO.Event.Sync(IO.Sync(this.relay.proxySync(this.oid, peer))));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export class SyncPeerEntity implements Entity {
|
||||||
|
readonly peer: Ref;
|
||||||
|
readonly handleMap = new IdentityMap<Handle, Handle>();
|
||||||
|
pins: Array<WireSymbol> = [];
|
||||||
|
|
||||||
|
constructor(peer: Ref) {
|
||||||
|
this.peer = peer;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(assertion: Assertion, handle: Handle): void {
|
||||||
|
this.handleMap.set(handle, Turn.active.assert(this.peer, assertion));
|
||||||
|
}
|
||||||
|
|
||||||
|
retract(handle: Handle): void {
|
||||||
|
Turn.active.retract(this.handleMap.get(handle)!);
|
||||||
|
this.handleMap.delete(handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
message(body: Assertion): void {
|
||||||
|
// We get to vanish from the indexes now
|
||||||
|
this.pins.forEach(e => e.drop());
|
||||||
|
Turn.active.message(this.peer, body);
|
||||||
|
}
|
||||||
|
|
||||||
|
sync(peer: Ref): void {
|
||||||
|
Turn.active._sync(this.peer, peer);
|
||||||
|
}
|
||||||
|
}
|
|
@ -2,7 +2,7 @@
|
||||||
/// SPDX-FileCopyrightText: Copyright © 2016-2024 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
/// SPDX-FileCopyrightText: Copyright © 2016-2024 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
||||||
|
|
||||||
import * as S from '../gen/sturdy.js';
|
import * as S from '../gen/sturdy.js';
|
||||||
import { Decoder, DecoderState, Encoder, EncoderState, GenericEmbedded, neverEmbeddedType, EmbeddedType, Value, Embedded, EmbeddedWriter } from '@preserves/core';
|
import { Decoder, DecoderState, Encoder, EncoderState, neverEmbeddedType, EmbeddedType, Value, Embedded, EmbeddedWriter } from '@preserves/core';
|
||||||
|
|
||||||
export const wireRefEmbeddedType: EmbeddedType<Embedded<S.WireRef>> & EmbeddedWriter<Embedded<S.WireRef>> = {
|
export const wireRefEmbeddedType: EmbeddedType<Embedded<S.WireRef>> & EmbeddedWriter<Embedded<S.WireRef>> = {
|
||||||
decode(s: DecoderState): Embedded<S.WireRef> {
|
decode(s: DecoderState): Embedded<S.WireRef> {
|
||||||
|
@ -13,11 +13,11 @@ export const wireRefEmbeddedType: EmbeddedType<Embedded<S.WireRef>> & EmbeddedWr
|
||||||
new Encoder<any>(s, neverEmbeddedType).push(S.fromWireRef(v.value));
|
new Encoder<any>(s, neverEmbeddedType).push(S.fromWireRef(v.value));
|
||||||
},
|
},
|
||||||
|
|
||||||
fromValue(v: Value<GenericEmbedded>): Embedded<S.WireRef> {
|
fromValue(v: Value): Embedded<S.WireRef> {
|
||||||
return new Embedded<S.WireRef>(S.asWireRef(v as Value<S._embedded>));
|
return new Embedded<S.WireRef>(S.asWireRef(v as Value<S._embedded>));
|
||||||
},
|
},
|
||||||
|
|
||||||
toValue(v: Embedded<S.WireRef>): Value<GenericEmbedded> {
|
toValue(v: Embedded<S.WireRef>): Value {
|
||||||
return S.fromWireRef(v.value) as Value<GenericEmbedded>;
|
return S.fromWireRef(v.value) as Value;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -1,136 +1,15 @@
|
||||||
/// SPDX-License-Identifier: GPL-3.0-or-later
|
/// SPDX-License-Identifier: GPL-3.0-or-later
|
||||||
/// SPDX-FileCopyrightText: Copyright © 2016-2024 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
/// SPDX-FileCopyrightText: Copyright © 2016-2024 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
|
||||||
|
|
||||||
import { Actor, Assertion, Entity, Facet, Handle, Ref, Turn } from '../runtime/actor.js';
|
import { Assertion, Facet, Ref, Turn } from '../runtime/actor.js';
|
||||||
import { BytesLike, Decoder, Dictionary, encode, IdentityMap, mapEmbeddeds, Embedded, stringify, underlying, Value } from '@preserves/core';
|
import { BytesLike, Decoder, encode, Embedded, stringify, underlying } from '@preserves/core';
|
||||||
import * as IO from '../gen/protocol.js';
|
import * as IO from '../gen/protocol.js';
|
||||||
import { wireRefEmbeddedType } from './protocol.js';
|
import { wireRefEmbeddedType } from './protocol.js';
|
||||||
import { attenuate } from '../runtime/rewrite.js';
|
import { WireRef } from '../gen/sturdy.js';
|
||||||
import { fromCaveat, WireRef } from '../gen/sturdy.js';
|
import { LayerBoundary } from './membrane.js';
|
||||||
|
|
||||||
const FLUSH = Symbol.for('flush');
|
const FLUSH = Symbol.for('flush');
|
||||||
|
|
||||||
export class WireSymbol {
|
|
||||||
count = 0;
|
|
||||||
|
|
||||||
constructor (
|
|
||||||
public side: Membrane,
|
|
||||||
public oid: IO.Oid,
|
|
||||||
public ref: Ref,
|
|
||||||
) {}
|
|
||||||
|
|
||||||
drop(): void {
|
|
||||||
this.count--;
|
|
||||||
if (this.count === 0) {
|
|
||||||
this.side.byOid.delete(this.oid);
|
|
||||||
this.side.byRef.delete(this.ref);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
export class SyncPeerEntity implements Entity {
|
|
||||||
readonly relay: Relay;
|
|
||||||
readonly peer: Ref;
|
|
||||||
readonly handleMap = new IdentityMap<Handle, Handle>();
|
|
||||||
pins: Array<WireSymbol> = [];
|
|
||||||
|
|
||||||
constructor(relay: Relay, peer: Ref) {
|
|
||||||
this.relay = relay;
|
|
||||||
this.peer = peer;
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(assertion: Assertion, handle: Handle): void {
|
|
||||||
this.handleMap.set(handle, Turn.active.assert(this.peer, assertion));
|
|
||||||
}
|
|
||||||
|
|
||||||
retract(handle: Handle): void {
|
|
||||||
Turn.active.retract(this.handleMap.get(handle)!);
|
|
||||||
this.handleMap.delete(handle);
|
|
||||||
}
|
|
||||||
|
|
||||||
message(body: Assertion): void {
|
|
||||||
// We get to vanish from the indexes now
|
|
||||||
this.pins.forEach(e => e.drop());
|
|
||||||
Turn.active.message(this.peer, body);
|
|
||||||
}
|
|
||||||
|
|
||||||
sync(peer: Ref): void {
|
|
||||||
Turn.active._sync(this.peer, peer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export class RelayEntity implements Entity {
|
|
||||||
readonly relay: Relay;
|
|
||||||
readonly oid: IO.Oid;
|
|
||||||
|
|
||||||
constructor(relay: Relay, oid: IO.Oid) {
|
|
||||||
this.relay = relay;
|
|
||||||
this.oid = oid;
|
|
||||||
}
|
|
||||||
|
|
||||||
send(m: IO.Event<Embedded<WireRef>>): void {
|
|
||||||
this.relay.send(this.oid, m);
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(assertion: Assertion, handle: Handle): void {
|
|
||||||
this.send(IO.Event.Assert(IO.Assert({
|
|
||||||
assertion: this.relay.register(this.oid, assertion, handle),
|
|
||||||
handle
|
|
||||||
})))
|
|
||||||
}
|
|
||||||
|
|
||||||
retract(handle: Handle): void {
|
|
||||||
this.relay.deregister(handle);
|
|
||||||
this.send(IO.Event.Retract(IO.Retract(handle)));
|
|
||||||
}
|
|
||||||
|
|
||||||
message(body: Assertion): void {
|
|
||||||
this.send(IO.Event.Message(IO.Message(this.relay.register(null, body, null))));
|
|
||||||
}
|
|
||||||
|
|
||||||
sync(peer: Ref): void {
|
|
||||||
const peerEntity = new SyncPeerEntity(this.relay, peer);
|
|
||||||
this.relay.grabImportedOid(this.oid, peerEntity.pins);
|
|
||||||
const ior = this.relay.rewriteRefOut(Turn.ref(peerEntity), false, peerEntity.pins);
|
|
||||||
this.send(IO.Event.Sync(IO.Sync(ior)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export type WhichTable = "byOid" | "byRef";
|
|
||||||
|
|
||||||
export class Membrane {
|
|
||||||
readonly byOid = new IdentityMap<IO.Oid, WireSymbol>();
|
|
||||||
readonly byRef = new IdentityMap<Ref, WireSymbol>();
|
|
||||||
|
|
||||||
grab<Table extends WhichTable>(table: Table,
|
|
||||||
key: Parameters<Membrane[Table]['get']>[0],
|
|
||||||
transient: boolean,
|
|
||||||
f: () => WireSymbol): WireSymbol;
|
|
||||||
grab<Table extends WhichTable>(table: Table,
|
|
||||||
key: Parameters<Membrane[Table]['get']>[0],
|
|
||||||
transient: boolean,
|
|
||||||
f: null): WireSymbol | null;
|
|
||||||
grab<Table extends WhichTable>(table: Table,
|
|
||||||
key: Parameters<Membrane[Table]['get']>[0],
|
|
||||||
transient: boolean,
|
|
||||||
f: (() => WireSymbol) | null): WireSymbol | null {
|
|
||||||
let e = this[table].get(key as any);
|
|
||||||
if (e === void 0) {
|
|
||||||
if (f === null) return null;
|
|
||||||
e = f();
|
|
||||||
this.byRef.set(e.ref, e);
|
|
||||||
this.byOid.set(e.oid, e);
|
|
||||||
}
|
|
||||||
if (!transient) e.count++;
|
|
||||||
return e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export const INERT_REF: Ref = {
|
|
||||||
relay: Actor.boot(() => Turn.active.stop(Turn.activeFacet)).root,
|
|
||||||
target: {},
|
|
||||||
};
|
|
||||||
|
|
||||||
export type PacketWriter = (bs: Uint8Array) => void;
|
export type PacketWriter = (bs: Uint8Array) => void;
|
||||||
|
|
||||||
export interface RelayOptions {
|
export interface RelayOptions {
|
||||||
|
@ -143,22 +22,13 @@ export interface RelayOptions {
|
||||||
nextLocalOid?: IO.Oid;
|
nextLocalOid?: IO.Oid;
|
||||||
}
|
}
|
||||||
|
|
||||||
export class Relay {
|
export class Relay extends LayerBoundary {
|
||||||
readonly facet: Facet;
|
readonly facet: Facet;
|
||||||
readonly selfRef: Ref;
|
readonly selfRef: Ref;
|
||||||
readonly w: PacketWriter;
|
readonly w: PacketWriter;
|
||||||
readonly inboundAssertions = new IdentityMap<Handle, {
|
|
||||||
localHandle: Handle,
|
|
||||||
pins: Array<WireSymbol>,
|
|
||||||
}>();
|
|
||||||
readonly outboundAssertions = new IdentityMap<Handle, Array<WireSymbol>>();
|
|
||||||
readonly exported = new Membrane();
|
|
||||||
readonly imported = new Membrane();
|
|
||||||
readonly peer: Ref | null;
|
readonly peer: Ref | null;
|
||||||
nextLocalOid: IO.Oid = 0;
|
|
||||||
pendingTurn: IO.Turn<Embedded<WireRef>> = [];
|
pendingTurn: IO.Turn<Embedded<WireRef>> = [];
|
||||||
debug: boolean;
|
debug: boolean;
|
||||||
trustPeer: boolean;
|
|
||||||
|
|
||||||
readonly decoder = new Decoder(void 0, {
|
readonly decoder = new Decoder(void 0, {
|
||||||
includeAnnotations: false,
|
includeAnnotations: false,
|
||||||
|
@ -166,11 +36,12 @@ export class Relay {
|
||||||
});
|
});
|
||||||
|
|
||||||
constructor(options: RelayOptions) {
|
constructor(options: RelayOptions) {
|
||||||
|
super(options.trustPeer, options.nextLocalOid);
|
||||||
|
|
||||||
this.facet = Turn.activeFacet;
|
this.facet = Turn.activeFacet;
|
||||||
this.selfRef = Turn.ref(this);
|
this.selfRef = Turn.ref(this);
|
||||||
this.w = options.packetWriter;
|
this.w = options.packetWriter;
|
||||||
this.debug = options.debug ?? false;
|
this.debug = options.debug ?? false;
|
||||||
this.trustPeer = options.trustPeer ?? true;
|
|
||||||
|
|
||||||
this.facet.preventInertCheck();
|
this.facet.preventInertCheck();
|
||||||
options.setup(this);
|
options.setup(this);
|
||||||
|
@ -182,109 +53,11 @@ export class Relay {
|
||||||
this.peer = (options.initialOid !== void 0)
|
this.peer = (options.initialOid !== void 0)
|
||||||
? this.rewriteRefIn(new Embedded<WireRef>(WireRef.mine(options.initialOid)), [])
|
? this.rewriteRefIn(new Embedded<WireRef>(WireRef.mine(options.initialOid)), [])
|
||||||
: null;
|
: null;
|
||||||
|
|
||||||
if (options.nextLocalOid !== void 0) {
|
|
||||||
this.nextLocalOid = (options.nextLocalOid === 0) ? 1 : options.nextLocalOid;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
register(targetRemoteOid: IO.Oid, assertion: Assertion, handle: Handle): Value<Embedded<WireRef>>;
|
|
||||||
register(targetRemoteOid: null, assertion: Assertion, handle: null): Value<Embedded<WireRef>>;
|
|
||||||
register(targetRemoteOid: IO.Oid | null, assertion: Assertion, handle: Handle | null): Value<Embedded<WireRef>> {
|
|
||||||
const transient = (handle === null);
|
|
||||||
const pins: Array<WireSymbol> = [];
|
|
||||||
const rewritten = mapEmbeddeds(assertion, r => this.rewriteRefOut(r, transient, pins));
|
|
||||||
if (handle !== null) {
|
|
||||||
if (targetRemoteOid !== null /* belt and suspenders */) {
|
|
||||||
this.grabImportedOid(targetRemoteOid, pins);
|
|
||||||
}
|
|
||||||
this.outboundAssertions.set(handle, pins);
|
|
||||||
}
|
|
||||||
return rewritten;
|
|
||||||
}
|
|
||||||
|
|
||||||
deregister(handle: Handle): void {
|
|
||||||
(this.outboundAssertions.get(handle) ?? []).forEach(e => e.drop());
|
|
||||||
this.outboundAssertions.delete(handle);
|
|
||||||
}
|
|
||||||
|
|
||||||
grabImportedOid(oid: IO.Oid, pins: Array<WireSymbol>) {
|
|
||||||
const e = this.imported.grab("byOid", oid, false, null);
|
|
||||||
if (e === null) {
|
|
||||||
throw new Error("Internal error: import table missing entry for oid " + oid);
|
|
||||||
}
|
|
||||||
pins.push(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
grabExportedOid(oid: IO.Oid, pins: Array<WireSymbol>): Ref {
|
|
||||||
const e = this.exported.grab("byOid", oid, false, null);
|
|
||||||
if (e === null) return INERT_REF;
|
|
||||||
pins.push(e);
|
|
||||||
return e.ref;
|
|
||||||
}
|
|
||||||
|
|
||||||
rewriteRefOut(r: Ref, transient: boolean, pins: Array<WireSymbol>): Embedded<WireRef> {
|
|
||||||
if (r.target instanceof RelayEntity && r.target.relay === this) {
|
|
||||||
if (r.attenuation === void 0 || r.attenuation.length === 0) {
|
|
||||||
// No extra conditions on this reference since it was sent to us.
|
|
||||||
this.grabImportedOid(r.target.oid, pins);
|
|
||||||
return new Embedded<WireRef>(WireRef.yours({ oid: r.target.oid, attenuation: [] }));
|
|
||||||
} else {
|
|
||||||
// This reference has been attenuated since it was sent to us.
|
|
||||||
// Do we trust the peer to enforce such attenuation on our behalf?
|
|
||||||
if (this.trustPeer) {
|
|
||||||
this.grabImportedOid(r.target.oid, pins);
|
|
||||||
return new Embedded<WireRef>(WireRef.yours({ oid: r.target.oid, attenuation: r.attenuation }));
|
|
||||||
} else {
|
|
||||||
// fall through: treat the attenuated ref as a local ref, and re-export it.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const e = this.exported.grab(
|
|
||||||
"byRef", r, transient, () => {
|
|
||||||
if (transient) throw new Error("Cannot send transient reference");
|
|
||||||
return new WireSymbol(this.exported, this.nextLocalOid++, r);
|
|
||||||
});
|
|
||||||
pins.push(e);
|
|
||||||
return new Embedded<WireRef>(WireRef.mine(e.oid));
|
|
||||||
}
|
|
||||||
|
|
||||||
rewriteRefIn(nw: Embedded<WireRef>, pins: Array<WireSymbol>): Ref {
|
|
||||||
const n = nw.value;
|
|
||||||
switch (n._variant) {
|
|
||||||
case 'yours': {
|
|
||||||
const e = this.exported.grab("byOid", n.oid, false, null);
|
|
||||||
if (e === null) {
|
|
||||||
return INERT_REF;
|
|
||||||
} else {
|
|
||||||
pins.push(e);
|
|
||||||
const r = e.ref;
|
|
||||||
if (n.attenuation.length === 0) {
|
|
||||||
return r;
|
|
||||||
} else {
|
|
||||||
type AttenuatedRef = Ref & { __attenuations?: Dictionary<any, Ref> };
|
|
||||||
const ar = r as AttenuatedRef;
|
|
||||||
if (ar.__attenuations === void 0) {
|
|
||||||
ar.__attenuations = new Dictionary();
|
|
||||||
}
|
|
||||||
return ar.__attenuations.getOrSet(n.attenuation.map(fromCaveat), () =>
|
|
||||||
attenuate(r, ... n.attenuation));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case 'mine': {
|
|
||||||
const e = this.imported.grab("byOid", n.oid, false, () =>
|
|
||||||
new WireSymbol(this.imported, n.oid, Turn.ref(new RelayEntity(this, n.oid))));
|
|
||||||
pins.push(e);
|
|
||||||
return e.ref;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message(body: Assertion) {
|
message(body: Assertion) {
|
||||||
if (body === FLUSH) {
|
if (body === FLUSH) {
|
||||||
if (this.debug) console.log('OUT', stringify(IO.fromTurn(this.pendingTurn)), this.pendingTurn);
|
if (this.debug) console.log('OUT', stringify(IO.fromTurn(this.pendingTurn)));
|
||||||
this.w(underlying(encode(IO.fromTurn(this.pendingTurn), {
|
this.w(underlying(encode(IO.fromTurn(this.pendingTurn), {
|
||||||
canonical: true,
|
canonical: true,
|
||||||
embeddedEncode: wireRefEmbeddedType,
|
embeddedEncode: wireRefEmbeddedType,
|
||||||
|
@ -301,63 +74,16 @@ export class Relay {
|
||||||
}
|
}
|
||||||
|
|
||||||
accept(bs: BytesLike): void {
|
accept(bs: BytesLike): void {
|
||||||
Turn.for(this.facet, () => {
|
this.facet.turn(() => {
|
||||||
this.decoder.write(bs);
|
this.decoder.write(bs);
|
||||||
while (true) {
|
while (true) {
|
||||||
const rawTurn = this.decoder.try_next();
|
const rawPacket = this.decoder.try_next();
|
||||||
if (rawTurn === void 0) break;
|
if (rawPacket === void 0) break;
|
||||||
const wireTurn = IO.toTurn(rawTurn);
|
const wirePacket = IO.toPacket(rawPacket);
|
||||||
if (wireTurn === void 0) throw new Error("Bad IO.Turn");
|
if (wirePacket === void 0) throw new Error("Bad IO.Packet");
|
||||||
if (this.debug) console.log('IN', stringify(rawTurn));
|
if (this.debug) console.log('IN', stringify(rawPacket));
|
||||||
wireTurn.forEach(v => this.handle(v.oid, v.event));
|
this.proxyPacket(wirePacket);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
rewriteIn(a: Value<Embedded<WireRef>>): [Assertion, Array<WireSymbol>]
|
|
||||||
{
|
|
||||||
const pins: Array<WireSymbol> = [];
|
|
||||||
const rewritten = mapEmbeddeds(a, r => this.rewriteRefIn(r, pins));
|
|
||||||
return [rewritten, pins];
|
|
||||||
}
|
|
||||||
|
|
||||||
handle(localOid: IO.Oid, m: IO.Event<Embedded<WireRef>>) {
|
|
||||||
switch (m._variant) {
|
|
||||||
case 'Assert': {
|
|
||||||
const [a, pins] = this.rewriteIn(m.value.assertion);
|
|
||||||
const r = this.grabExportedOid(localOid, pins);
|
|
||||||
this.inboundAssertions.set(m.value.handle, {
|
|
||||||
localHandle: Turn.active.assert(r, a),
|
|
||||||
pins,
|
|
||||||
});
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case 'Retract': {
|
|
||||||
const remoteHandle = m.value.handle;
|
|
||||||
const h = this.inboundAssertions.get(remoteHandle);
|
|
||||||
if (h === void 0) throw new Error(`Peer retracted invalid handle ${remoteHandle}`);
|
|
||||||
this.inboundAssertions.delete(remoteHandle);
|
|
||||||
h.pins.forEach(e => e.drop());
|
|
||||||
Turn.active.retract(h.localHandle);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case 'Message': {
|
|
||||||
const [a, pins] = this.rewriteIn(m.value.body);
|
|
||||||
if (pins.length > 0) throw new Error("Cannot receive transient reference");
|
|
||||||
const r = this.exported.byOid.get(localOid)?.ref;
|
|
||||||
if (r) Turn.active.message(r, a);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case 'Sync': {
|
|
||||||
const pins: Array<WireSymbol> = [];
|
|
||||||
const r = this.grabExportedOid(localOid, pins);
|
|
||||||
const k = this.rewriteRefIn(m.value.peer, pins);
|
|
||||||
Turn.active.sync(r).then(() => {
|
|
||||||
Turn.active.message(k, true);
|
|
||||||
pins.forEach(e => e.drop());
|
|
||||||
});
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue