More WIP novy

This commit is contained in:
Tony Garnock-Jones 2021-12-01 17:13:00 +01:00
parent d9bd57d643
commit 1563398dd8
16 changed files with 674 additions and 1376 deletions

View File

@ -9,7 +9,9 @@
},
"repository": "github:syndicate-lang/syndicate-js",
"scripts": {
"prepare": "yarn compile && yarn rollup",
"prepare": "yarn regenerate && yarn compile && yarn rollup",
"regenerate": "rm -rf ./src/gen && preserves-schema-ts --module EntityRef=./src/runtime/actor.ts --output ./src/gen './protocols/schemas/**/*.prs'",
"regenerate:watch": "yarn regenerate --watch",
"compile": "../../node_modules/.bin/tsc",
"compile:watch": "../../node_modules/.bin/tsc -w",
"rollup": "../../node_modules/.bin/rollup -c",
@ -22,6 +24,7 @@
"types": "lib/index.d.ts",
"author": "Tony Garnock-Jones <tonyg@leastfixedpoint.com>",
"dependencies": {
"@preserves/core": "0.15.0"
"@preserves/core": "0.17.0",
"@preserves/schema": "0.18.0"
}
}

1
packages/core/src/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
gen/

View File

@ -19,14 +19,11 @@
export * from '@preserves/core';
export * from './runtime/randomid.js';
export * from './runtime/assertions.js';
export * from './runtime/bag.js';
export * as API from './runtime/api.js';
export * as Skeleton from './runtime/skeleton.js';
export * from './runtime/actor.js';
export * from './runtime/dataspace.js';
export * from './runtime/ground.js';
export * from './runtime/relay.js';
// export * as Worker from './runtime/worker.js';
import { randomId } from './runtime/randomid.js';

View File

@ -30,7 +30,7 @@ if ('stackTraceLimit' in Error) {
export type Assertion = Value<Ref>;
export type Handle = number;
export type ExitReason = null | { ok: true } | { ok: false, err: Error };
export type ExitReason = null | { ok: true } | { ok: false, err: unknown };
export type LocalAction = (t: Turn) => void;
export interface Entity {

View File

@ -18,8 +18,7 @@
// Bags and Deltas (which are Bags where item-counts can be negative).
import { GenericEmbedded } from '@preserves/core';
import { Value, Set, Dictionary } from '@preserves/core';
import { Value, Set, Dictionary, GenericEmbedded } from '@preserves/core';
export enum ChangeDescription {
PRESENT_TO_ABSENT = -1,
@ -28,19 +27,19 @@ export enum ChangeDescription {
PRESENT_TO_PRESENT = 2,
}
export class Bag {
_items: Dictionary<GenericEmbedded, number>;
export class Bag<T extends object = GenericEmbedded> {
_items: Dictionary<T, number>;
constructor(s?: Set) {
constructor(s?: Set<T>) {
this._items = new Dictionary();
if (s) s.forEach((v) => this._items.set(v, 1));
}
get(key: Value): number {
get(key: Value<T>): number {
return this._items.get(key, 0) as number;
}
change(key: Value, delta: number, clamp: boolean = false): ChangeDescription {
change(key: Value<T>, delta: number, clamp: boolean = false): ChangeDescription {
let oldCount = this.get(key);
let newCount = oldCount + delta;
if (clamp) {
@ -64,7 +63,7 @@ export class Bag {
this._items = new Dictionary();
}
includes(key: Value): boolean {
includes(key: Value<T>): boolean {
return this._items.has(key);
}
@ -72,24 +71,24 @@ export class Bag {
return this._items.size;
}
keys(): IterableIterator<Value> {
keys(): IterableIterator<Value<T>> {
return this._items.keys();
}
entries(): IterableIterator<[Value, number]> {
entries(): IterableIterator<[Value<T>, number]> {
return this._items.entries();
}
forEach(f: (count: number, value: Value) => void) {
forEach(f: (count: number, value: Value<T>) => void) {
this._items.forEach(f);
}
snapshot(): Dictionary<GenericEmbedded, number> {
snapshot(): Dictionary<T, number> {
return this._items.clone();
}
clone(): Bag {
const b = new Bag();
clone(): Bag<T> {
const b = new Bag<T>();
b._items = this._items.clone();
return b;
}

View File

@ -20,773 +20,36 @@ import { Value, is, Set } from '@preserves/core';
import * as Skeleton from './skeleton.js';
import { Bag, ChangeDescription } from './bag.js';
import { Observe } from './assertions.js';
import * as Dataflow from './dataflow.js';
import { IdentitySet, IdentityMap } from './idcoll.js';
import { Ground } from './ground.js';
export enum Priority {
QUERY_HIGH = 0,
QUERY,
QUERY_HANDLER,
NORMAL,
GC,
IDLE,
_count
}
export type ActorId = number;
export type FacetId = ActorId;
export type EndpointId = ActorId;
export type Task<T> = () => T;
export type Script<T, Fields> = (this: Fields & DataflowObservableObject, f: Facet<Fields>) => T;
export type MaybeValue = Value<any> | undefined;
export type EndpointSpec = { assertion: MaybeValue, analysis: Skeleton.Analysis | null };
export type ObserverCallback<Fields> =
(this: Fields, facet: Facet<Fields>, bindings: Array<Value<any>>) => void;
export type ObserverCallbacks<Fields> = {
add?: ObserverCallback<Fields>;
del?: ObserverCallback<Fields>;
msg?: ObserverCallback<Fields>;
}
export const DataflowObservableObjectId = Symbol.for('DataflowObservableObjectId');
export interface DataflowObservableObject {
[DataflowObservableObjectId](): number;
}
export type DataflowObservable = [DataflowObservableObject, string];
export function _canonicalizeDataflowObservable(i: DataflowObservable): string {
return i[0][DataflowObservableObjectId]() + ',' + i[1];
}
export type DataflowDependent = Endpoint<any>;
export function _canonicalizeDataflowDependent(i: DataflowDependent): string {
return '' + i.id;
}
export type ActivationScript = Script<void, {}>;
export abstract class Dataspace {
nextId: ActorId = 0;
export class Dataspace implements Partial<Entity> {
index = new Skeleton.Index();
dataflow = new Dataflow.Graph<DataflowDependent, DataflowObservable>(
_canonicalizeDataflowDependent,
_canonicalizeDataflowObservable);
runnable: Array<Actor> = [];
pendingTurns: Array<Turn>;
actors: IdentityMap<number, Actor> = new IdentityMap();
activations: IdentitySet<ActivationScript> = new IdentitySet();
constructor(bootProc: Script<void, {}>) {
this.pendingTurns = [new Turn(null, [new Spawn(null, bootProc, new Set())])];
}
abstract start(): this;
abstract ground(): Ground;
backgroundTask(): () => void {
return this.ground().backgroundTask();
}
runTasks(): boolean { // TODO: rename?
this.runPendingTasks();
this.performPendingActions();
return this.runnable.length > 0 || this.pendingTurns.length > 0;
}
runPendingTasks() {
let runnable = this.runnable;
this.runnable = [];
runnable.forEach((ac) => { ac.runPendingTasks(); /* TODO: rename? */ });
}
performPendingActions() {
let turns = this.pendingTurns;
this.pendingTurns = [];
turns.forEach((turn) => {
turn.actions.forEach((action) => {
// console.log('[DATASPACE]', turn.actor && turn.actor.toString(), action);
action.perform(this, turn.actor);
this.runPendingTasks();
});
});
}
commitActions(ac: Actor, pending: Array<Action>) {
this.pendingTurns.push(new Turn(ac, pending));
}
refreshAssertions() {
this.dataflow.repairDamage((ep) => {
let facet = ep.facet;
if (facet.isLive) { // TODO: necessary test, or tautological?
facet.invokeScript(f => f.withNonScriptContext(() => ep.refresh()));
}
});
}
addActor<SpawnFields>(
name: any,
bootProc: Script<void, SpawnFields>,
initialAssertions: Set,
parentActor: Actor | null)
{
let ac = new Actor(this, name, initialAssertions, parentActor?.id);
// debug('Spawn', ac && ac.toString());
this.applyPatch(ac, ac.adhocAssertions);
ac.addFacet<{}, {}>(null, systemFacet => {
// Root facet is a dummy "system" facet that exists to hold
// one-or-more "user" "root" facets.
ac.addFacet<{}, SpawnFields>(systemFacet, bootProc);
// ^ The "true root", user-visible facet.
initialAssertions.forEach((a) => { ac.adhocRetract(a); });
});
}
applyPatch(ac: Actor, delta: Bag) {
// if (!delta.isEmpty()) debug('applyPatch BEGIN', ac && ac.toString());
let removals: Array<[number, Value<any>]> = [];
delta.forEach((count, a) => {
if (count > 0) {
// debug('applyPatch +', a && a.toString());
this.adjustIndex(a, count);
} else {
removals.push([count, a]);
}
if (ac) ac.cleanupChanges.change(a, -count);
});
removals.forEach(([count, a]) => {
// debug('applyPatch -', a && a.toString());
this.adjustIndex(a, count);
});
// if (!delta.isEmpty()) debug('applyPatch END');
}
deliverMessage(m: Value<any>, _sendingActor: Actor | null) {
// debug('deliverMessage', sendingActor && sendingActor.toString(), m.toString());
this.index.deliverMessage(m);
// this.index.deliverMessage(m, (leaf, _m) => {
// sendingActor.touchedTopics = sendingActor.touchedTopics.add(leaf);
// });
}
adjustIndex(a: Value<any>, count: number) {
return this.index.adjustAssertion(a, count);
}
subscribe(handler: Skeleton.Analysis) {
this.index.addHandler(handler, handler.callback!);
}
unsubscribe(handler: Skeleton.Analysis) {
this.index.removeHandler(handler, handler.callback!);
}
endpointHook<Fields>(_facet: Facet<Fields>, _endpoint: Endpoint<Fields>) {
// Subclasses may override
}
}
export class Actor {
readonly id: ActorId;
readonly dataspace: Dataspace;
readonly name: any;
rootFacet: Facet<any> | null = null;
isRunnable: boolean = false;
readonly pendingTasks: Array<Array<Task<void>>>;
pendingActions: Array<Action>;
adhocAssertions: Bag;
cleanupChanges = new Bag(); // negative counts allowed!
parentId: ActorId | undefined;
constructor(dataspace: Dataspace,
name: any,
initialAssertions: Set,
parentActorId: ActorId | undefined)
{
this.id = dataspace.nextId++;
this.dataspace = dataspace;
this.name = name;
this.isRunnable = false;
this.pendingTasks = [];
for (let i = 0; i < Priority._count; i++) { this.pendingTasks.push([]); }
this.pendingActions = [];
this.adhocAssertions = new Bag(initialAssertions); // no negative counts allowed
this.parentId = parentActorId;
dataspace.actors.set(this.id, this);
}
runPendingTasks() {
while (true) {
let task = this.popNextTask();
if (!task) break;
task();
this.dataspace.refreshAssertions();
}
this.isRunnable = false;
let pending = this.pendingActions;
if (pending.length > 0) {
this.pendingActions = [];
this.dataspace.commitActions(this, pending);
}
}
popNextTask(): Task<void> | null {
let tasks = this.pendingTasks;
for (let i = 0; i < Priority._count; i++) {
let q = tasks[i];
if (q.length > 0) return q.shift()!;
}
return null;
}
abandonQueuedWork() {
this.pendingActions = [];
for (let i = 0; i < Priority._count; i++) { this.pendingTasks[i] = []; }
}
scheduleTask(task: Task<void>, priority: Priority = Priority.NORMAL) {
if (!this.isRunnable) {
this.isRunnable = true;
this.dataspace.runnable.push(this);
}
this.pendingTasks[priority].push(task);
}
addFacet<ParentFields, ChildFields>(
parentFacet: Facet<ParentFields> | null,
bootProc: Script<void, ChildFields & ParentFields>,
checkInScript: boolean = false)
{
if (checkInScript && parentFacet && !parentFacet.inScript) {
throw new Error("Cannot add facet outside script; are you missing a `react { ... }`?");
}
let f = new Facet<ChildFields & ParentFields>(this, parentFacet);
f.invokeScript(f => f.withNonScriptContext(() => bootProc.call(f.fields, f)));
this.scheduleTask(() => {
if ((parentFacet && !parentFacet.isLive) || f.isInert()) {
f._terminate();
}
});
}
_terminate(emitPatches: boolean) {
// Abruptly terminates an entire actor, without running stop-scripts etc.
if (emitPatches) {
this.scheduleTask(() => {
this.adhocAssertions.snapshot().forEach((_count, a) => { this.retract(a); });
});
}
if (this.rootFacet) {
this.rootFacet._abort(emitPatches);
}
this.scheduleTask(() => { this.enqueueScriptAction(new Quit()); });
}
enqueueScriptAction(action: Action) {
this.pendingActions.push(action);
}
pendingPatch(): Patch {
if (this.pendingActions.length > 0) {
let p = this.pendingActions[this.pendingActions.length - 1];
if (p instanceof Patch) return p;
}
let p = new Patch(new Bag());
this.enqueueScriptAction(p);
return p;
}
assert(a: Value<any>) { this.pendingPatch().adjust(a, +1); }
retract(a: Value<any>) { this.pendingPatch().adjust(a, -1); }
adhocRetract(a: Value<any>) {
if (this.adhocAssertions.change(a, -1, true) === ChangeDescription.PRESENT_TO_ABSENT) {
this.retract(a);
}
}
adhocAssert(a: Value<any>) {
if (this.adhocAssertions.change(a, +1) === ChangeDescription.ABSENT_TO_PRESENT) {
this.assert(a);
}
}
toString(): string {
let s = 'Actor(' + this.id;
if (this.name !== void 0 && this.name !== null) s = s + ',' + this.name.toString();
return s + ')';
}
}
abstract class Action {
abstract perform(ds: Dataspace, ac: Actor | null): void;
}
class Patch extends Action {
readonly changes: Bag;
constructor(changes: Bag) {
super();
this.changes = changes;
}
perform(ds: Dataspace, ac: Actor | null): void {
ds.applyPatch(ac!, this.changes);
}
adjust(a: Value<any>, count: number) {
this.changes.change(a, count);
}
}
class Message extends Action {
readonly body: Value<any>;
constructor(body: Value<any>) {
super();
this.body = body;
}
perform(ds: Dataspace, ac: Actor | null): void {
ds.deliverMessage(this.body, ac);
}
}
class Spawn<Fields> extends Action {
readonly name: any;
readonly bootProc: Script<void, Fields>;
readonly initialAssertions: Set;
constructor(name: any, bootProc: Script<void, Fields>, initialAssertions: Set = new Set()) {
super();
this.name = name;
this.bootProc = bootProc;
this.initialAssertions = initialAssertions;
}
perform(ds: Dataspace, ac: Actor | null): void {
ds.addActor(this.name, this.bootProc, this.initialAssertions, ac);
}
}
class Quit extends Action { // TODO: rename? Perhaps to Cleanup?
// Pseudo-action - not for userland use.
perform(ds: Dataspace, ac: Actor | null): void {
if (ac === null) throw new Error("Internal error: Quit action with null actor");
ds.applyPatch(ac, ac.cleanupChanges);
ds.actors.delete(ac.id);
// debug('Quit', ac && ac.toString());
}
}
class DeferredTurn extends Action {
readonly continuation: Task<void>;
constructor(continuation: Task<void>) {
super();
this.continuation = continuation;
}
perform(_ds: Dataspace, ac: Actor | null): void {
// debug('DeferredTurn', ac && ac.toString());
ac!.scheduleTask(this.continuation);
}
}
class Activation extends Action {
readonly script: ActivationScript;
readonly name: any;
constructor(script: ActivationScript, name: any) {
super();
this.script = script;
this.name = name;
}
perform(ds: Dataspace, ac: Actor | null): void {
if (ds.activations.has(this.script)) return;
ds.activations.add(this.script);
ds.addActor<{}>(this.name, rootFacet => rootFacet.addStartScript(this.script), new Set(), ac);
}
}
export class Turn {
readonly actor: Actor | null;
readonly actions: Array<Action>;
constructor(actor: Actor | null, actions: Array<Action> = []) {
this.actor = actor;
this.actions = actions;
}
enqueueScriptAction(a: Action) {
this.actions.push(a);
}
}
export class Facet<Fields> {
readonly id: FacetId;
isLive = true;
readonly actor: Actor;
readonly parent: Facet<any> | null;
readonly endpoints = new IdentityMap<EndpointId, Endpoint<Fields>>();
readonly stopScripts: Array<Script<void, Fields>> = [];
readonly children = new IdentitySet<Facet<any>>();
readonly fields: Fields & DataflowObservableObject;
inScript = true;
constructor(actor: Actor, parent: Facet<any> | null) {
this.id = actor.dataspace.nextId++;
this.actor = actor;
this.parent = parent;
if (parent) {
parent.children.add(this);
this.fields = Dataflow.Graph.newScope(parent.fields);
} else {
if (actor.rootFacet) {
throw new Error("INVARIANT VIOLATED: Attempt to add second root facet");
}
actor.rootFacet = this;
this.fields = Dataflow.Graph.newScope({});
}
this.fields[DataflowObservableObjectId] = () => this.id;
}
withNonScriptContext<T>(task: Task<T>): T {
let savedInScript = this.inScript;
this.inScript = false;
try {
return task();
} finally {
this.inScript = savedInScript;
}
}
_abort(emitPatches: boolean) {
this.isLive = false;
this.children.forEach(child => child._abort(emitPatches));
this.retractAssertionsAndSubscriptions(emitPatches);
}
retractAssertionsAndSubscriptions(emitPatches: boolean) {
this.actor.scheduleTask(() => {
this.endpoints.forEach((ep) => ep.destroy(emitPatches));
this.endpoints.clear();
});
}
isInert(): boolean {
return this.endpoints.size === 0 && this.children.size === 0;
}
_terminate() {
if (!this.isLive) return;
let ac = this.actor;
let parent = this.parent;
if (parent) {
parent.children.delete(this);
} else {
ac.rootFacet = null;
}
this.isLive = false;
this.children.forEach((child) => { child._terminate(); });
// Run stop-scripts after terminating children. This means
// that children's stop-scripts run before ours.
ac.scheduleTask(() =>
this.invokeScript(() =>
this.stopScripts.forEach(s =>
s.call(this.fields, this))));
this.retractAssertionsAndSubscriptions(true);
ac.scheduleTask(() => {
if (parent) {
if (parent.isInert()) {
parent._terminate();
}
} else {
ac._terminate(true);
}
}, Priority.GC);
}
// This alias exists because of the naive expansion done by the parser.
_stop(continuation?: Script<void, Fields>) {
this.stop(continuation);
}
stop(continuation?: Script<void, Fields>) {
this.parent!.invokeScript(() => {
this.actor.scheduleTask(() => {
this._terminate();
if (continuation) {
this.parent!.scheduleScript(parent => continuation.call(this.fields, parent));
// ^ TODO: is this the correct scope to use??
}
});
});
}
addStartScript(s: Script<void, Fields>) {
this.ensureFacetSetup('`on start`');
this.scheduleScript(s);
}
addStopScript(s: Script<void, Fields>) {
this.ensureFacetSetup('`on stop`');
this.stopScripts.push(s);
}
addEndpoint(updateFun: Script<EndpointSpec, Fields>, isDynamic: boolean = true): Endpoint<Fields> {
const ep = new Endpoint(this, isDynamic, updateFun);
this.actor.dataspace.endpointHook(this, ep);
return ep;
}
_addRawObserverEndpoint(specScript: Script<MaybeValue, Fields>, callbacks: ObserverCallbacks<Fields>): Endpoint<Fields>
{
return this.addEndpoint(() => {
const spec = specScript.call(this.fields, this);
if (spec === void 0) {
return { assertion: void 0, analysis: null };
} else {
const analysis = Skeleton.analyzeAssertion(spec);
analysis.callback = this.wrap((facet, evt, vs) => {
switch (evt) {
case Skeleton.EventType.ADDED: callbacks.add?.call(facet.fields, facet, vs); break;
case Skeleton.EventType.REMOVED: callbacks.del?.call(facet.fields, facet, vs); break;
case Skeleton.EventType.MESSAGE: callbacks.msg?.call(facet.fields, facet, vs); break;
}
});
return { assertion: Observe(spec), analysis };
}
});
}
addObserverEndpoint(specThunk: (facet: Facet<Fields>) => MaybeValue, callbacks: ObserverCallbacks<Fields>): Endpoint<Fields> {
const scriptify = (f?: ObserverCallback<Fields>) =>
f && ((facet: Facet<Fields>, vs: Array<Value<any>>) =>
facet.scheduleScript(() => f.call(facet.fields, facet, vs)));
return this._addRawObserverEndpoint(specThunk, {
add: scriptify(callbacks.add),
del: scriptify(callbacks.del),
msg: scriptify(callbacks.msg),
});
}
addDataflow(subjectFun: Script<void, Fields>, priority?: Priority): Endpoint<Fields> {
return this.addEndpoint(() => {
let subjectId = this.actor.dataspace.dataflow.currentSubjectId;
this.scheduleScript(() => {
if (this.isLive) {
this.actor.dataspace.dataflow.withSubject(subjectId, () =>
subjectFun.call(this.fields, this));
}
}, priority);
return { assertion: void 0, analysis: null };
});
}
enqueueScriptAction(action: Action) {
this.actor.enqueueScriptAction(action);
}
toString(): string {
let s = 'Facet(' + this.actor.id;
if (this.actor.name !== void 0 && this.actor.name !== null) {
s = s + ',' + this.actor.name.toString();
}
s = s + ',' + this.id;
let f = this.parent;
while (f != null) {
s = s + ':' + f.id;
f = f.parent;
}
return s + ')';
}
invokeScript<T>(script: Script<T, Fields>, propagateErrors = false): T | undefined {
try {
// console.group('Facet', facet && facet.toString());
return script.call(this.fields, this);
} catch (e) {
let a = this.actor;
a.abandonQueuedWork();
a._terminate(false);
console.error('Actor ' + a.toString() + ' exited with exception:', e);
if (propagateErrors) throw e;
return undefined;
} finally {
// console.groupEnd();
}
}
wrap<T extends Array<any>, R>(
fn: (this: Fields & DataflowObservableObject,
f: Facet<Fields>, ... args: T) => R
): (... args: T) => R
{
return (... actuals) => this.invokeScript(f => fn.call(f.fields, f, ... actuals), true)!;
}
wrapExternal<T extends Array<any>>(
fn: (this: Fields & DataflowObservableObject,
f: Facet<Fields>, ... args: T) => void
): (... args: T) => void {
const ac = this.actor;
return (... actuals) => {
if (this.isLive) {
ac.dataspace.start();
ac.scheduleTask(() => this.invokeScript(f => fn.call(f.fields, f, ... actuals)));
}
};
}
ensureFacetSetup(what: string) {
if (this.inScript) {
throw new Error(`Cannot ${what} outside facet setup; are you missing \`react { ... }\`?`);
}
}
ensureNonFacetSetup(what: string) {
if (!this.inScript) {
throw new Error(`Cannot ${what} during facet setup; are you missing \`on start { ... }\`?`);
}
}
// This alias exists because of the naive expansion done by the parser.
_send(body: Value<any>) {
this.send(body);
}
send(body: Value<any>) {
this.ensureNonFacetSetup('`send`');
this.enqueueScriptAction(new Message(body));
}
// This alias exists because of the naive expansion done by the parser.
_spawn<SpawnFields>(name: any, bootProc: Script<void, SpawnFields>, initialAssertions?: Set) {
this.spawn(name, bootProc, initialAssertions);
}
spawn<SpawnFields>(name: any, bootProc: Script<void, SpawnFields>, initialAssertions?: Set) {
this.ensureNonFacetSetup('`spawn`');
this.enqueueScriptAction(new Spawn(name, bootProc, initialAssertions));
}
deferTurn(continuation: Script<void, Fields>) {
this.ensureNonFacetSetup('`deferTurn`');
this.enqueueScriptAction(new DeferredTurn(this.wrap(continuation)));
}
activate(script: ActivationScript, name?: any) {
this.ensureNonFacetSetup('`activate`');
this.enqueueScriptAction(new Activation(script, name ?? null));
}
scheduleScript(script: Script<void, Fields>, priority?: Priority) {
this.actor.scheduleTask(this.wrap(script), priority);
}
declareField<T extends DataflowObservableObject, K extends keyof T & string>(obj: T, prop: K, init: T[K]) {
if (prop in obj) {
obj[prop] = init;
} else {
this.actor.dataspace.dataflow.defineObservableProperty(obj, prop, init, {
objectId: [obj, prop],
noopGuard: is
});
}
}
// referenceField(obj: DataflowObservableObject, prop: string) {
// if (!(prop in obj)) {
// this.actor.dataspace.dataflow.recordObservation([obj, prop]);
// }
// return obj[prop];
// adjustIndex(a: Value<any>, count: number) {
// return this.index.adjustAssertion(a, count);
// }
// deleteField(obj: DataflowObservableObject, prop: string) {
// this.actor.dataspace.dataflow.recordDamage([obj, prop]);
// delete obj[prop];
// subscribe(handler: Skeleton.Analysis) {
// this.index.addHandler(handler, handler.callback!);
// }
addChildFacet<ChildFields>(bootProc: Script<void, ChildFields & Fields>) {
this.actor.addFacet<Fields, ChildFields>(this, bootProc, true);
// unsubscribe(handler: Skeleton.Analysis) {
// this.index.removeHandler(handler, handler.callback!);
// }
assert(turn: Turn, rec: Assertion, handle: Handle): void {
console.log(preserves`ds ${turn.activeFacet.id} assert ${rec} ${handle}`);
throw new Error("Full dataspaces not implemented");
}
withSelfDo(t: Script<void, Fields>) {
t.call(this.fields, this);
}
}
export class Endpoint<Fields> {
readonly id: EndpointId;
readonly facet: Facet<Fields>;
readonly updateFun: Script<EndpointSpec, Fields>;
spec: EndpointSpec;
constructor(facet: Facet<Fields>, isDynamic: boolean, updateFun: Script<EndpointSpec, Fields>) {
facet.ensureFacetSetup('add endpoint');
let ac = facet.actor;
let ds = ac.dataspace;
this.id = ds.nextId++;
this.facet = facet;
this.updateFun = updateFun;
let initialSpec = ds.dataflow.withSubject(isDynamic ? this : undefined,
() => updateFun.call(facet.fields, facet));
this._install(initialSpec);
this.spec = initialSpec; // keeps TypeScript's undefinedness-checker happy
facet.endpoints.set(this.id, this);
}
_install(spec: EndpointSpec) {
this.spec = spec;
const ac = this.facet.actor;
if (this.spec.assertion !== void 0) {
ac.assert(this.spec.assertion);
}
if (this.spec.analysis) ac.dataspace.subscribe(this.spec.analysis);
}
_uninstall(emitPatches: boolean) {
if (emitPatches) {
if (this.spec.assertion !== void 0) {
this.facet.actor.retract(this.spec.assertion);
}
}
if (this.spec.analysis) this.facet.actor.dataspace.unsubscribe(this.spec.analysis);
}
refresh() {
let newSpec = this.updateFun.call(this.facet.fields, this.facet);
if (!is(newSpec.assertion, this.spec.assertion)) {
this._uninstall(true);
this._install(newSpec);
}
}
destroy(emitPatches: boolean) {
const facet = this.facet;
facet.actor.dataspace.dataflow.forgetSubject(this);
// ^ TODO: this won't work because of object identity problems! Why
// does the Racket implementation do this, when the old JS
// implementation doesn't?
facet.endpoints.delete(this.id);
this._uninstall(emitPatches);
}
toString(): string {
return 'Endpoint(' + this.id + ')';
retract(turn: Turn, upstreamHandle: Handle): void {
console.log(preserves`ds ${turn.activeFacet.id} retract ${upstreamHandle}`);
throw new Error("Full dataspaces not implemented");
}
message(turn: Turn, rec: Assertion): void {
console.log(preserves`ds ${turn.activeFacet.id} message ${rec}`);
throw new Error("Full dataspaces not implemented");
}
}

View File

@ -1,131 +0,0 @@
//---------------------------------------------------------------------------
// @syndicate-lang/core, an implementation of Syndicate dataspaces for JS.
// Copyright (C) 2016-2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//---------------------------------------------------------------------------
import { ActivationScript, Dataspace } from './dataspace.js';
export type StopHandler<D extends Dataspace> = (ds: D) => void;
declare global {
interface Window {
_ground: Ground;
}
}
export class Ground extends Dataspace {
stepScheduled = false;
stepping = false;
startingFuel: number = 1000;
stopHandlers: Array<StopHandler<this>> = [];
backgroundTaskCount = 0;
constructor(bootProc: ActivationScript) {
super(function (rootFacet) { rootFacet.addStartScript(() => rootFacet.activate(bootProc)); });
if (typeof window !== 'undefined') {
window._ground = this;
}
}
static laterCall(thunk: () => void): void {
queueMicrotask(() => {
if ('stackTraceLimit' in Error) {
(Error as any).stackTraceLimit = 100;
}
try {
thunk();
} catch (e) {
console.error("SYNDICATE/JS INTERNAL ERROR", e);
}
});
}
backgroundTask(): () => void {
let active = true;
this.backgroundTaskCount++;
return () => {
if (active) {
this.backgroundTaskCount--;
active = false;
}
};
}
start(): this {
if (!this.stepScheduled) {
Ground.laterCall(() => {
this.stepScheduled = false;
this._step();
});
}
return this; // allows chaining start() immediately after construction
}
ground(): Ground {
return this;
}
_step() {
this.stepping = true;
try {
let stillBusy = false;
for (var fuel = this.startingFuel; fuel > 0; fuel--) {
stillBusy = this.runTasks();
if (!stillBusy) break;
}
if (stillBusy) {
this.start();
} else {
if (!this.backgroundTaskCount) {
this.stopHandlers.forEach((h) => h(this));
this.stopHandlers = [];
}
}
} finally {
this.stepping = false;
}
}
addStopHandler(h: StopHandler<this>): this {
this.stopHandlers.push(h);
return this;
}
}
// let g = new Ground(() => {
// Worker.spawnWorkerRelay();
// });
// if (typeof document !== 'undefined') {
// document.addEventListener("DOMContentLoaded", () => {
// g.start();
// if (k) k(g);
// });
// } else {
// g.start();
// if (k) k(g);
// }
export function bootModule(bootProc: ActivationScript): Ground {
const g = new Ground(bootProc);
Ground.laterCall(() => {
if (typeof document !== 'undefined') {
document.addEventListener('DOMContentLoaded', () => g.start());
} else {
g.start();
}
});
return g;
}

View File

@ -0,0 +1,181 @@
//---------------------------------------------------------------------------
// @syndicate-lang/core, an implementation of Syndicate dataspaces for JS.
// Copyright (C) 2016-2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//---------------------------------------------------------------------------
use crate::schemas::dataspace_patterns::*;
use preserves::value::NestedValue;
use std::convert::TryFrom;
use std::convert::TryInto;
#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq)]
pub enum PathStep {
Index(usize),
Key(_Any),
}
pub type Path = Vec<PathStep>;
pub type Paths = Vec<Path>;
struct Analyzer {
pub const_paths: Paths,
pub const_values: Vec<_Any>,
pub capture_paths: Paths,
}
pub struct PatternAnalysis {
pub const_paths: Paths,
pub const_values: _Any,
pub capture_paths: Paths,
}
struct PatternMatcher<N: NestedValue> {
captures: Vec<N>,
}
impl PatternAnalysis {
pub fn new(p: &Pattern) -> Self {
let mut analyzer = Analyzer {
const_paths: Vec::new(),
const_values: Vec::new(),
capture_paths: Vec::new(),
};
analyzer.walk(&mut Vec::new(), p);
PatternAnalysis {
const_paths: analyzer.const_paths,
const_values: _Any::new(analyzer.const_values),
capture_paths: analyzer.capture_paths,
}
}
}
impl Analyzer {
fn walk_step(&mut self, path: &mut Path, s: PathStep, p: &Pattern) {
path.push(s);
self.walk(path, p);
path.pop();
}
fn walk(&mut self, path: &mut Path, p: &Pattern) {
match p {
Pattern::DCompound(b) => match &**b {
DCompound::Rec { members, .. } |
DCompound::Arr { members, .. } => {
for (i, p) in members {
self.walk_step(path, PathStep::Index(usize::try_from(i).unwrap_or(0)), p);
}
}
DCompound::Dict { members, .. } => {
for (k, p) in members {
self.walk_step(path, PathStep::Key(k.clone()), p);
}
}
}
Pattern::DBind(b) => {
let DBind { pattern, .. } = &**b;
self.capture_paths.push(path.clone());
self.walk(path, pattern)
}
Pattern::DDiscard(_) =>
(),
Pattern::DLit(b) => {
let DLit { value } = &**b;
self.const_paths.push(path.clone());
self.const_values.push(value.clone());
}
}
}
}
impl<N: NestedValue> Pattern<N> {
pub fn match_value(&self, value: &N) -> Option<Vec<N>> {
let mut matcher = PatternMatcher::new();
if matcher.run(self, value) {
Some(matcher.captures)
} else {
None
}
}
}
impl<N: NestedValue> PatternMatcher<N> {
fn new() -> Self {
PatternMatcher {
captures: Vec::new(),
}
}
fn run(&mut self, pattern: &Pattern<N>, value: &N) -> bool {
match pattern {
Pattern::DDiscard(_) => true,
Pattern::DBind(b) => {
self.captures.push(value.clone());
self.run(&b.pattern, value)
}
Pattern::DLit(b) => value == &b.value,
Pattern::DCompound(b) => match &**b {
DCompound::Rec { ctor, members } => {
let arity = (&ctor.arity).try_into().expect("reasonable arity");
match value.value().as_record(Some(arity)) {
None => false,
Some(r) => {
for (i, p) in members.iter() {
let i: usize = i.try_into().expect("reasonable index");
if !self.run(p, &r.fields()[i]) {
return false;
}
}
true
}
}
}
DCompound::Arr { ctor, members } => {
let arity: usize = (&ctor.arity).try_into().expect("reasonable arity");
match value.value().as_sequence() {
None => false,
Some(vs) => {
if vs.len() != arity {
return false;
}
for (i, p) in members.iter() {
let i: usize = i.try_into().expect("reasonable index");
if !self.run(p, &vs[i]) {
return false;
}
}
true
}
}
}
DCompound::Dict { ctor: _, members } => {
match value.value().as_dictionary() {
None => false,
Some(entries) => {
for (k, p) in members.iter() {
if !entries.get(k).map(|v| self.run(p, v)).unwrap_or(false) {
return false;
}
}
true
}
}
}
}
}
}
}

View File

@ -1,151 +0,0 @@
//---------------------------------------------------------------------------
// @syndicate-lang/core, an implementation of Syndicate dataspaces for JS.
// Copyright (C) 2016-2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//---------------------------------------------------------------------------
import { embed, Value } from '@preserves/core';
import { $Special } from './special.js';
import { Dataspace, Facet, Actor, Endpoint, Script } from './dataspace.js';
import { Observe, Inbound, Outbound } from './assertions.js';
import { ChangeDescription } from './bag.js';
import { EventType, Analysis } from './skeleton.js';
import { Ground } from './ground.js';
export const $QuitDataspace = embed(new $Special("quit-dataspace"));
export class NestedDataspace extends Dataspace {
readonly outerFacet: Facet<{}>;
constructor(outerFacet: Facet<{}>, bootProc: Script<void, {}>) {
super(bootProc);
this.outerFacet = outerFacet;
}
deliverMessage(m: any, _sendingActor: Actor) {
super.deliverMessage(m, _sendingActor);
if (m === $QuitDataspace) {
this.outerFacet.stop();
}
}
endpointHook<InnerFields>(facet: Facet<InnerFields>, innerEp: Endpoint<InnerFields>) {
super.endpointHook(facet, innerEp);
const innerAssertion = innerEp.spec.assertion;
if (!Observe.isClassOf(innerAssertion)) return;
const wrapper = innerAssertion[0];
if (!Inbound.isClassOf(wrapper)) return;
// We know that innerAssertion is an Observe(Inbound(...)). Also, if
// innerEp.spec.analysis exists, it will be consonant with innerAssertion. Beware of
// completely-constant patterns, which cause skeleton to be null!
const innerDs = this;
this.hookEndpointLifecycle(innerEp, this.outerFacet.withNonScriptContext(() =>
this.outerFacet.addEndpoint(() => {
const assertion = Observe(wrapper[0]);
const h = innerEp.spec.analysis!;
const innerCallback = h.callback;
const callback = (innerCallback === void 0) ? void 0 :
function (evt: EventType, captures: Array<Value>) {
innerCallback.call(null, evt, captures);
innerDs.start();
};
const analysis: Analysis | null = (h === null) ? null :
(h.skeleton === null
? {
skeleton: null,
constPaths: h.constPaths,
constVals: h.constVals.map(v => (v as ReturnType<typeof Observe>)[0]),
capturePaths: h.capturePaths.map(p => p.slice(1)),
callback
}
: {
skeleton: h.skeleton.members[0],
constPaths: h.constPaths.map(p => p.slice(1)),
constVals: h.constVals,
capturePaths: h.capturePaths.map(p => p.slice(1)),
callback
});
return { assertion, analysis };
}, false)));
}
adjustIndex(a: Value, count: number) {
const net = super.adjustIndex(a, count);
if (Outbound.isClassOf(a)) {
switch (net) {
case ChangeDescription.ABSENT_TO_PRESENT:
this.outerFacet.actor.scheduleTask(() => {
this.outerFacet.actor.adhocAssert(a[0]);
});
this.outerFacet.actor.dataspace.start();
break;
case ChangeDescription.PRESENT_TO_ABSENT:
this.outerFacet.actor.scheduleTask(() => {
this.outerFacet.actor.adhocRetract(a[0]);
});
this.outerFacet.actor.dataspace.start();
break;
}
}
return net;
}
hookEndpointLifecycle<InnerFields>(innerEp: Endpoint<InnerFields>, outerEp: Endpoint<{}>) {
const _refresh = innerEp.refresh;
innerEp.refresh = function () {
_refresh.call(this);
outerEp.refresh();
};
const _destroy = innerEp.destroy;
innerEp.destroy = function (emitPatches: boolean) {
_destroy.call(this, emitPatches);
outerEp.destroy(true);
};
}
start(): this {
this.outerFacet.actor.dataspace.start();
this.outerFacet.scheduleScript(outerFacet => {
outerFacet.invokeScript(() => {
if (this.outerFacet.isLive) {
this.outerFacet.deferTurn(() => {
const stillBusy = this.runTasks();
if (stillBusy) this.start();
});
}
});
});
return this;
}
ground(): Ground {
return this.outerFacet.actor.dataspace.ground();
}
}
export function inNestedDataspace(bootProc: Script<void, {}>): Script<void, {}> {
return outerFacet => {
outerFacet.addDataflow(function () {});
// ^ eww! Dummy endpoint to keep the root facet of the relay alive.
const innerDs = new NestedDataspace(outerFacet, innerFacet =>
innerFacet.addStartScript(f => bootProc.call(f.fields, f)));
innerDs.start();
};
}

View File

@ -20,7 +20,6 @@ import { IdentitySet } from './idcoll.js';
import { is, Value, Record, Set, Dictionary, canonicalString, RecordConstructorInfo, GenericEmbedded } from '@preserves/core';
import { Bag, ChangeDescription } from './bag.js';
import { Discard, Capture } from './assertions.js';
import * as Stack from './stack.js';

View File

@ -16,12 +16,22 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//---------------------------------------------------------------------------
// $Special: Builder of singleton "atoms".
const LIMIT = 25000;
export class $Special {
readonly name: string;
constructor(name: string) {
this.name = name;
let taskCounter = 0;
let delayedTasks: Array<() => void> = [];
export function queueTask(f: () => void) {
taskCounter++;
if (taskCounter === LIMIT) {
setTimeout(() => {
taskCounter = 0;
delayedTasks.forEach(queueMicrotask);
delayedTasks = [];
}, 0);
}
if (taskCounter >= LIMIT) {
delayedTasks.push(f);
} else {
queueMicrotask(f);
}
}

View File

@ -1,304 +0,0 @@
//---------------------------------------------------------------------------
// @syndicate-lang/core, an implementation of Syndicate dataspaces for JS.
// Copyright (C) 2016-2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//---------------------------------------------------------------------------
// import { Dataspace, Facet } from './dataspace.js';
// import { Observe, Outbound, Inbound, Capture, Discard } from './assertions.js';
// import * as Skeleton from './skeleton.js';
// import { preserves, Value, Bytes, Record, Dictionary, encode, decode } from '@preserves/core';
// type MessageHandler = (e: Bytes) => void;
// type ImplementationType = 'node.js' | 'browser' | 'none';
// type WorkerConstructor = {
// new (stringUrl: string | URL, options?: WorkerOptions): Worker;
// };
// function extractBytes(h: MessageHandler): (e: MessageEvent<Bytes> | Bytes) => void {
// return (e) => {
// const bs = (e instanceof MessageEvent) ? e.data : e;
// return h(bs);
// };
// }
// const WorkerEvent = Record.makeConstructor('--worker-event', ['epId', 'event', 'vs']);
// const { implementationType, _Worker, postMessage, onMessage, isMainThread }: {
// implementationType: ImplementationType,
// _Worker?: WorkerConstructor,
// postMessage?: (m: any) => void,
// onMessage?: (handler: MessageHandler) => void,
// isMainThread: boolean,
// } = (function () {
// try {
// // Let's see if we're in node.js with the web worker extension enabled.
// const { Worker, parentPort, isMainThread } = require('worker_threads');
// return {
// implementationType: 'node.js' as ImplementationType,
// _Worker: Worker,
// postMessage: (m: any) => parentPort.postMessage(m),
// onMessage: (handler: MessageHandler) => {
// parentPort.removeAllListeners('message');
// parentPort.on('message', extractBytes(handler));
// },
// isMainThread,
// };
// } catch (_e) {
// // Well, something didn't work there. Could we be in the browser?
// if (typeof window !== 'undefined' && 'Worker' in window) {
// // Yep.
// return {
// implementationType: 'browser' as ImplementationType,
// _Worker: Worker,
// postMessage,
// onMessage: (handler: MessageHandler) => onmessage = extractBytes(handler),
// isMainThread: (typeof self === 'undefined'),
// };
// } else {
// // Nope. No support, then.
// return {
// implementationType: 'none' as ImplementationType,
// isMainThread: true,
// };
// }
// }
// })();
// function encodePacket(p: Value) {
// return Bytes.toIO(encode(p));
// }
// function decodePacket(m: Bytes) {
// return decode(m);
// }
// function sendPacket(ch: Worker, p: Value) {
// ch.postMessage(encodePacket(p));
// }
// function listen(w: Worker, eventType: string, handler: (... args: any[]) => any) {
// if ('on' in w) {
// (w as any).on(eventType, handler);
// return;
// }
// const k = 'on' + eventType;
// if (k in w) {
// w[k] = handler;
// }
// }
// export function spawnWorker(workerSourceFilename: string) {
// if (implementationType === 'none') {
// // In older versions of node.js, try adding --experimental-worker flag to the command line.
// throw new Error("Cannot spawnWorker without a web worker implementation available");
// }
// Dataspace.spawn(workerSourceFilename, function () {
// const outerFacet = Dataspace.currentFacet;
// outerFacet.addDataflow(function () {});
// // ^ eww! Dummy endpoint to keep the root facet of the relay alive.
// let endpoints = new Dictionary<Facet>();
// const w = new _Worker(workerSourceFilename);
// listen(w, 'error', Dataspace.wrapExternal((err) => {
// throw err;
// }));
// listen(w, 'exit', Dataspace.wrapExternal(() => {
// outerFacet.stop();
// }));
// listen(w, 'message', Dataspace.wrapExternal(extractBytes((msg: Bytes) => {
// const m = decodePacket(msg) as Array<Value>;
// switch (m[0]) {
// case 'assert': {
// const [ep, a] = m.slice(1);
// if (!endpoints.has(ep)) {
// outerFacet.actor.addFacet(outerFacet, function () {
// const epFacet = Dataspace.currentFacet;
// endpoints = endpoints.set(ep, epFacet);
// epFacet.addStopScript(() => { endpoints.delete(ep); });
// Dataspace.declareField(this, 'assertion', a);
// epFacet.addEndpoint(() => {
// if (Observe.isClassOf(this.assertion)) {
// const spec = this.assertion.get(0);
// const analysis = Skeleton.analyzeAssertion(spec);
// analysis.callback = Dataspace.wrap((evt, vs) => {
// epFacet.actor.scheduleScript(() => {
// sendPacket(w, ['event', ep, evt, vs]);
// });
// });
// return { assertion: Observe(spec), analysis };
// } else {
// return { assertion: this.assertion, analysis: null };
// }
// }, true);
// }, true);
// } else {
// endpoints.get(ep).fields.assertion = a;
// }
// break;
// }
// case 'clear': {
// const ep = m[1];
// const epFacet = endpoints.get(ep);
// if (epFacet) epFacet.stop(() => { endpoints.delete(ep); });
// break;
// }
// case 'message': {
// const body = m[1];
// Dataspace.send(body);
// break;
// }
// default: {
// throw new Error(
// preserves`Invalid Worker protocol message from Worker: ${m}`);
// }
// }
// })));
// }, null);
// }
// export function spawnWorkerRelay() {
// if (implementationType === 'none') return;
// if (isMainThread) return;
// Dataspace.currentFacet.actor.dataspace.ground().addStopHandler(() => {
// process.exit();
// });
// Dataspace.currentFacet.addStartScript(function () {
// Dataspace.spawn('WorkerRelay', function () {
// const outerFacet = Dataspace.currentFacet;
// const finish = Dataspace.backgroundTask();
// outerFacet.addStopScript(finish);
// const outboundEndpoints = new Dictionary<number>();
// const inboundEndpoints = new Dictionary<{ epId: number, facet: Facet }>();
// let nextId = 0;
// function sendToParent(m: Value) {
// postMessage(encodePacket(m));
// }
// onMessage(Dataspace.wrapExternal(function (msg: Bytes) {
// const m = decodePacket(msg);
// if (Array.isArray(m) && m[0] === 'event') {
// const [epId, evt, vs] = m.slice(1);
// Dataspace.send(WorkerEvent(epId, evt, vs));
// } else {
// throw new Error(
// preserves`Invalid Worker protocol message from parent: ${m}`);
// }
// }));
// outerFacet.addEndpoint(function () {
// const analysis = Skeleton.analyzeAssertion(Outbound(Capture(Discard._instance)));
// analysis.callback = Dataspace.wrap(function (evt, vs) {
// outerFacet.actor.scheduleScript(function () {
// switch (evt) {
// case Skeleton.EventType.ADDED: {
// const epId = nextId++;
// outboundEndpoints.set(vs[0], epId);
// sendToParent(['assert', epId, vs[0]]);
// break;
// }
// case Skeleton.EventType.REMOVED: {
// const epId = outboundEndpoints.get(vs[0]);
// outboundEndpoints.delete(vs[0]);
// sendToParent(['clear', epId]);
// break;
// }
// case Skeleton.EventType.MESSAGE: {
// sendToParent(['message', vs[0]]);
// break;
// }
// }
// });
// });
// return { assertion: analysis.assertion, analysis };
// }, false);
// outerFacet.addEndpoint(function () {
// const analysis =
// Skeleton.analyzeAssertion(Observe(Inbound(Capture(Discard._instance))));
// analysis.callback = Dataspace.wrap(function (evt, vs) {
// outerFacet.actor.scheduleScript(function () {
// const spec = vs[0];
// switch (evt) {
// case Skeleton.EventType.ADDED: {
// const epId = nextId++;
// outerFacet.actor.addFacet(outerFacet, function () {
// const innerFacet = Dataspace.currentFacet;
// inboundEndpoints.set(spec, { epId, facet: innerFacet });
// innerFacet.addEndpoint(function () {
// const analysis = Skeleton.analyzeAssertion(
// WorkerEvent(epId, Capture(Discard._instance), Capture(Discard._instance)));
// analysis.callback = Dataspace.wrap(function (evt, vs) {
// if (evt === Skeleton.EventType.MESSAGE) {
// evt = vs[0] as Skeleton.EventType;
// vs = vs[1] as Array<Value>;
// const a = Skeleton.instantiateAssertion(Inbound(spec), vs);
// innerFacet.actor.scheduleScript(function () {
// switch (evt) {
// case Skeleton.EventType.ADDED:
// innerFacet.actor.addFacet(innerFacet, function () {
// const assertionFacet = Dataspace.currentFacet;
// assertionFacet.addEndpoint(function () {
// return { assertion: a, analysis: null };
// }, false);
// assertionFacet.addEndpoint(function () {
// const analysis = Skeleton.analyzeAssertion(
// WorkerEvent(epId, Skeleton.EventType.REMOVED, vs));
// analysis.callback = Dataspace.wrap(() => {
// assertionFacet.actor.scheduleScript(function () {
// assertionFacet.stop();
// });
// });
// return { assertion: analysis.assertion, analysis };
// }, false);
// }, true);
// break;
// case Skeleton.EventType.MESSAGE:
// Dataspace.send(a);
// break;
// }
// });
// }
// });
// return { assertion: analysis.assertion, analysis };
// }, false);
// }, true);
// sendToParent(['assert', epId, Observe(spec)]);
// break;
// }
// case Skeleton.EventType.REMOVED: {
// const { epId, facet } = inboundEndpoints.get(spec);
// inboundEndpoints.delete(spec);
// facet.stop();
// sendToParent(['clear', epId]);
// break;
// }
// }
// });
// });
// return { assertion: analysis.assertion, analysis };
// }, false);
// }, null);
// });
// }

View File

@ -0,0 +1,42 @@
//---------------------------------------------------------------------------
// @syndicate-lang/core, an implementation of Syndicate dataspaces for JS.
// Copyright (C) 2016-2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//---------------------------------------------------------------------------
import * as S from '../gen/sturdy.js';
import { Oid } from '../gen/protocol.js';
import { Ref } from '../runtime/actor.js';
import { Decoder, DecoderState, Encoder, EncoderState, GenericEmbedded, neverEmbeddedType, EmbeddedType, Value } from '@preserves/core';
export type WireSymbol = { oid: Oid, ref: Ref, count: number };
export const wireRefEmbeddedType: EmbeddedType<S.WireRef> = {
decode(s: DecoderState): S.WireRef {
return S.asWireRef(new Decoder<any>(s).next());
},
encode(s: EncoderState, v: S.WireRef): void {
new Encoder<any>(s, neverEmbeddedType).push(S.fromWireRef(v));
},
fromValue(v: Value<GenericEmbedded>): S.WireRef {
return S.asWireRef(v as Value<S._embedded>);
},
toValue(v: S.WireRef): Value<GenericEmbedded> {
return S.fromWireRef(v) as Value<GenericEmbedded>;
}
};

View File

@ -0,0 +1,349 @@
//---------------------------------------------------------------------------
// @syndicate-lang/core, an implementation of Syndicate dataspaces for JS.
// Copyright (C) 2016-2021 Tony Garnock-Jones <tonyg@leastfixedpoint.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//---------------------------------------------------------------------------
import { Actor, Assertion, Entity, Facet, Handle, Ref, Turn } from '../runtime/actor.js';
import { BytesLike, Decoder, Dictionary, embed, encode, IdentityMap, mapEmbeddeds, underlying, Value } from '@preserves/core';
import * as IO from '../gen/protocol.js';
import { wireRefEmbeddedType, WireSymbol } from './protocol.js';
import { queueTask } from '../runtime/task.js';
import { attenuate } from '../runtime/rewrite.js';
import { fromAttenuation, WireRef } from '../gen/sturdy.js';
export class SyncPeerEntity implements Entity {
readonly relay: Relay;
readonly peer: Ref;
readonly handleMap = new IdentityMap<Handle, Handle>();
e: WireSymbol | null = null;
constructor(relay: Relay, peer: Ref) {
this.relay = relay;
this.peer = peer;
}
assert(turn: Turn, assertion: Assertion, handle: Handle): void {
this.handleMap.set(handle, turn.assert(this.peer, assertion));
}
retract(turn: Turn, handle: Handle): void {
turn.retract(this.handleMap.get(handle)!);
this.handleMap.delete(handle);
}
message(turn: Turn, body: Assertion): void {
// We get to vanish from the indexes now
this.relay.releaseRefOut(this.e!);
turn.message(this.peer, body);
}
sync(turn: Turn, peer: Ref): void {
turn._sync(this.peer, peer);
}
}
export class RelayEntity implements Entity {
readonly relay: Relay;
readonly oid: IO.Oid;
constructor(relay: Relay, oid: IO.Oid) {
this.relay = relay;
this.oid = oid;
}
send(m: IO.Event<WireRef>): void {
this.relay.send(this.oid, m);
}
assert(_turn: Turn, assertion: Assertion, handle: Handle): void {
this.send(IO.Event.Assert(IO.Assert({
assertion: this.relay.register(assertion, handle),
handle
})))
}
retract(_turn: Turn, handle: Handle): void {
this.relay.deregister(handle);
this.send(IO.Event.Retract(IO.Retract(handle)));
}
message(_turn: Turn, body: Assertion): void {
this.send(IO.Event.Message(IO.Message(this.relay.register(body, null))));
}
sync(turn: Turn, peer: Ref): void {
const peerEntity = new SyncPeerEntity(this.relay, peer);
const exported: Array<WireSymbol> = [];
const ior = this.relay.rewriteRefOut(turn.ref(peerEntity), false, exported);
peerEntity.e = exported[0];
this.send(IO.Event.Sync(IO.Sync(ior)));
}
}
export class Membrane {
readonly byOid = new IdentityMap<IO.Oid, WireSymbol>();
readonly byRef = new IdentityMap<Ref, WireSymbol>();
grab<Table extends "byOid" | "byRef">(table: Table,
key: Parameters<Membrane[Table]['get']>[0],
transient: boolean,
f: () => WireSymbol): WireSymbol
{
let e = this[table].get(key as any);
if (e === void 0) {
e = f();
this.byRef.set(e.ref, e);
this.byOid.set(e.oid, e);
}
if (!transient) e.count++;
return e;
}
drop(e: WireSymbol): void {
e.count--;
if (e.count === 0) {
this.byOid.delete(e.oid);
this.byRef.delete(e.ref);
}
}
}
export const INERT_REF: Ref = {
relay: (() => {
const a = new Actor(t => t.stop());
return a.root;
})(),
target: {},
};
export type PacketWriter = (bs: Uint8Array) => void;
export interface RelayOptions {
packetWriter: PacketWriter;
setup(t: Turn, r: Relay): void;
debug?: boolean;
trustPeer?: boolean;
}
export class Relay {
readonly facet: Facet;
readonly w: PacketWriter;
readonly inboundAssertions = new IdentityMap<Handle, {
localHandle: Handle,
imported: Array<WireSymbol>,
}>();
readonly outboundAssertions = new IdentityMap<Handle, Array<WireSymbol>>();
readonly exported = new Membrane();
readonly imported = new Membrane();
nextLocalOid: IO.Oid = 0;
pendingTurn: IO.Turn<WireRef> = [];
debug: boolean;
trustPeer: boolean;
readonly decoder = new Decoder(void 0, {
includeAnnotations: false,
embeddedDecode: wireRefEmbeddedType,
});
constructor(t: Turn, options: RelayOptions) {
this.facet = t.activeFacet;
this.w = options.packetWriter;
this.debug = options.debug ?? false;
this.trustPeer = options.trustPeer ?? true;
this.facet.preventInertCheck();
options.setup(t, this);
}
rewriteOut(assertion: Assertion, transient: boolean): [Value<WireRef>, Array<WireSymbol>]
{
const exported: Array<WireSymbol> = [];
const rewritten = mapEmbeddeds(assertion, r => embed(this.rewriteRefOut(r, transient, exported)));
return [rewritten, exported];
}
rewriteIn(t: Turn, a: Value<WireRef>): [Assertion, Array<WireSymbol>]
{
const imported: Array<WireSymbol> = [];
const rewritten = mapEmbeddeds(a, r => embed(this.rewriteRefIn(t, r, imported)));
return [rewritten, imported];
}
register(assertion: Assertion, handle: Handle | null): Value<WireRef> {
const [rewritten, exported] = this.rewriteOut(assertion, handle === null);
if (handle !== null) this.outboundAssertions.set(handle, exported);
return rewritten;
}
deregister(handle: Handle): void {
(this.outboundAssertions.get(handle) ?? []).forEach(e => this.releaseRefOut(e));
this.outboundAssertions.delete(handle);
}
rewriteRefOut(r: Ref, transient: boolean, exported: Array<WireSymbol>): WireRef {
if (r.target instanceof RelayEntity && r.target.relay === this) {
if (r.attenuation === void 0 || r.attenuation.length === 0) {
// No extra conditions on this reference since it was sent to us.
return WireRef.yours({ oid: r.target.oid, attenuation: [] });
} else {
// This reference has been attenuated since it was sent to us.
// Do we trust the peer to enforce such attenuation on our behalf?
if (this.trustPeer) {
return WireRef.yours({ oid: r.target.oid, attenuation: r.attenuation });
} else {
// fall through: treat the attenuated ref as a local ref, and re-export it.
}
}
}
const e = this.exported.grab(
"byRef", r, transient, () => {
if (transient) throw new Error("Cannot send transient reference");
return { oid: this.nextLocalOid++, ref: r, count: 0 };
});
exported.push(e);
return WireRef.mine(e.oid);
}
releaseRefOut(e: WireSymbol) {
this.exported.drop(e);
}
rewriteRefIn(t: Turn, n: WireRef, imported: Array<WireSymbol>): Ref {
switch (n._variant) {
case 'yours': {
const r = this.lookupLocal(n.oid);
if (n.attenuation.length === 0 || r === INERT_REF) {
return r;
} else {
type AttenuatedRef = Ref & { __attenuations?: Dictionary<any, Ref> };
const ar = r as AttenuatedRef;
if (ar.__attenuations === void 0) {
ar.__attenuations = new Dictionary();
}
return ar.__attenuations.getOrSet(fromAttenuation(n.attenuation), () =>
attenuate(r, ... n.attenuation));
}
}
case 'mine': {
const e = this.imported.grab("byOid", n.oid, false, () =>
({ oid: n.oid, ref: t.ref(new RelayEntity(this, n.oid)), count: 0 }));
imported.push(e);
return e.ref;
}
}
}
send(remoteOid: IO.Oid, m: IO.Event<WireRef>): void {
if (this.pendingTurn.length === 0) {
queueTask(() => {
if (this.debug) console.log('OUT', IO.fromTurn(this.pendingTurn).asPreservesText());
this.w(underlying(encode(IO.fromTurn(this.pendingTurn), {
canonical: true,
embeddedEncode: wireRefEmbeddedType,
})));
this.pendingTurn = [];
});
}
this.pendingTurn.push(IO.TurnEvent({ oid: remoteOid, event: m }));
}
lookupLocal(localOid: IO.Oid): Ref {
return this.exported.byOid.get(localOid)?.ref ?? INERT_REF;
}
accept(bs: BytesLike): void {
Turn.for(this.facet, t => {
this.decoder.write(bs);
while (true) {
const rawTurn = this.decoder.try_next();
if (rawTurn === void 0) break;
const wireTurn = IO.toTurn(rawTurn);
if (wireTurn === void 0) throw new Error("Bad IO.Turn");
if (this.debug) console.log('IN', rawTurn.asPreservesText());
wireTurn.forEach(v => {
const { oid: localOid, event: m } = v;
this.handle(t, this.lookupLocal(localOid), m);
});
}
});
}
handle(t: Turn, r: Ref, m: IO.Event<WireRef>) {
switch (m._variant) {
case 'Assert': {
const [a, imported] = this.rewriteIn(t, m.value.assertion);
this.inboundAssertions.set(m.value.handle, {
localHandle: t.assert(r, a),
imported,
});
break;
}
case 'Retract': {
const remoteHandle = m.value.handle;
const h = this.inboundAssertions.get(remoteHandle);
if (h === void 0) throw new Error(`Peer retracted invalid handle ${remoteHandle}`);
this.inboundAssertions.delete(remoteHandle);
h.imported.forEach(e => this.imported.drop(e));
t.retract(h.localHandle);
break;
}
case 'Message': {
const [a, imported] = this.rewriteIn(t, m.value.body);
if (imported.length > 0) throw new Error("Cannot receive transient reference");
t.message(r, a);
break;
}
case 'Sync': {
const imported: Array<WireSymbol> = [];
const k = this.rewriteRefIn(t, m.value.peer, imported);
t.sync(r).then(t => {
t.message(k, true);
imported.forEach(e => this.imported.drop(e));
});
break;
}
}
}
}
export interface RelayActorOptions extends RelayOptions {
initialOid?: IO.Oid;
initialRef?: Ref;
nextLocalOid?: IO.Oid;
}
export function spawnRelay(t: Turn, options: RelayActorOptions & {initialOid: IO.Oid}): Promise<Ref>;
export function spawnRelay(t: Turn, options: Omit<RelayActorOptions, 'initialOid'>): Promise<null>;
export function spawnRelay(t: Turn, options: RelayActorOptions): Promise<Ref | null>
{
return new Promise(resolve => {
t.spawn(t => {
const relay = new Relay(t, options);
if (options.initialRef !== void 0) {
relay.rewriteRefOut(options.initialRef, false, []);
}
if (options.initialOid !== void 0) {
resolve(relay.rewriteRefIn(t, WireRef.mine(options.initialOid), []));
} else {
resolve(null);
}
if (options.nextLocalOid !== void 0) {
relay.nextLocalOid = (options.nextLocalOid === 0) ? 1 : options.nextLocalOid;
}
});
});
}

View File

@ -24,7 +24,7 @@
// In Network and Distributed System Security Symposium. San Diego,
// California: Internet Society, 2014.
import { mac } from './cryptography';
import { mac } from './cryptography.js';
import { Bytes, decode, encode, is, neverEmbeddedType, Value } from '@preserves/core';
import * as S from '../gen/sturdy.js';
export * from '../gen/sturdy.js';

View File

@ -1366,10 +1366,24 @@
dependencies:
"@octokit/openapi-types" "^6.1.1"
"@preserves/core@0.15.0":
version "0.15.0"
resolved "https://registry.yarnpkg.com/@preserves/core/-/core-0.15.0.tgz#55a14288442d404d20a2906b92b7a7cc9e522a18"
integrity sha512-PoEvwlqNNXpYykwkiD7KyjT6kfo78XXEMwJ5yOhOiEF6nVD167NVv801/DR7xIINtPDaDjdqBtqY/tamyzi7vA==
"@preserves/core@0.17.0", "@preserves/core@^0.17.0":
version "0.17.0"
resolved "https://registry.yarnpkg.com/@preserves/core/-/core-0.17.0.tgz#23e7edb0a1bf5e602d210ea2ae474113d0cc5b12"
integrity sha512-ZyqsC8f08yvOn9UjaBekIhKDY44k7sWwyW2xTLfVCffFHGCPidgq6uUGKcuKAbr3ibGB7kQ6KQr/xgVAUpDC0Q==
"@preserves/schema@0.18.0":
version "0.18.0"
resolved "https://registry.yarnpkg.com/@preserves/schema/-/schema-0.18.0.tgz#593a33a41eccc62a72a620b75379676b7b72b17c"
integrity sha512-L2IygPGYHZSxfDkHdcohuNCjYbj7BI+5+RJFrPL3vLmT2kj0IYJi6lZu1ReumALbBTx7RW8Mn9IorH8qVUZ39A==
dependencies:
"@preserves/core" "^0.17.0"
"@types/glob" "^7.1.3"
"@types/minimatch" "^3.0.3"
chalk "^4.1.0"
chokidar "^3.5.1"
commander "^7.2.0"
glob "^7.1.6"
minimatch "^3.0.4"
"@rollup/plugin-node-resolve@^11.0.1":
version "11.2.1"
@ -1452,6 +1466,14 @@
"@types/minimatch" "*"
"@types/node" "*"
"@types/glob@^7.1.3":
version "7.2.0"
resolved "https://registry.yarnpkg.com/@types/glob/-/glob-7.2.0.tgz#bc1b5bf3aa92f25bd5dd39f35c57361bdce5b2eb"
integrity sha512-ZUxbzKl0IfJILTS6t7ip5fQQM/J3TJYubDm3nMbgubNNYS62eXeUpoLUC8/7fJNiFYHTrGPQn7hspDUzIHX3UA==
dependencies:
"@types/minimatch" "*"
"@types/node" "*"
"@types/graceful-fs@^4.1.2":
version "4.1.5"
resolved "https://registry.yarnpkg.com/@types/graceful-fs/-/graceful-fs-4.1.5.tgz#21ffba0d98da4350db64891f92a9e5db3cdb4e15"
@ -1491,6 +1513,11 @@
resolved "https://registry.yarnpkg.com/@types/minimatch/-/minimatch-3.0.4.tgz#f0ec25dbf2f0e4b18647313ac031134ca5b24b21"
integrity sha512-1z8k4wzFnNjVK/tlxvrWuK5WMt6mydWWP7+zvH5eFep4oj+UkrfiJTRtjCeBXNpwaA/FYqqtb4/QS4ianFpIRA==
"@types/minimatch@^3.0.3":
version "3.0.5"
resolved "https://registry.yarnpkg.com/@types/minimatch/-/minimatch-3.0.5.tgz#1001cc5e6a3704b83c236027e77f2f58ea010f40"
integrity sha512-Klz949h02Gz2uZCMGwDUSDS1YBlTdDDgbWHi+81l29tQALUtvz4rAYi5uoVhE5Lagoq6DeqAUlbrHvW/mXDgdQ==
"@types/minimist@^1.2.0":
version "1.2.1"
resolved "https://registry.yarnpkg.com/@types/minimist/-/minimist-1.2.1.tgz#283f669ff76d7b8260df8ab7a4262cc83d988256"
@ -2167,6 +2194,14 @@ chalk@^4.0.0:
ansi-styles "^4.1.0"
supports-color "^7.1.0"
chalk@^4.1.0:
version "4.1.2"
resolved "https://registry.yarnpkg.com/chalk/-/chalk-4.1.2.tgz#aac4e2b7734a740867aeb16bf02aad556a1e7a01"
integrity sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==
dependencies:
ansi-styles "^4.1.0"
supports-color "^7.1.0"
char-regex@^1.0.2:
version "1.0.2"
resolved "https://registry.yarnpkg.com/char-regex/-/char-regex-1.0.2.tgz#d744358226217f981ed58f479b1d6bcc29545dcf"
@ -2342,6 +2377,11 @@ commander@^2.20.0:
resolved "https://registry.yarnpkg.com/commander/-/commander-2.20.3.tgz#fd485e84c03eb4881c20722ba48035e8531aeb33"
integrity sha512-GpVkmM8vF2vQUkj2LvZmD35JxeJOLCwJ9cUkugyk2nuhbv3+mJvpLYYt+0+USMxE+oj+ey/lJEnhZw75x/OMcQ==
commander@^7.2.0:
version "7.2.0"
resolved "https://registry.yarnpkg.com/commander/-/commander-7.2.0.tgz#a36cb57d0b501ce108e4d20559a150a391d97ab7"
integrity sha512-QrWXB+ZQSVPmIWIhtEO9H+gwHaMGYiF5ChvoJ+K9ZGHG/sVsa6yiesAD1GC/x46sET00Xlwo1u49RVVVzvcSkw==
compare-func@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/compare-func/-/compare-func-2.0.0.tgz#fb65e75edbddfd2e568554e8b5b05fff7a51fcb3"