# SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway # SPDX-License-Identifier: Unlicense import ./bags, ./dataflow, ./events, ./skeletons import preserves import std/[asyncdispatch, deques, hashes, macros, options, sets, tables] export dataflow.defineObservableProperty export dataflow.recordObservation export dataflow.recordDamage template generateIdType(T: untyped) = type T* = distinct Natural proc `==`*(x, y: T): bool {.borrow.} proc `$`*(id: T): string {.borrow.} generateIdType(ActorId) generateIdType(FacetId) generateIdType(EndpointId) generateIdType(FieldId) type Value* = Preserve Bag = bags.Bag[Value] Task[T] = proc (): T Script[T] = proc (facet: Facet): T ActivationScript* = Script[void] ActionKind = enum patchAction, messageAction, spawnAction, quitAction, deferredTurnAction, activationAction Action = object impl: proc (action: Action; ds: Dataspace; actor: Option[Actor]) {.gcsafe.} case kind: ActionKind of patchAction: changes: Bag else: discard Priority = enum pQueryHigh = 0, pQuery, pQueryHandler, pNormal, pGC, pIdle, len Actor = ref object id: ActorId name: string dataspace*: Dataspace rootFacet: ParentFacet pendingTasks: array[Priority.len, Deque[Task[void]]] pendingActions: seq[Action] adhocAssertions: Bag cleanupChanges: Bag parentId: ActorId EndpointSpec* = tuple callback: HandlerCallback assertion: Value analysis: Option[Analysis] Endpoint = ref object id: EndpointId facet: Facet updateProc: Script[EndpointSpec] spec: EndpointSpec Field* = object of RootObj id*: FieldId Fields* = seq[Value] # TODO: compile-time tuples Turn = object actions: seq[Action] actor: Option[Actor] Dataspace* = ref object ground*: Ground index: Index dataflow*: Graph[Endpoint, FieldId] runnable: seq[Actor] pendingTurns: seq[Turn] actors: Table[ActorId, Actor] activations: seq[ActivationScript] nextId: Natural StopHandler = proc (ds: Dataspace) {.gcsafe.} Ground = ref object dataspace: Dataspace stopHandlers: seq[StopHandler] future: Future[void] externalTaskCount: int stepScheduled: bool ParentFacet = Option[Facet] Facet* = ref FacetObj FacetObj = object id: FacetId actor*: Actor parent: ParentFacet endpoints: Table[EndpointId, Endpoint] stopScripts: seq[Script[void]] children: Table[FacetId, Facet] fields*: Fields isLive, inScript: bool # FacetImpl[Fields] = ref FacetImplObj[Fields] # FacetImplObj[Fields] {.final.} = object of FacetBaseObj using dataspace: Dataspace actor: Actor facet: Facet proc hash*(ep: Endpoint): Hash = !$(hash(ep.id) !& hash(ep.facet.id)) proc generateId*(ds: Dataspace): Natural = # TODO: used by declareField, but should be hidden. inc(ds.nextId) ds.nextId proc newActor(ds: Dataspace; name: string; initialAssertions: Value; parentId: ActorId): Actor = assert(initialAssertions.kind == pkSet) result = Actor( id: ds.generateId.ActorId, name: name, dataspace: ds, parentId: parentId) for v in initialAssertions.set: discard result.adhocAssertions.change(v, 1) ds.actors[result.id] = result proc applyPatch(ds: Dataspace; actor: Option[Actor]; changes: Bag) = type Pair = tuple[val: Value; count: int] var removals: seq[Pair] for a, count in changes.pairs: if count > 0: # debugEcho "applyPatch +", a discard ds.index.adjustAssertion(a, count) else: removals.add((a, count)) actor.map do (ac: Actor): discard ac.cleanupChanges.change(a, -count) for (a, count) in removals: # debugEcho "applyPatch -", a discard ds.index.adjustAssertion(a, count) proc initPatch(): Action = proc impl(patch: Action; ds: Dataspace; actor: Option[Actor]) {.gcsafe.} = ds.applyPatch(actor, patch.changes) Action(impl: impl, kind: patchAction) proc pendingPatch(actor): var Action = for a in actor.pendingActions.mitems: if a.kind == patchAction: return a actor.pendingActions.add(initPatch()) actor.pendingActions[actor.pendingActions.high] proc adjust(patch: var Action; v: Value; delta: int) = discard patch.changes.change(v, delta) proc assert(actor; a: Value) = actor.pendingPatch.adjust(a, +1) proc retract(actor; a: Value) = actor.pendingPatch.adjust(a, -1) proc install(ep: Endpoint; spec: EndpointSpec) = ep.spec = spec if not ep.spec.assertion.isFalse: ep.facet.actor.assert(ep.spec.assertion) ep.spec.analysis.map do (a: Analysis): assert(not ep.spec.callback.isNil) ep.facet.actor.dataspace.index.addHandler(a, ep.spec.callback) proc isRunnable(actor): bool = for tasks in actor.pendingTasks: if tasks.len > 0: return true proc scheduleTask(actor; prio: Priority; task: Task[void]) = if not actor.isRunnable: actor.dataspace.runnable.add(actor) actor.pendingTasks[prio].addLast(task) proc scheduleTask(actor; task: Task[void]) = scheduleTask(actor, pNormal, task) proc abandonQueuedWork(actor) = reset actor.pendingActions for q in actor.pendingTasks.mitems: clear(q) proc uninstall(ep: Endpoint; emitPatches: bool) = if emitPatches: if not ep.spec.assertion.isFalse: ep.facet.actor.retract(ep.spec.assertion) ep.spec.analysis.map do (a: Analysis): assert(not ep.spec.callback.isNil) ep.facet.actor.dataspace.index.removeHandler(a, ep.spec.callback) proc destroy(ep: Endpoint; emitPatches: bool) = ep.facet.actor.dataspace.dataflow.forgetSubject(ep) ep.uninstall(emitPatches) ep.facet.actor.scheduleTask(pGC) do (): ep.facet.endpoints.del(ep.id) # TODO: cannot remove from ep.facet.endpoints during # its iteration, defering remove is probably unecessary # because the facet is going down. proc retractAssertionsAndSubscriptions(facet; emitPatches: bool) = facet.actor.scheduleTask do (): for ep in facet.endpoints.values: ep.destroy(emitPatches) clear(facet.endpoints) proc abort(facet; emitPatches: bool) = facet.isLive = false for child in facet.children.values: child.abort(emitPatches) facet.retractAssertionsAndSubscriptions(emitPatches) for s in facet.stopScripts: s(facet) # call stopScripts immediately proc enqueueScriptAction(actor; action: Action) = actor.pendingActions.add(action) proc enqueueScriptAction(facet; action: Action) = enqueueScriptAction(facet.actor, action) proc initQuitAction(): Action = proc impl(action: Action; ds: Dataspace; actor: Option[Actor]) = assert(actor.isSome) ds.applyPatch(actor, actor.get.cleanupChanges) ds.actors.del(actor.get.id) Action(impl: impl, kind: quitAction) proc terminate(actor; emitPatches: bool) = if emitPatches: actor.scheduleTask do (): for a in actor.adhocAssertions.keys: actor.retract(a) actor.rootFacet.map do (root: Facet): root.abort(emitPatches) actor.scheduleTask do (): actor.enqueueScriptAction(initQuitAction()) proc invokeScript(facet; script: Script[void]) = try: script(facet) except: let e = getCurrentException() # TODO: install an error handling callback at the facet? facet.actor.abandonQueuedWork() facet.actor.terminate(false) raise e func isInert(facet): bool = facet.endpoints.len == 0 and facet.children.len == 0 proc terminate(facet) = if facet.isLive: let actor = facet.actor parent = facet.parent if parent.isNone: reset actor.rootFacet facet.isLive = false for child in facet.children.values: child.terminate() reset facet.children actor.scheduleTask do (): facet.invokeScript do (facet: Facet): for s in facet.stopScripts: s(facet) facet.retractAssertionsAndSubscriptions(true) actor.scheduleTask(pGC) do (): if parent.isSome: if parent.get.isInert: parent.get.terminate() else: actor.terminate(true) template withNonScriptContext(facet; body: untyped) = let inScriptPrev = facet.inScript facet.inScript = false try: body finally: facet.inScript = inScriptPrev proc ensureFacetSetup(facet; s: string) = assert(not facet.inScript, "Cannot " & s & " ouside facet setup") proc ensureNonFacetSetup(facet; s: string) = assert(facet.inScript, "Cannot " & s & " during facet setup") proc wrap(facet; script: Script[void]): Task[void] = proc task() = facet.invokeScript(script) task proc scheduleScript*(facet; prio: Priority; script: Script[void]) = facet.actor.scheduleTask(prio, facet.wrap(script)) proc scheduleScript*(facet; script: Script[void]) = facet.actor.scheduleTask(pNormal, facet.wrap(script)) proc addStartScript*(facet; s: Script[void]) = facet.ensureFacetSetup("onStart") facet.scheduleScript(pNormal, s) proc addStopScript*(facet; s: Script[void]) = facet.stopScripts.add(s) proc addFacet(actor; parentFacet: Option[Facet]; bootScript: Script[void]; checkInScript = false) = if checkInScript and parentFacet.isSome: assert parentFacet.get.inScript let f = Facet( id: actor.dataspace.generateId.FacetId, actor: actor, parent: parentFacet, isLive: true, inScript: true) if parentFacet.isSome: parentFacet.get.children[f.id] = f f.fields = parentFacet.get.fields # inherit scope by copying fields of the parent else: actor.rootFacet = some f f.invokeScript do (facet: Facet): facet.withNonScriptContext: bootScript(facet) actor.scheduleTask do (): if ((parentFacet.isSome) and (not parentFacet.get.isLive)) or f.isInert: f.terminate() proc addChildFacet*(facet; bootProc: Script[void]) = facet.actor.addFacet(some facet, bootProc, true) proc deliverMessage(ds: Dataspace; msg: Value; ac: Option[Actor]) = ds.index.deliverMessage(msg) proc adhocRetract(actor; a: Value) = if actor.adhocAssertions.change(a, -1, true) == cdPresentToAbsent: actor.retract(a) proc refresh(ep: Endpoint) = let newSpec = ep.updateProc(ep.facet) if newSpec.assertion != ep.spec.assertion: ep.uninstall(true) ep.install(newSpec) proc refreshAssertions(ds: Dataspace) = ds.dataflow.repairDamage do (ep: Endpoint): let facet = ep.facet assert(facet.isLive) facet.invokeScript do (f: Facet): f.withNonScriptContext: refresh(ep) proc addActor(ds: Dataspace; name: string; bootProc: Script[void]; initialAssertions: Value; parent: Option[Actor]) = var parentId: ActorId parent.map do (p: Actor): parentId = p.id let ac = newActor(ds, name, initialAssertions, parentId) ds.applyPatch(some ac, ac.adhocAssertions) ac.addFacet(none Facet) do (systemFacet: Facet): # Root facet is a dummy "system" facet that exists to hold # one-or-more "user" "root" facets. ac.addFacet(some systemFacet, bootProc) # ^ The "true root", user-visible facet. for a in initialAssertions.set: ac.adhocRetract(a) proc send*(facet; body: Value) = ## Send a message into the dataspace. facet.ensureNonFacetSetup("send") proc impl(_: Action; ds: Dataspace; actor: Option[Actor]) = ds.deliverMessage(body, actor) facet.enqueueScriptAction(Action(impl: impl, kind: messageAction)) proc initSpawnAction(name: string; bootProc: Script[void], initialAssertions: Value): Action = proc impl(action: Action; ds: Dataspace; actor: Option[Actor]) = ds.addActor(name, bootProc, initialAssertions, actor) Action(impl: impl, kind: spawnAction) proc spawn*(facet; name: string; bootProc: Script[void], initialAssertions: Value) = facet.ensureNonFacetSetup("spawn") facet.enqueueScriptAction(initSpawnAction(name, bootProc, initialAssertions)) proc spawn*(facet; name: string; bootProc: Script[void]) = spawn(facet, name, bootProc, Value(kind: pkSet)) #[ template spawn*(facet; name: string; fields: untyped; bootProc: Script[void]): untyped = type Fields = typeof(fields) spawn[Fields](facet, name, bootProc, Value(kind: pkSet)) ]# proc initActivationAction(script: ActivationScript; name: string): Action = proc impl(action: Action; ds: Dataspace; actor: Option[Actor]) = for s in ds.activations: if s == script: return ds.activations.add(script) proc boot(root: Facet) = root.addStartScript(script) ds.addActor(name, boot, Value(kind: pkSet), actor) Action(impl: impl, kind: activationAction) proc activate(facet; name: string; script: ActivationScript) = facet.ensureNonFacetSetup "`activate`" facet.enqueueScriptAction(initActivationAction(script, name)) proc newDataspace(ground: Ground; name: string; bootProc: ActivationScript): Dataspace = let turn = Turn(actions: @[initSpawnAction(name, bootProc, Value(kind: pkSet))]) Dataspace(ground: ground, index: initIndex(), pendingTurns: @[turn]) proc addEndpoint*(facet; updateScript: Script[EndpointSpec], isDynamic = true) = facet.ensureFacetSetup("addEndpoint") let actor = facet.actor dataspace = actor.dataspace ep = Endpoint( id: dataspace.generateId.EndpointId, facet: facet, updateProc: updateScript) dataspace.dataflow.addSubject(ep) let dyn = if isDynamic: some ep else: none Endpoint initialSpec = dataspace.dataflow.withSubject(dyn) do () -> EndpointSpec: updateScript(facet) assert: (initialSpec.analysis.isNone and initialSpec.callback.isNil) or (initialSpec.analysis.isSome and (not initialSpec.callback.isNil)) ep.install(initialSpec) facet.endpoints[ep.id] = ep proc addDataflow*(facet; prio: Priority; subjectProc: Script[void]) = facet.addEndpoint do (fa: Facet) -> EndpointSpec: let subjectId = facet.actor.dataspace.dataflow.currentSubjectId facet.scheduleScript(prio) do (fa: Facet): if facet.isLive: facet.actor.dataspace.dataflow.withSubject(subjectId): subjectProc(facet) proc addDataflow*(facet; subjectProc: Script[void]) = addDataflow(facet, pNormal, subjectProc) proc commitActions(dataspace; actor; pending: seq[Action]) = dataspace.pendingTurns.add(Turn(actor: some actor, actions: pending)) proc runPendingTask(actor): bool = for deque in actor.pendingTasks.mitems: if deque.len > 0: let task = deque.popFirst() task() actor.dataspace.refreshAssertions() return true proc runPendingTasks(actor) = while actor.runPendingTask(): discard if actor.pendingActions.len > 0: var pending = move actor.pendingActions actor.dataspace.commitActions(actor, pending) proc runPendingTasks(ds: Dataspace) = var runnable = move ds.runnable for actor in runnable: runPendingTasks(actor) proc performPendingActions(ds: Dataspace) = var turns = move ds.pendingTurns for turn in turns: for action in turn.actions: action.impl(action, ds, turn.actor) runPendingTasks(ds) proc runTasks(ds: Dataspace): bool = ds.runPendingTasks() ds.performPendingActions() result = ds.runnable.len > 0 or ds.pendingTurns.len > 0 proc stop*(facet; continuation: Script[void] = nil) = facet.parent.map do (parent: Facet): parent.invokeScript do (_: Facet): facet.actor.scheduleTask do (): facet.terminate() if not continuation.isNil: parent.scheduleScript do (parent: Facet): continuation(parent) # ^ TODO: is this the correct scope to use?? proc addStopHandler*(g: Ground; h: StopHandler) = g.stopHandlers.add(h) proc step(g: Ground) {.gcsafe.} proc scheduleStep(g: Ground) = if not g.stepScheduled: g.stepScheduled = true asyncdispatch.callSoon: step(g) proc beginExternalTask*(facet) = ## Inform the ``Ground`` dataspace of a pending external task. ## The dataspace will continue to operate until all internal ## and external tasks have completed. See ``endExternalTask``. inc facet.actor.dataspace.ground.externalTaskCount proc endExternalTask*(facet) = ## Inform the ``Ground`` dataspace that an external task has completed. # TODO: automatically do this when the facet stops? let g = facet.actor.dataspace.ground dec g.externalTaskCount scheduleStep g proc step(g: Ground) = # TODO: backgroundtasks g.stepScheduled = false if g.dataspace.runTasks(): scheduleStep g else: if g.externalTaskCount < 1: for actor in g.dataspace.actors.values: terminate(actor, false) for sh in g.stopHandlers: sh(g.dataspace) reset g.stopHandlers complete(g.future) proc bootModule*(name: string; bootProc: ActivationScript): Future[void] = # TODO: better integration with the async dispatcher let g = Ground(future: newFuture[void]"bootModule") g.dataspace = newDataspace(g, name) do (rootFacet: Facet): rootFacet.addStartScript do (rootFacet: Facet): rootFacet.activate(name, bootProc) addTimer(1, true) do (fd: AsyncFD) -> bool: step(g) true return g.future template declareField*(facet: Facet; F: untyped; T: typedesc; initial: T): untyped = ## Declare getter and setter procs for field `F` of type `T` initalized with `initial`. type DistinctField {.final, unpreservable.} = object of Field discard let `F` {.inject.} = DistinctField(id: facet.actor.dataspace.generateId.FieldId) facet.actor.dataspace.dataflow.defineObservableProperty(`F`.id) facet.fields.add(toPreserve(initial)) let fieldOff = facet.fields.high proc set(f: DistinctField; x: T) {.used.} = facet.actor.dataspace.dataflow.recordDamage(f.id) facet.fields[fieldOff] = toPreserve(x) proc set(f: DistinctField; x: Value) {.used.} = facet.actor.dataspace.dataflow.recordDamage(f.id) facet.fields[fieldOff] = x proc get(f: DistinctField): T {.used.} = facet.actor.dataspace.dataflow.recordObservation(f.id) if not fromPreserve(result, facet.fields[fieldOff]): raise newException(ValueError, "cannot convert field " & $F & " to " & $T) proc getPreserve(f: DistinctField): Value {.used.} = facet.actor.dataspace.dataflow.recordObservation(f.id) facet.fields[fieldOff] template stopIf*(facet: Facet; cond: untyped; continuation: Script[void]): untyped = ## Stop the current facet if `cond` is true and ## invoke `body` after the facet has stopped. discard facet.addDataflow do (facet: Facet): if cond: facet.stop(continuation) type EventHandler* = proc (facet: Facet; bindings: seq[Value]) {.gcsafe.} proc wrap*(facet: Facet; onEvent: EventKind; cb: EventHandler): HandlerCallback = proc wrapper(event: EventKind; bindings: seq[Value]) = facet.invokeScript do (facet: Facet): if event == onEvent: facet.scheduleScript do (facet: Facet): cb(facet, bindings) wrapper