diff --git a/src/actor.ts b/src/actor.ts index a449c39..2cc484b 100644 --- a/src/actor.ts +++ b/src/actor.ts @@ -30,68 +30,68 @@ export type RewriteStage = Array; export type Rewrite = { pattern: Pattern, template: Template }; export const _CRec = Symbol.for('rec'); -export const CRec = Record.makeConstructor<{label: Assertion, arity: number}, Ref>()( +export const CRec = Record.makeConstructor<{label: Value, arity: number}, never>()( _CRec, ['label', 'arity']); export const _CArr = Symbol.for('arr'); -export const CArr = Record.makeConstructor<{arity: number}, Ref>()( +export const CArr = Record.makeConstructor<{arity: number}, never>()( _CArr, ['arity']); export const _CDict = Symbol.for('dict'); -export const CDict = Record.makeConstructor<{}, Ref>()( +export const CDict = Record.makeConstructor<{}, never>()( _CDict, []); export type ConstructorSpec = - | Record - | Record - | Record; + | Record, number], never> + | Record + | Record; export const _PDiscard = Symbol.for('_'); -export const PDiscard = Record.makeConstructor<{}, Ref>()( +export const PDiscard = Record.makeConstructor<{}, never>()( _PDiscard, []); export const _PBind = Symbol.for('bind'); -export const PBind = Record.makeConstructor<{name: string, pattern: Pattern}, Ref>()( +export const PBind = Record.makeConstructor<{name: string, pattern: Pattern}, never>()( _PBind, ['name', 'pattern']); export const _PAnd = Symbol.for('and'); -export const PAnd = Record.makeConstructor<{patterns: Array}, Ref>()( +export const PAnd = Record.makeConstructor<{patterns: Array}, never>()( _PAnd, ['patterns']); export const _PNot = Symbol.for('not'); -export const PNot = Record.makeConstructor<{pattern: Pattern}, Ref>()( +export const PNot = Record.makeConstructor<{pattern: Pattern}, never>()( _PNot, ['pattern']); export const _Lit = Symbol.for('lit'); -export const Lit = Record.makeConstructor<{value: Assertion}, Ref>()( +export const Lit = Record.makeConstructor<{value: Value}, never>()( _Lit, ['value']); export const _PCompound = Symbol.for('compound'); export const PCompound = - Record.makeConstructor<{ctor: ConstructorSpec, members: Dictionary}, Ref>()( + Record.makeConstructor<{ctor: ConstructorSpec, members: Dictionary}, never>()( _PCompound, ['ctor', 'members']); export type Pattern = - | Record - | Record - | Record - | Record - | Record - | Record], Ref>; + | Record + | Record + | Record + | Record + | Record], never> + | Record], never>; export const _TRef = Symbol.for('ref'); -export const TRef = Record.makeConstructor<{name: string}, Ref>()( +export const TRef = Record.makeConstructor<{name: string}, never>()( _TRef, ['name']); export const _TCompound = Symbol.for('compound'); export const TCompound = - Record.makeConstructor<{ctor: ConstructorSpec, members: Dictionary}, Ref>()( + Record.makeConstructor<{ctor: ConstructorSpec, members: Dictionary}, never>()( _TCompound, ['ctor', 'members']); export type Template = - | Record - | Record - | Record], Ref>; + | Record + | Record], never> + | Record], never>; export type Bindings = { [name: string]: Assertion }; @@ -312,7 +312,7 @@ export function match(p: Pattern, v: Assertion): Bindings | null { case _CDict: if (!Dictionary.isDictionary(v)) return false; for (const [key, pp] of members) { - const vv = v.get(key); + const vv = v.get(key as Assertion); if (vv === void 0) return false; if (!walk(pp, vv)) return false; } @@ -334,13 +334,16 @@ export function instantiate(t: Template, b: Bindings): Assertion { return v; } case _Lit: - return Lit._.value(t); + return Lit._.value(t) as Assertion; case _TCompound: { const ctor = TCompound._.ctor(t); const members = TCompound._.members(t); switch (ctor.label) { case _CRec: { - const v = Record(CRec._.label(ctor), []); + const v = Record( + CRec._.label(ctor) as Assertion, + [], + ); v.length = CRec._.arity(ctor); for (const [key, tt] of members) { v[key as number] = walk(tt); @@ -358,7 +361,7 @@ export function instantiate(t: Template, b: Bindings): Assertion { case _CDict: { const v = new Dictionary(); for (const [key, tt] of members) { - v.set(key, walk(tt)); + v.set(key as Assertion, walk(tt)); } return v; } diff --git a/src/main.ts b/src/main.ts index d47ef43..ef625e6 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,4 +1,4 @@ -import { Actor, Assertion, attenuate, CRec, forwarder, Lit, Pattern, PCompound, Ref, rfilter, Turn } from './actor.js'; +import { Actor, Assertion, attenuate, CRec, Lit, Pattern, PCompound, rfilter, Turn } from './actor.js'; import { Dictionary, Record } from 'preserves'; import { Dataspace, Observe } from './dataspace.js'; import { Worker } from 'worker_threads'; @@ -60,7 +60,7 @@ Turn.for(new Actor(), async (t: Turn) => { new Dictionary()), PCompound(CRec(Observe.constructorInfo.label, Observe.constructorInfo.arity), - new Dictionary([ + new Dictionary([ [0, Lit(SetBox.constructorInfo.label)]])))); const ds_for_client = attenuate( @@ -70,15 +70,15 @@ Turn.for(new Actor(), async (t: Turn) => { new Dictionary()), PCompound(CRec(Observe.constructorInfo.label, Observe.constructorInfo.arity), - new Dictionary([ + new Dictionary([ [0, Lit(BoxState.constructorInfo.label)]])))); const boxpath = path.join(__dirname, 'box.js'); const clientpath = path.join(__dirname, 'client.js'); - spawnModule(t, boxpath, [ds_for_box, 500000, 25000]); - // spawnWorker(t, boxpath, [ds_for_box, 50000, 2500]); + // spawnModule(t, boxpath, [ds_for_box, 500000, 25000]); + spawnWorker(t, boxpath, [ds_for_box, 50000, 2500]); spawnModule(t, clientpath, ds_for_client); // spawnWorker(t, clientpath, ds_for_client); diff --git a/src/protocol.ts b/src/protocol.ts index 87608c9..6abe1ab 100644 --- a/src/protocol.ts +++ b/src/protocol.ts @@ -1,31 +1,76 @@ -import { Handle, Ref } from 'actor'; +import { Attenuation, Handle, Pattern, Ref, Template } from './actor.js'; import { Record, Value } from 'preserves'; export const _Assert = Symbol.for('assert'); -export const Assert = Record.makeConstructor<{assertion: Value, handle: Handle}, WireRef>()( - _Assert, ['assertion', 'handle']); - export const _Retract = Symbol.for('retract'); -export const Retract = Record.makeConstructor<{handle: Handle}, WireRef>()( - _Retract, ['handle']); - export const _Message = Symbol.for('message'); -export const Message = Record.makeConstructor<{body: Value}, WireRef>()( - _Message, ['body']); - export const _Sync = Symbol.for('sync'); -export const Sync = Record.makeConstructor<{peer: WireRef}, WireRef>()( - _Sync, ['peer']); -export type EntityMessage = - | Record, Handle], WireRef> - | Record - | Record], WireRef> - | Record; +function mk() { + return { + Assert: Record.makeConstructor<{assertion: Value, handle: Handle}, T>()( + _Assert, ['assertion', 'handle']), + Retract: Record.makeConstructor<{handle: Handle}, T>()( + _Retract, ['handle']), + Message: Record.makeConstructor<{body: Value}, T>()( + _Message, ['body']), + Sync: Record.makeConstructor<{peer: T}, T>()( + _Sync, ['peer']), + }; +} -export type TurnMessage = Array<[Oid, EntityMessage]>; +export type EntityMessage = + | Record, Handle], T> + | Record + | Record], T> + | Record; + +export type TurnMessage = Array<[Oid, EntityMessage]>; export type Oid = number; -export type RefLocation = "mine" | "your"; -export type WireRef = { loc: RefLocation, oid: Oid }; -export type WireSymbol = { name: WireRef, ref: Ref, count: number }; + +export type WireSymbol = { oid: Oid, ref: Ref, count: number }; + +export type WireRef = + | { loc: "mine", oid: Oid } + | { loc: "your", oid: Oid, attenuation: EncodedAttenuation }; + +export const IO = mk(); + +export function myRef(oid: Oid): WireRef & { loc: "mine" } { + return { loc: 'mine', oid }; +} + +export function yourRef(oid: Oid, attenuation: EncodedAttenuation): WireRef & { loc: "your" } { + return { loc: 'your', oid, attenuation }; +} + +export type EncodedAttenuation = Array, Value]>>; + +export function encodeAttenuation(a: Attenuation | undefined): EncodedAttenuation { + if (a === void 0) return []; + return a.map(s => s.map(({pattern, template}) => [ + pattern as Value, + template as Value, + ])); +} + +export function decodeAttenuation(v: Array>): Attenuation { + function complain(): never { + throw new Error( + `Received invalid attenuation ${v.asPreservesText()} from peer`); + } + + if (v.length === 0) return []; + return v.map(s => { + if (!Array.isArray(s)) complain(); + return s.map(e => { + if (!(Array.isArray(e) && e.length === 2)) complain(); + // TODO: check structure of pattern and template + return { + pattern: e[0] as Pattern, + template: e[1] as Template, + }; + }); + }); +} diff --git a/src/relay.ts b/src/relay.ts index 8af4a0e..626e67b 100644 --- a/src/relay.ts +++ b/src/relay.ts @@ -1,19 +1,21 @@ -import { Actor, Assertion, Entity, Handle, Ref, Turn } from './actor.js'; -import { Bytes, decode, encode, IdentityMap, mapPointers, underlying, Value } from 'preserves'; +import { Actor, Assertion, attenuate, Entity, Handle, Ref, Turn } from './actor.js'; +import { Bytes, canonicalString, decode, encode, FlexMap, IdentityMap, mapPointers, underlying, Value } from 'preserves'; import { - Oid, - Assert, + EncodedAttenuation, EntityMessage, - Message, - Retract, - Sync, + IO, + Oid, + TurnMessage, + WireRef, WireSymbol, _Assert, - _Retract, _Message, + _Retract, _Sync, - WireRef, - TurnMessage, + decodeAttenuation, + encodeAttenuation, + myRef, + yourRef, } from './protocol.js'; import { queueTask } from './task.js'; @@ -50,34 +52,36 @@ export class SyncPeerEntity implements Entity { export class RelayEntity implements Entity { readonly relay: Relay; - readonly e: WireSymbol; + readonly oid: Oid; - constructor(relay: Relay, e: WireSymbol) { + constructor(relay: Relay, oid: Oid) { this.relay = relay; - this.e = e; + this.oid = oid; } - send(m: EntityMessage): void { - this.relay.send(this.e.name.oid, m); + send(m: EntityMessage): void { + this.relay.send(this.oid, m); } assert(_turn: Turn, assertion: Assertion, handle: Handle): void { - this.send(Assert(this.relay.register(assertion, handle), handle)); + this.send(IO.Assert(this.relay.register(assertion, handle), handle)); } retract(_turn: Turn, handle: Handle): void { this.relay.deregister(handle); - this.send(Retract(handle)); + this.send(IO.Retract(handle)); } message(_turn: Turn, body: Assertion): void { - this.send(Message(this.relay.register(body, null))); + this.send(IO.Message(this.relay.register(body, null))); } sync(turn: Turn, peer: Ref): void { const peerEntity = new SyncPeerEntity(this.relay, peer); - peerEntity.e = this.relay.rewriteRefOut(turn.ref(peerEntity), false, null); - this.send(Sync(peerEntity.e.name)); + const exported: Array = []; + const ior = this.relay.rewriteRefOut(turn.ref(peerEntity), false, exported); + peerEntity.e = exported[0]; + this.send(IO.Sync(ior)); } } @@ -86,17 +90,15 @@ export class Membrane { readonly byRef = new IdentityMap(); grab(table: Table, - key: (Table extends "byOid" ? Oid : Ref), + key: Parameters[0], transient: boolean, f: () => WireSymbol): WireSymbol { - let e = - (this[table] as IdentityMap
) - .get(key); + let e = this[table].get(key as any); if (e === void 0) { e = f(); this.byRef.set(e.ref, e); - this.byOid.set(e.name.oid, e); + this.byOid.set(e.oid, e); } if (!transient) e.count++; return e; @@ -105,7 +107,7 @@ export class Membrane { drop(e: WireSymbol): void { e.count--; if (e.count === 0) { - this.byOid.delete(e.name.oid); + this.byOid.delete(e.oid); this.byRef.delete(e.ref); } } @@ -126,9 +128,9 @@ export interface RelayOptions { packetWriter: PacketWriter; setup(t: Turn, r: Relay): void; debug?: boolean; + trustPeer?: boolean; } - export class Relay { readonly actor: Actor; readonly w: PacketWriter; @@ -140,21 +142,22 @@ export class Relay { readonly exported = new Membrane(); readonly imported = new Membrane(); nextLocalOid: Oid = 0; - pendingTurn: TurnMessage = []; + pendingTurn: TurnMessage = []; debug: boolean; + trustPeer: boolean; constructor(t: Turn, options: RelayOptions) { this.actor = t.actor; this.w = options.packetWriter; this.debug = options.debug ?? false; + this.trustPeer = options.trustPeer ?? true; options.setup(t, this); } rewriteOut(assertion: Assertion, transient: boolean): [Value, Array] { const exported: Array = []; - const rewritten = - mapPointers(assertion, r => this.rewriteRefOut(r, transient, exported).name); + const rewritten = mapPointers(assertion, r => this.rewriteRefOut(r, transient, exported)); return [rewritten, exported]; } @@ -175,44 +178,61 @@ export class Relay { (this.outboundAssertions.get(handle) ?? []).forEach(e => this.releaseRefOut(e)); } - rewriteRefOut(r: Ref, transient: boolean, exported: Array | null): WireSymbol { + rewriteRefOut(r: Ref, transient: boolean, exported: Array): WireRef { if (r.target instanceof RelayEntity && r.target.relay === this) { - return r.target.e; - } else { - const e = this.exported.grab("byRef", r, transient, () => { - if (transient) throw new Error("Cannot send transient reference"); - return { name: { loc: "mine", oid: this.nextLocalOid++ }, ref: r, count: 0 }; - }); - exported?.push(e); - return e; + if (r.attenuation === void 0 || r.attenuation.length === 0) { + // No extra conditions on this reference since it was sent to us. + return yourRef(r.target.oid, []); + } else { + // This reference has been attenuated since it was sent to us. + // Do we trust the peer to enforce such attenuation on our behalf? + if (this.trustPeer) { + return yourRef(r.target.oid, encodeAttenuation(r.attenuation)); + } else { + // fall through: treat the attenuated ref as a local ref, and re-export it. + } + } } + + const e = this.exported.grab( + "byRef", r, transient, () => { + if (transient) throw new Error("Cannot send transient reference"); + return { oid: this.nextLocalOid++, ref: r, count: 0 }; + }); + exported.push(e); + return myRef(e.oid); } releaseRefOut(e: WireSymbol) { this.exported.drop(e); } - rewriteRefIn(t: Turn, n: WireRef, imported: Array | null): Ref { + rewriteRefIn(t: Turn, n: WireRef, imported: Array): Ref { switch (n.loc) { - case 'your': - return this.lookupLocal(n.oid); + case 'your': { + const r = this.lookupLocal(n.oid); + if (n.attenuation.length === 0 || r === INERT_REF) { + return r; + } else { + type AttenuatedRef = Ref & { __attenuations?: FlexMap }; + const ar = r as AttenuatedRef; + if (ar.__attenuations === void 0) { + ar.__attenuations = new FlexMap(canonicalString); + } + return ar.__attenuations.getOrSet(n.attenuation, () => + attenuate(r, ... decodeAttenuation(n.attenuation))); + } + } case 'mine': { - const e = this.imported.grab("byOid", n.oid, false, () => { - const e: WireSymbol = { - name: { loc: 'your', oid: n.oid }, - ref: null as any, - count: 0, - }; - e.ref = t.ref(new RelayEntity(this, e as WireSymbol)); - return e; - }); - imported?.push(e); + const e = this.imported.grab("byOid", n.oid, false, () => + ({ oid: n.oid, ref: t.ref(new RelayEntity(this, n.oid)), count: 0 })); + imported.push(e); return e.ref; } } } - send(remoteOid: Oid, m: EntityMessage): void { + send(remoteOid: Oid, m: EntityMessage): void { if (this.pendingTurn.length === 0) { queueTask(() => { if (this.debug) console.log('OUT', this.pendingTurn.asPreservesText()); @@ -221,7 +241,7 @@ export class Relay { encodePointer: n => { switch (n.loc) { case 'mine': return [0, n.oid]; - case 'your': return [1, n.oid]; + case 'your': return [1, n.oid, ... n.attenuation]; } }, }))); @@ -240,40 +260,47 @@ export class Relay { const bs = Bytes.from(bs0); const wireTurn = decode(bs, { decodePointer: v => { - if (!Array.isArray(v) || v.length !== 2) { + function complain(): never { throw new Error( `Received invalid object reference ${v.asPreservesText()} from peer`); } - const loc = v[0] === 0 ? 'mine' : 'your'; - return { loc, oid: v[1] as Oid }; + if (!(Array.isArray(v) && v.length >= 2 && typeof v[1] === 'number')) { + complain(); + } + const oid = v[1] as Oid; + switch (v[0]) { + case 0: + if (v.length > 2) complain(); + return myRef(oid); + case 1: + // TODO: check EncodedAttenuation + return yourRef(oid, v.slice(2) as EncodedAttenuation); + default: + complain(); + } }, - }); + }) as TurnMessage; + // ^ TODO: deep check that v is a TurnMessage if (this.debug) console.log('IN', wireTurn.asPreservesText()); - if (!Array.isArray(wireTurn)) invalidTopLevelMessage(wireTurn); wireTurn.forEach(v => { - if (Array.isArray(v) && v.length === 2 && typeof v[0] === 'number') { - const [localOid, m] = v; - // TODO: deep check that m is EntityMessage - this.handle(t, this.lookupLocal(localOid), m as EntityMessage); - } else { - invalidTopLevelMessage(wireTurn); - } + const [localOid, m] = v; + this.handle(t, this.lookupLocal(localOid), m); }); }); } - handle(t: Turn, r: Ref, m: EntityMessage) { + handle(t: Turn, r: Ref, m: EntityMessage) { switch (m.label) { case _Assert: { - const [a, imported] = this.rewriteIn(t, Assert._.assertion(m)); - this.inboundAssertions.set(Assert._.handle(m), { + const [a, imported] = this.rewriteIn(t, IO.Assert._.assertion(m)); + this.inboundAssertions.set(IO.Assert._.handle(m), { localHandle: t.assert(r, a), imported, }); break; } case _Retract: { - const remoteHandle = Retract._.handle(m); + const remoteHandle = IO.Retract._.handle(m); const h = this.inboundAssertions.get(remoteHandle); if (h === void 0) throw new Error(`Peer retracted invalid handle ${remoteHandle}`); this.inboundAssertions.delete(remoteHandle); @@ -282,14 +309,14 @@ export class Relay { break; } case _Message: { - const [a, imported] = this.rewriteIn(t, Message._.body(m)); + const [a, imported] = this.rewriteIn(t, IO.Message._.body(m)); if (imported.length > 0) throw new Error("Cannot receive transient reference"); t.message(r, a); break; } case _Sync: { const imported: Array = []; - const k = this.rewriteRefIn(t, Sync._.peer(m), imported); + const k = this.rewriteRefIn(t, IO.Sync._.peer(m), imported); t.sync(r).then(t => { t.message(k, true); imported.forEach(e => this.imported.drop(e)); @@ -300,11 +327,6 @@ export class Relay { } } -function invalidTopLevelMessage(m: Value): never { - throw new Error( - `Received invalid top-level protocol message from peer: ${m.asPreservesText()}`); -} - export interface RelayActorOptions extends RelayOptions { initialOid?: Oid; initialRef?: Ref; @@ -316,10 +338,10 @@ export function spawnRelay(t: Turn, options: RelayActorOptions): Promise { const relay = new Relay(t, options); if (options.initialRef !== void 0) { - relay.rewriteRefOut(options.initialRef, false, null); + relay.rewriteRefOut(options.initialRef, false, []); } if (options.initialOid !== void 0) { - resolve(relay.rewriteRefIn(t, { loc: 'mine', oid: options.initialOid }, null)); + resolve(relay.rewriteRefIn(t, myRef(options.initialOid), [])); } else { resolve(null); }