Revocable forwarders

This commit is contained in:
Tony Garnock-Jones 2021-03-03 11:45:01 +01:00
parent f8f643000a
commit 1f5389b9cd
4 changed files with 67 additions and 12 deletions

View File

@ -1,4 +1,4 @@
import { Dictionary, IdentitySet, Record, Tuple, Value, is } from 'preserves';
import { Dictionary, IdentitySet, Record, Tuple, Value, is, IdentityMap } from 'preserves';
import { queueTask } from './task.js';
//---------------------------------------------------------------------------
@ -195,6 +195,11 @@ export class Turn {
assert(ref: Ref, assertion: Assertion): Handle {
const h = allocateHandle();
this._assert(ref, assertion, h);
return h;
}
_assert(ref: Ref, assertion: Assertion, h: Handle) {
const a = runRewrites(ref.attenuation, assertion);
if (a !== null) {
this.enqueue(ref.relay, t => {
@ -202,7 +207,6 @@ export class Turn {
ref.target.assert?.(t, a, h);
});
}
return h;
}
retract(h: Handle): void {
@ -396,3 +400,37 @@ export function rfilter(... patterns: Pattern[]): RewriteStage {
export function attenuate(ref: Ref, ... a: Attenuation): Ref {
return { ... ref, attenuation: [... a, ... (ref.attenuation ?? [])] };
}
export function forwarder(t: Turn, ref: Ref): { proxy: Ref, revoker: Ref } {
let underlying: Ref | null = ref;
let handleMap = new IdentityMap<Handle, Handle>();
let proxy = t.ref({
assert(turn: Turn, assertion: Assertion, handle: Handle): void {
if (underlying === null) return;
handleMap.set(handle, turn.assert(underlying, assertion));
},
retract(turn: Turn, handle: Handle): void {
if (underlying === null) return;
const h = handleMap.get(handle);
if (h !== void 0) {
turn.retract(h);
handleMap.delete(handle);
}
},
message(turn: Turn, body: Assertion): void {
if (underlying === null) return;
turn.message(underlying, body);
},
sync(turn: Turn, peer: Ref): void {
if (underlying === null) return;
turn._sync(underlying, peer);
},
});
let revoker = t.ref({
message(turn: Turn, _body: Assertion): void {
underlying = null;
handleMap.forEach(h => turn.retract(h));
},
});
return { proxy, revoker };
}

View File

@ -15,14 +15,14 @@ export default function (t: Turn, [ds, LIMIT, REPORT_EVERY]: [Ref, number, numbe
setValue(t, 0);
t.assert(ds, Observe(SetBox.constructorInfo.label, t.ref({
message(t: Turn, [newValue]: [number]): void {
// console.log(`Box: got ${newValue}`);
// console.log(`Box ${t.actor.id}: got ${newValue}`);
if (newValue % REPORT_EVERY === 0) {
const endTime = Date.now();
const delta = (endTime - startTime) / 1000.0;
const count = newValue - prevValue;
prevValue = newValue;
startTime = endTime;
console.log(`Box: got ${newValue} (${count / delta} Hz)`);
console.log(`Box ${t.actor.id}: got ${newValue} (${count / delta} Hz)`);
}
if (newValue === LIMIT) t.quit();
setValue(t, newValue);

View File

@ -1,6 +1,6 @@
import { BoxState, SetBox } from "./box-protocol.js";
import { Observe } from "./dataspace.js";
import { Assertion, Handle, Ref, Turn } from "./actor.js";
import { Assertion, Ref, Turn } from "./actor.js";
export default function (t: Turn, ds: Ref) {
t.spawn(t => {
@ -8,17 +8,21 @@ export default function (t: Turn, ds: Ref) {
let count = 0;
t.assert(ds, Observe(BoxState.constructorInfo.label, t.ref({
assert(t: Turn, [currentValue]: [number]): void {
// console.log(`Client: got ${currentValue}`);
// console.log(`Client ${t.actor.id}: got ${currentValue}`);
t.message(ds, SetBox(currentValue + 1));
}
})));
t.assert(ds, Observe(BoxState.constructorInfo.label, t.ref({
assert(_t: Turn, _assertion: Assertion): void { count++; },
retract(t: Turn, _handle: Handle) {
assert(_t: Turn, _assertion: Assertion): void {
count++;
// console.log('inc to', count, _assertion);
},
retract(t: Turn) {
if (--count === 0) {
console.log('Client: detected box termination');
console.log(`Client ${t.actor.id}: detected box termination`);
t.quit();
}
// console.log('dec to', count);
},
})));
});

View File

@ -1,4 +1,4 @@
import { Actor, Assertion, attenuate, CRec, Lit, Pattern, PCompound, Ref, rfilter, Turn } from './actor.js';
import { Actor, Assertion, attenuate, CRec, forwarder, Lit, Pattern, PCompound, Ref, rfilter, Turn } from './actor.js';
import { Dictionary, Record } from 'preserves';
import { Dataspace, Observe } from './dataspace.js';
import { Worker } from 'worker_threads';
@ -38,7 +38,20 @@ function spawnModule(t: Turn, moduleName: string, arg: Assertion) {
// __setNextActorId(1000);
Turn.for(new Actor(), async (t: Turn) => {
const ds = t.ref(new Dataspace());
const ds_unproxied = t.ref(new Dataspace());
const ds = ds_unproxied;
// const { proxy: ds, revoker } = forwarder(t, ds_unproxied);
// t.spawn(t => {
// t.assert(ds, Observe(SetBox.constructorInfo.label, t.ref({
// message(t: Turn, [newValue]: [number]): void {
// if (newValue === 30000) {
// console.log('BOOM!');
// t.message(revoker, true);
// }
// }
// })));
// });
const ds_for_box = attenuate(
ds,
@ -51,7 +64,7 @@ Turn.for(new Actor(), async (t: Turn) => {
[0, Lit(SetBox.constructorInfo.label)]]))));
const ds_for_client = attenuate(
ds,
ds_unproxied,
rfilter(PCompound(CRec(SetBox.constructorInfo.label,
SetBox.constructorInfo.arity),
new Dictionary()),