Cross-Relay attenuation

This commit is contained in:
Tony Garnock-Jones 2021-03-03 15:45:35 +01:00
parent 1cf0d9f89f
commit 94831bd140
4 changed files with 202 additions and 132 deletions

View File

@ -30,68 +30,68 @@ export type RewriteStage = Array<Rewrite>;
export type Rewrite = { pattern: Pattern, template: Template };
export const _CRec = Symbol.for('rec');
export const CRec = Record.makeConstructor<{label: Assertion, arity: number}, Ref>()(
export const CRec = Record.makeConstructor<{label: Value<never>, arity: number}, never>()(
_CRec, ['label', 'arity']);
export const _CArr = Symbol.for('arr');
export const CArr = Record.makeConstructor<{arity: number}, Ref>()(
export const CArr = Record.makeConstructor<{arity: number}, never>()(
_CArr, ['arity']);
export const _CDict = Symbol.for('dict');
export const CDict = Record.makeConstructor<{}, Ref>()(
export const CDict = Record.makeConstructor<{}, never>()(
_CDict, []);
export type ConstructorSpec =
| Record<typeof _CRec, [Assertion, number], Ref>
| Record<typeof _CArr, [number], Ref>
| Record<typeof _CDict, [], Ref>;
| Record<typeof _CRec, [Value<never>, number], never>
| Record<typeof _CArr, [number], never>
| Record<typeof _CDict, [], never>;
export const _PDiscard = Symbol.for('_');
export const PDiscard = Record.makeConstructor<{}, Ref>()(
export const PDiscard = Record.makeConstructor<{}, never>()(
_PDiscard, []);
export const _PBind = Symbol.for('bind');
export const PBind = Record.makeConstructor<{name: string, pattern: Pattern}, Ref>()(
export const PBind = Record.makeConstructor<{name: string, pattern: Pattern}, never>()(
_PBind, ['name', 'pattern']);
export const _PAnd = Symbol.for('and');
export const PAnd = Record.makeConstructor<{patterns: Array<Pattern>}, Ref>()(
export const PAnd = Record.makeConstructor<{patterns: Array<Pattern>}, never>()(
_PAnd, ['patterns']);
export const _PNot = Symbol.for('not');
export const PNot = Record.makeConstructor<{pattern: Pattern}, Ref>()(
export const PNot = Record.makeConstructor<{pattern: Pattern}, never>()(
_PNot, ['pattern']);
export const _Lit = Symbol.for('lit');
export const Lit = Record.makeConstructor<{value: Assertion}, Ref>()(
export const Lit = Record.makeConstructor<{value: Value<never>}, never>()(
_Lit, ['value']);
export const _PCompound = Symbol.for('compound');
export const PCompound =
Record.makeConstructor<{ctor: ConstructorSpec, members: Dictionary<Pattern, Ref>}, Ref>()(
Record.makeConstructor<{ctor: ConstructorSpec, members: Dictionary<Pattern, never>}, never>()(
_PCompound, ['ctor', 'members']);
export type Pattern =
| Record<typeof _PDiscard, [], Ref>
| Record<typeof _PBind, [string, Pattern], Ref>
| Record<typeof _PAnd, [Pattern[]], Ref>
| Record<typeof _PNot, [Pattern], Ref>
| Record<typeof _Lit, [Assertion], Ref>
| Record<typeof _PCompound, [ConstructorSpec, Dictionary<Pattern, Ref>], Ref>;
| Record<typeof _PDiscard, [], never>
| Record<typeof _PBind, [string, Pattern], never>
| Record<typeof _PAnd, [Pattern[]], never>
| Record<typeof _PNot, [Pattern], never>
| Record<typeof _Lit, [Value<never>], never>
| Record<typeof _PCompound, [ConstructorSpec, Dictionary<Pattern, never>], never>;
export const _TRef = Symbol.for('ref');
export const TRef = Record.makeConstructor<{name: string}, Ref>()(
export const TRef = Record.makeConstructor<{name: string}, never>()(
_TRef, ['name']);
export const _TCompound = Symbol.for('compound');
export const TCompound =
Record.makeConstructor<{ctor: ConstructorSpec, members: Dictionary<Template, Ref>}, Ref>()(
Record.makeConstructor<{ctor: ConstructorSpec, members: Dictionary<Template, never>}, never>()(
_TCompound, ['ctor', 'members']);
export type Template =
| Record<typeof _TRef, [string], Ref>
| Record<typeof _Lit, [Assertion], Ref>
| Record<typeof _TCompound, [ConstructorSpec, Dictionary<Template, Ref>], Ref>;
| Record<typeof _TRef, [string], never>
| Record<typeof _Lit, [Value<never>], never>
| Record<typeof _TCompound, [ConstructorSpec, Dictionary<Template, never>], never>;
export type Bindings = { [name: string]: Assertion };
@ -312,7 +312,7 @@ export function match(p: Pattern, v: Assertion): Bindings | null {
case _CDict:
if (!Dictionary.isDictionary<Assertion, Ref>(v)) return false;
for (const [key, pp] of members) {
const vv = v.get(key);
const vv = v.get(key as Assertion);
if (vv === void 0) return false;
if (!walk(pp, vv)) return false;
}
@ -334,13 +334,16 @@ export function instantiate(t: Template, b: Bindings): Assertion {
return v;
}
case _Lit:
return Lit._.value(t);
return Lit._.value(t) as Assertion;
case _TCompound: {
const ctor = TCompound._.ctor(t);
const members = TCompound._.members(t);
switch (ctor.label) {
case _CRec: {
const v = Record<Assertion, any, Ref>(CRec._.label(ctor), []);
const v = Record<Assertion, any, Ref>(
CRec._.label(ctor) as Assertion,
[],
);
v.length = CRec._.arity(ctor);
for (const [key, tt] of members) {
v[key as number] = walk(tt);
@ -358,7 +361,7 @@ export function instantiate(t: Template, b: Bindings): Assertion {
case _CDict: {
const v = new Dictionary<Assertion, Ref>();
for (const [key, tt] of members) {
v.set(key, walk(tt));
v.set(key as Assertion, walk(tt));
}
return v;
}

View File

@ -1,4 +1,4 @@
import { Actor, Assertion, attenuate, CRec, forwarder, Lit, Pattern, PCompound, Ref, rfilter, Turn } from './actor.js';
import { Actor, Assertion, attenuate, CRec, Lit, Pattern, PCompound, rfilter, Turn } from './actor.js';
import { Dictionary, Record } from 'preserves';
import { Dataspace, Observe } from './dataspace.js';
import { Worker } from 'worker_threads';
@ -60,7 +60,7 @@ Turn.for(new Actor(), async (t: Turn) => {
new Dictionary()),
PCompound(CRec(Observe.constructorInfo.label,
Observe.constructorInfo.arity),
new Dictionary<Pattern, Ref>([
new Dictionary<Pattern, never>([
[0, Lit(SetBox.constructorInfo.label)]]))));
const ds_for_client = attenuate(
@ -70,15 +70,15 @@ Turn.for(new Actor(), async (t: Turn) => {
new Dictionary()),
PCompound(CRec(Observe.constructorInfo.label,
Observe.constructorInfo.arity),
new Dictionary<Pattern, Ref>([
new Dictionary<Pattern, never>([
[0, Lit(BoxState.constructorInfo.label)]]))));
const boxpath = path.join(__dirname, 'box.js');
const clientpath = path.join(__dirname, 'client.js');
spawnModule(t, boxpath, [ds_for_box, 500000, 25000]);
// spawnWorker(t, boxpath, [ds_for_box, 50000, 2500]);
// spawnModule(t, boxpath, [ds_for_box, 500000, 25000]);
spawnWorker(t, boxpath, [ds_for_box, 50000, 2500]);
spawnModule(t, clientpath, ds_for_client);
// spawnWorker(t, clientpath, ds_for_client);

View File

@ -1,31 +1,76 @@
import { Handle, Ref } from 'actor';
import { Attenuation, Handle, Pattern, Ref, Template } from './actor.js';
import { Record, Value } from 'preserves';
export const _Assert = Symbol.for('assert');
export const Assert = Record.makeConstructor<{assertion: Value<WireRef>, handle: Handle}, WireRef>()(
_Assert, ['assertion', 'handle']);
export const _Retract = Symbol.for('retract');
export const Retract = Record.makeConstructor<{handle: Handle}, WireRef>()(
_Retract, ['handle']);
export const _Message = Symbol.for('message');
export const Message = Record.makeConstructor<{body: Value<WireRef>}, WireRef>()(
_Message, ['body']);
export const _Sync = Symbol.for('sync');
export const Sync = Record.makeConstructor<{peer: WireRef}, WireRef>()(
_Sync, ['peer']);
export type EntityMessage =
| Record<typeof _Assert, [Value<WireRef>, Handle], WireRef>
| Record<typeof _Retract, [Handle], WireRef>
| Record<typeof _Message, [Value<WireRef>], WireRef>
| Record<typeof _Sync, [WireRef], WireRef>;
function mk<T extends object>() {
return {
Assert: Record.makeConstructor<{assertion: Value<T>, handle: Handle}, T>()(
_Assert, ['assertion', 'handle']),
Retract: Record.makeConstructor<{handle: Handle}, T>()(
_Retract, ['handle']),
Message: Record.makeConstructor<{body: Value<T>}, T>()(
_Message, ['body']),
Sync: Record.makeConstructor<{peer: T}, T>()(
_Sync, ['peer']),
};
}
export type TurnMessage = Array<[Oid, EntityMessage]>;
export type EntityMessage<T extends object> =
| Record<typeof _Assert, [Value<T>, Handle], T>
| Record<typeof _Retract, [Handle], T>
| Record<typeof _Message, [Value<T>], T>
| Record<typeof _Sync, [T], T>;
export type TurnMessage<T extends object> = Array<[Oid, EntityMessage<T>]>;
export type Oid = number;
export type RefLocation = "mine" | "your";
export type WireRef = { loc: RefLocation, oid: Oid };
export type WireSymbol = { name: WireRef, ref: Ref, count: number };
export type WireSymbol = { oid: Oid, ref: Ref, count: number };
export type WireRef =
| { loc: "mine", oid: Oid }
| { loc: "your", oid: Oid, attenuation: EncodedAttenuation };
export const IO = mk<WireRef>();
export function myRef(oid: Oid): WireRef & { loc: "mine" } {
return { loc: 'mine', oid };
}
export function yourRef(oid: Oid, attenuation: EncodedAttenuation): WireRef & { loc: "your" } {
return { loc: 'your', oid, attenuation };
}
export type EncodedAttenuation = Array<Array<[Value<WireRef>, Value<WireRef>]>>;
export function encodeAttenuation(a: Attenuation | undefined): EncodedAttenuation {
if (a === void 0) return [];
return a.map(s => s.map(({pattern, template}) => [
pattern as Value<WireRef>,
template as Value<WireRef>,
]));
}
export function decodeAttenuation(v: Array<Value<WireRef>>): Attenuation {
function complain(): never {
throw new Error(
`Received invalid attenuation ${v.asPreservesText()} from peer`);
}
if (v.length === 0) return [];
return v.map(s => {
if (!Array.isArray(s)) complain();
return s.map(e => {
if (!(Array.isArray(e) && e.length === 2)) complain();
// TODO: check structure of pattern and template
return {
pattern: e[0] as Pattern,
template: e[1] as Template,
};
});
});
}

View File

@ -1,19 +1,21 @@
import { Actor, Assertion, Entity, Handle, Ref, Turn } from './actor.js';
import { Bytes, decode, encode, IdentityMap, mapPointers, underlying, Value } from 'preserves';
import { Actor, Assertion, attenuate, Entity, Handle, Ref, Turn } from './actor.js';
import { Bytes, canonicalString, decode, encode, FlexMap, IdentityMap, mapPointers, underlying, Value } from 'preserves';
import {
Oid,
Assert,
EncodedAttenuation,
EntityMessage,
Message,
Retract,
Sync,
IO,
Oid,
TurnMessage,
WireRef,
WireSymbol,
_Assert,
_Retract,
_Message,
_Retract,
_Sync,
WireRef,
TurnMessage,
decodeAttenuation,
encodeAttenuation,
myRef,
yourRef,
} from './protocol.js';
import { queueTask } from './task.js';
@ -50,34 +52,36 @@ export class SyncPeerEntity implements Entity {
export class RelayEntity implements Entity {
readonly relay: Relay;
readonly e: WireSymbol;
readonly oid: Oid;
constructor(relay: Relay, e: WireSymbol) {
constructor(relay: Relay, oid: Oid) {
this.relay = relay;
this.e = e;
this.oid = oid;
}
send(m: EntityMessage): void {
this.relay.send(this.e.name.oid, m);
send(m: EntityMessage<WireRef>): void {
this.relay.send(this.oid, m);
}
assert(_turn: Turn, assertion: Assertion, handle: Handle): void {
this.send(Assert(this.relay.register(assertion, handle), handle));
this.send(IO.Assert(this.relay.register(assertion, handle), handle));
}
retract(_turn: Turn, handle: Handle): void {
this.relay.deregister(handle);
this.send(Retract(handle));
this.send(IO.Retract(handle));
}
message(_turn: Turn, body: Assertion): void {
this.send(Message(this.relay.register(body, null)));
this.send(IO.Message(this.relay.register(body, null)));
}
sync(turn: Turn, peer: Ref): void {
const peerEntity = new SyncPeerEntity(this.relay, peer);
peerEntity.e = this.relay.rewriteRefOut(turn.ref(peerEntity), false, null);
this.send(Sync(peerEntity.e.name));
const exported: Array<WireSymbol> = [];
const ior = this.relay.rewriteRefOut(turn.ref(peerEntity), false, exported);
peerEntity.e = exported[0];
this.send(IO.Sync(ior));
}
}
@ -86,17 +90,15 @@ export class Membrane {
readonly byRef = new IdentityMap<Ref, WireSymbol>();
grab<Table extends "byOid" | "byRef">(table: Table,
key: (Table extends "byOid" ? Oid : Ref),
key: Parameters<Membrane[Table]['get']>[0],
transient: boolean,
f: () => WireSymbol): WireSymbol
{
let e =
(this[table] as IdentityMap<Table extends "byOid" ? Oid : Ref, WireSymbol>)
.get(key);
let e = this[table].get(key as any);
if (e === void 0) {
e = f();
this.byRef.set(e.ref, e);
this.byOid.set(e.name.oid, e);
this.byOid.set(e.oid, e);
}
if (!transient) e.count++;
return e;
@ -105,7 +107,7 @@ export class Membrane {
drop(e: WireSymbol): void {
e.count--;
if (e.count === 0) {
this.byOid.delete(e.name.oid);
this.byOid.delete(e.oid);
this.byRef.delete(e.ref);
}
}
@ -126,9 +128,9 @@ export interface RelayOptions {
packetWriter: PacketWriter;
setup(t: Turn, r: Relay): void;
debug?: boolean;
trustPeer?: boolean;
}
export class Relay {
readonly actor: Actor;
readonly w: PacketWriter;
@ -140,21 +142,22 @@ export class Relay {
readonly exported = new Membrane();
readonly imported = new Membrane();
nextLocalOid: Oid = 0;
pendingTurn: TurnMessage = [];
pendingTurn: TurnMessage<WireRef> = [];
debug: boolean;
trustPeer: boolean;
constructor(t: Turn, options: RelayOptions) {
this.actor = t.actor;
this.w = options.packetWriter;
this.debug = options.debug ?? false;
this.trustPeer = options.trustPeer ?? true;
options.setup(t, this);
}
rewriteOut(assertion: Assertion, transient: boolean): [Value<WireRef>, Array<WireSymbol>]
{
const exported: Array<WireSymbol> = [];
const rewritten =
mapPointers(assertion, r => this.rewriteRefOut(r, transient, exported).name);
const rewritten = mapPointers(assertion, r => this.rewriteRefOut(r, transient, exported));
return [rewritten, exported];
}
@ -175,44 +178,61 @@ export class Relay {
(this.outboundAssertions.get(handle) ?? []).forEach(e => this.releaseRefOut(e));
}
rewriteRefOut(r: Ref, transient: boolean, exported: Array<WireSymbol> | null): WireSymbol {
rewriteRefOut(r: Ref, transient: boolean, exported: Array<WireSymbol>): WireRef {
if (r.target instanceof RelayEntity && r.target.relay === this) {
return r.target.e;
} else {
const e = this.exported.grab("byRef", r, transient, () => {
if (transient) throw new Error("Cannot send transient reference");
return { name: { loc: "mine", oid: this.nextLocalOid++ }, ref: r, count: 0 };
});
exported?.push(e);
return e;
if (r.attenuation === void 0 || r.attenuation.length === 0) {
// No extra conditions on this reference since it was sent to us.
return yourRef(r.target.oid, []);
} else {
// This reference has been attenuated since it was sent to us.
// Do we trust the peer to enforce such attenuation on our behalf?
if (this.trustPeer) {
return yourRef(r.target.oid, encodeAttenuation(r.attenuation));
} else {
// fall through: treat the attenuated ref as a local ref, and re-export it.
}
}
}
const e = this.exported.grab(
"byRef", r, transient, () => {
if (transient) throw new Error("Cannot send transient reference");
return { oid: this.nextLocalOid++, ref: r, count: 0 };
});
exported.push(e);
return myRef(e.oid);
}
releaseRefOut(e: WireSymbol) {
this.exported.drop(e);
}
rewriteRefIn(t: Turn, n: WireRef, imported: Array<WireSymbol> | null): Ref {
rewriteRefIn(t: Turn, n: WireRef, imported: Array<WireSymbol>): Ref {
switch (n.loc) {
case 'your':
return this.lookupLocal(n.oid);
case 'your': {
const r = this.lookupLocal(n.oid);
if (n.attenuation.length === 0 || r === INERT_REF) {
return r;
} else {
type AttenuatedRef = Ref & { __attenuations?: FlexMap<EncodedAttenuation, Ref> };
const ar = r as AttenuatedRef;
if (ar.__attenuations === void 0) {
ar.__attenuations = new FlexMap(canonicalString);
}
return ar.__attenuations.getOrSet(n.attenuation, () =>
attenuate(r, ... decodeAttenuation(n.attenuation)));
}
}
case 'mine': {
const e = this.imported.grab("byOid", n.oid, false, () => {
const e: WireSymbol = {
name: { loc: 'your', oid: n.oid },
ref: null as any,
count: 0,
};
e.ref = t.ref(new RelayEntity(this, e as WireSymbol));
return e;
});
imported?.push(e);
const e = this.imported.grab("byOid", n.oid, false, () =>
({ oid: n.oid, ref: t.ref(new RelayEntity(this, n.oid)), count: 0 }));
imported.push(e);
return e.ref;
}
}
}
send(remoteOid: Oid, m: EntityMessage): void {
send(remoteOid: Oid, m: EntityMessage<WireRef>): void {
if (this.pendingTurn.length === 0) {
queueTask(() => {
if (this.debug) console.log('OUT', this.pendingTurn.asPreservesText());
@ -221,7 +241,7 @@ export class Relay {
encodePointer: n => {
switch (n.loc) {
case 'mine': return [0, n.oid];
case 'your': return [1, n.oid];
case 'your': return [1, n.oid, ... n.attenuation];
}
},
})));
@ -240,40 +260,47 @@ export class Relay {
const bs = Bytes.from(bs0);
const wireTurn = decode<WireRef>(bs, {
decodePointer: v => {
if (!Array.isArray(v) || v.length !== 2) {
function complain(): never {
throw new Error(
`Received invalid object reference ${v.asPreservesText()} from peer`);
}
const loc = v[0] === 0 ? 'mine' : 'your';
return { loc, oid: v[1] as Oid };
if (!(Array.isArray(v) && v.length >= 2 && typeof v[1] === 'number')) {
complain();
}
const oid = v[1] as Oid;
switch (v[0]) {
case 0:
if (v.length > 2) complain();
return myRef(oid);
case 1:
// TODO: check EncodedAttenuation
return yourRef(oid, v.slice(2) as EncodedAttenuation);
default:
complain();
}
},
});
}) as TurnMessage<WireRef>;
// ^ TODO: deep check that v is a TurnMessage
if (this.debug) console.log('IN', wireTurn.asPreservesText());
if (!Array.isArray(wireTurn)) invalidTopLevelMessage(wireTurn);
wireTurn.forEach(v => {
if (Array.isArray(v) && v.length === 2 && typeof v[0] === 'number') {
const [localOid, m] = v;
// TODO: deep check that m is EntityMessage
this.handle(t, this.lookupLocal(localOid), m as EntityMessage);
} else {
invalidTopLevelMessage(wireTurn);
}
const [localOid, m] = v;
this.handle(t, this.lookupLocal(localOid), m);
});
});
}
handle(t: Turn, r: Ref, m: EntityMessage) {
handle(t: Turn, r: Ref, m: EntityMessage<WireRef>) {
switch (m.label) {
case _Assert: {
const [a, imported] = this.rewriteIn(t, Assert._.assertion(m));
this.inboundAssertions.set(Assert._.handle(m), {
const [a, imported] = this.rewriteIn(t, IO.Assert._.assertion(m));
this.inboundAssertions.set(IO.Assert._.handle(m), {
localHandle: t.assert(r, a),
imported,
});
break;
}
case _Retract: {
const remoteHandle = Retract._.handle(m);
const remoteHandle = IO.Retract._.handle(m);
const h = this.inboundAssertions.get(remoteHandle);
if (h === void 0) throw new Error(`Peer retracted invalid handle ${remoteHandle}`);
this.inboundAssertions.delete(remoteHandle);
@ -282,14 +309,14 @@ export class Relay {
break;
}
case _Message: {
const [a, imported] = this.rewriteIn(t, Message._.body(m));
const [a, imported] = this.rewriteIn(t, IO.Message._.body(m));
if (imported.length > 0) throw new Error("Cannot receive transient reference");
t.message(r, a);
break;
}
case _Sync: {
const imported: Array<WireSymbol> = [];
const k = this.rewriteRefIn(t, Sync._.peer(m), imported);
const k = this.rewriteRefIn(t, IO.Sync._.peer(m), imported);
t.sync(r).then(t => {
t.message(k, true);
imported.forEach(e => this.imported.drop(e));
@ -300,11 +327,6 @@ export class Relay {
}
}
function invalidTopLevelMessage(m: Value<WireRef>): never {
throw new Error(
`Received invalid top-level protocol message from peer: ${m.asPreservesText()}`);
}
export interface RelayActorOptions extends RelayOptions {
initialOid?: Oid;
initialRef?: Ref;
@ -316,10 +338,10 @@ export function spawnRelay(t: Turn, options: RelayActorOptions): Promise<Ref | n
t.spawn(t => {
const relay = new Relay(t, options);
if (options.initialRef !== void 0) {
relay.rewriteRefOut(options.initialRef, false, null);
relay.rewriteRefOut(options.initialRef, false, []);
}
if (options.initialOid !== void 0) {
resolve(relay.rewriteRefIn(t, { loc: 'mine', oid: options.initialOid }, null));
resolve(relay.rewriteRefIn(t, myRef(options.initialOid), []));
} else {
resolve(null);
}