First stab at attenuation

This commit is contained in:
Tony Garnock-Jones 2021-02-23 16:16:15 +01:00
parent 77fbab89dd
commit f043bc13b9
2 changed files with 73 additions and 18 deletions

View File

@ -6,13 +6,13 @@ export type ExitReason = null | { ok: true } | { ok: false, err: Error };
export type LocalAction = (t: Turn) => void;
export interface Entity {
assert?(turn: Turn, assertion: Assertion, handle: Handle): void;
retract?(turn: Turn, handle: Handle): void;
message?(turn: Turn, body: Assertion): void;
sync?(turn: Turn, peer: Ref): void;
assert(turn: Turn, assertion: Assertion, handle: Handle): void;
retract(turn: Turn, handle: Handle): void;
message(turn: Turn, body: Assertion): void;
sync(turn: Turn, peer: Ref): void;
}
export interface Ref { readonly relay: Actor, readonly target: Entity };
export interface Ref { readonly relay: Actor, readonly target: Partial<Entity> };
export class Actor {
readonly outbound: Map<Handle, Ref>;
@ -43,6 +43,10 @@ export class Actor {
let nextHandle = 0;
export function _sync(turn: Turn, e: Partial<Entity>, peer: Ref): void {
e.sync ? e.sync!(turn, peer) : turn.message(peer, true);
}
export class Turn {
readonly actor: Actor;
readonly queues = new Map<Actor, LocalAction[]>();
@ -57,7 +61,7 @@ export class Turn {
this.actor = actor;
}
ref(e: Entity): Ref {
ref(e: Partial<Entity>): Ref {
return { relay: this.actor, target: e };
}
@ -104,11 +108,8 @@ export class Turn {
}
sync(ref: Ref): Promise<Turn> {
return new Promise(resolve => {
const k = this.ref({ message: resolve });
this.enqueue(ref.relay, t =>
ref.target.sync ? ref.target.sync!(t, k) : t.message(k, true));
});
return new Promise(resolve => this.enqueue(ref.relay, t =>
_sync(t, ref.target, this.ref({ message: resolve }))));
}
message(ref: Ref, assertion: Assertion): void {
@ -119,3 +120,38 @@ export class Turn {
this.queues.get(relay)?.push(a) ?? this.queues.set(relay, [a]);
}
}
export type AttenuationFilter = (assertion: Assertion) => Assertion | null;
export class Attenuation implements Entity {
readonly target: Partial<Entity>;
readonly filter: AttenuationFilter;
constructor(target: Partial<Entity>, filter: AttenuationFilter) {
this.target = target;
this.filter = filter;
}
assert(turn: Turn, assertion: Assertion, handle: Handle): void {
const filtered = this.filter(assertion);
if (filtered !== null) this.target.assert?.(turn, filtered, handle);
}
retract(turn: Turn, handle: Handle): void {
// TODO: consider whether we want targets to even see blocked handles
this.target.retract?.(turn, handle);
}
message(turn: Turn, body: Assertion): void {
const filtered = this.filter(body);
if (filtered !== null) this.target.message?.(turn, filtered);
}
sync(turn: Turn, peer: Ref): void {
_sync(turn, this.target, peer);
}
}
export function attenuate(ref: Ref, filter: AttenuationFilter): Ref {
return { relay: ref.relay, target: new Attenuation(ref.target, filter) };
}

33
main.ts
View File

@ -1,10 +1,10 @@
import { Actor, Assertion, Entity, Handle, Ref, Turn } from './actor.js';
import { Dictionary, IdentityMap, is, Record } from 'preserves';
import { Actor, Assertion, attenuate, Entity, Handle, Ref, Turn } from './actor.js';
import { Dictionary, IdentityMap, is, Record, RecordConstructor } from 'preserves';
import { Bag, ChangeDescription } from './bag';
const Observe = Record.makeConstructor<Ref>('Observe', ['label', 'observer']);
class Dataspace implements Entity {
class Dataspace implements Partial<Entity> {
readonly handleMap: IdentityMap<Handle, Record<Ref>> = new IdentityMap();
readonly assertions = new Bag<Ref>();
readonly subscriptions: Dictionary<Map<Ref, Dictionary<Handle>>> = new Dictionary();
@ -52,16 +52,16 @@ class Dataspace implements Entity {
}
}
//---------------------------------------------------------------------------
const BoxState = Record.makeConstructor<Ref>('BoxState', ['value']);
const SetBox = Record.makeConstructor<Ref>('SetBox', ['newValue']);
let startTime = Date.now();
let prevValue = 0;
const LIMIT = 500000;
Turn.for(new Actor(), async (t: Turn) => {
const ds = t.ref(new Dataspace());
// Box
function spawnBox(t: Turn, ds: Ref) {
t.spawn(t => {
console.log('Spawning Box');
let valueHandle: Handle | undefined;
@ -85,8 +85,9 @@ Turn.for(new Actor(), async (t: Turn) => {
}
})));
});
}
// Client
function spawnClient(t: Turn, ds: Ref) {
t.spawn(t => {
console.log('Spawning Client');
let count = 0;
@ -111,4 +112,22 @@ Turn.for(new Actor(), async (t: Turn) => {
},
})));
});
}
function isObserverOf(a: Assertion, ctor: RecordConstructor<Ref>): boolean {
return Observe.isClassOf(a) && is(Observe._.label(a), ctor.constructorInfo.label);
}
Turn.for(new Actor(), async (t: Turn) => {
const ds = t.ref(new Dataspace());
spawnBox(t, attenuate(ds, a => {
if (BoxState.isClassOf(a)) return a;
if (isObserverOf(a, SetBox)) return a;
return null;
}));
spawnClient(t, attenuate(ds, a => {
if (SetBox.isClassOf(a)) return a;
if (isObserverOf(a, BoxState)) return a;
return null;
}));
});