import { Actor, Assertion, Entity, Handle, Ref, Turn } from './actor.js'; import { Dictionary, IdentityMap, is, Record } from 'preserves'; import { Bag, ChangeDescription } from './bag'; const Observe = Record.makeConstructor('Observe', ['label', 'observer']); class Dataspace implements Entity { readonly handleMap: IdentityMap> = new IdentityMap(); readonly assertions = new Bag(); readonly subscriptions: Dictionary>> = new Dictionary(); assert(turn: Turn, rec: Assertion, handle: Handle): void { if (!Record.isRecord(rec)) return; this.handleMap.set(handle, rec); if (this.assertions.change(rec, +1) !== ChangeDescription.ABSENT_TO_PRESENT) return; if (Observe.isClassOf(rec)) { const label = Observe._.label(rec)!; const observer = Observe._.observer(rec) as Ref; const seen = new Dictionary(); if (!this.subscriptions.has(label)) this.subscriptions.set(label, new Map()); this.subscriptions.get(label)!.set(observer, seen); this.assertions.forEach((_count, prev) => is((prev as Record).label, label) && seen.set(prev, turn.assert(observer, prev))); } this.subscriptions.get(rec.label)?.forEach((seen, peer) => seen.has(rec) || seen.set(rec, turn.assert(peer, rec))); } retract(turn: Turn, upstreamHandle: Handle): void { const rec = this.handleMap.get(upstreamHandle); if (rec === void 0) return; this.handleMap.delete(upstreamHandle); if (this.assertions.change(rec, -1) !== ChangeDescription.PRESENT_TO_ABSENT) return; this.subscriptions.get(rec.label)?.forEach((seen, _peer) => { const downstreamHandle = seen.get(rec); if (downstreamHandle !== void 0) { turn.retract(downstreamHandle); seen.delete(rec); } }); if (Observe.isClassOf(rec)) { let peerMap = this.subscriptions.get(Observe._.label(rec)!)!; peerMap.delete(Observe._.observer(rec) as Ref); if (peerMap.size === 0) this.subscriptions.delete(Observe._.label(rec)!); } } message(turn: Turn, rec: Assertion): void { if (!Record.isRecord(rec)) return; this.subscriptions.get(rec.label)?.forEach((_seen, peer) => turn.message(peer, rec)); } } const BoxState = Record.makeConstructor('BoxState', ['value']); const SetBox = Record.makeConstructor('SetBox', ['newValue']); let startTime = Date.now(); let prevValue = 0; const LIMIT = 500000; Turn.for(new Actor(), async (t: Turn) => { const ds = t.ref(new Dataspace()); // Box t.spawn(t => { console.log('Spawning Box'); let valueHandle: Handle | undefined; function setValue(t: Turn, value: number) { valueHandle = t.replace(ds, valueHandle, BoxState(value)); } setValue(t, 0); t.assert(ds, Observe(SetBox.constructorInfo.label, t.ref({ message(t: Turn, [newValue]: [number]): void { // console.log(`Box: got ${newValue}`); if (newValue % 25000 === 0) { const endTime = Date.now(); const delta = (endTime - startTime) / 1000.0; const count = newValue - prevValue; prevValue = newValue; startTime = endTime; console.log(`Box: got ${newValue} (${count / delta} Hz)`); } if (newValue === LIMIT - 20000) t.quit(); setValue(t, newValue); } }))); }); // Client t.spawn(t => { console.log('Spawning Client'); let count = 0; t.assert(ds, Observe(BoxState.constructorInfo.label, t.ref({ assert(t: Turn, [currentValue]: [number]): void { // console.log(`Client: got ${currentValue}`); if (currentValue === LIMIT) { console.log(`Client: quitting at limit`); t.quit(); } else { t.message(ds, SetBox(currentValue + 1)); } } }))); t.assert(ds, Observe(BoxState.constructorInfo.label, t.ref({ assert(_t: Turn, _assertion: Assertion): void { count++; }, retract(t: Turn, _handle: Handle) { if (--count === 0) { console.log('Client: detected box termination'); t.quit(); } }, }))); }); });