From f043bc13b9b73cf2d6f3a749345dfef81c2814bb Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 23 Feb 2021 16:16:15 +0100 Subject: [PATCH] First stab at attenuation --- actor.ts | 58 +++++++++++++++++++++++++++++++++++++++++++++----------- main.ts | 33 +++++++++++++++++++++++++------- 2 files changed, 73 insertions(+), 18 deletions(-) diff --git a/actor.ts b/actor.ts index 0f4894f..b556782 100644 --- a/actor.ts +++ b/actor.ts @@ -6,13 +6,13 @@ export type ExitReason = null | { ok: true } | { ok: false, err: Error }; export type LocalAction = (t: Turn) => void; export interface Entity { - assert?(turn: Turn, assertion: Assertion, handle: Handle): void; - retract?(turn: Turn, handle: Handle): void; - message?(turn: Turn, body: Assertion): void; - sync?(turn: Turn, peer: Ref): void; + assert(turn: Turn, assertion: Assertion, handle: Handle): void; + retract(turn: Turn, handle: Handle): void; + message(turn: Turn, body: Assertion): void; + sync(turn: Turn, peer: Ref): void; } -export interface Ref { readonly relay: Actor, readonly target: Entity }; +export interface Ref { readonly relay: Actor, readonly target: Partial }; export class Actor { readonly outbound: Map; @@ -43,6 +43,10 @@ export class Actor { let nextHandle = 0; +export function _sync(turn: Turn, e: Partial, peer: Ref): void { + e.sync ? e.sync!(turn, peer) : turn.message(peer, true); +} + export class Turn { readonly actor: Actor; readonly queues = new Map(); @@ -57,7 +61,7 @@ export class Turn { this.actor = actor; } - ref(e: Entity): Ref { + ref(e: Partial): Ref { return { relay: this.actor, target: e }; } @@ -104,11 +108,8 @@ export class Turn { } sync(ref: Ref): Promise { - return new Promise(resolve => { - const k = this.ref({ message: resolve }); - this.enqueue(ref.relay, t => - ref.target.sync ? ref.target.sync!(t, k) : t.message(k, true)); - }); + return new Promise(resolve => this.enqueue(ref.relay, t => + _sync(t, ref.target, this.ref({ message: resolve })))); } message(ref: Ref, assertion: Assertion): void { @@ -119,3 +120,38 @@ export class Turn { this.queues.get(relay)?.push(a) ?? this.queues.set(relay, [a]); } } + +export type AttenuationFilter = (assertion: Assertion) => Assertion | null; + +export class Attenuation implements Entity { + readonly target: Partial; + readonly filter: AttenuationFilter; + + constructor(target: Partial, filter: AttenuationFilter) { + this.target = target; + this.filter = filter; + } + + assert(turn: Turn, assertion: Assertion, handle: Handle): void { + const filtered = this.filter(assertion); + if (filtered !== null) this.target.assert?.(turn, filtered, handle); + } + + retract(turn: Turn, handle: Handle): void { + // TODO: consider whether we want targets to even see blocked handles + this.target.retract?.(turn, handle); + } + + message(turn: Turn, body: Assertion): void { + const filtered = this.filter(body); + if (filtered !== null) this.target.message?.(turn, filtered); + } + + sync(turn: Turn, peer: Ref): void { + _sync(turn, this.target, peer); + } +} + +export function attenuate(ref: Ref, filter: AttenuationFilter): Ref { + return { relay: ref.relay, target: new Attenuation(ref.target, filter) }; +} diff --git a/main.ts b/main.ts index e63165e..699818a 100644 --- a/main.ts +++ b/main.ts @@ -1,10 +1,10 @@ -import { Actor, Assertion, Entity, Handle, Ref, Turn } from './actor.js'; -import { Dictionary, IdentityMap, is, Record } from 'preserves'; +import { Actor, Assertion, attenuate, Entity, Handle, Ref, Turn } from './actor.js'; +import { Dictionary, IdentityMap, is, Record, RecordConstructor } from 'preserves'; import { Bag, ChangeDescription } from './bag'; const Observe = Record.makeConstructor('Observe', ['label', 'observer']); -class Dataspace implements Entity { +class Dataspace implements Partial { readonly handleMap: IdentityMap> = new IdentityMap(); readonly assertions = new Bag(); readonly subscriptions: Dictionary>> = new Dictionary(); @@ -52,16 +52,16 @@ class Dataspace implements Entity { } } +//--------------------------------------------------------------------------- + const BoxState = Record.makeConstructor('BoxState', ['value']); const SetBox = Record.makeConstructor('SetBox', ['newValue']); let startTime = Date.now(); let prevValue = 0; const LIMIT = 500000; -Turn.for(new Actor(), async (t: Turn) => { - const ds = t.ref(new Dataspace()); - // Box +function spawnBox(t: Turn, ds: Ref) { t.spawn(t => { console.log('Spawning Box'); let valueHandle: Handle | undefined; @@ -85,8 +85,9 @@ Turn.for(new Actor(), async (t: Turn) => { } }))); }); +} - // Client +function spawnClient(t: Turn, ds: Ref) { t.spawn(t => { console.log('Spawning Client'); let count = 0; @@ -111,4 +112,22 @@ Turn.for(new Actor(), async (t: Turn) => { }, }))); }); +} + +function isObserverOf(a: Assertion, ctor: RecordConstructor): boolean { + return Observe.isClassOf(a) && is(Observe._.label(a), ctor.constructorInfo.label); +} + +Turn.for(new Actor(), async (t: Turn) => { + const ds = t.ref(new Dataspace()); + spawnBox(t, attenuate(ds, a => { + if (BoxState.isClassOf(a)) return a; + if (isObserverOf(a, SetBox)) return a; + return null; + })); + spawnClient(t, attenuate(ds, a => { + if (SetBox.isClassOf(a)) return a; + if (isObserverOf(a, BoxState)) return a; + return null; + })); });