diff --git a/src/actor.ts b/src/actor.ts index 9ce0ff9..78172fe 100644 --- a/src/actor.ts +++ b/src/actor.ts @@ -1,4 +1,4 @@ -import { Dictionary, IdentitySet, Record, Tuple, Value, is } from 'preserves'; +import { Dictionary, IdentitySet, Record, Tuple, Value, is, IdentityMap } from 'preserves'; import { queueTask } from './task.js'; //--------------------------------------------------------------------------- @@ -195,6 +195,11 @@ export class Turn { assert(ref: Ref, assertion: Assertion): Handle { const h = allocateHandle(); + this._assert(ref, assertion, h); + return h; + } + + _assert(ref: Ref, assertion: Assertion, h: Handle) { const a = runRewrites(ref.attenuation, assertion); if (a !== null) { this.enqueue(ref.relay, t => { @@ -202,7 +207,6 @@ export class Turn { ref.target.assert?.(t, a, h); }); } - return h; } retract(h: Handle): void { @@ -396,3 +400,37 @@ export function rfilter(... patterns: Pattern[]): RewriteStage { export function attenuate(ref: Ref, ... a: Attenuation): Ref { return { ... ref, attenuation: [... a, ... (ref.attenuation ?? [])] }; } + +export function forwarder(t: Turn, ref: Ref): { proxy: Ref, revoker: Ref } { + let underlying: Ref | null = ref; + let handleMap = new IdentityMap(); + let proxy = t.ref({ + assert(turn: Turn, assertion: Assertion, handle: Handle): void { + if (underlying === null) return; + handleMap.set(handle, turn.assert(underlying, assertion)); + }, + retract(turn: Turn, handle: Handle): void { + if (underlying === null) return; + const h = handleMap.get(handle); + if (h !== void 0) { + turn.retract(h); + handleMap.delete(handle); + } + }, + message(turn: Turn, body: Assertion): void { + if (underlying === null) return; + turn.message(underlying, body); + }, + sync(turn: Turn, peer: Ref): void { + if (underlying === null) return; + turn._sync(underlying, peer); + }, + }); + let revoker = t.ref({ + message(turn: Turn, _body: Assertion): void { + underlying = null; + handleMap.forEach(h => turn.retract(h)); + }, + }); + return { proxy, revoker }; +} diff --git a/src/box.ts b/src/box.ts index e8ee880..c81d414 100644 --- a/src/box.ts +++ b/src/box.ts @@ -15,14 +15,14 @@ export default function (t: Turn, [ds, LIMIT, REPORT_EVERY]: [Ref, number, numbe setValue(t, 0); t.assert(ds, Observe(SetBox.constructorInfo.label, t.ref({ message(t: Turn, [newValue]: [number]): void { - // console.log(`Box: got ${newValue}`); + // console.log(`Box ${t.actor.id}: got ${newValue}`); if (newValue % REPORT_EVERY === 0) { const endTime = Date.now(); const delta = (endTime - startTime) / 1000.0; const count = newValue - prevValue; prevValue = newValue; startTime = endTime; - console.log(`Box: got ${newValue} (${count / delta} Hz)`); + console.log(`Box ${t.actor.id}: got ${newValue} (${count / delta} Hz)`); } if (newValue === LIMIT) t.quit(); setValue(t, newValue); diff --git a/src/client.ts b/src/client.ts index d19cc5f..0b4e2b4 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,6 +1,6 @@ import { BoxState, SetBox } from "./box-protocol.js"; import { Observe } from "./dataspace.js"; -import { Assertion, Handle, Ref, Turn } from "./actor.js"; +import { Assertion, Ref, Turn } from "./actor.js"; export default function (t: Turn, ds: Ref) { t.spawn(t => { @@ -8,17 +8,21 @@ export default function (t: Turn, ds: Ref) { let count = 0; t.assert(ds, Observe(BoxState.constructorInfo.label, t.ref({ assert(t: Turn, [currentValue]: [number]): void { - // console.log(`Client: got ${currentValue}`); + // console.log(`Client ${t.actor.id}: got ${currentValue}`); t.message(ds, SetBox(currentValue + 1)); } }))); t.assert(ds, Observe(BoxState.constructorInfo.label, t.ref({ - assert(_t: Turn, _assertion: Assertion): void { count++; }, - retract(t: Turn, _handle: Handle) { + assert(_t: Turn, _assertion: Assertion): void { + count++; + // console.log('inc to', count, _assertion); + }, + retract(t: Turn) { if (--count === 0) { - console.log('Client: detected box termination'); + console.log(`Client ${t.actor.id}: detected box termination`); t.quit(); } + // console.log('dec to', count); }, }))); }); diff --git a/src/main.ts b/src/main.ts index 5006b15..d47ef43 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,4 +1,4 @@ -import { Actor, Assertion, attenuate, CRec, Lit, Pattern, PCompound, Ref, rfilter, Turn } from './actor.js'; +import { Actor, Assertion, attenuate, CRec, forwarder, Lit, Pattern, PCompound, Ref, rfilter, Turn } from './actor.js'; import { Dictionary, Record } from 'preserves'; import { Dataspace, Observe } from './dataspace.js'; import { Worker } from 'worker_threads'; @@ -38,7 +38,20 @@ function spawnModule(t: Turn, moduleName: string, arg: Assertion) { // __setNextActorId(1000); Turn.for(new Actor(), async (t: Turn) => { - const ds = t.ref(new Dataspace()); + const ds_unproxied = t.ref(new Dataspace()); + + const ds = ds_unproxied; + // const { proxy: ds, revoker } = forwarder(t, ds_unproxied); + // t.spawn(t => { + // t.assert(ds, Observe(SetBox.constructorInfo.label, t.ref({ + // message(t: Turn, [newValue]: [number]): void { + // if (newValue === 30000) { + // console.log('BOOM!'); + // t.message(revoker, true); + // } + // } + // }))); + // }); const ds_for_box = attenuate( ds, @@ -51,7 +64,7 @@ Turn.for(new Actor(), async (t: Turn) => { [0, Lit(SetBox.constructorInfo.label)]])))); const ds_for_client = attenuate( - ds, + ds_unproxied, rfilter(PCompound(CRec(SetBox.constructorInfo.label, SetBox.constructorInfo.arity), new Dictionary()),