115 lines
4.6 KiB
TypeScript
115 lines
4.6 KiB
TypeScript
import { Actor, Assertion, Entity, Handle, Ref, Turn, assert, message, retract } from './actor.js';
|
|
import { Dictionary, IdentityMap, is, Record } from 'preserves';
|
|
import { Bag, ChangeDescription } from './bag';
|
|
|
|
const Observe = Record.makeConstructor<Ref>('Observe', ['label', 'observer']);
|
|
|
|
class Dataspace implements Entity {
|
|
readonly handleMap: IdentityMap<Handle, Record<Ref>> = new IdentityMap();
|
|
readonly assertions = new Bag<Ref>();
|
|
readonly subscriptions: Dictionary<Map<Ref, Dictionary<Handle>>> = new Dictionary();
|
|
|
|
[assert](turn: Turn, rec: Assertion, handle: Handle): void {
|
|
if (!Record.isRecord<Ref>(rec)) return;
|
|
this.handleMap.set(handle, rec);
|
|
if (this.assertions.change(rec, +1) !== ChangeDescription.ABSENT_TO_PRESENT) return;
|
|
if (Observe.isClassOf(rec)) {
|
|
const label = Observe._.label(rec)!;
|
|
const observer = Observe._.observer(rec) as Ref;
|
|
const seen = new Dictionary<Handle>();
|
|
if (!this.subscriptions.has(label)) this.subscriptions.set(label, new Map());
|
|
this.subscriptions.get(label)!.set(observer, seen);
|
|
this.assertions.forEach((_count, prev) =>
|
|
is((prev as Record<Ref>).label, label)
|
|
&& seen.set(prev, turn.assert(observer, prev)));
|
|
}
|
|
this.subscriptions.get(rec.label)?.forEach((seen, peer) =>
|
|
seen.has(rec) || seen.set(rec, turn.assert(peer, rec)));
|
|
}
|
|
|
|
[retract](turn: Turn, upstreamHandle: Handle): void {
|
|
const rec = this.handleMap.get(upstreamHandle);
|
|
if (rec === void 0) return;
|
|
this.handleMap.delete(upstreamHandle);
|
|
if (this.assertions.change(rec, -1) !== ChangeDescription.PRESENT_TO_ABSENT) return;
|
|
this.subscriptions.get(rec.label)?.forEach((seen, _peer) => {
|
|
const downstreamHandle = seen.get(rec);
|
|
if (downstreamHandle !== void 0) {
|
|
turn.retract(downstreamHandle);
|
|
seen.delete(rec);
|
|
}
|
|
});
|
|
if (Observe.isClassOf(rec)) {
|
|
let peerMap = this.subscriptions.get(Observe._.label(rec)!)!;
|
|
peerMap.delete(Observe._.observer(rec) as Ref);
|
|
if (peerMap.size === 0) this.subscriptions.delete(Observe._.label(rec)!);
|
|
}
|
|
}
|
|
|
|
[message](turn: Turn, rec: Assertion): void {
|
|
if (!Record.isRecord<Ref>(rec)) return;
|
|
this.subscriptions.get(rec.label)?.forEach((_seen, peer) => turn.message(peer, rec));
|
|
}
|
|
}
|
|
|
|
const BoxState = Record.makeConstructor<Ref>('BoxState', ['value']);
|
|
const SetBox = Record.makeConstructor<Ref>('SetBox', ['newValue']);
|
|
|
|
let startTime = Date.now();
|
|
let prevValue = 0;
|
|
const LIMIT = 500000;
|
|
Turn.for(new Actor(), async (t: Turn) => {
|
|
const ds = t.ref(new Dataspace());
|
|
|
|
// Box
|
|
t.spawn(t => {
|
|
console.log('Spawning Box');
|
|
let valueHandle: Handle | undefined;
|
|
function setValue(t: Turn, value: number) {
|
|
valueHandle = t.replace(ds, valueHandle, BoxState(value));
|
|
}
|
|
setValue(t, 0);
|
|
t.assert(ds, Observe(SetBox.constructorInfo.label, t.ref({
|
|
[message](t: Turn, [newValue]: [number]): void {
|
|
// console.log(`Box: got ${newValue}`);
|
|
if (newValue % 25000 === 0) {
|
|
const endTime = Date.now();
|
|
const delta = (endTime - startTime) / 1000.0;
|
|
const count = newValue - prevValue;
|
|
prevValue = newValue;
|
|
startTime = endTime;
|
|
console.log(`Box: got ${newValue} (${count / delta} Hz)`);
|
|
}
|
|
if (newValue === LIMIT - 20000) t.quit();
|
|
setValue(t, newValue);
|
|
}
|
|
})));
|
|
});
|
|
|
|
// Client
|
|
t.spawn(t => {
|
|
console.log('Spawning Client');
|
|
let count = 0;
|
|
t.assert(ds, Observe(BoxState.constructorInfo.label, t.ref({
|
|
[assert](t: Turn, [currentValue]: [number]): void {
|
|
// console.log(`Client: got ${currentValue}`);
|
|
if (currentValue === LIMIT) {
|
|
console.log(`Client: quitting at limit`);
|
|
t.quit();
|
|
} else {
|
|
t.message(ds, SetBox(currentValue + 1));
|
|
}
|
|
}
|
|
})));
|
|
t.assert(ds, Observe(BoxState.constructorInfo.label, t.ref({
|
|
[assert](_t: Turn, _assertion: Assertion): void { count++; },
|
|
[retract](t: Turn, _handle: Handle) {
|
|
if (--count === 0) {
|
|
console.log('Client: detected box termination');
|
|
t.quit();
|
|
}
|
|
},
|
|
})));
|
|
});
|
|
});
|