import { Actor, Assertion, Entity, Handle, Ref, Turn, assert, message, retract, } from './actor.js'; import { Dictionary, IdentityMap, is, Record } from 'preserves'; import { Bag, ChangeDescription } from './bag'; const Observe = Record.makeConstructor>('Observe', ['label', 'observer']); function makeDataspace(): Ref { const handleMap: IdentityMap = new IdentityMap(); const assertions = new Bag>(); const subscriptions: Dictionary>> = new Dictionary(); function forEachSubscription(assertion: Assertion, f: (handleMap: Dictionary, peer: Ref) => void): void { if (Record.isRecord(assertion)) { const peerMap = subscriptions.get(assertion.label); if (Dictionary.isDictionary(peerMap)) { peerMap.forEach((handleMap, peer) => { if (peer instanceof Ref) { f(handleMap, peer); } }); } } } const a = new Actor(); return a.ref({ [assert](turn: Turn, assertion: Assertion, handle: Handle): void { // console.log(`DS: assert ${assertion.asPreservesText()} :: ${handle}`); handleMap.set(handle, assertion); if (assertions.change(assertion, +1) === ChangeDescription.ABSENT_TO_PRESENT) { if (Observe.isClassOf(assertion)) { const observedLabel = Observe._.label(assertion)!; const observer = Observe._.observer(assertion) as Ref; let peerMap = subscriptions.get(observedLabel); if (peerMap === void 0) { peerMap = new Dictionary(); subscriptions.set(observedLabel, peerMap); } const handleMap: Dictionary = new Dictionary(); peerMap.set(observer, handleMap); assertions.forEach((_count, assertion) => { if (Record.isRecord(assertion)) { if (is(assertion.label, observedLabel)) { handleMap.set(assertion, turn.assert(observer, assertion)); } } }); } forEachSubscription(assertion, (handleMap, peer) => { if (!handleMap.has(assertion)) { handleMap.set(assertion, turn.assert(peer, assertion)); } }); } }, [retract](turn: Turn, upstreamHandle: Handle): void { const assertion = handleMap.get(upstreamHandle); // console.log(`DS: retract ${(assertion ?? Symbol.for('missing')).asPreservesText()} :: ${upstreamHandle}`); if (assertion !== void 0) { handleMap.delete(upstreamHandle); if (assertions.change(assertion, -1) === ChangeDescription.PRESENT_TO_ABSENT) { forEachSubscription(assertion, (handleMap, _peer) => { const downstreamHandle = handleMap.get(assertion); if (downstreamHandle !== void 0) { turn.retract(downstreamHandle); handleMap.delete(assertion); } }); if (Observe.isClassOf(assertion)) { let peerMap = subscriptions.get(Observe._.label(assertion)!)!; peerMap.delete(Observe._.observer(assertion)!); if (peerMap.size === 0) subscriptions.delete(Observe._.label(assertion)!); } } } }, [message](turn: Turn, message: Assertion): void { // console.log(`DS: message ${message.asPreservesText()}`); forEachSubscription(message, (_handleMap, peer) => turn.message(peer, message)); } }); } const BoxState = Record.makeConstructor>('BoxState', ['value']); const SetBox = Record.makeConstructor>('SetBox', ['newValue']); let startTime = Date.now(); let prevValue = 0; Turn.for(null, async (t: Turn) => { const ds = makeDataspace(); // Box t.spawn(t => { console.log('Spawning Box'); let value: number; let valueHandle: Handle | undefined; function setValue(t: Turn, newValue: number) { value = newValue; valueHandle = t.replace(ds, valueHandle, BoxState(value)); } setValue(t, 0); t.assert(ds, Observe(SetBox.constructorInfo.label, t.entity({ [message](t: Turn, [newValue]: [number]): void { if (newValue % 25000 === 0) { const endTime = Date.now(); const delta = (endTime - startTime) / 1000.0; const count = newValue - prevValue; prevValue = newValue; startTime = endTime; console.log(`Box: got ${newValue} (${count / delta} Hz)`); } if (newValue === 280000) t.quit(); setValue(t, newValue); } }))); }); // Client t.spawn(t => { console.log('Spawning Client'); let count = 0; t.assert(ds, Observe(BoxState.constructorInfo.label, t.entity({ [assert](t: Turn, [currentValue]: [number]): void { // console.log(`Client: got ${currentValue}`); if (currentValue === 300000) { console.log(`Client: quitting at limit`); t.quit(); } else { t.message(ds, SetBox(currentValue + 1)); } } }))); t.assert(ds, Observe(BoxState.constructorInfo.label, t.entity({ [assert](_t: Turn, _assertion: Assertion): void { count++; }, [retract](t: Turn, _handle: Handle) { if (--count === 0) { console.log('Client: detected box termination'); t.quit(); } }, }))); }); });