syndicate-nim/src/syndicate.nim

640 lines
19 KiB
Nim

# SPDX-License-Identifier: ISC
import ./syndicate/bags, ./syndicate/dataflow, ./syndicate/events, ./syndicate/skeletons
import preserves
import asyncdispatch, deques, hashes, macros, options, sets, strutils, tables
export dataflow.defineObservableProperty
export dataflow.recordObservation
export dataflow.recordDamage
template generateIdType(T: untyped) =
type T* = distinct Natural
proc `==`*(x, y: T): bool {.borrow.}
proc `$`*(id: T): string {.borrow.}
generateIdType(ActorId)
generateIdType(FacetId)
generateIdType(EndpointId)
generateIdType(FieldId)
type
Value* = Preserve
Bag = bags.Bag[Value]
Task[T] = proc (): T
Script[T] = proc (facet: Facet): T
ActivationScript* = Script[void]
ActionKind = enum
patchAction, messageAction, spawnAction, quitAction, deferredTurnAction, activationAction
Action = object
impl: proc (action: Action; ds: Dataspace; actor: Option[Actor]) {.gcsafe.}
case kind: ActionKind
of patchAction:
changes: Bag
else:
discard
Priority = enum
pQueryHigh = 0,
pQuery,
pQueryHandler,
pNormal,
pGC,
pIdle,
len
Actor = ref object
id: ActorId
name: string
dataspace*: Dataspace
rootFacet: ParentFacet
pendingTasks: array[Priority.len, Deque[Task[void]]]
pendingActions: seq[Action]
adhocAssertions: Bag
cleanupChanges: Bag
parentId: ActorId
isRunnable: bool
EndpointSpec* = object
assertion*: Option[Value]
analysis*: Option[Analysis]
Endpoint = ref object
id: EndpointId
facet: Facet
updateProc: Script[EndpointSpec]
spec: EndpointSpec
Fields = seq[Preserve]
# TODO: compile-time tuples
Turn = object
actions: seq[Action]
actor: Option[Actor]
Dataspace* = ref object
ground*: Ground
index: Index
dataflow*: Graph[Endpoint, FieldId]
runnable: seq[Actor]
pendingTurns: seq[Turn]
actors: Table[ActorId, Actor]
activations: seq[ActivationScript]
nextId: Natural
StopHandler = proc (ds: Dataspace) {.gcsafe.}
Ground = ref object
dataspace: Dataspace
stopHandlers: seq[StopHandler]
future: Future[void]
ParentFacet = Option[Facet]
Facet* = ref FacetObj
FacetObj = object
id: FacetId
actor*: Actor
parent: ParentFacet
endpoints: Table[EndpointId, Endpoint]
stopScripts: seq[Script[void]]
children: Table[FacetId, Facet]
fields*: Fields
isLive, inScript: bool
# FacetImpl[Fields] = ref FacetImplObj[Fields]
# FacetImplObj[Fields] {.final.} = object of FacetBaseObj
using
dataspace: Dataspace
actor: Actor
facet: Facet
proc `$`*(spec: EndpointSpec): string =
result.add "{assertion: "
if spec.assertion.isSome:
result.add $(spec.assertion.get)
else:
result.add "nil"
result.add ", analysis: "
if spec.analysis.isSome:
result.add $spec.analysis.get
else:
result.add "nil"
result.add " }"
proc `$`*(actor): string =
result.add "Actor("
result.add $actor.id
result.add ','
result.add actor.name
result.add ')'
proc `$`*(facet): string =
result.add "Facet("
result.add $facet.actor.id
result.add ','
result.add facet.actor.name
result.add ','
result.add $facet.id
result.add ')'
proc hash*(ep: Endpoint): Hash =
!$(hash(ep.id) !& hash(ep.facet.id))
proc generateId*(ds: Dataspace): Natural =
# TODO: used by declareField, but should be hidden.
inc(ds.nextId)
ds.nextId
proc newActor(ds: Dataspace; name: string; initialAssertions: Value; parentId: ActorId): Actor =
assert(initialAssertions.kind == pkSet)
result = Actor(
id: ds.generateId.ActorId,
name: name,
dataspace: ds,
parentId: parentId)
for v in initialAssertions.set:
discard result.adhocAssertions.change(v, 1)
ds.actors[result.id] = result
proc newFacet(actor; parent: ParentFacet): Facet =
result = Facet(
id: actor.dataspace.generateId.FacetId,
actor: actor,
parent: parent,
isLive: true,
inScript: true)
if parent.isSome:
parent.get.children[result.id] = result
else:
actor.rootFacet = some result
proc applyPatch(ds: Dataspace; actor: Option[Actor]; changes: Bag) =
type Pair = tuple[val: Value; count: int]
var removals: seq[Pair]
for a, count in changes.pairs:
if count > 0:
# echo "applyPatch +", a
discard ds.index.adjustAssertion(a, count)
else:
removals.add((a, count))
actor.map do (ac: Actor):
discard ac.cleanupChanges.change(a, -count)
for (a, count) in removals:
# echo "applyPatch -", a
discard ds.index.adjustAssertion(a, count)
proc initPatch(): Action =
proc impl(patch: Action; ds: Dataspace; actor: Option[Actor]) {.gcsafe.} =
ds.applyPatch(actor, patch.changes)
Action(impl: impl, kind: patchAction)
proc pendingPatch(actor): var Action =
for a in actor.pendingActions.mitems:
if a.kind == patchAction: return a
actor.pendingActions.add(initPatch())
actor.pendingActions[actor.pendingActions.high]
proc adjust(patch: var Action; v: Value; delta: int) =
discard patch.changes.change(v, delta)
proc subscribe(ds: Dataspace; handler: Analysis) =
ds.index.addHandler(handler, handler.callback.get)
proc unsubscribe(ds: Dataspace; handler: Analysis) =
ds.index.removeHandler(handler, handler.callback.get)
proc assert(actor; a: Value) = actor.pendingPatch.adjust(a, +1)
proc retract(actor; a: Value) = actor.pendingPatch.adjust(a, -1)
proc install(ep: Endpoint; spec: EndpointSpec) =
ep.spec = spec
ep.spec.assertion.map do (a: Value):
ep.facet.actor.assert(a)
ep.spec.analysis.map do (a: Analysis):
ep.facet.actor.dataspace.subscribe(a)
proc scheduleTask(actor; prio: Priority; task: Task[void]) =
if not actor.isRunnable:
actor.isRunnable = true
actor.dataspace.runnable.add(actor)
actor.pendingTasks[prio].addLast(task)
proc scheduleTask(actor; task: Task[void]) =
scheduleTask(actor, pNormal, task)
proc abandonQueuedWork(actor) =
actor.pendingActions = @[]
for q in actor.pendingTasks.mitems: clear(q)
proc uninstall(ep: Endpoint; emitPatches: bool) =
if emitPatches:
ep.spec.assertion.map do (a: Value):
ep.facet.actor.retract(a)
ep.spec.analysis.map do (a: Analysis):
ep.facet.actor.dataspace.unsubscribe(a)
proc destroy(ep: Endpoint; emitPatches: bool) =
ep.facet.actor.dataspace.dataflow.forgetSubject(ep)
ep.uninstall(emitPatches)
ep.facet.actor.scheduleTask(pGC) do ():
ep.facet.endpoints.del(ep.id)
# TODO: cannot remove from ep.facet.endpoints during
# its iteration, defering remove is probably unecessary
# because the facet is going down.
proc retractAssertionsAndSubscriptions(facet; emitPatches: bool) =
facet.actor.scheduleTask do ():
for ep in facet.endpoints.values:
ep.destroy(emitPatches)
clear(facet.endpoints)
proc abort(facet; emitPatches: bool) =
facet.isLive = false
for child in facet.children.values:
child.abort(emitPatches)
facet.retractAssertionsAndSubscriptions(emitPatches)
proc enqueueScriptAction(actor; action: Action) =
actor.pendingActions.add(action)
proc enqueueScriptAction(facet; action: Action) =
enqueueScriptAction(facet.actor, action)
proc initQuitAction(): Action =
proc impl(action: Action; ds: Dataspace; actor: Option[Actor]) =
assert(actor.isSome)
ds.applyPatch(actor, actor.get.cleanupChanges)
ds.actors.del(actor.get.id)
Action(impl: impl, kind: quitAction)
proc terminate(actor; emitPatches: bool) =
if emitPatches:
actor.scheduleTask do ():
for a in actor.adhocAssertions.keys:
actor.retract(a)
actor.rootFacet.map do (root: Facet):
root.abort(emitPatches)
actor.scheduleTask do ():
actor.enqueueScriptAction(initQuitAction())
proc invokeScript(facet; script: Script[void]) =
try: script(facet)
except:
let e = getCurrentException()
# TODO: install an error handling callback at the facet?
facet.actor.abandonQueuedWork()
facet.actor.terminate(false)
raise e
func isInert(facet): bool =
facet.endpoints.len == 0 and facet.children.len == 0
proc terminate(facet) =
if facet.isLive:
let
actor = facet.actor
parent = facet.parent
if parent.isSome:
parent.get.children.del(facet.id)
else:
reset actor.rootFacet
facet.isLive = false
for child in facet.children.values:
child.terminate()
actor.scheduleTask do ():
facet.invokeScript do (facet: Facet):
for s in facet.stopScripts:
s(facet)
facet.retractAssertionsAndSubscriptions(true)
actor.scheduleTask(pGC) do ():
if parent.isSome:
if parent.get.isInert:
parent.get.terminate()
else:
actor.terminate(true)
template withNonScriptContext(facet; body: untyped) =
let inScriptPrev = facet.inScript
facet.inScript = false
try: body
finally: facet.inScript = inScriptPrev
proc ensureFacetSetup(facet; s: string) =
assert(not facet.inScript, "Cannot " & s & "ouside facet setup")
proc ensureNonFacetSetup(facet; s: string) =
assert(facet.inScript, "Cannot " & s & " during facet setup")
proc wrap(facet; script: Script[void]): Task[void] =
proc task() = facet.invokeScript(script)
task
proc wrap*(facet; cb: proc(facet: Facet; event: EventKind; bindings: seq[Value]) {.gcsafe.}): HandlerCallback =
proc wrapper(event: EventKind; bindings: seq[Value]) =
facet.invokeScript do (facet: Facet):
cb(facet, event, bindings)
wrapper
proc scheduleScript*(facet; prio: Priority; script: Script[void]) =
facet.actor.scheduleTask(prio, facet.wrap(script))
proc scheduleScript*(facet; script: Script[void]) =
facet.actor.scheduleTask(pNormal, facet.wrap(script))
proc addStartScript(facet; s: Script[void]) =
facet.ensureFacetSetup("onStart")
facet.scheduleScript(pNormal, s)
proc addFacet(actor; parentFacet: Option[Facet]; bootScript: Script[void]; checkInScript = false) =
if checkInScript and parentFacet.isSome:
assert parentFacet.get.inScript
let f = Facet(
id: actor.dataspace.generateId.FacetId,
actor: actor,
parent: parentFacet,
isLive: true,
inScript: true)
if parentFacet.isSome:
parentFacet.get.children[f.id] = f
f.fields = parentFacet.get.fields
# inherit scope by copying fields of the parent
else:
actor.rootFacet = some f
f.invokeScript do (facet: Facet):
facet.withNonScriptContext:
bootScript(facet)
actor.scheduleTask do ():
if ((parentFacet.isSome) and (not parentFacet.get.isLive)) or f.isInert:
f.terminate()
proc deliverMessage(ds: Dataspace; msg: Value; ac: Option[Actor]) =
ds.index.deliverMessage(msg)
proc adhocRetract(actor; a: Value) =
if actor.adhocAssertions.change(a, -1, true) == cdPresentToAbsent:
actor.retract(a)
proc refresh(ep: Endpoint) =
let newSpec = ep.updateProc(ep.facet)
if newSpec.assertion != ep.spec.assertion:
ep.uninstall(true)
ep.install(newSpec)
proc refreshAssertions(ds: Dataspace) =
ds.dataflow.repairDamage do (ep: Endpoint):
let facet = ep.facet
assert(facet.isLive)
facet.invokeScript do (f: Facet):
f.withNonScriptContext:
refresh(ep)
proc addActor(ds: Dataspace; name: string; bootProc: Script[void]; initialAssertions: Value; parent: Option[Actor]) =
var parentId: ActorId
parent.map do (p: Actor): parentId = p.id
let ac = newActor(ds, name, initialAssertions, parentId)
ds.applyPatch(some ac, ac.adhocAssertions)
ac.addFacet(none Facet) do (systemFacet: Facet):
# Root facet is a dummy "system" facet that exists to hold
# one-or-more "user" "root" facets.
ac.addFacet(some systemFacet, bootProc)
# ^ The "true root", user-visible facet.
for a in initialAssertions.set:
ac.adhocRetract(a)
proc send*(facet; body: Value) =
facet.ensureNonFacetSetup("send")
proc impl(action: Action; ds: Dataspace; actor: Option[Actor]) =
ds.deliverMessage(body, actor)
facet.enqueueScriptAction(Action(impl: impl, kind: messageAction))
proc initSpawnAction(name: string; bootProc: Script[void], initialAssertions: Value): Action =
proc impl(action: Action; ds: Dataspace; actor: Option[Actor]) =
ds.addActor(name, bootProc, initialAssertions, actor)
Action(impl: impl, kind: spawnAction)
proc spawn*(facet; name: string; bootProc: Script[void], initialAssertions: Value) =
facet.ensureNonFacetSetup("spawn")
facet.enqueueScriptAction(initSpawnAction(name, bootProc, initialAssertions))
proc spawn*(facet; name: string; bootProc: Script[void]) =
spawn(facet, name, bootProc, Value(kind: pkSet))
#[
template spawn*(facet; name: string; fields: untyped; bootProc: Script[void]): untyped =
type Fields = typeof(fields)
spawn[Fields](facet, name, bootProc, Value(kind: pkSet))
]#
proc initActivationAction(script: ActivationScript; name: string): Action =
proc impl(action: Action; ds: Dataspace; actor: Option[Actor]) =
for s in ds.activations:
if s == script: return
ds.activations.add(script)
proc boot(root: Facet) =
root.addStartScript(script)
ds.addActor(name, boot, Preserve(kind: pkSet), actor)
Action(impl: impl, kind: activationAction)
proc activate(facet; script: ActivationScript; name = "") =
facet.ensureNonFacetSetup "`activate`"
facet.enqueueScriptAction(initActivationAction(script, name))
proc newDataspace(ground: Ground; bootProc: ActivationScript): Dataspace =
let turn = Turn(actions: @[initSpawnAction("", bootProc, Value(kind: pkSet))])
Dataspace(ground: ground, index: initIndex(), pendingTurns: @[turn])
proc addEndpoint*(facet; updateScript: Script[EndpointSpec], isDynamic = true): Endpoint =
facet.ensureFacetSetup("add endpoint")
let
actor = facet.actor
dataspace = actor.dataspace
result = Endpoint(
id: dataspace.generateId.EndpointId,
facet: facet,
updateProc: updateScript)
dataspace.dataflow.addSubject(result)
let
dyn = if isDynamic: some result else: none Endpoint
initialSpec = dataspace.dataflow.withSubject(dyn) do () -> EndpointSpec:
updateScript(facet)
result.install(initialSpec)
facet.endpoints[result.id] = result
# dataspace.endpointHook(facet, result)
# Subclasses may override
proc addDataflow*(facet; prio: Priority; subjectProc: Script[void]): Endpoint =
facet.addEndpoint do (fa: Facet) -> EndpointSpec:
let subjectId = facet.actor.dataspace.dataflow.currentSubjectId
facet.scheduleScript(prio) do (fa: Facet):
if facet.isLive:
facet.actor.dataspace.dataflow.withSubject(subjectId):
subjectProc(facet)
# result is the default EndpointSpec
proc addDataflow*(facet; subjectProc: Script[void]): Endpoint =
addDataflow(facet, pNormal, subjectProc)
proc commitActions(dataspace; actor; pending: seq[Action]) =
dataspace.pendingTurns.add(Turn(actor: some actor, actions: pending))
proc runPendingTask(actor): bool =
for deque in actor.pendingTasks.mitems:
if deque.len > 0:
let task = deque.popFirst()
task()
actor.dataspace.refreshAssertions()
return true
proc runPendingTasks(actor) =
while actor.runPendingTask(): discard
actor.isRunnable = false
if actor.pendingActions.len > 0:
var pending = move actor.pendingActions
actor.dataspace.commitActions(actor, pending)
proc runPendingTasks(ds: Dataspace) =
var runnable = move ds.runnable
for actor in runnable:
runPendingTasks(actor)
proc performPendingActions(ds: Dataspace) =
var turns = move ds.pendingTurns
for turn in turns:
for action in turn.actions:
action.impl(action, ds, turn.actor)
runPendingTasks(ds)
proc runTasks(ds: Dataspace): bool =
ds.runPendingTasks()
ds.performPendingActions()
result = ds.runnable.len > 0 or ds.pendingTurns.len > 0
proc stop*(facet; continuation: Script[void]) =
facet.parent.map do (parent: Facet):
facet.actor.scheduleTask do ():
facet.terminate()
parent.scheduleScript do (parent: Facet):
continuation(parent)
# ^ TODO: is this the correct scope to use??
proc stop*(facet) =
facet.parent.map do (parent: Facet):
facet.actor.scheduleTask do ():
facet.terminate()
proc addStopHandler*(g: Ground; h: StopHandler) =
g.stopHandlers.add(h)
proc step(g: Ground) =
if g.dataspace.runTasks():
asyncdispatch.callSoon: step(g)
else:
for sh in g.stopHandlers:
sh(g.dataspace)
reset g.stopHandlers
complete(g.future)
proc bootModule*(bootProc: ActivationScript): Future[void] =
# TODO: better integration with the async dispatcher
let g = Ground(future: newFuture[void]"bootModule")
g.dataspace = newDataspace(g) do (rootFacet: Facet):
rootFacet.addStartScript do (rootFacet: Facet):
rootFacet.activate(bootProc)
addTimer(1, true) do (fd: AsyncFD) -> bool:
step(g)
true
return g.future
template declareField*(facet: Facet; F: untyped; T: typedesc; init: T): untyped =
## Declare getter and setter procs for field `F` of type `T` initalized with `init`.
# TODO: do more at compile-time, use a tuple rather than a sequence of Preserves.
facet.fields.add(toPreserve(init))
let
`F FieldOff` = facet.fields.high
`F FieldId` = facet.actor.dataspace.generateId.FieldId
facet.actor.dataspace.dataflow.defineObservableProperty(`F FieldId`)
proc `F`(fields: Fields): T =
facet.actor.dataspace.dataflow.recordObservation(`F FieldId`)
fromPreserve[T](result, facet.fields[`F FieldOff`])
proc `F =`(fields: var Fields; x: T) =
facet.actor.dataspace.dataflow.recordDamage(`F FieldId`)
facet.fields[`F FieldOff`] = toPreserve[T](x)
proc `F =`(fields: var Fields; x: Preserve) =
assert(x.kind == facet.fields[`F FieldOff`].kind, "invalid Preserves item for field type")
facet.actor.dataspace.dataflow.recordDamage(`F FieldId`)
facet.fields[`F FieldOff`] = x
#[
# Some early macros
proc send() =
discard
proc generateField(stmt: NimNode): NimNode =
stmt.expectLen 3
let l = stmt[1]
let r = stmt[2]
case r.kind
of nnkStmtList:
r.expectLen 1
result = newNimNode(nnkVarSection).add(newIdentDefs(l, r[0]))
else:
raiseAssert("unhandled field " & r.treeRepr)
proc generateStopOnTuple(output: var NimNode; stmt: NimNode): NimNode =
stmt.expectLen 3
let
cond = stmt[1]
body = stmt[2]
# symCond = genSym(ident="stopOnCond")
symStop = genSym(ident="stopOnCb")
# output.add(newProc(name = symCond, params = [ident"bool"], body = cond))
output.add(newProc(name = symStop, body = body))
newPar(newColonExpr(ident"cond", cond), newColonExpr(ident"body", symStop))
macro spawn*(label: string; body: untyped) =
## Spawn actor.
result = newNimNode(nnkStmtList)
let blockBody = newNimNode(nnkStmtList)
let actorSym = genSym(nskVar, label.strVal)
result.add(newNimNode(nnkVarSection).add(
newIdentDefs(actorSym, ident"Actor")))
var stopOnSeq = newNimNode(nnkBracket)
body.expectKind nnkStmtList
for stmt in body:
case stmt.kind
of nnkCommentStmt:
result.add(stmt)
of nnkCommand:
let cmd = stmt[0]
cmd.expectKind nnkIdent
if eqIdent(cmd, "field"):
result.add(generateField(stmt))
elif eqIdent(cmd, "stopOn"):
let stop = generateStopOnTuple(result, stmt)
stopOnSeq.add(stop)
else:
raiseAssert("unhandled spawn command " & cmd.repr)
else:
raiseAssert("unhandled statment " & stmt.treeRepr)
result.add(
newAssignment(newDotExpr(actorSym, ident"stops"), prefix(stopOnSeq, "@")))
#result.add(newBlockStmt(blockBody))
echo result.repr
# echo result.treeRepr
macro dumpStuff*(body: untyped): untyped =
echo body.treeRepr
]#