syndicate-nim/src/syndicate/dataspaces.nim

578 lines
18 KiB
Nim

# SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway
# SPDX-License-Identifier: Unlicense
import ./bags, ./dataflow, ./events, ./skeletons
import preserves
import std/[asyncdispatch, deques, hashes, macros, options, sets, tables]
export dataflow.defineObservableProperty
export dataflow.recordObservation
export dataflow.recordDamage
template generateIdType(T: untyped) =
type T* = distinct Natural
proc `==`*(x, y: T): bool {.borrow.}
proc `$`*(id: T): string {.borrow.}
generateIdType(ActorId)
generateIdType(FacetId)
generateIdType(EndpointId)
generateIdType(FieldId)
type
Value* = Preserve
Bag = bags.Bag[Value]
Task[T] = proc (): T
Script[T] = proc (facet: Facet): T
ActivationScript* = Script[void]
ActionKind = enum
patchAction, messageAction, spawnAction, quitAction, deferredTurnAction, activationAction
Action = object
impl: proc (action: Action; ds: Dataspace; actor: Option[Actor]) {.gcsafe.}
case kind: ActionKind
of patchAction:
changes: Bag
else:
discard
Priority = enum
pQueryHigh = 0,
pQuery,
pQueryHandler,
pNormal,
pGC,
pIdle,
len
Actor = ref object
id: ActorId
name: string
dataspace*: Dataspace
rootFacet: ParentFacet
pendingTasks: array[Priority.len, Deque[Task[void]]]
pendingActions: seq[Action]
adhocAssertions: Bag
cleanupChanges: Bag
parentId: ActorId
EndpointSpec* = tuple
callback: HandlerCallback
assertion: Value
analysis: Option[Analysis]
Endpoint = ref object
id: EndpointId
facet: Facet
updateProc: Script[EndpointSpec]
spec: EndpointSpec
Field* = object of RootObj
id*: FieldId
Fields* = seq[Value]
# TODO: compile-time tuples
Turn = object
actions: seq[Action]
actor: Option[Actor]
Dataspace* = ref object
ground*: Ground
index: Index
dataflow*: Graph[Endpoint, FieldId]
runnable: seq[Actor]
pendingTurns: seq[Turn]
actors: Table[ActorId, Actor]
activations: seq[ActivationScript]
nextId: Natural
StopHandler = proc (ds: Dataspace) {.gcsafe.}
Ground = ref object
dataspace: Dataspace
stopHandlers: seq[StopHandler]
future: Future[void]
externalTaskCount: int
stepScheduled: bool
ParentFacet = Option[Facet]
Facet* = ref FacetObj
FacetObj = object
id: FacetId
actor*: Actor
parent: ParentFacet
endpoints: Table[EndpointId, Endpoint]
stopScripts: seq[Script[void]]
children: Table[FacetId, Facet]
fields*: Fields
isLive, inScript: bool
# FacetImpl[Fields] = ref FacetImplObj[Fields]
# FacetImplObj[Fields] {.final.} = object of FacetBaseObj
using
dataspace: Dataspace
actor: Actor
facet: Facet
proc hash*(ep: Endpoint): Hash =
!$(hash(ep.id) !& hash(ep.facet.id))
proc generateId*(ds: Dataspace): Natural =
# TODO: used by declareField, but should be hidden.
inc(ds.nextId)
ds.nextId
proc newActor(ds: Dataspace; name: string; initialAssertions: Value; parentId: ActorId): Actor =
assert(initialAssertions.kind == pkSet)
result = Actor(
id: ds.generateId.ActorId,
name: name,
dataspace: ds,
parentId: parentId)
for v in initialAssertions.set:
discard result.adhocAssertions.change(v, 1)
ds.actors[result.id] = result
proc applyPatch(ds: Dataspace; actor: Option[Actor]; changes: Bag) =
type Pair = tuple[val: Value; count: int]
var removals: seq[Pair]
for a, count in changes.pairs:
if count > 0:
# debugEcho "applyPatch +", a
discard ds.index.adjustAssertion(a, count)
else:
removals.add((a, count))
actor.map do (ac: Actor):
discard ac.cleanupChanges.change(a, -count)
for (a, count) in removals:
# debugEcho "applyPatch -", a
discard ds.index.adjustAssertion(a, count)
proc initPatch(): Action =
proc impl(patch: Action; ds: Dataspace; actor: Option[Actor]) {.gcsafe.} =
ds.applyPatch(actor, patch.changes)
Action(impl: impl, kind: patchAction)
proc pendingPatch(actor): var Action =
for a in actor.pendingActions.mitems:
if a.kind == patchAction: return a
actor.pendingActions.add(initPatch())
actor.pendingActions[actor.pendingActions.high]
proc adjust(patch: var Action; v: Value; delta: int) =
discard patch.changes.change(v, delta)
proc assert(actor; a: Value) = actor.pendingPatch.adjust(a, +1)
proc retract(actor; a: Value) = actor.pendingPatch.adjust(a, -1)
proc install(ep: Endpoint; spec: EndpointSpec) =
ep.spec = spec
if not ep.spec.assertion.isNil:
ep.facet.actor.assert(ep.spec.assertion)
ep.spec.analysis.map do (a: Analysis):
assert(not ep.spec.callback.isNil)
ep.facet.actor.dataspace.index.addHandler(a, ep.spec.callback)
proc isRunnable(actor): bool =
for tasks in actor.pendingTasks:
if tasks.len > 0: return true
proc scheduleTask(actor; prio: Priority; task: Task[void]) =
if not actor.isRunnable:
actor.dataspace.runnable.add(actor)
actor.pendingTasks[prio].addLast(task)
proc scheduleTask(actor; task: Task[void]) =
scheduleTask(actor, pNormal, task)
proc abandonQueuedWork(actor) =
reset actor.pendingActions
for q in actor.pendingTasks.mitems: clear(q)
proc uninstall(ep: Endpoint; emitPatches: bool) =
if emitPatches:
if not ep.spec.assertion.isNil:
ep.facet.actor.retract(ep.spec.assertion)
ep.spec.analysis.map do (a: Analysis):
assert(not ep.spec.callback.isNil)
ep.facet.actor.dataspace.index.removeHandler(a, ep.spec.callback)
proc destroy(ep: Endpoint; emitPatches: bool) =
ep.facet.actor.dataspace.dataflow.forgetSubject(ep)
ep.uninstall(emitPatches)
ep.facet.actor.scheduleTask(pGC) do ():
ep.facet.endpoints.del(ep.id)
# TODO: cannot remove from ep.facet.endpoints during
# its iteration, defering remove is probably unecessary
# because the facet is going down.
proc retractAssertionsAndSubscriptions(facet; emitPatches: bool) =
facet.actor.scheduleTask do ():
for ep in facet.endpoints.values:
ep.destroy(emitPatches)
clear(facet.endpoints)
proc abort(facet; emitPatches: bool) =
facet.isLive = false
for child in facet.children.values:
child.abort(emitPatches)
facet.retractAssertionsAndSubscriptions(emitPatches)
for s in facet.stopScripts: s(facet)
# call stopScripts immediately
proc enqueueScriptAction(actor; action: Action) =
actor.pendingActions.add(action)
proc enqueueScriptAction(facet; action: Action) =
enqueueScriptAction(facet.actor, action)
proc initQuitAction(): Action =
proc impl(action: Action; ds: Dataspace; actor: Option[Actor]) =
assert(actor.isSome)
ds.applyPatch(actor, actor.get.cleanupChanges)
ds.actors.del(actor.get.id)
Action(impl: impl, kind: quitAction)
proc terminate(actor; emitPatches: bool) =
if emitPatches:
actor.scheduleTask do ():
for a in actor.adhocAssertions.keys:
actor.retract(a)
actor.rootFacet.map do (root: Facet):
root.abort(emitPatches)
actor.scheduleTask do ():
actor.enqueueScriptAction(initQuitAction())
proc invokeScript(facet; script: Script[void]) =
try: script(facet)
except:
let e = getCurrentException()
# TODO: install an error handling callback at the facet?
facet.actor.abandonQueuedWork()
facet.actor.terminate(false)
raise e
func isInert(facet): bool =
facet.endpoints.len == 0 and facet.children.len == 0
proc terminate(facet) =
if facet.isLive:
let
actor = facet.actor
parent = facet.parent
if parent.isNone:
reset actor.rootFacet
facet.isLive = false
for child in facet.children.values:
child.terminate()
reset facet.children
actor.scheduleTask do ():
facet.invokeScript do (facet: Facet):
for s in facet.stopScripts:
s(facet)
facet.retractAssertionsAndSubscriptions(true)
actor.scheduleTask(pGC) do ():
if parent.isSome:
if parent.get.isInert:
parent.get.terminate()
else:
actor.terminate(true)
template withNonScriptContext(facet; body: untyped) =
let inScriptPrev = facet.inScript
facet.inScript = false
try: body
finally: facet.inScript = inScriptPrev
proc ensureFacetSetup(facet; s: string) =
assert(not facet.inScript, "Cannot " & s & " ouside facet setup")
proc ensureNonFacetSetup(facet; s: string) =
assert(facet.inScript, "Cannot " & s & " during facet setup")
proc wrap(facet; script: Script[void]): Task[void] =
proc task() = facet.invokeScript(script)
task
proc scheduleScript*(facet; prio: Priority; script: Script[void]) =
facet.actor.scheduleTask(prio, facet.wrap(script))
proc scheduleScript*(facet; script: Script[void]) =
facet.actor.scheduleTask(pNormal, facet.wrap(script))
proc addStartScript*(facet; s: Script[void]) =
facet.ensureFacetSetup("onStart")
facet.scheduleScript(pNormal, s)
proc addStopScript*(facet; s: Script[void]) =
facet.stopScripts.add(s)
proc addFacet(actor; parentFacet: Option[Facet]; bootScript: Script[void]; checkInScript = false) =
if checkInScript and parentFacet.isSome:
assert parentFacet.get.inScript
let f = Facet(
id: actor.dataspace.generateId.FacetId,
actor: actor,
parent: parentFacet,
isLive: true,
inScript: true)
if parentFacet.isSome:
parentFacet.get.children[f.id] = f
f.fields = parentFacet.get.fields
# inherit scope by copying fields of the parent
else:
actor.rootFacet = some f
f.invokeScript do (facet: Facet):
facet.withNonScriptContext:
bootScript(facet)
actor.scheduleTask do ():
if ((parentFacet.isSome) and (not parentFacet.get.isLive)) or f.isInert:
f.terminate()
proc addChildFacet*(facet; bootProc: Script[void]) =
facet.actor.addFacet(some facet, bootProc, true)
proc deliverMessage(ds: Dataspace; msg: Value; ac: Option[Actor]) =
ds.index.deliverMessage(msg)
proc adhocRetract(actor; a: Value) =
if actor.adhocAssertions.change(a, -1, true) == cdPresentToAbsent:
actor.retract(a)
proc refresh(ep: Endpoint) =
let newSpec = ep.updateProc(ep.facet)
if newSpec.assertion != ep.spec.assertion:
ep.uninstall(true)
ep.install(newSpec)
proc refreshAssertions(ds: Dataspace) =
ds.dataflow.repairDamage do (ep: Endpoint):
let facet = ep.facet
assert(facet.isLive)
facet.invokeScript do (f: Facet):
f.withNonScriptContext:
refresh(ep)
proc addActor(ds: Dataspace; name: string; bootProc: Script[void]; initialAssertions: Value; parent: Option[Actor]) =
var parentId: ActorId
parent.map do (p: Actor): parentId = p.id
let ac = newActor(ds, name, initialAssertions, parentId)
ds.applyPatch(some ac, ac.adhocAssertions)
ac.addFacet(none Facet) do (systemFacet: Facet):
# Root facet is a dummy "system" facet that exists to hold
# one-or-more "user" "root" facets.
ac.addFacet(some systemFacet, bootProc)
# ^ The "true root", user-visible facet.
for a in initialAssertions.set:
ac.adhocRetract(a)
proc send*(facet; body: Value) =
## Send a message into the dataspace.
facet.ensureNonFacetSetup("send")
proc impl(_: Action; ds: Dataspace; actor: Option[Actor]) =
ds.deliverMessage(body, actor)
facet.enqueueScriptAction(Action(impl: impl, kind: messageAction))
proc initSpawnAction(name: string; bootProc: Script[void], initialAssertions: Value): Action =
proc impl(action: Action; ds: Dataspace; actor: Option[Actor]) =
ds.addActor(name, bootProc, initialAssertions, actor)
Action(impl: impl, kind: spawnAction)
proc spawn*(facet; name: string; bootProc: Script[void], initialAssertions: Value) =
facet.ensureNonFacetSetup("spawn")
facet.enqueueScriptAction(initSpawnAction(name, bootProc, initialAssertions))
proc spawn*(facet; name: string; bootProc: Script[void]) =
spawn(facet, name, bootProc, Value(kind: pkSet))
#[
template spawn*(facet; name: string; fields: untyped; bootProc: Script[void]): untyped =
type Fields = typeof(fields)
spawn[Fields](facet, name, bootProc, Value(kind: pkSet))
]#
proc initActivationAction(script: ActivationScript; name: string): Action =
proc impl(action: Action; ds: Dataspace; actor: Option[Actor]) =
for s in ds.activations:
if s == script: return
ds.activations.add(script)
proc boot(root: Facet) =
root.addStartScript(script)
ds.addActor(name, boot, Value(kind: pkSet), actor)
Action(impl: impl, kind: activationAction)
proc activate(facet; name: string; script: ActivationScript) =
facet.ensureNonFacetSetup "`activate`"
facet.enqueueScriptAction(initActivationAction(script, name))
proc newDataspace(ground: Ground; name: string; bootProc: ActivationScript): Dataspace =
let turn = Turn(actions: @[initSpawnAction(name, bootProc, Value(kind: pkSet))])
Dataspace(ground: ground, index: initIndex(), pendingTurns: @[turn])
proc addEndpoint*(facet; updateScript: Script[EndpointSpec], isDynamic = true) =
facet.ensureFacetSetup("addEndpoint")
let
actor = facet.actor
dataspace = actor.dataspace
ep = Endpoint(
id: dataspace.generateId.EndpointId,
facet: facet,
updateProc: updateScript)
dataspace.dataflow.addSubject(ep)
let
dyn = if isDynamic: some ep else: none Endpoint
initialSpec = dataspace.dataflow.withSubject(dyn) do () -> EndpointSpec:
updateScript(facet)
assert:
(initialSpec.analysis.isNone and initialSpec.callback.isNil) or
(initialSpec.analysis.isSome and (not initialSpec.callback.isNil))
ep.install(initialSpec)
facet.endpoints[ep.id] = ep
proc addDataflow*(facet; prio: Priority; subjectProc: Script[void]) =
facet.addEndpoint do (fa: Facet) -> EndpointSpec:
let subjectId = facet.actor.dataspace.dataflow.currentSubjectId
facet.scheduleScript(prio) do (fa: Facet):
if facet.isLive:
facet.actor.dataspace.dataflow.withSubject(subjectId):
subjectProc(facet)
proc addDataflow*(facet; subjectProc: Script[void]) =
addDataflow(facet, pNormal, subjectProc)
proc commitActions(dataspace; actor; pending: seq[Action]) =
dataspace.pendingTurns.add(Turn(actor: some actor, actions: pending))
proc runPendingTask(actor): bool =
for deque in actor.pendingTasks.mitems:
if deque.len > 0:
let task = deque.popFirst()
task()
actor.dataspace.refreshAssertions()
return true
proc runPendingTasks(actor) =
while actor.runPendingTask(): discard
if actor.pendingActions.len > 0:
var pending = move actor.pendingActions
actor.dataspace.commitActions(actor, pending)
proc runPendingTasks(ds: Dataspace) =
var runnable = move ds.runnable
for actor in runnable:
runPendingTasks(actor)
proc performPendingActions(ds: Dataspace) =
var turns = move ds.pendingTurns
for turn in turns:
for action in turn.actions:
action.impl(action, ds, turn.actor)
runPendingTasks(ds)
proc runTasks(ds: Dataspace): bool =
ds.runPendingTasks()
ds.performPendingActions()
result = ds.runnable.len > 0 or ds.pendingTurns.len > 0
proc stop*(facet; continuation: Script[void] = nil) =
facet.parent.map do (parent: Facet):
parent.invokeScript do (_: Facet):
facet.actor.scheduleTask do ():
facet.terminate()
if not continuation.isNil:
parent.scheduleScript do (parent: Facet):
continuation(parent)
# ^ TODO: is this the correct scope to use??
proc addStopHandler*(g: Ground; h: StopHandler) =
g.stopHandlers.add(h)
proc step(g: Ground) {.gcsafe.}
proc scheduleStep(g: Ground) =
if not g.stepScheduled:
g.stepScheduled = true
asyncdispatch.callSoon: step(g)
proc beginExternalTask*(facet) =
## Inform the ``Ground`` dataspace of a pending external task.
## The dataspace will continue to operate until all internal
## and external tasks have completed. See ``endExternalTask``.
inc facet.actor.dataspace.ground.externalTaskCount
proc endExternalTask*(facet) =
## Inform the ``Ground`` dataspace that an external task has completed.
# TODO: automatically do this when the facet stops?
let g = facet.actor.dataspace.ground
dec g.externalTaskCount
scheduleStep g
proc step(g: Ground) =
# TODO: backgroundtasks
g.stepScheduled = false
if g.dataspace.runTasks():
scheduleStep g
else:
if g.externalTaskCount < 1:
for actor in g.dataspace.actors.values:
terminate(actor, false)
for sh in g.stopHandlers:
sh(g.dataspace)
reset g.stopHandlers
complete(g.future)
proc bootModule*(name: string; bootProc: ActivationScript): Future[void] =
# TODO: better integration with the async dispatcher
let g = Ground(future: newFuture[void]"bootModule")
g.dataspace = newDataspace(g, name) do (rootFacet: Facet):
rootFacet.addStartScript do (rootFacet: Facet):
rootFacet.activate(name, bootProc)
addTimer(1, true) do (fd: AsyncFD) -> bool:
step(g)
true
return g.future
template declareField*(facet: Facet; F: untyped; T: typedesc; initial: T): untyped =
## Declare getter and setter procs for field `F` of type `T` initalized with `initial`.
type DistinctField {.final, unpreservable.} = object of Field
discard
let `F` {.inject.} = DistinctField(id: facet.actor.dataspace.generateId.FieldId)
facet.actor.dataspace.dataflow.defineObservableProperty(`F`.id)
facet.fields.add(toPreserve(initial))
let fieldOff = facet.fields.high
proc set(f: DistinctField; x: T) {.used.} =
facet.actor.dataspace.dataflow.recordDamage(f.id)
facet.fields[fieldOff] = toPreserve(x)
proc set(f: DistinctField; x: Value) {.used.} =
facet.actor.dataspace.dataflow.recordDamage(f.id)
facet.fields[fieldOff] = x
proc get(f: DistinctField): T {.used.} =
facet.actor.dataspace.dataflow.recordObservation(f.id)
if not fromPreserve(result, facet.fields[fieldOff]):
raise newException(ValueError, "cannot convert field " & $F & " to " & $T)
proc getPreserve(f: DistinctField): Value {.used.} =
facet.actor.dataspace.dataflow.recordObservation(f.id)
facet.fields[fieldOff]
template stopIf*(facet: Facet; cond: untyped; continuation: Script[void]): untyped =
## Stop the current facet if `cond` is true and
## invoke `body` after the facet has stopped.
discard facet.addDataflow do (facet: Facet):
if cond: facet.stop(continuation)
type EventHandler* = proc (facet: Facet; bindings: seq[Value]) {.gcsafe.}
proc wrap*(facet: Facet; onEvent: EventKind; cb: EventHandler): HandlerCallback =
proc wrapper(event: EventKind; bindings: seq[Value]) =
facet.invokeScript do (facet: Facet):
if event == onEvent:
facet.scheduleScript do (facet: Facet):
cb(facet, bindings)
wrapper