novy-syndicate/main.ts

115 lines
4.7 KiB
TypeScript
Raw Normal View History

2021-02-23 07:48:10 +00:00
import { Actor, Assertion, Entity, Handle, Ref, Turn, assert, message, retract } from './actor.js';
2021-02-22 18:37:47 +00:00
import { Dictionary, IdentityMap, is, Record } from 'preserves';
import { Bag, ChangeDescription } from './bag';
2021-02-22 09:12:10 +00:00
2021-02-22 18:37:47 +00:00
const Observe = Record.makeConstructor<Ref<Entity>>('Observe', ['label', 'observer']);
2021-02-22 19:45:19 +00:00
function makeDataspace(): Entity {
2021-02-22 21:06:02 +00:00
const handleMap: IdentityMap<Handle, Record<Ref<Entity>>> = new IdentityMap();
2021-02-22 18:37:47 +00:00
const assertions = new Bag<Ref<Entity>>();
2021-02-22 20:38:04 +00:00
const subscriptions: Dictionary<Map<Ref<Entity>, Dictionary<Handle>>> = new Dictionary();
2021-02-22 18:37:47 +00:00
2021-02-22 19:45:19 +00:00
return {
2021-02-22 21:06:02 +00:00
[assert](turn: Turn, rec: Assertion, handle: Handle): void {
if (!Record.isRecord<Ref<Entity>>(rec)) return;
handleMap.set(handle, rec);
if (assertions.change(rec, +1) !== ChangeDescription.ABSENT_TO_PRESENT) return;
if (Observe.isClassOf(rec)) {
const label = Observe._.label(rec)!;
const observer = Observe._.observer(rec) as Ref<Entity>;
const seen = new Dictionary<Handle>();
if (!subscriptions.has(label)) subscriptions.set(label, new Map());
subscriptions.get(label)!.set(observer, seen);
assertions.forEach((_count, prev) =>
is((prev as Record<Ref<Entity>>).label, label)
&& seen.set(prev, turn.assert(observer, prev)));
2021-02-22 18:37:47 +00:00
}
2021-02-22 21:30:36 +00:00
subscriptions.get(rec.label)?.forEach((seen, peer) =>
2021-02-22 21:06:02 +00:00
seen.has(rec) || seen.set(rec, turn.assert(peer, rec)));
2021-02-22 18:37:47 +00:00
},
[retract](turn: Turn, upstreamHandle: Handle): void {
2021-02-22 21:06:02 +00:00
const rec = handleMap.get(upstreamHandle);
if (rec === void 0) return;
handleMap.delete(upstreamHandle);
if (assertions.change(rec, -1) !== ChangeDescription.PRESENT_TO_ABSENT) return;
2021-02-22 21:30:36 +00:00
subscriptions.get(rec.label)?.forEach((seen, _peer) => {
2021-02-22 21:06:02 +00:00
const downstreamHandle = seen.get(rec);
if (downstreamHandle !== void 0) {
turn.retract(downstreamHandle);
seen.delete(rec);
2021-02-22 18:37:47 +00:00
}
2021-02-22 21:06:02 +00:00
});
if (Observe.isClassOf(rec)) {
let peerMap = subscriptions.get(Observe._.label(rec)!)!;
peerMap.delete(Observe._.observer(rec) as Ref<Entity>);
if (peerMap.size === 0) subscriptions.delete(Observe._.label(rec)!);
2021-02-22 18:37:47 +00:00
}
},
2021-02-22 21:06:02 +00:00
[message](turn: Turn, rec: Assertion): void {
if (!Record.isRecord<Ref<Entity>>(rec)) return;
2021-02-22 21:30:36 +00:00
subscriptions.get(rec.label)?.forEach((_seen, peer) => turn.message(peer, rec));
2021-02-22 18:37:47 +00:00
}
2021-02-22 19:45:19 +00:00
};
2021-02-22 18:37:47 +00:00
}
const BoxState = Record.makeConstructor<Ref<Entity>>('BoxState', ['value']);
const SetBox = Record.makeConstructor<Ref<Entity>>('SetBox', ['newValue']);
let startTime = Date.now();
let prevValue = 0;
2021-02-22 09:12:10 +00:00
Turn.for(null, async (t: Turn) => {
2021-02-22 19:45:19 +00:00
const ds = new Ref(new Actor(), makeDataspace());
2021-02-22 18:37:47 +00:00
// Box
t.spawn(t => {
console.log('Spawning Box');
let valueHandle: Handle | undefined;
2021-02-22 21:30:36 +00:00
function setValue(t: Turn, value: number) {
2021-02-22 18:37:47 +00:00
valueHandle = t.replace(ds, valueHandle, BoxState(value));
}
setValue(t, 0);
t.assert(ds, Observe(SetBox.constructorInfo.label, t.ref({
2021-02-22 18:37:47 +00:00
[message](t: Turn, [newValue]: [number]): void {
if (newValue % 25000 === 0) {
const endTime = Date.now();
const delta = (endTime - startTime) / 1000.0;
const count = newValue - prevValue;
prevValue = newValue;
startTime = endTime;
console.log(`Box: got ${newValue} (${count / delta} Hz)`);
}
if (newValue === 280000) t.quit();
setValue(t, newValue);
}
})));
});
// Client
t.spawn(t => {
console.log('Spawning Client');
let count = 0;
t.assert(ds, Observe(BoxState.constructorInfo.label, t.ref({
2021-02-22 18:37:47 +00:00
[assert](t: Turn, [currentValue]: [number]): void {
// console.log(`Client: got ${currentValue}`);
if (currentValue === 300000) {
console.log(`Client: quitting at limit`);
t.quit();
} else {
t.message(ds, SetBox(currentValue + 1));
}
2021-02-22 19:08:11 +00:00
}
2021-02-22 18:37:47 +00:00
})));
t.assert(ds, Observe(BoxState.constructorInfo.label, t.ref({
2021-02-22 18:37:47 +00:00
[assert](_t: Turn, _assertion: Assertion): void { count++; },
[retract](t: Turn, _handle: Handle) {
if (--count === 0) {
console.log('Client: detected box termination');
t.quit();
}
},
})));
2021-02-22 09:12:10 +00:00
});
});