diff --git a/packages/core/package.json b/packages/core/package.json index a9b23f4..9d71551 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -9,7 +9,9 @@ }, "repository": "github:syndicate-lang/syndicate-js", "scripts": { - "prepare": "yarn compile && yarn rollup", + "prepare": "yarn regenerate && yarn compile && yarn rollup", + "regenerate": "rm -rf ./src/gen && preserves-schema-ts --module EntityRef=./src/runtime/actor.ts --output ./src/gen './protocols/schemas/**/*.prs'", + "regenerate:watch": "yarn regenerate --watch", "compile": "../../node_modules/.bin/tsc", "compile:watch": "../../node_modules/.bin/tsc -w", "rollup": "../../node_modules/.bin/rollup -c", @@ -22,6 +24,7 @@ "types": "lib/index.d.ts", "author": "Tony Garnock-Jones ", "dependencies": { - "@preserves/core": "0.15.0" + "@preserves/core": "0.17.0", + "@preserves/schema": "0.18.0" } } diff --git a/packages/core/src/.gitignore b/packages/core/src/.gitignore new file mode 100644 index 0000000..e8e450b --- /dev/null +++ b/packages/core/src/.gitignore @@ -0,0 +1 @@ +gen/ diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index f5bea2a..978be87 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -19,14 +19,11 @@ export * from '@preserves/core'; export * from './runtime/randomid.js'; -export * from './runtime/assertions.js'; export * from './runtime/bag.js'; export * as API from './runtime/api.js'; export * as Skeleton from './runtime/skeleton.js'; +export * from './runtime/actor.js'; export * from './runtime/dataspace.js'; -export * from './runtime/ground.js'; -export * from './runtime/relay.js'; -// export * as Worker from './runtime/worker.js'; import { randomId } from './runtime/randomid.js'; diff --git a/packages/core/src/runtime/actor.ts b/packages/core/src/runtime/actor.ts index 30d19f2..6653203 100644 --- a/packages/core/src/runtime/actor.ts +++ b/packages/core/src/runtime/actor.ts @@ -30,7 +30,7 @@ if ('stackTraceLimit' in Error) { export type Assertion = Value; export type Handle = number; -export type ExitReason = null | { ok: true } | { ok: false, err: Error }; +export type ExitReason = null | { ok: true } | { ok: false, err: unknown }; export type LocalAction = (t: Turn) => void; export interface Entity { diff --git a/packages/core/src/runtime/bag.ts b/packages/core/src/runtime/bag.ts index 12f79d2..d8508a1 100644 --- a/packages/core/src/runtime/bag.ts +++ b/packages/core/src/runtime/bag.ts @@ -18,8 +18,7 @@ // Bags and Deltas (which are Bags where item-counts can be negative). -import { GenericEmbedded } from '@preserves/core'; -import { Value, Set, Dictionary } from '@preserves/core'; +import { Value, Set, Dictionary, GenericEmbedded } from '@preserves/core'; export enum ChangeDescription { PRESENT_TO_ABSENT = -1, @@ -28,19 +27,19 @@ export enum ChangeDescription { PRESENT_TO_PRESENT = 2, } -export class Bag { - _items: Dictionary; +export class Bag { + _items: Dictionary; - constructor(s?: Set) { + constructor(s?: Set) { this._items = new Dictionary(); if (s) s.forEach((v) => this._items.set(v, 1)); } - get(key: Value): number { + get(key: Value): number { return this._items.get(key, 0) as number; } - change(key: Value, delta: number, clamp: boolean = false): ChangeDescription { + change(key: Value, delta: number, clamp: boolean = false): ChangeDescription { let oldCount = this.get(key); let newCount = oldCount + delta; if (clamp) { @@ -64,7 +63,7 @@ export class Bag { this._items = new Dictionary(); } - includes(key: Value): boolean { + includes(key: Value): boolean { return this._items.has(key); } @@ -72,24 +71,24 @@ export class Bag { return this._items.size; } - keys(): IterableIterator { + keys(): IterableIterator> { return this._items.keys(); } - entries(): IterableIterator<[Value, number]> { + entries(): IterableIterator<[Value, number]> { return this._items.entries(); } - forEach(f: (count: number, value: Value) => void) { + forEach(f: (count: number, value: Value) => void) { this._items.forEach(f); } - snapshot(): Dictionary { + snapshot(): Dictionary { return this._items.clone(); } - clone(): Bag { - const b = new Bag(); + clone(): Bag { + const b = new Bag(); b._items = this._items.clone(); return b; } diff --git a/packages/core/src/runtime/dataspace.ts b/packages/core/src/runtime/dataspace.ts index 70afe17..1b59e3d 100644 --- a/packages/core/src/runtime/dataspace.ts +++ b/packages/core/src/runtime/dataspace.ts @@ -20,773 +20,36 @@ import { Value, is, Set } from '@preserves/core'; import * as Skeleton from './skeleton.js'; import { Bag, ChangeDescription } from './bag.js'; -import { Observe } from './assertions.js'; import * as Dataflow from './dataflow.js'; import { IdentitySet, IdentityMap } from './idcoll.js'; -import { Ground } from './ground.js'; -export enum Priority { - QUERY_HIGH = 0, - QUERY, - QUERY_HANDLER, - NORMAL, - GC, - IDLE, - _count -} - -export type ActorId = number; -export type FacetId = ActorId; -export type EndpointId = ActorId; - -export type Task = () => T; -export type Script = (this: Fields & DataflowObservableObject, f: Facet) => T; - -export type MaybeValue = Value | undefined; -export type EndpointSpec = { assertion: MaybeValue, analysis: Skeleton.Analysis | null }; - -export type ObserverCallback = - (this: Fields, facet: Facet, bindings: Array>) => void; - -export type ObserverCallbacks = { - add?: ObserverCallback; - del?: ObserverCallback; - msg?: ObserverCallback; -} - -export const DataflowObservableObjectId = Symbol.for('DataflowObservableObjectId'); -export interface DataflowObservableObject { - [DataflowObservableObjectId](): number; -} - -export type DataflowObservable = [DataflowObservableObject, string]; -export function _canonicalizeDataflowObservable(i: DataflowObservable): string { - return i[0][DataflowObservableObjectId]() + ',' + i[1]; -} - -export type DataflowDependent = Endpoint; -export function _canonicalizeDataflowDependent(i: DataflowDependent): string { - return '' + i.id; -} - -export type ActivationScript = Script; - -export abstract class Dataspace { - nextId: ActorId = 0; +export class Dataspace implements Partial { index = new Skeleton.Index(); - dataflow = new Dataflow.Graph( - _canonicalizeDataflowDependent, - _canonicalizeDataflowObservable); - runnable: Array = []; - pendingTurns: Array; - actors: IdentityMap = new IdentityMap(); - activations: IdentitySet = new IdentitySet(); - constructor(bootProc: Script) { - this.pendingTurns = [new Turn(null, [new Spawn(null, bootProc, new Set())])]; - } - - abstract start(): this; - abstract ground(): Ground; - - backgroundTask(): () => void { - return this.ground().backgroundTask(); - } - - runTasks(): boolean { // TODO: rename? - this.runPendingTasks(); - this.performPendingActions(); - return this.runnable.length > 0 || this.pendingTurns.length > 0; - } - - runPendingTasks() { - let runnable = this.runnable; - this.runnable = []; - runnable.forEach((ac) => { ac.runPendingTasks(); /* TODO: rename? */ }); - } - - performPendingActions() { - let turns = this.pendingTurns; - this.pendingTurns = []; - turns.forEach((turn) => { - turn.actions.forEach((action) => { - // console.log('[DATASPACE]', turn.actor && turn.actor.toString(), action); - action.perform(this, turn.actor); - this.runPendingTasks(); - }); - }); - } - - commitActions(ac: Actor, pending: Array) { - this.pendingTurns.push(new Turn(ac, pending)); - } - - refreshAssertions() { - this.dataflow.repairDamage((ep) => { - let facet = ep.facet; - if (facet.isLive) { // TODO: necessary test, or tautological? - facet.invokeScript(f => f.withNonScriptContext(() => ep.refresh())); - } - }); - } - - addActor( - name: any, - bootProc: Script, - initialAssertions: Set, - parentActor: Actor | null) - { - let ac = new Actor(this, name, initialAssertions, parentActor?.id); - // debug('Spawn', ac && ac.toString()); - this.applyPatch(ac, ac.adhocAssertions); - ac.addFacet<{}, {}>(null, systemFacet => { - // Root facet is a dummy "system" facet that exists to hold - // one-or-more "user" "root" facets. - ac.addFacet<{}, SpawnFields>(systemFacet, bootProc); - // ^ The "true root", user-visible facet. - initialAssertions.forEach((a) => { ac.adhocRetract(a); }); - }); - } - - applyPatch(ac: Actor, delta: Bag) { - // if (!delta.isEmpty()) debug('applyPatch BEGIN', ac && ac.toString()); - let removals: Array<[number, Value]> = []; - delta.forEach((count, a) => { - if (count > 0) { - // debug('applyPatch +', a && a.toString()); - this.adjustIndex(a, count); - } else { - removals.push([count, a]); - } - if (ac) ac.cleanupChanges.change(a, -count); - }); - removals.forEach(([count, a]) => { - // debug('applyPatch -', a && a.toString()); - this.adjustIndex(a, count); - }); - // if (!delta.isEmpty()) debug('applyPatch END'); - } - - deliverMessage(m: Value, _sendingActor: Actor | null) { - // debug('deliverMessage', sendingActor && sendingActor.toString(), m.toString()); - this.index.deliverMessage(m); - // this.index.deliverMessage(m, (leaf, _m) => { - // sendingActor.touchedTopics = sendingActor.touchedTopics.add(leaf); - // }); - } - - adjustIndex(a: Value, count: number) { - return this.index.adjustAssertion(a, count); - } - - subscribe(handler: Skeleton.Analysis) { - this.index.addHandler(handler, handler.callback!); - } - - unsubscribe(handler: Skeleton.Analysis) { - this.index.removeHandler(handler, handler.callback!); - } - - endpointHook(_facet: Facet, _endpoint: Endpoint) { - // Subclasses may override - } -} - -export class Actor { - readonly id: ActorId; - readonly dataspace: Dataspace; - readonly name: any; - rootFacet: Facet | null = null; - isRunnable: boolean = false; - readonly pendingTasks: Array>>; - pendingActions: Array; - adhocAssertions: Bag; - cleanupChanges = new Bag(); // negative counts allowed! - parentId: ActorId | undefined; - - constructor(dataspace: Dataspace, - name: any, - initialAssertions: Set, - parentActorId: ActorId | undefined) - { - this.id = dataspace.nextId++; - this.dataspace = dataspace; - this.name = name; - this.isRunnable = false; - this.pendingTasks = []; - for (let i = 0; i < Priority._count; i++) { this.pendingTasks.push([]); } - this.pendingActions = []; - this.adhocAssertions = new Bag(initialAssertions); // no negative counts allowed - this.parentId = parentActorId; - dataspace.actors.set(this.id, this); - } - - runPendingTasks() { - while (true) { - let task = this.popNextTask(); - if (!task) break; - task(); - this.dataspace.refreshAssertions(); - } - - this.isRunnable = false; - let pending = this.pendingActions; - if (pending.length > 0) { - this.pendingActions = []; - this.dataspace.commitActions(this, pending); - } - } - - popNextTask(): Task | null { - let tasks = this.pendingTasks; - for (let i = 0; i < Priority._count; i++) { - let q = tasks[i]; - if (q.length > 0) return q.shift()!; - } - return null; - } - - abandonQueuedWork() { - this.pendingActions = []; - for (let i = 0; i < Priority._count; i++) { this.pendingTasks[i] = []; } - } - - scheduleTask(task: Task, priority: Priority = Priority.NORMAL) { - if (!this.isRunnable) { - this.isRunnable = true; - this.dataspace.runnable.push(this); - } - this.pendingTasks[priority].push(task); - } - - addFacet( - parentFacet: Facet | null, - bootProc: Script, - checkInScript: boolean = false) - { - if (checkInScript && parentFacet && !parentFacet.inScript) { - throw new Error("Cannot add facet outside script; are you missing a `react { ... }`?"); - } - let f = new Facet(this, parentFacet); - f.invokeScript(f => f.withNonScriptContext(() => bootProc.call(f.fields, f))); - this.scheduleTask(() => { - if ((parentFacet && !parentFacet.isLive) || f.isInert()) { - f._terminate(); - } - }); - } - - _terminate(emitPatches: boolean) { - // Abruptly terminates an entire actor, without running stop-scripts etc. - if (emitPatches) { - this.scheduleTask(() => { - this.adhocAssertions.snapshot().forEach((_count, a) => { this.retract(a); }); - }); - } - if (this.rootFacet) { - this.rootFacet._abort(emitPatches); - } - this.scheduleTask(() => { this.enqueueScriptAction(new Quit()); }); - } - - enqueueScriptAction(action: Action) { - this.pendingActions.push(action); - } - - pendingPatch(): Patch { - if (this.pendingActions.length > 0) { - let p = this.pendingActions[this.pendingActions.length - 1]; - if (p instanceof Patch) return p; - } - let p = new Patch(new Bag()); - this.enqueueScriptAction(p); - return p; - } - - assert(a: Value) { this.pendingPatch().adjust(a, +1); } - retract(a: Value) { this.pendingPatch().adjust(a, -1); } - - adhocRetract(a: Value) { - if (this.adhocAssertions.change(a, -1, true) === ChangeDescription.PRESENT_TO_ABSENT) { - this.retract(a); - } - } - - adhocAssert(a: Value) { - if (this.adhocAssertions.change(a, +1) === ChangeDescription.ABSENT_TO_PRESENT) { - this.assert(a); - } - } - - toString(): string { - let s = 'Actor(' + this.id; - if (this.name !== void 0 && this.name !== null) s = s + ',' + this.name.toString(); - return s + ')'; - } -} - -abstract class Action { - abstract perform(ds: Dataspace, ac: Actor | null): void; -} - -class Patch extends Action { - readonly changes: Bag; - - constructor(changes: Bag) { - super(); - this.changes = changes; - } - - perform(ds: Dataspace, ac: Actor | null): void { - ds.applyPatch(ac!, this.changes); - } - - adjust(a: Value, count: number) { - this.changes.change(a, count); - } -} - -class Message extends Action { - readonly body: Value; - - constructor(body: Value) { - super(); - this.body = body; - } - - perform(ds: Dataspace, ac: Actor | null): void { - ds.deliverMessage(this.body, ac); - } -} - -class Spawn extends Action { - readonly name: any; - readonly bootProc: Script; - readonly initialAssertions: Set; - - constructor(name: any, bootProc: Script, initialAssertions: Set = new Set()) { - super(); - this.name = name; - this.bootProc = bootProc; - this.initialAssertions = initialAssertions; - } - - perform(ds: Dataspace, ac: Actor | null): void { - ds.addActor(this.name, this.bootProc, this.initialAssertions, ac); - } -} - -class Quit extends Action { // TODO: rename? Perhaps to Cleanup? - // Pseudo-action - not for userland use. - - perform(ds: Dataspace, ac: Actor | null): void { - if (ac === null) throw new Error("Internal error: Quit action with null actor"); - ds.applyPatch(ac, ac.cleanupChanges); - ds.actors.delete(ac.id); - // debug('Quit', ac && ac.toString()); - } -} - -class DeferredTurn extends Action { - readonly continuation: Task; - - constructor(continuation: Task) { - super(); - this.continuation = continuation; - } - - perform(_ds: Dataspace, ac: Actor | null): void { - // debug('DeferredTurn', ac && ac.toString()); - ac!.scheduleTask(this.continuation); - } -} - -class Activation extends Action { - readonly script: ActivationScript; - readonly name: any; - - constructor(script: ActivationScript, name: any) { - super(); - this.script = script; - this.name = name; - } - - perform(ds: Dataspace, ac: Actor | null): void { - if (ds.activations.has(this.script)) return; - ds.activations.add(this.script); - ds.addActor<{}>(this.name, rootFacet => rootFacet.addStartScript(this.script), new Set(), ac); - } -} - -export class Turn { - readonly actor: Actor | null; - readonly actions: Array; - - constructor(actor: Actor | null, actions: Array = []) { - this.actor = actor; - this.actions = actions; - } - - enqueueScriptAction(a: Action) { - this.actions.push(a); - } -} - -export class Facet { - readonly id: FacetId; - isLive = true; - readonly actor: Actor; - readonly parent: Facet | null; - readonly endpoints = new IdentityMap>(); - readonly stopScripts: Array> = []; - readonly children = new IdentitySet>(); - readonly fields: Fields & DataflowObservableObject; - inScript = true; - - constructor(actor: Actor, parent: Facet | null) { - this.id = actor.dataspace.nextId++; - this.actor = actor; - this.parent = parent; - if (parent) { - parent.children.add(this); - this.fields = Dataflow.Graph.newScope(parent.fields); - } else { - if (actor.rootFacet) { - throw new Error("INVARIANT VIOLATED: Attempt to add second root facet"); - } - actor.rootFacet = this; - this.fields = Dataflow.Graph.newScope({}); - } - this.fields[DataflowObservableObjectId] = () => this.id; - } - - withNonScriptContext(task: Task): T { - let savedInScript = this.inScript; - this.inScript = false; - try { - return task(); - } finally { - this.inScript = savedInScript; - } - } - - _abort(emitPatches: boolean) { - this.isLive = false; - this.children.forEach(child => child._abort(emitPatches)); - this.retractAssertionsAndSubscriptions(emitPatches); - } - - retractAssertionsAndSubscriptions(emitPatches: boolean) { - this.actor.scheduleTask(() => { - this.endpoints.forEach((ep) => ep.destroy(emitPatches)); - this.endpoints.clear(); - }); - } - - isInert(): boolean { - return this.endpoints.size === 0 && this.children.size === 0; - } - - _terminate() { - if (!this.isLive) return; - - let ac = this.actor; - let parent = this.parent; - if (parent) { - parent.children.delete(this); - } else { - ac.rootFacet = null; - } - this.isLive = false; - - this.children.forEach((child) => { child._terminate(); }); - - // Run stop-scripts after terminating children. This means - // that children's stop-scripts run before ours. - ac.scheduleTask(() => - this.invokeScript(() => - this.stopScripts.forEach(s => - s.call(this.fields, this)))); - - this.retractAssertionsAndSubscriptions(true); - ac.scheduleTask(() => { - if (parent) { - if (parent.isInert()) { - parent._terminate(); - } - } else { - ac._terminate(true); - } - }, Priority.GC); - } - - // This alias exists because of the naive expansion done by the parser. - _stop(continuation?: Script) { - this.stop(continuation); - } - - stop(continuation?: Script) { - this.parent!.invokeScript(() => { - this.actor.scheduleTask(() => { - this._terminate(); - if (continuation) { - this.parent!.scheduleScript(parent => continuation.call(this.fields, parent)); - // ^ TODO: is this the correct scope to use?? - } - }); - }); - } - - addStartScript(s: Script) { - this.ensureFacetSetup('`on start`'); - this.scheduleScript(s); - } - - addStopScript(s: Script) { - this.ensureFacetSetup('`on stop`'); - this.stopScripts.push(s); - } - - addEndpoint(updateFun: Script, isDynamic: boolean = true): Endpoint { - const ep = new Endpoint(this, isDynamic, updateFun); - this.actor.dataspace.endpointHook(this, ep); - return ep; - } - - _addRawObserverEndpoint(specScript: Script, callbacks: ObserverCallbacks): Endpoint - { - return this.addEndpoint(() => { - const spec = specScript.call(this.fields, this); - if (spec === void 0) { - return { assertion: void 0, analysis: null }; - } else { - const analysis = Skeleton.analyzeAssertion(spec); - analysis.callback = this.wrap((facet, evt, vs) => { - switch (evt) { - case Skeleton.EventType.ADDED: callbacks.add?.call(facet.fields, facet, vs); break; - case Skeleton.EventType.REMOVED: callbacks.del?.call(facet.fields, facet, vs); break; - case Skeleton.EventType.MESSAGE: callbacks.msg?.call(facet.fields, facet, vs); break; - } - }); - return { assertion: Observe(spec), analysis }; - } - }); - } - - addObserverEndpoint(specThunk: (facet: Facet) => MaybeValue, callbacks: ObserverCallbacks): Endpoint { - const scriptify = (f?: ObserverCallback) => - f && ((facet: Facet, vs: Array>) => - facet.scheduleScript(() => f.call(facet.fields, facet, vs))); - return this._addRawObserverEndpoint(specThunk, { - add: scriptify(callbacks.add), - del: scriptify(callbacks.del), - msg: scriptify(callbacks.msg), - }); - } - - addDataflow(subjectFun: Script, priority?: Priority): Endpoint { - return this.addEndpoint(() => { - let subjectId = this.actor.dataspace.dataflow.currentSubjectId; - this.scheduleScript(() => { - if (this.isLive) { - this.actor.dataspace.dataflow.withSubject(subjectId, () => - subjectFun.call(this.fields, this)); - } - }, priority); - return { assertion: void 0, analysis: null }; - }); - } - - enqueueScriptAction(action: Action) { - this.actor.enqueueScriptAction(action); - } - - toString(): string { - let s = 'Facet(' + this.actor.id; - if (this.actor.name !== void 0 && this.actor.name !== null) { - s = s + ',' + this.actor.name.toString(); - } - s = s + ',' + this.id; - let f = this.parent; - while (f != null) { - s = s + ':' + f.id; - f = f.parent; - } - return s + ')'; - } - - invokeScript(script: Script, propagateErrors = false): T | undefined { - try { - // console.group('Facet', facet && facet.toString()); - return script.call(this.fields, this); - } catch (e) { - let a = this.actor; - a.abandonQueuedWork(); - a._terminate(false); - console.error('Actor ' + a.toString() + ' exited with exception:', e); - if (propagateErrors) throw e; - return undefined; - } finally { - // console.groupEnd(); - } - } - - wrap, R>( - fn: (this: Fields & DataflowObservableObject, - f: Facet, ... args: T) => R - ): (... args: T) => R - { - return (... actuals) => this.invokeScript(f => fn.call(f.fields, f, ... actuals), true)!; - } - - wrapExternal>( - fn: (this: Fields & DataflowObservableObject, - f: Facet, ... args: T) => void - ): (... args: T) => void { - const ac = this.actor; - return (... actuals) => { - if (this.isLive) { - ac.dataspace.start(); - ac.scheduleTask(() => this.invokeScript(f => fn.call(f.fields, f, ... actuals))); - } - }; - } - - ensureFacetSetup(what: string) { - if (this.inScript) { - throw new Error(`Cannot ${what} outside facet setup; are you missing \`react { ... }\`?`); - } - } - - ensureNonFacetSetup(what: string) { - if (!this.inScript) { - throw new Error(`Cannot ${what} during facet setup; are you missing \`on start { ... }\`?`); - } - } - - // This alias exists because of the naive expansion done by the parser. - _send(body: Value) { - this.send(body); - } - - send(body: Value) { - this.ensureNonFacetSetup('`send`'); - this.enqueueScriptAction(new Message(body)); - } - - // This alias exists because of the naive expansion done by the parser. - _spawn(name: any, bootProc: Script, initialAssertions?: Set) { - this.spawn(name, bootProc, initialAssertions); - } - - spawn(name: any, bootProc: Script, initialAssertions?: Set) { - this.ensureNonFacetSetup('`spawn`'); - this.enqueueScriptAction(new Spawn(name, bootProc, initialAssertions)); - } - - deferTurn(continuation: Script) { - this.ensureNonFacetSetup('`deferTurn`'); - this.enqueueScriptAction(new DeferredTurn(this.wrap(continuation))); - } - - activate(script: ActivationScript, name?: any) { - this.ensureNonFacetSetup('`activate`'); - this.enqueueScriptAction(new Activation(script, name ?? null)); - } - - scheduleScript(script: Script, priority?: Priority) { - this.actor.scheduleTask(this.wrap(script), priority); - } - - declareField(obj: T, prop: K, init: T[K]) { - if (prop in obj) { - obj[prop] = init; - } else { - this.actor.dataspace.dataflow.defineObservableProperty(obj, prop, init, { - objectId: [obj, prop], - noopGuard: is - }); - } - } - - // referenceField(obj: DataflowObservableObject, prop: string) { - // if (!(prop in obj)) { - // this.actor.dataspace.dataflow.recordObservation([obj, prop]); - // } - // return obj[prop]; + // adjustIndex(a: Value, count: number) { + // return this.index.adjustAssertion(a, count); // } - // deleteField(obj: DataflowObservableObject, prop: string) { - // this.actor.dataspace.dataflow.recordDamage([obj, prop]); - // delete obj[prop]; + // subscribe(handler: Skeleton.Analysis) { + // this.index.addHandler(handler, handler.callback!); // } - addChildFacet(bootProc: Script) { - this.actor.addFacet(this, bootProc, true); + // unsubscribe(handler: Skeleton.Analysis) { + // this.index.removeHandler(handler, handler.callback!); + // } + + assert(turn: Turn, rec: Assertion, handle: Handle): void { + console.log(preserves`ds ${turn.activeFacet.id} assert ${rec} ${handle}`); + throw new Error("Full dataspaces not implemented"); } - withSelfDo(t: Script) { - t.call(this.fields, this); - } -} - -export class Endpoint { - readonly id: EndpointId; - readonly facet: Facet; - readonly updateFun: Script; - spec: EndpointSpec; - - constructor(facet: Facet, isDynamic: boolean, updateFun: Script) { - facet.ensureFacetSetup('add endpoint'); - let ac = facet.actor; - let ds = ac.dataspace; - this.id = ds.nextId++; - this.facet = facet; - this.updateFun = updateFun; - let initialSpec = ds.dataflow.withSubject(isDynamic ? this : undefined, - () => updateFun.call(facet.fields, facet)); - this._install(initialSpec); - this.spec = initialSpec; // keeps TypeScript's undefinedness-checker happy - facet.endpoints.set(this.id, this); - } - - _install(spec: EndpointSpec) { - this.spec = spec; - const ac = this.facet.actor; - if (this.spec.assertion !== void 0) { - ac.assert(this.spec.assertion); - } - if (this.spec.analysis) ac.dataspace.subscribe(this.spec.analysis); - } - - _uninstall(emitPatches: boolean) { - if (emitPatches) { - if (this.spec.assertion !== void 0) { - this.facet.actor.retract(this.spec.assertion); - } - } - if (this.spec.analysis) this.facet.actor.dataspace.unsubscribe(this.spec.analysis); - } - - refresh() { - let newSpec = this.updateFun.call(this.facet.fields, this.facet); - if (!is(newSpec.assertion, this.spec.assertion)) { - this._uninstall(true); - this._install(newSpec); - } - } - - destroy(emitPatches: boolean) { - const facet = this.facet; - facet.actor.dataspace.dataflow.forgetSubject(this); - // ^ TODO: this won't work because of object identity problems! Why - // does the Racket implementation do this, when the old JS - // implementation doesn't? - facet.endpoints.delete(this.id); - this._uninstall(emitPatches); - } - - toString(): string { - return 'Endpoint(' + this.id + ')'; + retract(turn: Turn, upstreamHandle: Handle): void { + console.log(preserves`ds ${turn.activeFacet.id} retract ${upstreamHandle}`); + throw new Error("Full dataspaces not implemented"); + } + + message(turn: Turn, rec: Assertion): void { + console.log(preserves`ds ${turn.activeFacet.id} message ${rec}`); + throw new Error("Full dataspaces not implemented"); } } diff --git a/packages/core/src/runtime/ground.ts b/packages/core/src/runtime/ground.ts deleted file mode 100644 index fd34d41..0000000 --- a/packages/core/src/runtime/ground.ts +++ /dev/null @@ -1,131 +0,0 @@ -//--------------------------------------------------------------------------- -// @syndicate-lang/core, an implementation of Syndicate dataspaces for JS. -// Copyright (C) 2016-2021 Tony Garnock-Jones -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . -//--------------------------------------------------------------------------- - -import { ActivationScript, Dataspace } from './dataspace.js'; - -export type StopHandler = (ds: D) => void; - -declare global { - interface Window { - _ground: Ground; - } -} - -export class Ground extends Dataspace { - stepScheduled = false; - stepping = false; - startingFuel: number = 1000; - stopHandlers: Array> = []; - backgroundTaskCount = 0; - - constructor(bootProc: ActivationScript) { - super(function (rootFacet) { rootFacet.addStartScript(() => rootFacet.activate(bootProc)); }); - if (typeof window !== 'undefined') { - window._ground = this; - } - } - - static laterCall(thunk: () => void): void { - queueMicrotask(() => { - if ('stackTraceLimit' in Error) { - (Error as any).stackTraceLimit = 100; - } - try { - thunk(); - } catch (e) { - console.error("SYNDICATE/JS INTERNAL ERROR", e); - } - }); - } - - backgroundTask(): () => void { - let active = true; - this.backgroundTaskCount++; - return () => { - if (active) { - this.backgroundTaskCount--; - active = false; - } - }; - } - - start(): this { - if (!this.stepScheduled) { - Ground.laterCall(() => { - this.stepScheduled = false; - this._step(); - }); - } - return this; // allows chaining start() immediately after construction - } - - ground(): Ground { - return this; - } - - _step() { - this.stepping = true; - try { - let stillBusy = false; - for (var fuel = this.startingFuel; fuel > 0; fuel--) { - stillBusy = this.runTasks(); - if (!stillBusy) break; - } - if (stillBusy) { - this.start(); - } else { - if (!this.backgroundTaskCount) { - this.stopHandlers.forEach((h) => h(this)); - this.stopHandlers = []; - } - } - } finally { - this.stepping = false; - } - } - - addStopHandler(h: StopHandler): this { - this.stopHandlers.push(h); - return this; - } -} - - // let g = new Ground(() => { - // Worker.spawnWorkerRelay(); - // }); - // if (typeof document !== 'undefined') { - // document.addEventListener("DOMContentLoaded", () => { - // g.start(); - // if (k) k(g); - // }); - // } else { - // g.start(); - // if (k) k(g); - // } - -export function bootModule(bootProc: ActivationScript): Ground { - const g = new Ground(bootProc); - Ground.laterCall(() => { - if (typeof document !== 'undefined') { - document.addEventListener('DOMContentLoaded', () => g.start()); - } else { - g.start(); - } - }); - return g; -} diff --git a/packages/core/src/runtime/pattern.ts b/packages/core/src/runtime/pattern.ts new file mode 100644 index 0000000..16c9efe --- /dev/null +++ b/packages/core/src/runtime/pattern.ts @@ -0,0 +1,181 @@ +//--------------------------------------------------------------------------- +// @syndicate-lang/core, an implementation of Syndicate dataspaces for JS. +// Copyright (C) 2016-2021 Tony Garnock-Jones +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +//--------------------------------------------------------------------------- + +use crate::schemas::dataspace_patterns::*; + +use preserves::value::NestedValue; + +use std::convert::TryFrom; +use std::convert::TryInto; + +#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq)] +pub enum PathStep { + Index(usize), + Key(_Any), +} + +pub type Path = Vec; +pub type Paths = Vec; + +struct Analyzer { + pub const_paths: Paths, + pub const_values: Vec<_Any>, + pub capture_paths: Paths, +} + +pub struct PatternAnalysis { + pub const_paths: Paths, + pub const_values: _Any, + pub capture_paths: Paths, +} + +struct PatternMatcher { + captures: Vec, +} + +impl PatternAnalysis { + pub fn new(p: &Pattern) -> Self { + let mut analyzer = Analyzer { + const_paths: Vec::new(), + const_values: Vec::new(), + capture_paths: Vec::new(), + }; + analyzer.walk(&mut Vec::new(), p); + PatternAnalysis { + const_paths: analyzer.const_paths, + const_values: _Any::new(analyzer.const_values), + capture_paths: analyzer.capture_paths, + } + } +} + +impl Analyzer { + fn walk_step(&mut self, path: &mut Path, s: PathStep, p: &Pattern) { + path.push(s); + self.walk(path, p); + path.pop(); + } + + fn walk(&mut self, path: &mut Path, p: &Pattern) { + match p { + Pattern::DCompound(b) => match &**b { + DCompound::Rec { members, .. } | + DCompound::Arr { members, .. } => { + for (i, p) in members { + self.walk_step(path, PathStep::Index(usize::try_from(i).unwrap_or(0)), p); + } + } + DCompound::Dict { members, .. } => { + for (k, p) in members { + self.walk_step(path, PathStep::Key(k.clone()), p); + } + } + } + Pattern::DBind(b) => { + let DBind { pattern, .. } = &**b; + self.capture_paths.push(path.clone()); + self.walk(path, pattern) + } + Pattern::DDiscard(_) => + (), + Pattern::DLit(b) => { + let DLit { value } = &**b; + self.const_paths.push(path.clone()); + self.const_values.push(value.clone()); + } + } + } +} + +impl Pattern { + pub fn match_value(&self, value: &N) -> Option> { + let mut matcher = PatternMatcher::new(); + if matcher.run(self, value) { + Some(matcher.captures) + } else { + None + } + } +} + +impl PatternMatcher { + fn new() -> Self { + PatternMatcher { + captures: Vec::new(), + } + } + + fn run(&mut self, pattern: &Pattern, value: &N) -> bool { + match pattern { + Pattern::DDiscard(_) => true, + Pattern::DBind(b) => { + self.captures.push(value.clone()); + self.run(&b.pattern, value) + } + Pattern::DLit(b) => value == &b.value, + Pattern::DCompound(b) => match &**b { + DCompound::Rec { ctor, members } => { + let arity = (&ctor.arity).try_into().expect("reasonable arity"); + match value.value().as_record(Some(arity)) { + None => false, + Some(r) => { + for (i, p) in members.iter() { + let i: usize = i.try_into().expect("reasonable index"); + if !self.run(p, &r.fields()[i]) { + return false; + } + } + true + } + } + } + DCompound::Arr { ctor, members } => { + let arity: usize = (&ctor.arity).try_into().expect("reasonable arity"); + match value.value().as_sequence() { + None => false, + Some(vs) => { + if vs.len() != arity { + return false; + } + for (i, p) in members.iter() { + let i: usize = i.try_into().expect("reasonable index"); + if !self.run(p, &vs[i]) { + return false; + } + } + true + } + } + } + DCompound::Dict { ctor: _, members } => { + match value.value().as_dictionary() { + None => false, + Some(entries) => { + for (k, p) in members.iter() { + if !entries.get(k).map(|v| self.run(p, v)).unwrap_or(false) { + return false; + } + } + true + } + } + } + } + } + } +} diff --git a/packages/core/src/runtime/relay.ts b/packages/core/src/runtime/relay.ts deleted file mode 100644 index 05ebc98..0000000 --- a/packages/core/src/runtime/relay.ts +++ /dev/null @@ -1,151 +0,0 @@ -//--------------------------------------------------------------------------- -// @syndicate-lang/core, an implementation of Syndicate dataspaces for JS. -// Copyright (C) 2016-2021 Tony Garnock-Jones -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . -//--------------------------------------------------------------------------- - -import { embed, Value } from '@preserves/core'; - -import { $Special } from './special.js'; -import { Dataspace, Facet, Actor, Endpoint, Script } from './dataspace.js'; -import { Observe, Inbound, Outbound } from './assertions.js'; -import { ChangeDescription } from './bag.js'; -import { EventType, Analysis } from './skeleton.js'; -import { Ground } from './ground.js'; - -export const $QuitDataspace = embed(new $Special("quit-dataspace")); - -export class NestedDataspace extends Dataspace { - readonly outerFacet: Facet<{}>; - - constructor(outerFacet: Facet<{}>, bootProc: Script) { - super(bootProc); - this.outerFacet = outerFacet; - } - - deliverMessage(m: any, _sendingActor: Actor) { - super.deliverMessage(m, _sendingActor); - if (m === $QuitDataspace) { - this.outerFacet.stop(); - } - } - - endpointHook(facet: Facet, innerEp: Endpoint) { - super.endpointHook(facet, innerEp); - - const innerAssertion = innerEp.spec.assertion; - if (!Observe.isClassOf(innerAssertion)) return; - const wrapper = innerAssertion[0]; - if (!Inbound.isClassOf(wrapper)) return; - - // We know that innerAssertion is an Observe(Inbound(...)). Also, if - // innerEp.spec.analysis exists, it will be consonant with innerAssertion. Beware of - // completely-constant patterns, which cause skeleton to be null! - - const innerDs = this; - this.hookEndpointLifecycle(innerEp, this.outerFacet.withNonScriptContext(() => - this.outerFacet.addEndpoint(() => { - const assertion = Observe(wrapper[0]); - const h = innerEp.spec.analysis!; - const innerCallback = h.callback; - const callback = (innerCallback === void 0) ? void 0 : - function (evt: EventType, captures: Array) { - innerCallback.call(null, evt, captures); - innerDs.start(); - }; - const analysis: Analysis | null = (h === null) ? null : - (h.skeleton === null - ? { - skeleton: null, - constPaths: h.constPaths, - constVals: h.constVals.map(v => (v as ReturnType)[0]), - capturePaths: h.capturePaths.map(p => p.slice(1)), - callback - } - : { - skeleton: h.skeleton.members[0], - constPaths: h.constPaths.map(p => p.slice(1)), - constVals: h.constVals, - capturePaths: h.capturePaths.map(p => p.slice(1)), - callback - }); - return { assertion, analysis }; - }, false))); - } - - adjustIndex(a: Value, count: number) { - const net = super.adjustIndex(a, count); - if (Outbound.isClassOf(a)) { - switch (net) { - case ChangeDescription.ABSENT_TO_PRESENT: - this.outerFacet.actor.scheduleTask(() => { - this.outerFacet.actor.adhocAssert(a[0]); - }); - this.outerFacet.actor.dataspace.start(); - break; - case ChangeDescription.PRESENT_TO_ABSENT: - this.outerFacet.actor.scheduleTask(() => { - this.outerFacet.actor.adhocRetract(a[0]); - }); - this.outerFacet.actor.dataspace.start(); - break; - } - } - return net; - } - - hookEndpointLifecycle(innerEp: Endpoint, outerEp: Endpoint<{}>) { - const _refresh = innerEp.refresh; - innerEp.refresh = function () { - _refresh.call(this); - outerEp.refresh(); - }; - - const _destroy = innerEp.destroy; - innerEp.destroy = function (emitPatches: boolean) { - _destroy.call(this, emitPatches); - outerEp.destroy(true); - }; - } - - start(): this { - this.outerFacet.actor.dataspace.start(); - this.outerFacet.scheduleScript(outerFacet => { - outerFacet.invokeScript(() => { - if (this.outerFacet.isLive) { - this.outerFacet.deferTurn(() => { - const stillBusy = this.runTasks(); - if (stillBusy) this.start(); - }); - } - }); - }); - return this; - } - - ground(): Ground { - return this.outerFacet.actor.dataspace.ground(); - } -} - -export function inNestedDataspace(bootProc: Script): Script { - return outerFacet => { - outerFacet.addDataflow(function () {}); - // ^ eww! Dummy endpoint to keep the root facet of the relay alive. - const innerDs = new NestedDataspace(outerFacet, innerFacet => - innerFacet.addStartScript(f => bootProc.call(f.fields, f))); - innerDs.start(); - }; -} diff --git a/packages/core/src/runtime/skeleton.ts b/packages/core/src/runtime/skeleton.ts index 050c888..49b2efe 100644 --- a/packages/core/src/runtime/skeleton.ts +++ b/packages/core/src/runtime/skeleton.ts @@ -20,7 +20,6 @@ import { IdentitySet } from './idcoll.js'; import { is, Value, Record, Set, Dictionary, canonicalString, RecordConstructorInfo, GenericEmbedded } from '@preserves/core'; import { Bag, ChangeDescription } from './bag.js'; -import { Discard, Capture } from './assertions.js'; import * as Stack from './stack.js'; diff --git a/packages/core/src/runtime/special.ts b/packages/core/src/runtime/task.ts similarity index 68% rename from packages/core/src/runtime/special.ts rename to packages/core/src/runtime/task.ts index 4aa8b5a..bc36de1 100644 --- a/packages/core/src/runtime/special.ts +++ b/packages/core/src/runtime/task.ts @@ -16,12 +16,22 @@ // along with this program. If not, see . //--------------------------------------------------------------------------- -// $Special: Builder of singleton "atoms". +const LIMIT = 25000; -export class $Special { - readonly name: string; - - constructor(name: string) { - this.name = name; +let taskCounter = 0; +let delayedTasks: Array<() => void> = []; +export function queueTask(f: () => void) { + taskCounter++; + if (taskCounter === LIMIT) { + setTimeout(() => { + taskCounter = 0; + delayedTasks.forEach(queueMicrotask); + delayedTasks = []; + }, 0); + } + if (taskCounter >= LIMIT) { + delayedTasks.push(f); + } else { + queueMicrotask(f); } } diff --git a/packages/core/src/runtime/worker.ts b/packages/core/src/runtime/worker.ts deleted file mode 100644 index 989ae16..0000000 --- a/packages/core/src/runtime/worker.ts +++ /dev/null @@ -1,304 +0,0 @@ -//--------------------------------------------------------------------------- -// @syndicate-lang/core, an implementation of Syndicate dataspaces for JS. -// Copyright (C) 2016-2021 Tony Garnock-Jones -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . -//--------------------------------------------------------------------------- - -// import { Dataspace, Facet } from './dataspace.js'; -// import { Observe, Outbound, Inbound, Capture, Discard } from './assertions.js'; -// import * as Skeleton from './skeleton.js'; - -// import { preserves, Value, Bytes, Record, Dictionary, encode, decode } from '@preserves/core'; - -// type MessageHandler = (e: Bytes) => void; -// type ImplementationType = 'node.js' | 'browser' | 'none'; - -// type WorkerConstructor = { -// new (stringUrl: string | URL, options?: WorkerOptions): Worker; -// }; - -// function extractBytes(h: MessageHandler): (e: MessageEvent | Bytes) => void { -// return (e) => { -// const bs = (e instanceof MessageEvent) ? e.data : e; -// return h(bs); -// }; -// } - -// const WorkerEvent = Record.makeConstructor('--worker-event', ['epId', 'event', 'vs']); -// const { implementationType, _Worker, postMessage, onMessage, isMainThread }: { -// implementationType: ImplementationType, -// _Worker?: WorkerConstructor, -// postMessage?: (m: any) => void, -// onMessage?: (handler: MessageHandler) => void, -// isMainThread: boolean, -// } = (function () { -// try { -// // Let's see if we're in node.js with the web worker extension enabled. -// const { Worker, parentPort, isMainThread } = require('worker_threads'); -// return { -// implementationType: 'node.js' as ImplementationType, -// _Worker: Worker, -// postMessage: (m: any) => parentPort.postMessage(m), -// onMessage: (handler: MessageHandler) => { -// parentPort.removeAllListeners('message'); -// parentPort.on('message', extractBytes(handler)); -// }, -// isMainThread, -// }; -// } catch (_e) { -// // Well, something didn't work there. Could we be in the browser? -// if (typeof window !== 'undefined' && 'Worker' in window) { -// // Yep. -// return { -// implementationType: 'browser' as ImplementationType, -// _Worker: Worker, -// postMessage, -// onMessage: (handler: MessageHandler) => onmessage = extractBytes(handler), -// isMainThread: (typeof self === 'undefined'), -// }; -// } else { -// // Nope. No support, then. -// return { -// implementationType: 'none' as ImplementationType, -// isMainThread: true, -// }; -// } -// } -// })(); - -// function encodePacket(p: Value) { -// return Bytes.toIO(encode(p)); -// } - -// function decodePacket(m: Bytes) { -// return decode(m); -// } - -// function sendPacket(ch: Worker, p: Value) { -// ch.postMessage(encodePacket(p)); -// } - -// function listen(w: Worker, eventType: string, handler: (... args: any[]) => any) { -// if ('on' in w) { -// (w as any).on(eventType, handler); -// return; -// } - -// const k = 'on' + eventType; -// if (k in w) { -// w[k] = handler; -// } -// } - -// export function spawnWorker(workerSourceFilename: string) { -// if (implementationType === 'none') { -// // In older versions of node.js, try adding --experimental-worker flag to the command line. -// throw new Error("Cannot spawnWorker without a web worker implementation available"); -// } - -// Dataspace.spawn(workerSourceFilename, function () { -// const outerFacet = Dataspace.currentFacet; -// outerFacet.addDataflow(function () {}); -// // ^ eww! Dummy endpoint to keep the root facet of the relay alive. - -// let endpoints = new Dictionary(); - -// const w = new _Worker(workerSourceFilename); -// listen(w, 'error', Dataspace.wrapExternal((err) => { -// throw err; -// })); -// listen(w, 'exit', Dataspace.wrapExternal(() => { -// outerFacet.stop(); -// })); -// listen(w, 'message', Dataspace.wrapExternal(extractBytes((msg: Bytes) => { -// const m = decodePacket(msg) as Array; -// switch (m[0]) { -// case 'assert': { -// const [ep, a] = m.slice(1); -// if (!endpoints.has(ep)) { -// outerFacet.actor.addFacet(outerFacet, function () { -// const epFacet = Dataspace.currentFacet; -// endpoints = endpoints.set(ep, epFacet); -// epFacet.addStopScript(() => { endpoints.delete(ep); }); -// Dataspace.declareField(this, 'assertion', a); -// epFacet.addEndpoint(() => { -// if (Observe.isClassOf(this.assertion)) { -// const spec = this.assertion.get(0); -// const analysis = Skeleton.analyzeAssertion(spec); -// analysis.callback = Dataspace.wrap((evt, vs) => { -// epFacet.actor.scheduleScript(() => { -// sendPacket(w, ['event', ep, evt, vs]); -// }); -// }); -// return { assertion: Observe(spec), analysis }; -// } else { -// return { assertion: this.assertion, analysis: null }; -// } -// }, true); -// }, true); -// } else { -// endpoints.get(ep).fields.assertion = a; -// } -// break; -// } -// case 'clear': { -// const ep = m[1]; -// const epFacet = endpoints.get(ep); -// if (epFacet) epFacet.stop(() => { endpoints.delete(ep); }); -// break; -// } -// case 'message': { -// const body = m[1]; -// Dataspace.send(body); -// break; -// } -// default: { -// throw new Error( -// preserves`Invalid Worker protocol message from Worker: ${m}`); -// } -// } -// }))); -// }, null); -// } - -// export function spawnWorkerRelay() { -// if (implementationType === 'none') return; -// if (isMainThread) return; - -// Dataspace.currentFacet.actor.dataspace.ground().addStopHandler(() => { -// process.exit(); -// }); - -// Dataspace.currentFacet.addStartScript(function () { -// Dataspace.spawn('WorkerRelay', function () { -// const outerFacet = Dataspace.currentFacet; - -// const finish = Dataspace.backgroundTask(); -// outerFacet.addStopScript(finish); - -// const outboundEndpoints = new Dictionary(); -// const inboundEndpoints = new Dictionary<{ epId: number, facet: Facet }>(); -// let nextId = 0; - -// function sendToParent(m: Value) { -// postMessage(encodePacket(m)); -// } - -// onMessage(Dataspace.wrapExternal(function (msg: Bytes) { -// const m = decodePacket(msg); -// if (Array.isArray(m) && m[0] === 'event') { -// const [epId, evt, vs] = m.slice(1); -// Dataspace.send(WorkerEvent(epId, evt, vs)); -// } else { -// throw new Error( -// preserves`Invalid Worker protocol message from parent: ${m}`); -// } -// })); - -// outerFacet.addEndpoint(function () { -// const analysis = Skeleton.analyzeAssertion(Outbound(Capture(Discard._instance))); -// analysis.callback = Dataspace.wrap(function (evt, vs) { -// outerFacet.actor.scheduleScript(function () { -// switch (evt) { -// case Skeleton.EventType.ADDED: { -// const epId = nextId++; -// outboundEndpoints.set(vs[0], epId); -// sendToParent(['assert', epId, vs[0]]); -// break; -// } -// case Skeleton.EventType.REMOVED: { -// const epId = outboundEndpoints.get(vs[0]); -// outboundEndpoints.delete(vs[0]); -// sendToParent(['clear', epId]); -// break; -// } -// case Skeleton.EventType.MESSAGE: { -// sendToParent(['message', vs[0]]); -// break; -// } -// } -// }); -// }); -// return { assertion: analysis.assertion, analysis }; -// }, false); - -// outerFacet.addEndpoint(function () { -// const analysis = -// Skeleton.analyzeAssertion(Observe(Inbound(Capture(Discard._instance)))); -// analysis.callback = Dataspace.wrap(function (evt, vs) { -// outerFacet.actor.scheduleScript(function () { -// const spec = vs[0]; -// switch (evt) { -// case Skeleton.EventType.ADDED: { -// const epId = nextId++; -// outerFacet.actor.addFacet(outerFacet, function () { -// const innerFacet = Dataspace.currentFacet; -// inboundEndpoints.set(spec, { epId, facet: innerFacet }); -// innerFacet.addEndpoint(function () { -// const analysis = Skeleton.analyzeAssertion( -// WorkerEvent(epId, Capture(Discard._instance), Capture(Discard._instance))); -// analysis.callback = Dataspace.wrap(function (evt, vs) { -// if (evt === Skeleton.EventType.MESSAGE) { -// evt = vs[0] as Skeleton.EventType; -// vs = vs[1] as Array; -// const a = Skeleton.instantiateAssertion(Inbound(spec), vs); -// innerFacet.actor.scheduleScript(function () { -// switch (evt) { -// case Skeleton.EventType.ADDED: -// innerFacet.actor.addFacet(innerFacet, function () { -// const assertionFacet = Dataspace.currentFacet; -// assertionFacet.addEndpoint(function () { -// return { assertion: a, analysis: null }; -// }, false); -// assertionFacet.addEndpoint(function () { -// const analysis = Skeleton.analyzeAssertion( -// WorkerEvent(epId, Skeleton.EventType.REMOVED, vs)); -// analysis.callback = Dataspace.wrap(() => { -// assertionFacet.actor.scheduleScript(function () { -// assertionFacet.stop(); -// }); -// }); -// return { assertion: analysis.assertion, analysis }; -// }, false); -// }, true); -// break; -// case Skeleton.EventType.MESSAGE: -// Dataspace.send(a); -// break; -// } -// }); -// } -// }); -// return { assertion: analysis.assertion, analysis }; -// }, false); -// }, true); -// sendToParent(['assert', epId, Observe(spec)]); -// break; -// } -// case Skeleton.EventType.REMOVED: { -// const { epId, facet } = inboundEndpoints.get(spec); -// inboundEndpoints.delete(spec); -// facet.stop(); -// sendToParent(['clear', epId]); -// break; -// } -// } -// }); -// }); -// return { assertion: analysis.assertion, analysis }; -// }, false); -// }, null); -// }); -// } diff --git a/packages/core/src/transport/protocol.ts b/packages/core/src/transport/protocol.ts new file mode 100644 index 0000000..76e6dca --- /dev/null +++ b/packages/core/src/transport/protocol.ts @@ -0,0 +1,42 @@ +//--------------------------------------------------------------------------- +// @syndicate-lang/core, an implementation of Syndicate dataspaces for JS. +// Copyright (C) 2016-2021 Tony Garnock-Jones +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +//--------------------------------------------------------------------------- + +import * as S from '../gen/sturdy.js'; +import { Oid } from '../gen/protocol.js'; +import { Ref } from '../runtime/actor.js'; +import { Decoder, DecoderState, Encoder, EncoderState, GenericEmbedded, neverEmbeddedType, EmbeddedType, Value } from '@preserves/core'; + +export type WireSymbol = { oid: Oid, ref: Ref, count: number }; + +export const wireRefEmbeddedType: EmbeddedType = { + decode(s: DecoderState): S.WireRef { + return S.asWireRef(new Decoder(s).next()); + }, + + encode(s: EncoderState, v: S.WireRef): void { + new Encoder(s, neverEmbeddedType).push(S.fromWireRef(v)); + }, + + fromValue(v: Value): S.WireRef { + return S.asWireRef(v as Value); + }, + + toValue(v: S.WireRef): Value { + return S.fromWireRef(v) as Value; + } +}; diff --git a/packages/core/src/transport/relay.ts b/packages/core/src/transport/relay.ts new file mode 100644 index 0000000..1cdc13c --- /dev/null +++ b/packages/core/src/transport/relay.ts @@ -0,0 +1,349 @@ +//--------------------------------------------------------------------------- +// @syndicate-lang/core, an implementation of Syndicate dataspaces for JS. +// Copyright (C) 2016-2021 Tony Garnock-Jones +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +//--------------------------------------------------------------------------- + +import { Actor, Assertion, Entity, Facet, Handle, Ref, Turn } from '../runtime/actor.js'; +import { BytesLike, Decoder, Dictionary, embed, encode, IdentityMap, mapEmbeddeds, underlying, Value } from '@preserves/core'; +import * as IO from '../gen/protocol.js'; +import { wireRefEmbeddedType, WireSymbol } from './protocol.js'; +import { queueTask } from '../runtime/task.js'; +import { attenuate } from '../runtime/rewrite.js'; +import { fromAttenuation, WireRef } from '../gen/sturdy.js'; + +export class SyncPeerEntity implements Entity { + readonly relay: Relay; + readonly peer: Ref; + readonly handleMap = new IdentityMap(); + e: WireSymbol | null = null; + + constructor(relay: Relay, peer: Ref) { + this.relay = relay; + this.peer = peer; + } + + assert(turn: Turn, assertion: Assertion, handle: Handle): void { + this.handleMap.set(handle, turn.assert(this.peer, assertion)); + } + + retract(turn: Turn, handle: Handle): void { + turn.retract(this.handleMap.get(handle)!); + this.handleMap.delete(handle); + } + + message(turn: Turn, body: Assertion): void { + // We get to vanish from the indexes now + this.relay.releaseRefOut(this.e!); + turn.message(this.peer, body); + } + + sync(turn: Turn, peer: Ref): void { + turn._sync(this.peer, peer); + } +} + +export class RelayEntity implements Entity { + readonly relay: Relay; + readonly oid: IO.Oid; + + constructor(relay: Relay, oid: IO.Oid) { + this.relay = relay; + this.oid = oid; + } + + send(m: IO.Event): void { + this.relay.send(this.oid, m); + } + + assert(_turn: Turn, assertion: Assertion, handle: Handle): void { + this.send(IO.Event.Assert(IO.Assert({ + assertion: this.relay.register(assertion, handle), + handle + }))) + } + + retract(_turn: Turn, handle: Handle): void { + this.relay.deregister(handle); + this.send(IO.Event.Retract(IO.Retract(handle))); + } + + message(_turn: Turn, body: Assertion): void { + this.send(IO.Event.Message(IO.Message(this.relay.register(body, null)))); + } + + sync(turn: Turn, peer: Ref): void { + const peerEntity = new SyncPeerEntity(this.relay, peer); + const exported: Array = []; + const ior = this.relay.rewriteRefOut(turn.ref(peerEntity), false, exported); + peerEntity.e = exported[0]; + this.send(IO.Event.Sync(IO.Sync(ior))); + } +} + +export class Membrane { + readonly byOid = new IdentityMap(); + readonly byRef = new IdentityMap(); + + grab(table: Table, + key: Parameters[0], + transient: boolean, + f: () => WireSymbol): WireSymbol + { + let e = this[table].get(key as any); + if (e === void 0) { + e = f(); + this.byRef.set(e.ref, e); + this.byOid.set(e.oid, e); + } + if (!transient) e.count++; + return e; + } + + drop(e: WireSymbol): void { + e.count--; + if (e.count === 0) { + this.byOid.delete(e.oid); + this.byRef.delete(e.ref); + } + } +} + +export const INERT_REF: Ref = { + relay: (() => { + const a = new Actor(t => t.stop()); + return a.root; + })(), + target: {}, +}; + +export type PacketWriter = (bs: Uint8Array) => void; + +export interface RelayOptions { + packetWriter: PacketWriter; + setup(t: Turn, r: Relay): void; + debug?: boolean; + trustPeer?: boolean; +} + +export class Relay { + readonly facet: Facet; + readonly w: PacketWriter; + readonly inboundAssertions = new IdentityMap, + }>(); + readonly outboundAssertions = new IdentityMap>(); + readonly exported = new Membrane(); + readonly imported = new Membrane(); + nextLocalOid: IO.Oid = 0; + pendingTurn: IO.Turn = []; + debug: boolean; + trustPeer: boolean; + + readonly decoder = new Decoder(void 0, { + includeAnnotations: false, + embeddedDecode: wireRefEmbeddedType, + }); + + constructor(t: Turn, options: RelayOptions) { + this.facet = t.activeFacet; + this.w = options.packetWriter; + this.debug = options.debug ?? false; + this.trustPeer = options.trustPeer ?? true; + + this.facet.preventInertCheck(); + options.setup(t, this); + } + + rewriteOut(assertion: Assertion, transient: boolean): [Value, Array] + { + const exported: Array = []; + const rewritten = mapEmbeddeds(assertion, r => embed(this.rewriteRefOut(r, transient, exported))); + return [rewritten, exported]; + } + + rewriteIn(t: Turn, a: Value): [Assertion, Array] + { + const imported: Array = []; + const rewritten = mapEmbeddeds(a, r => embed(this.rewriteRefIn(t, r, imported))); + return [rewritten, imported]; + } + + register(assertion: Assertion, handle: Handle | null): Value { + const [rewritten, exported] = this.rewriteOut(assertion, handle === null); + if (handle !== null) this.outboundAssertions.set(handle, exported); + return rewritten; + } + + deregister(handle: Handle): void { + (this.outboundAssertions.get(handle) ?? []).forEach(e => this.releaseRefOut(e)); + this.outboundAssertions.delete(handle); + } + + rewriteRefOut(r: Ref, transient: boolean, exported: Array): WireRef { + if (r.target instanceof RelayEntity && r.target.relay === this) { + if (r.attenuation === void 0 || r.attenuation.length === 0) { + // No extra conditions on this reference since it was sent to us. + return WireRef.yours({ oid: r.target.oid, attenuation: [] }); + } else { + // This reference has been attenuated since it was sent to us. + // Do we trust the peer to enforce such attenuation on our behalf? + if (this.trustPeer) { + return WireRef.yours({ oid: r.target.oid, attenuation: r.attenuation }); + } else { + // fall through: treat the attenuated ref as a local ref, and re-export it. + } + } + } + + const e = this.exported.grab( + "byRef", r, transient, () => { + if (transient) throw new Error("Cannot send transient reference"); + return { oid: this.nextLocalOid++, ref: r, count: 0 }; + }); + exported.push(e); + return WireRef.mine(e.oid); + } + + releaseRefOut(e: WireSymbol) { + this.exported.drop(e); + } + + rewriteRefIn(t: Turn, n: WireRef, imported: Array): Ref { + switch (n._variant) { + case 'yours': { + const r = this.lookupLocal(n.oid); + if (n.attenuation.length === 0 || r === INERT_REF) { + return r; + } else { + type AttenuatedRef = Ref & { __attenuations?: Dictionary }; + const ar = r as AttenuatedRef; + if (ar.__attenuations === void 0) { + ar.__attenuations = new Dictionary(); + } + return ar.__attenuations.getOrSet(fromAttenuation(n.attenuation), () => + attenuate(r, ... n.attenuation)); + } + } + case 'mine': { + const e = this.imported.grab("byOid", n.oid, false, () => + ({ oid: n.oid, ref: t.ref(new RelayEntity(this, n.oid)), count: 0 })); + imported.push(e); + return e.ref; + } + } + } + + send(remoteOid: IO.Oid, m: IO.Event): void { + if (this.pendingTurn.length === 0) { + queueTask(() => { + if (this.debug) console.log('OUT', IO.fromTurn(this.pendingTurn).asPreservesText()); + this.w(underlying(encode(IO.fromTurn(this.pendingTurn), { + canonical: true, + embeddedEncode: wireRefEmbeddedType, + }))); + this.pendingTurn = []; + }); + } + this.pendingTurn.push(IO.TurnEvent({ oid: remoteOid, event: m })); + } + + lookupLocal(localOid: IO.Oid): Ref { + return this.exported.byOid.get(localOid)?.ref ?? INERT_REF; + } + + accept(bs: BytesLike): void { + Turn.for(this.facet, t => { + this.decoder.write(bs); + while (true) { + const rawTurn = this.decoder.try_next(); + if (rawTurn === void 0) break; + const wireTurn = IO.toTurn(rawTurn); + if (wireTurn === void 0) throw new Error("Bad IO.Turn"); + if (this.debug) console.log('IN', rawTurn.asPreservesText()); + wireTurn.forEach(v => { + const { oid: localOid, event: m } = v; + this.handle(t, this.lookupLocal(localOid), m); + }); + } + }); + } + + handle(t: Turn, r: Ref, m: IO.Event) { + switch (m._variant) { + case 'Assert': { + const [a, imported] = this.rewriteIn(t, m.value.assertion); + this.inboundAssertions.set(m.value.handle, { + localHandle: t.assert(r, a), + imported, + }); + break; + } + case 'Retract': { + const remoteHandle = m.value.handle; + const h = this.inboundAssertions.get(remoteHandle); + if (h === void 0) throw new Error(`Peer retracted invalid handle ${remoteHandle}`); + this.inboundAssertions.delete(remoteHandle); + h.imported.forEach(e => this.imported.drop(e)); + t.retract(h.localHandle); + break; + } + case 'Message': { + const [a, imported] = this.rewriteIn(t, m.value.body); + if (imported.length > 0) throw new Error("Cannot receive transient reference"); + t.message(r, a); + break; + } + case 'Sync': { + const imported: Array = []; + const k = this.rewriteRefIn(t, m.value.peer, imported); + t.sync(r).then(t => { + t.message(k, true); + imported.forEach(e => this.imported.drop(e)); + }); + break; + } + } + } +} + +export interface RelayActorOptions extends RelayOptions { + initialOid?: IO.Oid; + initialRef?: Ref; + nextLocalOid?: IO.Oid; +} + +export function spawnRelay(t: Turn, options: RelayActorOptions & {initialOid: IO.Oid}): Promise; +export function spawnRelay(t: Turn, options: Omit): Promise; +export function spawnRelay(t: Turn, options: RelayActorOptions): Promise +{ + return new Promise(resolve => { + t.spawn(t => { + const relay = new Relay(t, options); + if (options.initialRef !== void 0) { + relay.rewriteRefOut(options.initialRef, false, []); + } + if (options.initialOid !== void 0) { + resolve(relay.rewriteRefIn(t, WireRef.mine(options.initialOid), [])); + } else { + resolve(null); + } + if (options.nextLocalOid !== void 0) { + relay.nextLocalOid = (options.nextLocalOid === 0) ? 1 : options.nextLocalOid; + } + }); + }); +} diff --git a/packages/core/src/transport/sturdy.ts b/packages/core/src/transport/sturdy.ts index d57cf22..84018bb 100644 --- a/packages/core/src/transport/sturdy.ts +++ b/packages/core/src/transport/sturdy.ts @@ -24,7 +24,7 @@ // In Network and Distributed System Security Symposium. San Diego, // California: Internet Society, 2014. -import { mac } from './cryptography'; +import { mac } from './cryptography.js'; import { Bytes, decode, encode, is, neverEmbeddedType, Value } from '@preserves/core'; import * as S from '../gen/sturdy.js'; export * from '../gen/sturdy.js'; diff --git a/yarn.lock b/yarn.lock index 5e9932a..d398c91 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1366,10 +1366,24 @@ dependencies: "@octokit/openapi-types" "^6.1.1" -"@preserves/core@0.15.0": - version "0.15.0" - resolved "https://registry.yarnpkg.com/@preserves/core/-/core-0.15.0.tgz#55a14288442d404d20a2906b92b7a7cc9e522a18" - integrity sha512-PoEvwlqNNXpYykwkiD7KyjT6kfo78XXEMwJ5yOhOiEF6nVD167NVv801/DR7xIINtPDaDjdqBtqY/tamyzi7vA== +"@preserves/core@0.17.0", "@preserves/core@^0.17.0": + version "0.17.0" + resolved "https://registry.yarnpkg.com/@preserves/core/-/core-0.17.0.tgz#23e7edb0a1bf5e602d210ea2ae474113d0cc5b12" + integrity sha512-ZyqsC8f08yvOn9UjaBekIhKDY44k7sWwyW2xTLfVCffFHGCPidgq6uUGKcuKAbr3ibGB7kQ6KQr/xgVAUpDC0Q== + +"@preserves/schema@0.18.0": + version "0.18.0" + resolved "https://registry.yarnpkg.com/@preserves/schema/-/schema-0.18.0.tgz#593a33a41eccc62a72a620b75379676b7b72b17c" + integrity sha512-L2IygPGYHZSxfDkHdcohuNCjYbj7BI+5+RJFrPL3vLmT2kj0IYJi6lZu1ReumALbBTx7RW8Mn9IorH8qVUZ39A== + dependencies: + "@preserves/core" "^0.17.0" + "@types/glob" "^7.1.3" + "@types/minimatch" "^3.0.3" + chalk "^4.1.0" + chokidar "^3.5.1" + commander "^7.2.0" + glob "^7.1.6" + minimatch "^3.0.4" "@rollup/plugin-node-resolve@^11.0.1": version "11.2.1" @@ -1452,6 +1466,14 @@ "@types/minimatch" "*" "@types/node" "*" +"@types/glob@^7.1.3": + version "7.2.0" + resolved "https://registry.yarnpkg.com/@types/glob/-/glob-7.2.0.tgz#bc1b5bf3aa92f25bd5dd39f35c57361bdce5b2eb" + integrity sha512-ZUxbzKl0IfJILTS6t7ip5fQQM/J3TJYubDm3nMbgubNNYS62eXeUpoLUC8/7fJNiFYHTrGPQn7hspDUzIHX3UA== + dependencies: + "@types/minimatch" "*" + "@types/node" "*" + "@types/graceful-fs@^4.1.2": version "4.1.5" resolved "https://registry.yarnpkg.com/@types/graceful-fs/-/graceful-fs-4.1.5.tgz#21ffba0d98da4350db64891f92a9e5db3cdb4e15" @@ -1491,6 +1513,11 @@ resolved "https://registry.yarnpkg.com/@types/minimatch/-/minimatch-3.0.4.tgz#f0ec25dbf2f0e4b18647313ac031134ca5b24b21" integrity sha512-1z8k4wzFnNjVK/tlxvrWuK5WMt6mydWWP7+zvH5eFep4oj+UkrfiJTRtjCeBXNpwaA/FYqqtb4/QS4ianFpIRA== +"@types/minimatch@^3.0.3": + version "3.0.5" + resolved "https://registry.yarnpkg.com/@types/minimatch/-/minimatch-3.0.5.tgz#1001cc5e6a3704b83c236027e77f2f58ea010f40" + integrity sha512-Klz949h02Gz2uZCMGwDUSDS1YBlTdDDgbWHi+81l29tQALUtvz4rAYi5uoVhE5Lagoq6DeqAUlbrHvW/mXDgdQ== + "@types/minimist@^1.2.0": version "1.2.1" resolved "https://registry.yarnpkg.com/@types/minimist/-/minimist-1.2.1.tgz#283f669ff76d7b8260df8ab7a4262cc83d988256" @@ -2167,6 +2194,14 @@ chalk@^4.0.0: ansi-styles "^4.1.0" supports-color "^7.1.0" +chalk@^4.1.0: + version "4.1.2" + resolved "https://registry.yarnpkg.com/chalk/-/chalk-4.1.2.tgz#aac4e2b7734a740867aeb16bf02aad556a1e7a01" + integrity sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA== + dependencies: + ansi-styles "^4.1.0" + supports-color "^7.1.0" + char-regex@^1.0.2: version "1.0.2" resolved "https://registry.yarnpkg.com/char-regex/-/char-regex-1.0.2.tgz#d744358226217f981ed58f479b1d6bcc29545dcf" @@ -2342,6 +2377,11 @@ commander@^2.20.0: resolved "https://registry.yarnpkg.com/commander/-/commander-2.20.3.tgz#fd485e84c03eb4881c20722ba48035e8531aeb33" integrity sha512-GpVkmM8vF2vQUkj2LvZmD35JxeJOLCwJ9cUkugyk2nuhbv3+mJvpLYYt+0+USMxE+oj+ey/lJEnhZw75x/OMcQ== +commander@^7.2.0: + version "7.2.0" + resolved "https://registry.yarnpkg.com/commander/-/commander-7.2.0.tgz#a36cb57d0b501ce108e4d20559a150a391d97ab7" + integrity sha512-QrWXB+ZQSVPmIWIhtEO9H+gwHaMGYiF5ChvoJ+K9ZGHG/sVsa6yiesAD1GC/x46sET00Xlwo1u49RVVVzvcSkw== + compare-func@^2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/compare-func/-/compare-func-2.0.0.tgz#fb65e75edbddfd2e568554e8b5b05fff7a51fcb3"