This commit is contained in:
Tony Garnock-Jones 2021-02-22 19:37:47 +01:00
parent c746340e15
commit 252e82a887
2 changed files with 178 additions and 11 deletions

View File

@ -2,7 +2,7 @@ import { IdentitySet, Value } from 'preserves';
export type Assertion = Value<Ref<Entity>>;
export type Handle = any;
export type Handle = number;
export type ExitReason = null | { ok: true } | { ok: false, err: Error };
@ -49,14 +49,14 @@ export class Actor implements SyncTarget {
return this.exitReason === null;
}
stop() {
this.terminateWith({ ok: true });
stop(t: Turn) {
this.terminateWith(t, { ok: true });
}
terminateWith(reason: Exclude<ExitReason, null>) {
terminateWith(t: Turn, reason: Exclude<ExitReason, null>) {
if (this.alive) {
this.exitReason = reason;
Turn.for(this, t => this.outbound.forEach(([peer, _a], h) => t._retract(peer, h)));
this.outbound.forEach(([peer, _a], h) => t._retract(peer, h));
}
}
@ -70,7 +70,8 @@ export class Actor implements SyncTarget {
try {
proc();
} catch (err) {
this.terminateWith({ ok: false, err });
console.error(Actor, err);
Turn.for(this, t => this.terminateWith(t, { ok: false, err }));
}
}
});
@ -108,6 +109,10 @@ export class Turn {
return this.actor;
}
ref<T>(t: T, what: string = "ref"): Ref<T> {
return this._ensureActor(what).ref(t);
}
spawn(bootProc: (t: Turn) => void, initialAssertions?: IdentitySet<Handle>): void {
if ((initialAssertions !== void 0) && (initialAssertions.size > 0)) {
this._ensureActor("spawn with initialAssertions");
@ -121,6 +126,11 @@ export class Turn {
});
}
quit(): void {
const actor = this._ensureActor("quit");
this.localActions.push(t => actor.stop(t));
}
assert(location: Ref, assertion: Assertion): Handle {
this._ensureActor("assert");
const h = nextHandle++;
@ -135,6 +145,12 @@ export class Turn {
this._retract(this._ensureActor("retract").outbound.get(h)![0], h);
}
replace(location: Ref, h: Handle | undefined, assertion: Assertion): Handle {
const newHandle = this.assert(location, assertion);
if (h !== void 0) this.retract(h);
return newHandle;
}
_retract(location: Ref, handle: Handle): void {
this.enqueue(location.actor, t => {
this.actor!.outbound.delete(handle);
@ -144,7 +160,7 @@ export class Turn {
sync(location: Ref<SyncTarget>): Promise<Turn> {
return new Promise(resolve => {
const k = this._ensureActor("sync").ref({ [synced]: resolve });
const k = this.ref({ [synced]: resolve }, "sync");
this.enqueue(location.actor, t => location.target[sync](t, k));
});
}
@ -173,7 +189,9 @@ export class Turn {
this.completed = true;
this.queues.forEach((queue, actor) =>
actor.execute(() => queue.forEach(f => Turn.for(actor, f))));
this.localActions.forEach(f => Turn.for(this.actor, f));
if (this.localActions.length > 0) {
queueMicrotask(() => this.localActions.forEach(f => Turn.for(this.actor, f)));
}
}
}

155
main.ts
View File

@ -1,7 +1,156 @@
import { Actor, Turn } from './actor.js';
import {
Actor,
Assertion,
Entity,
Handle,
Ref,
Turn,
assert,
message,
retract,
} from './actor.js';
import { Dictionary, IdentityMap, is, Record } from 'preserves';
import { Bag, ChangeDescription } from './bag';
const Observe = Record.makeConstructor<Ref<Entity>>('Observe', ['label', 'observer']);
function makeDataspace(): Ref<Entity> {
const handleMap: IdentityMap<Handle, Assertion> = new IdentityMap();
const assertions = new Bag<Ref<Entity>>();
const subscriptions: Dictionary<Dictionary<Dictionary<Handle>>> = new Dictionary();
function forEachSubscription(assertion: Assertion, f: (handleMap: Dictionary<Handle>, peer: Ref<Entity>) => void): void {
if (Record.isRecord(assertion)) {
const peerMap = subscriptions.get(assertion.label);
if (Dictionary.isDictionary(peerMap)) {
peerMap.forEach((handleMap, peer) => {
if (peer instanceof Ref) {
f(handleMap, peer);
}
});
}
}
}
const a = new Actor();
return a.ref<Entity>({
[assert](turn: Turn, assertion: Assertion, handle: Handle): void {
// console.log(`DS: assert ${assertion.asPreservesText()} :: ${handle}`);
handleMap.set(handle, assertion);
if (assertions.change(assertion, +1) === ChangeDescription.ABSENT_TO_PRESENT) {
if (Observe.isClassOf(assertion)) {
const observedLabel = Observe._.label(assertion)!;
const observer = Observe._.observer(assertion) as Ref<Entity>;
let peerMap = subscriptions.get(observedLabel);
if (peerMap === void 0) {
peerMap = new Dictionary();
subscriptions.set(observedLabel, peerMap);
}
const handleMap: Dictionary<Handle> = new Dictionary();
peerMap.set(observer, handleMap);
assertions.forEach((_count, assertion) => {
if (Record.isRecord(assertion)) {
if (is(assertion.label, observedLabel)) {
handleMap.set(assertion, turn.assert(observer, assertion));
}
}
});
}
forEachSubscription(assertion, (handleMap, peer) => {
if (!handleMap.has(assertion)) {
handleMap.set(assertion, turn.assert(peer, assertion));
}
});
}
},
[retract](turn: Turn, upstreamHandle: Handle): void {
const assertion = handleMap.get(upstreamHandle);
// console.log(`DS: retract ${(assertion ?? Symbol.for('missing')).asPreservesText()} :: ${upstreamHandle}`);
if (assertion !== void 0) {
handleMap.delete(upstreamHandle);
if (assertions.change(assertion, -1) === ChangeDescription.PRESENT_TO_ABSENT) {
forEachSubscription(assertion, (handleMap, _peer) => {
const downstreamHandle = handleMap.get(assertion);
if (downstreamHandle !== void 0) {
turn.retract(downstreamHandle);
handleMap.delete(assertion);
}
});
if (Observe.isClassOf(assertion)) {
let peerMap = subscriptions.get(Observe._.label(assertion)!)!;
peerMap.delete(Observe._.observer(assertion)!);
if (peerMap.size === 0) subscriptions.delete(Observe._.label(assertion)!);
}
}
}
},
[message](turn: Turn, message: Assertion): void {
// console.log(`DS: message ${message.asPreservesText()}`);
forEachSubscription(message, (_handleMap, peer) => turn.message(peer, message));
}
});
}
const BoxState = Record.makeConstructor<Ref<Entity>>('BoxState', ['value']);
const SetBox = Record.makeConstructor<Ref<Entity>>('SetBox', ['newValue']);
let startTime = Date.now();
let prevValue = 0;
Turn.for(null, async (t: Turn) => {
const a = new Actor().ref({
const ds = makeDataspace();
// Box
t.spawn(t => {
console.log('Spawning Box');
let value: number;
let valueHandle: Handle | undefined;
function setValue(t: Turn, newValue: number) {
value = newValue;
valueHandle = t.replace(ds, valueHandle, BoxState(value));
}
setValue(t, 0);
t.assert(ds, Observe(SetBox.constructorInfo.label, t.ref({
[message](t: Turn, [newValue]: [number]): void {
if (newValue % 25000 === 0) {
const endTime = Date.now();
const delta = (endTime - startTime) / 1000.0;
const count = newValue - prevValue;
prevValue = newValue;
startTime = endTime;
console.log(`Box: got ${newValue} (${count / delta} Hz)`);
}
if (newValue === 280000) t.quit();
setValue(t, newValue);
}
})));
});
// Client
t.spawn(t => {
console.log('Spawning Client');
let count = 0;
t.assert(ds, Observe(BoxState.constructorInfo.label, t.ref({
[assert](t: Turn, [currentValue]: [number]): void {
// console.log(`Client: got ${currentValue}`);
if (currentValue === 300000) {
console.log(`Client: quitting at limit`);
t.quit();
} else {
t.message(ds, SetBox(currentValue + 1));
}
},
[retract]() {}
})));
t.assert(ds, Observe(BoxState.constructorInfo.label, t.ref({
[assert](_t: Turn, _assertion: Assertion): void { count++; },
[retract](t: Turn, _handle: Handle) {
if (--count === 0) {
console.log('Client: detected box termination');
t.quit();
}
},
})));
});
});