From 252e82a8875aa5977485e171dfa57b53c1ea31a4 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 22 Feb 2021 19:37:47 +0100 Subject: [PATCH] Progress --- actor.ts | 34 +++++++++--- main.ts | 155 +++++++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 178 insertions(+), 11 deletions(-) diff --git a/actor.ts b/actor.ts index b92ef52..8d9e074 100644 --- a/actor.ts +++ b/actor.ts @@ -2,7 +2,7 @@ import { IdentitySet, Value } from 'preserves'; export type Assertion = Value>; -export type Handle = any; +export type Handle = number; export type ExitReason = null | { ok: true } | { ok: false, err: Error }; @@ -49,14 +49,14 @@ export class Actor implements SyncTarget { return this.exitReason === null; } - stop() { - this.terminateWith({ ok: true }); + stop(t: Turn) { + this.terminateWith(t, { ok: true }); } - terminateWith(reason: Exclude) { + terminateWith(t: Turn, reason: Exclude) { if (this.alive) { this.exitReason = reason; - Turn.for(this, t => this.outbound.forEach(([peer, _a], h) => t._retract(peer, h))); + this.outbound.forEach(([peer, _a], h) => t._retract(peer, h)); } } @@ -70,7 +70,8 @@ export class Actor implements SyncTarget { try { proc(); } catch (err) { - this.terminateWith({ ok: false, err }); + console.error(Actor, err); + Turn.for(this, t => this.terminateWith(t, { ok: false, err })); } } }); @@ -108,6 +109,10 @@ export class Turn { return this.actor; } + ref(t: T, what: string = "ref"): Ref { + return this._ensureActor(what).ref(t); + } + spawn(bootProc: (t: Turn) => void, initialAssertions?: IdentitySet): void { if ((initialAssertions !== void 0) && (initialAssertions.size > 0)) { this._ensureActor("spawn with initialAssertions"); @@ -121,6 +126,11 @@ export class Turn { }); } + quit(): void { + const actor = this._ensureActor("quit"); + this.localActions.push(t => actor.stop(t)); + } + assert(location: Ref, assertion: Assertion): Handle { this._ensureActor("assert"); const h = nextHandle++; @@ -135,6 +145,12 @@ export class Turn { this._retract(this._ensureActor("retract").outbound.get(h)![0], h); } + replace(location: Ref, h: Handle | undefined, assertion: Assertion): Handle { + const newHandle = this.assert(location, assertion); + if (h !== void 0) this.retract(h); + return newHandle; + } + _retract(location: Ref, handle: Handle): void { this.enqueue(location.actor, t => { this.actor!.outbound.delete(handle); @@ -144,7 +160,7 @@ export class Turn { sync(location: Ref): Promise { return new Promise(resolve => { - const k = this._ensureActor("sync").ref({ [synced]: resolve }); + const k = this.ref({ [synced]: resolve }, "sync"); this.enqueue(location.actor, t => location.target[sync](t, k)); }); } @@ -173,7 +189,9 @@ export class Turn { this.completed = true; this.queues.forEach((queue, actor) => actor.execute(() => queue.forEach(f => Turn.for(actor, f)))); - this.localActions.forEach(f => Turn.for(this.actor, f)); + if (this.localActions.length > 0) { + queueMicrotask(() => this.localActions.forEach(f => Turn.for(this.actor, f))); + } } } diff --git a/main.ts b/main.ts index ec3f6fc..4d14ccd 100644 --- a/main.ts +++ b/main.ts @@ -1,7 +1,156 @@ -import { Actor, Turn } from './actor.js'; +import { + Actor, + Assertion, + Entity, + Handle, + Ref, + Turn, + assert, + message, + retract, +} from './actor.js'; +import { Dictionary, IdentityMap, is, Record } from 'preserves'; +import { Bag, ChangeDescription } from './bag'; +const Observe = Record.makeConstructor>('Observe', ['label', 'observer']); + +function makeDataspace(): Ref { + const handleMap: IdentityMap = new IdentityMap(); + const assertions = new Bag>(); + const subscriptions: Dictionary>> = new Dictionary(); + + function forEachSubscription(assertion: Assertion, f: (handleMap: Dictionary, peer: Ref) => void): void { + if (Record.isRecord(assertion)) { + const peerMap = subscriptions.get(assertion.label); + if (Dictionary.isDictionary(peerMap)) { + peerMap.forEach((handleMap, peer) => { + if (peer instanceof Ref) { + f(handleMap, peer); + } + }); + } + } + } + + const a = new Actor(); + return a.ref({ + [assert](turn: Turn, assertion: Assertion, handle: Handle): void { + // console.log(`DS: assert ${assertion.asPreservesText()} :: ${handle}`); + handleMap.set(handle, assertion); + if (assertions.change(assertion, +1) === ChangeDescription.ABSENT_TO_PRESENT) { + if (Observe.isClassOf(assertion)) { + const observedLabel = Observe._.label(assertion)!; + const observer = Observe._.observer(assertion) as Ref; + let peerMap = subscriptions.get(observedLabel); + if (peerMap === void 0) { + peerMap = new Dictionary(); + subscriptions.set(observedLabel, peerMap); + } + const handleMap: Dictionary = new Dictionary(); + peerMap.set(observer, handleMap); + assertions.forEach((_count, assertion) => { + if (Record.isRecord(assertion)) { + if (is(assertion.label, observedLabel)) { + handleMap.set(assertion, turn.assert(observer, assertion)); + } + } + }); + } + forEachSubscription(assertion, (handleMap, peer) => { + if (!handleMap.has(assertion)) { + handleMap.set(assertion, turn.assert(peer, assertion)); + } + }); + } + }, + + [retract](turn: Turn, upstreamHandle: Handle): void { + const assertion = handleMap.get(upstreamHandle); + // console.log(`DS: retract ${(assertion ?? Symbol.for('missing')).asPreservesText()} :: ${upstreamHandle}`); + if (assertion !== void 0) { + handleMap.delete(upstreamHandle); + if (assertions.change(assertion, -1) === ChangeDescription.PRESENT_TO_ABSENT) { + forEachSubscription(assertion, (handleMap, _peer) => { + const downstreamHandle = handleMap.get(assertion); + if (downstreamHandle !== void 0) { + turn.retract(downstreamHandle); + handleMap.delete(assertion); + } + }); + if (Observe.isClassOf(assertion)) { + let peerMap = subscriptions.get(Observe._.label(assertion)!)!; + peerMap.delete(Observe._.observer(assertion)!); + if (peerMap.size === 0) subscriptions.delete(Observe._.label(assertion)!); + } + } + } + }, + + [message](turn: Turn, message: Assertion): void { + // console.log(`DS: message ${message.asPreservesText()}`); + forEachSubscription(message, (_handleMap, peer) => turn.message(peer, message)); + } + }); +} + +const BoxState = Record.makeConstructor>('BoxState', ['value']); +const SetBox = Record.makeConstructor>('SetBox', ['newValue']); + +let startTime = Date.now(); +let prevValue = 0; Turn.for(null, async (t: Turn) => { - const a = new Actor().ref({ - + const ds = makeDataspace(); + + // Box + t.spawn(t => { + console.log('Spawning Box'); + let value: number; + let valueHandle: Handle | undefined; + function setValue(t: Turn, newValue: number) { + value = newValue; + valueHandle = t.replace(ds, valueHandle, BoxState(value)); + } + setValue(t, 0); + t.assert(ds, Observe(SetBox.constructorInfo.label, t.ref({ + [message](t: Turn, [newValue]: [number]): void { + if (newValue % 25000 === 0) { + const endTime = Date.now(); + const delta = (endTime - startTime) / 1000.0; + const count = newValue - prevValue; + prevValue = newValue; + startTime = endTime; + console.log(`Box: got ${newValue} (${count / delta} Hz)`); + } + if (newValue === 280000) t.quit(); + setValue(t, newValue); + } + }))); + }); + + // Client + t.spawn(t => { + console.log('Spawning Client'); + let count = 0; + t.assert(ds, Observe(BoxState.constructorInfo.label, t.ref({ + [assert](t: Turn, [currentValue]: [number]): void { + // console.log(`Client: got ${currentValue}`); + if (currentValue === 300000) { + console.log(`Client: quitting at limit`); + t.quit(); + } else { + t.message(ds, SetBox(currentValue + 1)); + } + }, + [retract]() {} + }))); + t.assert(ds, Observe(BoxState.constructorInfo.label, t.ref({ + [assert](_t: Turn, _assertion: Assertion): void { count++; }, + [retract](t: Turn, _handle: Handle) { + if (--count === 0) { + console.log('Client: detected box termination'); + t.quit(); + } + }, + }))); }); });