From dd977991addee30632349d6d9422b7d83d30c754 Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Thu, 24 Jun 2021 17:50:27 +0200 Subject: [PATCH] Initial commit A mostly verbatim translation of syndicate-js. https://git.syndicate-lang.org/syndicate-lang/syndicate-js --- .gitignore | 1 + README.md | 3 + src/syndicate.nim | 649 ++++++++++++++++++++++++++++++++++ src/syndicate/assertions.nim | 11 + src/syndicate/bags.nim | 37 ++ src/syndicate/dataflow.nim | 80 +++++ src/syndicate/events.nim | 3 + src/syndicate/skeletons.nim | 297 ++++++++++++++++ syndicate.nimble | 12 + tests/config.nims | 1 + tests/test_box_and_client.nim | 73 ++++ 11 files changed, 1167 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 src/syndicate.nim create mode 100644 src/syndicate/assertions.nim create mode 100644 src/syndicate/bags.nim create mode 100644 src/syndicate/dataflow.nim create mode 100644 src/syndicate/events.nim create mode 100644 src/syndicate/skeletons.nim create mode 100644 syndicate.nimble create mode 100644 tests/config.nims create mode 100644 tests/test_box_and_client.nim diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a954504 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +tests/test_box_and_client diff --git a/README.md b/README.md new file mode 100644 index 0000000..7e59cc3 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# Syndicate + +Nim implementation of [Syndicate](https://syndicate-lang.org/) dataspaces and actors. diff --git a/src/syndicate.nim b/src/syndicate.nim new file mode 100644 index 0000000..996526f --- /dev/null +++ b/src/syndicate.nim @@ -0,0 +1,649 @@ +# SPDX-License-Identifier: ISC + +import ./syndicate/bags, ./syndicate/dataflow, ./syndicate/events, ./syndicate/skeletons +import preserves +import asyncdispatch, deques, hashes, macros, options, sets, strutils, tables + +export dataflow.defineObservableProperty +export dataflow.recordObservation +export dataflow.recordDamage + +template generateIdType(T: untyped) = + type T* = distinct Natural + proc `==`*(x, y: T): bool {.borrow.} + proc `$`*(id: T): string {.borrow.} + +generateIdType(ActorId) +generateIdType(FacetId) +generateIdType(EndpointId) +generateIdType(FieldId) + +type + Value* = Preserve + Bag = bags.Bag[Value] + + Task[T] = proc (): T + Script[T] = proc (facet: Facet): T + ActivationScript* = Script[void] + + ActionKind = enum + patchAction, messageAction, spawnAction, quitAction, deferredTurnAction, activationAction + + Action = object + impl: proc (action: Action; ds: Dataspace; actor: Option[Actor]) {.gcsafe.} + case kind: ActionKind + of patchAction: + changes: Bag + else: + discard + + Priority = enum + pQueryHigh = 0, + pQuery, + pQueryHandler, + pNormal, + pGC, + pIdle, + len + + Actor = ref object + id: ActorId + name: string + dataspace*: Dataspace + rootFacet: ParentFacet + pendingTasks: array[Priority.len, Deque[Task[void]]] + pendingActions: seq[Action] + adhocAssertions: Bag + cleanupChanges: Bag + parentId: ActorId + isRunnable: bool + + EndpointSpec* = object + assertion*: Option[Value] + analysis*: Option[Analysis] + + Endpoint = ref object + id: EndpointId + facet: Facet + updateProc: Script[EndpointSpec] + spec: EndpointSpec + + Field = tuple[name: string; value: Preserve] + Fields = seq[Field] + # TODO: compile-time tuples + + Turn = object + actions: seq[Action] + actor: Option[Actor] + + Dataspace* = ref object + ground*: Ground + index: Index + dataflow*: Graph[Endpoint, FieldId] + runnable: seq[Actor] + pendingTurns: seq[Turn] + actors: Table[ActorId, Actor] + activations: seq[ActivationScript] + nextId: Natural + + StopHandler = proc (ds: Dataspace) {.gcsafe.} + + Ground = ref object + dataspace: Dataspace + stopHandlers: seq[StopHandler] + future: Future[void] + + ParentFacet = Option[Facet] + + Facet* = ref FacetObj + FacetObj = object + id: FacetId + actor*: Actor + parent: ParentFacet + endpoints: Table[EndpointId, Endpoint] + stopScripts: seq[Script[void]] + children: Table[FacetId, Facet] + fields*: Fields + isLive, inScript: bool + + # FacetImpl[Fields] = ref FacetImplObj[Fields] + # FacetImplObj[Fields] {.final.} = object of FacetBaseObj + +using + dataspace: Dataspace + actor: Actor + facet: Facet + +proc `$`*(spec: EndpointSpec): string = + result.add "{assertion: " + if spec.assertion.isSome: + result.add $(spec.assertion.get) + else: + result.add "nil" + result.add ", analysis: " + if spec.analysis.isSome: + result.add $spec.analysis.get + else: + result.add "nil" + result.add " }" + +proc `$`*(actor): string = + result.add "Actor(" + result.add $actor.id + result.add ',' + result.add actor.name + result.add ')' + +proc `$`*(facet): string = + result.add "Facet(" + result.add $facet.actor.id + result.add ',' + result.add facet.actor.name + result.add ',' + result.add $facet.id + result.add ')' + +proc hash*(ep: Endpoint): Hash = + !$(hash(ep.id) !& hash(ep.facet.id)) + +proc generateId*(ds: Dataspace): Natural = + # TODO: used by declareField, but should be hidden. + inc(ds.nextId) + ds.nextId + +proc newActor(ds: Dataspace; name: string; initialAssertions: Value; parentId: ActorId): Actor = + assert(initialAssertions.kind == pkSet) + result = Actor( + id: ds.generateId.ActorId, + name: name, + dataspace: ds, + parentId: parentId) + for v in initialAssertions.set: + discard result.adhocAssertions.change(v, 1) + ds.actors[result.id] = result + +proc newFacet(actor; parent: ParentFacet): Facet = + result = Facet( + id: actor.dataspace.generateId.FacetId, + actor: actor, + parent: parent, + isLive: true, + inScript: true) + if parent.isSome: + parent.get.children[result.id] = result + else: + actor.rootFacet = some result + +proc applyPatch(ds: Dataspace; actor: Option[Actor]; changes: Bag) = + type Pair = tuple[val: Value; count: int] + var removals: seq[Pair] + for a, count in changes.pairs: + if count > 0: + # echo "applyPatch +", a + discard ds.index.adjustAssertion(a, count) + else: + removals.add((a, count)) + actor.map do (ac: Actor): + discard ac.cleanupChanges.change(a, -count) + for (a, count) in removals: + # echo "applyPatch -", a + discard ds.index.adjustAssertion(a, count) + +proc initPatch(): Action = + proc impl(patch: Action; ds: Dataspace; actor: Option[Actor]) {.gcsafe.} = + ds.applyPatch(actor, patch.changes) + Action(impl: impl, kind: patchAction) + +proc pendingPatch(actor): var Action = + for a in actor.pendingActions.mitems: + if a.kind == patchAction: return a + actor.pendingActions.add(initPatch()) + actor.pendingActions[actor.pendingActions.high] + +proc adjust(patch: var Action; v: Value; delta: int) = + discard patch.changes.change(v, delta) + +proc subscribe(ds: Dataspace; handler: Analysis) = + ds.index.addHandler(handler, handler.callback.get) + +proc unsubscribe(ds: Dataspace; handler: Analysis) = + ds.index.removeHandler(handler, handler.callback.get) + +proc assert(actor; a: Value) = actor.pendingPatch.adjust(a, +1) + +proc retract(actor; a: Value) = actor.pendingPatch.adjust(a, -1) + +proc install(ep: Endpoint; spec: EndpointSpec) = + ep.spec = spec + ep.spec.assertion.map do (a: Value): + ep.facet.actor.assert(a) + ep.spec.analysis.map do (a: Analysis): + ep.facet.actor.dataspace.subscribe(a) + +proc scheduleTask(actor; prio: Priority; task: Task[void]) = + if not actor.isRunnable: + actor.isRunnable = true + actor.dataspace.runnable.add(actor) + actor.pendingTasks[prio].addLast(task) + +proc scheduleTask(actor; task: Task[void]) = + scheduleTask(actor, pNormal, task) + +proc abandonQueuedWork(actor) = + actor.pendingActions = @[] + for q in actor.pendingTasks.mitems: clear(q) + +proc uninstall(ep: Endpoint; emitPatches: bool) = + if emitPatches: + ep.spec.assertion.map do (a: Value): + ep.facet.actor.retract(a) + ep.spec.analysis.map do (a: Analysis): + ep.facet.actor.dataspace.unsubscribe(a) + +proc destroy(ep: Endpoint; emitPatches: bool) = + ep.facet.actor.dataspace.dataflow.forgetSubject(ep) + ep.uninstall(emitPatches) + ep.facet.actor.scheduleTask(pGC) do (): + ep.facet.endpoints.del(ep.id) + # TODO: cannot remove from ep.facet.endpoints during + # its iteration, defering remove is probably unecessary + # because the facet is going down. + +proc retractAssertionsAndSubscriptions(facet; emitPatches: bool) = + facet.actor.scheduleTask do (): + for ep in facet.endpoints.values: + ep.destroy(emitPatches) + clear(facet.endpoints) + +proc abort(facet; emitPatches: bool) = + facet.isLive = false + for child in facet.children.values: + child.abort(emitPatches) + facet.retractAssertionsAndSubscriptions(emitPatches) + +proc enqueueScriptAction(actor; action: Action) = + actor.pendingActions.add(action) + +proc enqueueScriptAction(facet; action: Action) = + enqueueScriptAction(facet.actor, action) + +proc initQuitAction(): Action = + proc impl(action: Action; ds: Dataspace; actor: Option[Actor]) = + assert(actor.isSome) + ds.applyPatch(actor, actor.get.cleanupChanges) + ds.actors.del(actor.get.id) + Action(impl: impl, kind: quitAction) + +proc terminate(actor; emitPatches: bool) = + if emitPatches: + actor.scheduleTask do (): + for a in actor.adhocAssertions.keys: + actor.retract(a) + actor.rootFacet.map do (root: Facet): + root.abort(emitPatches) + actor.scheduleTask do (): + actor.enqueueScriptAction(initQuitAction()) + +proc invokeScript(facet; script: Script[void]) = + try: script(facet) + except: + let e = getCurrentException() + # TODO: install an error handling callback at the facet? + facet.actor.abandonQueuedWork() + facet.actor.terminate(false) + raise e + +func isInert(facet): bool = + facet.endpoints.len == 0 and facet.children.len == 0 + +proc terminate(facet) = + if facet.isLive: + let + actor = facet.actor + parent = facet.parent + if parent.isSome: + parent.get.children.del(facet.id) + else: + reset actor.rootFacet + facet.isLive = false + for child in facet.children.values: + child.terminate() + actor.scheduleTask do (): + facet.invokeScript do (facet: Facet): + for s in facet.stopScripts: + s(facet) + + facet.retractAssertionsAndSubscriptions(true) + actor.scheduleTask(pGC) do (): + if parent.isSome: + if parent.get.isInert: + parent.get.terminate() + else: + actor.terminate(true) + +template withNonScriptContext(facet; body: untyped) = + let inScriptPrev = facet.inScript + facet.inScript = false + try: body + finally: facet.inScript = inScriptPrev + +proc ensureFacetSetup(facet; s: string) = + assert(not facet.inScript, "Cannot " & s & "ouside facet setup") + +proc ensureNonFacetSetup(facet; s: string) = + assert(facet.inScript, "Cannot " & s & " during facet setup") + +proc wrap(facet; script: Script[void]): Task[void] = + proc task() = facet.invokeScript(script) + task + +proc wrap*(facet; cb: proc(facet: Facet; event: EventKind; bindings: seq[Value]) {.gcsafe.}): HandlerCallback = + proc wrapper(event: EventKind; bindings: seq[Value]) = + facet.invokeScript do (facet: Facet): + cb(facet, event, bindings) + wrapper + +proc scheduleScript*(facet; prio: Priority; script: Script[void]) = + facet.actor.scheduleTask(prio, facet.wrap(script)) + +proc scheduleScript*(facet; script: Script[void]) = + facet.actor.scheduleTask(pNormal, facet.wrap(script)) + +proc addStartScript(facet; s: Script[void]) = + facet.ensureFacetSetup("onStart") + facet.scheduleScript(pNormal, s) + +proc addFacet(actor; parentFacet: Option[Facet]; bootScript: Script[void]; checkInScript = false) = + if checkInScript and parentFacet.isSome: + assert parentFacet.get.inScript + let f = Facet( + id: actor.dataspace.generateId.FacetId, + actor: actor, + parent: parentFacet, + isLive: true, + inScript: true) + if parentFacet.isSome: + parentFacet.get.children[f.id] = f + f.fields = parentFacet.get.fields + # inherit scope by copying fields of the parent + else: + actor.rootFacet = some f + f.invokeScript do (facet: Facet): + facet.withNonScriptContext: + bootScript(facet) + actor.scheduleTask do (): + if ((parentFacet.isSome) and (not parentFacet.get.isLive)) or f.isInert: + f.terminate() + +proc deliverMessage(ds: Dataspace; msg: Value; ac: Option[Actor]) = + ds.index.deliverMessage(msg) + +proc adhocRetract(actor; a: Value) = + if actor.adhocAssertions.change(a, -1, true) == cdPresentToAbsent: + actor.retract(a) + +proc refresh(ep: Endpoint) = + let newSpec = ep.updateProc(ep.facet) + if newSpec.assertion != ep.spec.assertion: + ep.uninstall(true) + ep.install(newSpec) + +proc refreshAssertions(ds: Dataspace) = + ds.dataflow.repairDamage do (ep: Endpoint): + let facet = ep.facet + assert(facet.isLive) + facet.invokeScript do (f: Facet): + f.withNonScriptContext: + refresh(ep) + +proc addActor(ds: Dataspace; name: string; bootProc: Script[void]; initialAssertions: Value; parent: Option[Actor]) = + var parentId: ActorId + parent.map do (p: Actor): parentId = p.id + let ac = newActor(ds, name, initialAssertions, parentId) + ds.applyPatch(some ac, ac.adhocAssertions) + ac.addFacet(none Facet) do (systemFacet: Facet): + # Root facet is a dummy "system" facet that exists to hold + # one-or-more "user" "root" facets. + ac.addFacet(some systemFacet, bootProc) + # ^ The "true root", user-visible facet. + for a in initialAssertions.set: + ac.adhocRetract(a) + +proc send*(facet; body: Value) = + facet.ensureNonFacetSetup("send") + proc impl(action: Action; ds: Dataspace; actor: Option[Actor]) = + ds.deliverMessage(body, actor) + facet.enqueueScriptAction(Action(impl: impl, kind: messageAction)) + +proc initSpawnAction(name: string; bootProc: Script[void], initialAssertions: Value): Action = + proc impl(action: Action; ds: Dataspace; actor: Option[Actor]) = + ds.addActor(name, bootProc, initialAssertions, actor) + Action(impl: impl, kind: spawnAction) + +proc spawn*(facet; name: string; bootProc: Script[void], initialAssertions: Value) = + facet.ensureNonFacetSetup("spawn") + facet.enqueueScriptAction(initSpawnAction(name, bootProc, initialAssertions)) + +proc spawn*(facet; name: string; bootProc: Script[void]) = + spawn(facet, name, bootProc, Value(kind: pkSet)) + +#[ +template spawn*(facet; name: string; fields: untyped; bootProc: Script[void]): untyped = + type Fields = typeof(fields) + spawn[Fields](facet, name, bootProc, Value(kind: pkSet)) +]# + +proc initActivationAction(script: ActivationScript; name: string): Action = + proc impl(action: Action; ds: Dataspace; actor: Option[Actor]) = + for s in ds.activations: + if s == script: return + ds.activations.add(script) + proc boot(root: Facet) = + root.addStartScript(script) + ds.addActor(name, boot, Preserve(kind: pkSet), actor) + Action(impl: impl, kind: activationAction) + +proc activate(facet; script: ActivationScript; name = "") = + facet.ensureNonFacetSetup "`activate`" + facet.enqueueScriptAction(initActivationAction(script, name)) + +proc newDataspace(ground: Ground; bootProc: ActivationScript): Dataspace = + let turn = Turn(actions: @[initSpawnAction("", bootProc, Value(kind: pkSet))]) + Dataspace(ground: ground, index: initIndex(), pendingTurns: @[turn]) + +proc addEndpoint*(facet; updateScript: Script[EndpointSpec], isDynamic = true): Endpoint = + facet.ensureFacetSetup("add endpoint") + let + actor = facet.actor + dataspace = actor.dataspace + result = Endpoint( + id: dataspace.generateId.EndpointId, + facet: facet, + updateProc: updateScript) + dataspace.dataflow.addSubject(result) + let + dyn = if isDynamic: some result else: none Endpoint + initialSpec = dataspace.dataflow.withSubject(dyn) do () -> EndpointSpec: + updateScript(facet) + result.install(initialSpec) + facet.endpoints[result.id] = result + # dataspace.endpointHook(facet, result) + # Subclasses may override + +proc addDataflow*(facet; prio: Priority; subjectProc: Script[void]): Endpoint = + facet.addEndpoint do (fa: Facet) -> EndpointSpec: + let subjectId = facet.actor.dataspace.dataflow.currentSubjectId + facet.scheduleScript(prio) do (fa: Facet): + if facet.isLive: + facet.actor.dataspace.dataflow.withSubject(subjectId): + subjectProc(facet) + # result is the default EndpointSpec + +proc addDataflow*(facet; subjectProc: Script[void]): Endpoint = + addDataflow(facet, pNormal, subjectProc) + +proc commitActions(dataspace; actor; pending: seq[Action]) = + dataspace.pendingTurns.add(Turn(actor: some actor, actions: pending)) + +proc runPendingTask(actor): bool = + for deque in actor.pendingTasks.mitems: + if deque.len > 0: + let task = deque.popFirst() + task() + actor.dataspace.refreshAssertions() + return true + +proc runPendingTasks(actor) = + while actor.runPendingTask(): discard + actor.isRunnable = false + if actor.pendingActions.len > 0: + var pending = move actor.pendingActions + actor.dataspace.commitActions(actor, pending) + +proc runPendingTasks(ds: Dataspace) = + var runnable = move ds.runnable + for actor in runnable: + runPendingTasks(actor) + +proc performPendingActions(ds: Dataspace) = + var turns = move ds.pendingTurns + for turn in turns: + for action in turn.actions: + action.impl(action, ds, turn.actor) + runPendingTasks(ds) + +proc runTasks(ds: Dataspace): bool = + ds.runPendingTasks() + ds.performPendingActions() + result = ds.runnable.len > 0 or ds.pendingTurns.len > 0 + +proc stop*(facet; continuation: Script[void]) = + facet.parent.map do (parent: Facet): + facet.actor.scheduleTask do (): + facet.terminate() + parent.scheduleScript do (parent: Facet): + continuation(parent) + # ^ TODO: is this the correct scope to use?? + +proc stop*(facet) = + facet.parent.map do (parent: Facet): + facet.actor.scheduleTask do (): + facet.terminate() + +proc addStopHandler*(g: Ground; h: StopHandler) = + g.stopHandlers.add(h) + +proc step(g: Ground) = + if g.dataspace.runTasks(): + asyncdispatch.callSoon: step(g) + else: + for sh in g.stopHandlers: + sh(g.dataspace) + reset g.stopHandlers + complete(g.future) + +proc bootModule*(bootProc: ActivationScript): Future[void] = + # TODO: better integration with the async dispatcher + let g = Ground(future: newFuture[void]"bootModule") + g.dataspace = newDataspace(g) do (rootFacet: Facet): + rootFacet.addStartScript do (rootFacet: Facet): + rootFacet.activate(bootProc) + addTimer(1, true) do (fd: AsyncFD) -> bool: + step(g) + true + return g.future + +proc registerField*(facet: Facet; field: Field): int = + # TODO: should be hidden, but used by declareField. + for i in 0..facet.fields.high: + if facet.fields[i].name == field.name: + facet.fields[i] = field + return i + facet.fields.add(field) + return facet.fields.high + +template declareField*(facet: Facet; F: untyped; T: typedesc; init: T): untyped = + ## Declare getter and setter procs for field `F` of type `T` initalized with `init`. + # TODO: do more at compile-time, use a tuple rather than a sequence of Preserves. + let + `F FieldId` = facet.actor.dataspace.generateId.FieldId + `F FeildOff` = registerField(facet, + (nimIdentNormalize("`F`"), toPreserve(init), )) + facet.actor.dataspace.dataflow.defineObservableProperty(`F FieldId`) + + proc `F`(fields: Fields): T = + facet.actor.dataspace.dataflow.recordObservation(`F FieldId`) + fromPreserve[T](result, facet.fields[`F FeildOff`].value) + + proc `F =`(fields: var Fields; x: T) = + facet.actor.dataspace.dataflow.recordDamage(`F FieldId`) + facet.fields[`F FeildOff`].value = toPreserve[T](x) + + proc `F =`(fields: var Fields; x: Preserve) = + assert(x.kind == facet.fields[`F FeildOff`].value.kind, "invalid Preserves item for field type") + facet.actor.dataspace.dataflow.recordDamage(`F FieldId`) + facet.fields[`F FeildOff`].value = x + + +#[ +# Some early macros + +proc send() = + discard + +proc generateField(stmt: NimNode): NimNode = + stmt.expectLen 3 + let l = stmt[1] + let r = stmt[2] + case r.kind + of nnkStmtList: + r.expectLen 1 + result = newNimNode(nnkVarSection).add(newIdentDefs(l, r[0])) + else: + raiseAssert("unhandled field " & r.treeRepr) + +proc generateStopOnTuple(output: var NimNode; stmt: NimNode): NimNode = + stmt.expectLen 3 + let + cond = stmt[1] + body = stmt[2] + # symCond = genSym(ident="stopOnCond") + symStop = genSym(ident="stopOnCb") + # output.add(newProc(name = symCond, params = [ident"bool"], body = cond)) + output.add(newProc(name = symStop, body = body)) + newPar(newColonExpr(ident"cond", cond), newColonExpr(ident"body", symStop)) + +macro spawn*(label: string; body: untyped) = + ## Spawn actor. + result = newNimNode(nnkStmtList) + let blockBody = newNimNode(nnkStmtList) + let actorSym = genSym(nskVar, label.strVal) + result.add(newNimNode(nnkVarSection).add( + newIdentDefs(actorSym, ident"Actor"))) + var stopOnSeq = newNimNode(nnkBracket) + body.expectKind nnkStmtList + for stmt in body: + case stmt.kind + of nnkCommentStmt: + result.add(stmt) + of nnkCommand: + let cmd = stmt[0] + cmd.expectKind nnkIdent + if eqIdent(cmd, "field"): + result.add(generateField(stmt)) + elif eqIdent(cmd, "stopOn"): + let stop = generateStopOnTuple(result, stmt) + stopOnSeq.add(stop) + else: + raiseAssert("unhandled spawn command " & cmd.repr) + else: + raiseAssert("unhandled statment " & stmt.treeRepr) + result.add( + newAssignment(newDotExpr(actorSym, ident"stops"), prefix(stopOnSeq, "@"))) + #result.add(newBlockStmt(blockBody)) + echo result.repr + # echo result.treeRepr + +macro dumpStuff*(body: untyped): untyped = + echo body.treeRepr +]# diff --git a/src/syndicate/assertions.nim b/src/syndicate/assertions.nim new file mode 100644 index 0000000..2242c16 --- /dev/null +++ b/src/syndicate/assertions.nim @@ -0,0 +1,11 @@ +# SPDX-License-Identifier: ISC + +import preserves + +const + Discard* = RecordClass(label: symbol"discard", arity: 0) + Capture* = RecordClass(label: symbol"capture", arity: 1) + Observe* = RecordClass(label: symbol"observe", arity: 1) + Inbound* = RecordClass(label: symbol"inbound", arity: 1) + Outbound* = RecordClass(label: symbol"outbound", arity: 1) + Instance* = RecordClass(label: symbol"instance", arity: 1) diff --git a/src/syndicate/bags.nim b/src/syndicate/bags.nim new file mode 100644 index 0000000..f280a19 --- /dev/null +++ b/src/syndicate/bags.nim @@ -0,0 +1,37 @@ +# SPDX-License-Identifier: ISC + +## An unordered association of items to counts. +## An item count may be negative, unlike CountTable. + +import tables + +type + ChangeDescription* = enum + cdPresentToAbsent, + cdAbsentToAbsent, + cdAbsentToPresent, + cdPresentToPresent + + Bag*[T] = Table[T, int] + +proc change(count: var int; delta: int; clamp: bool): ChangeDescription = + var + oldCount = count + newCount = oldCount + delta + if clamp: + newCount = max(0, newCount) + if newCount == 0: + result = + if oldCount == 0: cdAbsentToAbsent + else: cdPresentToAbsent + else: + result = + if oldCount == 0: cdAbsentToPresent + else: cdPresentToPresent + count = newCount + +proc change*[T](bag: var Bag[T]; key: T; delta: int; clamp = false): ChangeDescription = + assert(delta != 0) + result = change(bag.mGetOrPut(key, 0), delta, clamp) + if result in {cdAbsentToAbsent, cdPresentToAbsent}: + bag.del(key) diff --git a/src/syndicate/dataflow.nim b/src/syndicate/dataflow.nim new file mode 100644 index 0000000..c412fdd --- /dev/null +++ b/src/syndicate/dataflow.nim @@ -0,0 +1,80 @@ +# SPDX-License-Identifier: ISC + +import preserves +import options, sets, tables + +type + Set = HashSet + + Graph*[SubjectId, ObjectId] = object + edgesForward: Table[ObjectId, Set[SubjectId]] + edgesReverse: Table[SubjectId, Set[ObjectId]] + damagedNodes: Set[ObjectId] + currentSubjectId*: Option[SubjectId] + +proc withSubject*[Sid, Oid, T](g: var Graph[Sid, Oid]; sid: Option[Sid]; + body: proc (): T): T = + var oldSid = move g.currentSubjectId + g.currentSubjectId = sid + try: result = body() + finally: g.currentSubjectId = move oldSid + +proc withSubject*[Sid, Oid](g: var Graph[Sid, Oid]; sid: Option[Sid]; + body: proc ()) = + var oldSid = move g.currentSubjectId + g.currentSubjectId = sid + try: body() + finally: g.currentSubjectId = move oldSid + +proc recordObservation*[Sid, Oid](g: var Graph[Sid, Oid]; oid: Oid) = + if g.currentSubjectId.isSome: + let sid = g.currentSubjectId.get + if not g.edgesForward.hasKey(oid): + g.edgesForward[oid] = initHashSet[Sid]() + g.edgesForward[oid].incl(sid) + if not g.edgesReverse.hasKey(sid): + g.edgesReverse[sid] = initHashSet[Oid]() + g.edgesReverse[sid].incl(oid) + # TODO: double lookups here + +proc recordDamage*[Sid, Oid](g: var Graph[Sid, Oid]; oid: Oid) = + g.damagedNodes.incl(oid) + +proc forgetSubject*[Sid, Oid](g: var Graph[Sid, Oid]; sid: Sid) = + var subjectObjects: Set[Oid] + if g.edgesReverse.pop(sid, subjectObjects): + for oid in subjectObjects: + g.edgesForward.del(oid) + +iterator observersOf[Sid, Oid](g: Graph[Sid, Oid]; oid: Oid): Sid = + if g.edgesForward.hasKey(oid): + for sid in g.edgesForward[oid]: yield sid + +proc repairDamage*[Sid, Oid](g: var Graph[Sid, Oid]; repairNode: proc (sid: Sid) {.gcsafe.}) = + var repairedThisRound: Set[Oid] + while true: + var workSet = move g.damagedNodes + assert(g.damagedNodes.len == 0) + + var alreadyDamaged = workSet * repairedThisRound + if alreadyDamaged.len > 0: + echo "Cyclic dependencies involving ", alreadyDamaged + + workSet = workSet - repairedThisRound + repairedThisRound = repairedThisRound + workSet + + if workSet.len == 0: break + + for oid in workSet: + for sid in g.observersOf(oid): + g.forgetSubject(sid) + g.withSubject(some sid) do: repairNode(sid) + +proc defineObservableProperty*[Sid, Oid](g: var Graph[Sid, Oid]; oid: Oid) = + assert(not g.edgesForward.hasKey(oid)) + g.edgesForward[oid] = initHashSet[Sid]() + g.recordDamage(oid) + +proc addSubject*[Sid, Oid](g: var Graph[Sid, Oid]; sid: Sid) = + assert(not g.edgesReverse.hasKey(sid)) + g.edgesReverse[sid] = initHashSet[Oid]() diff --git a/src/syndicate/events.nim b/src/syndicate/events.nim new file mode 100644 index 0000000..3beec87 --- /dev/null +++ b/src/syndicate/events.nim @@ -0,0 +1,3 @@ +# SPDX-License-Identifier: ISC + +type EventKind* = enum addedEvent, removedEvent, messageEvent diff --git a/src/syndicate/skeletons.nim b/src/syndicate/skeletons.nim new file mode 100644 index 0000000..e1f4f41 --- /dev/null +++ b/src/syndicate/skeletons.nim @@ -0,0 +1,297 @@ +# SPDX-License-Identifier: ISC + +import ./assertions, ./bags, ./events +import preserves +import lists, options, sets, tables + +type + Value = Preserve + + NonEmptySkeleton*[Shape] = object + shape: Shape + members: seq[Skeleton[Shape]] + Skeleton*[Shape] = Option[NonEmptySkeleton[Shape]] + + Path = seq[Natural] + +proc projectPath(v: Value; path: Path): Value = + result = v + for index in path: + result = result[index] + +proc projectPaths(v: Value; paths: seq[Path]): seq[Value] = + result.setLen(paths.len) + for i, path in paths: result[i] = projectPath(v, path) + +type + Shape = string + + HandlerCallback* = proc (event: EventKind; bindings: seq[Value]) {.gcsafe.} + + Analysis* = object + skeleton: Skeleton[Shape] + constPaths: seq[Path] + constVals: seq[Value] + capturePaths: seq[Path] + callback*: Option[HandlerCallback] + +proc `$`(a: Analysis): string = + result.add "\n\t skeleton: " + result.add $a.skeleton + result.add "\n\t constPaths: " + result.add $a.constPaths + result.add "\n\t constVals: " + result.add $a.constVals + result.add "\n\t capturePaths: " + result.add $a.capturePaths + +proc analyzeAssertion*(a: Value): Analysis = + var path: Path + proc walk(analysis: var Analysis; a: Value): Skeleton[Shape] = + if Capture.isClassOf a: + analysis.capturePaths.add(path) + result = walk(analysis, a.fields[0]) + elif Discard.isClassOf a: + discard + else: + if a.kind == pkRecord: + let class = classOf(a) + result = some NonEmptySkeleton[Shape](shape: $class) + path.add(0) + var i: int + for field in a.fields: + path[path.high] = i + result.get.members.add(walk(analysis, field)) + inc(i) + discard path.pop + else: + analysis.constPaths.add(path) + analysis.constVals.add(a) + result.skeleton = walk(result, a) + +type + Handler = ref object + cachedCaptures: Bag[seq[Value]] + callbacks: HashSet[HandlerCallback] + + Leaf = ref object + cachedAssertions: HashSet[Value] + handlerMap: Table[seq[Path], Handler] + + Continuation = ref object + cachedAssertions: HashSet[Value] + leafMap: Table[seq[Path], TableRef[seq[Value], Leaf]] # TODO: not TableRef? + + Selector = tuple[popCount: int; index: int] + + Node = ref object + edges: Table[Selector, Table[string, Node]] + continuation: Continuation + +using + continuation: Continuation + leaf: Leaf + node: Node + +proc `$`(leaf): string = + result.add "Leaf{ cached: " + result.add $leaf.cachedAssertions + result.add ", handler count: " + result.add $leaf.handlerMap.len + result.add " }" + +proc `$`(continuation): string = + result.add "Continuation{ cached: " + result.add $continuation.cachedAssertions + result.add ", " + result.add $continuation.leafMap + result.add " }" + +proc `$`(node): string = + result.add "Node{ " + result.add $node.continuation + result.add ", edges: " + result.add $node.edges + result.add "}" + +proc isEmpty(leaf): bool = + leaf.cachedAssertions.len == 0 and leaf.handlerMap.len == 0 + +type + ContinuationProc = proc (c: Continuation; v: Value) {.gcsafe.} + LeafProc = proc (l: Leaf; v: Value) {.gcsafe.} + HandlerProc = proc (h: Handler; vs: seq[Value]) {.gcsafe.} + +proc modify(node; operation: EventKind; outerValue: Value; + mCont: ContinuationProc; mLeaf: LeafProc; mHandler: HandlerProc) = + + proc walkContinuation(continuation) {.gcsafe.} + + proc walkNode(node; termStack: SinglyLinkedList[seq[Value]]) = + # TODO: use a seq for the stack? + walkContinuation(node.continuation) + for (selector, table) in node.edges.pairs: + var nextStack = termStack + for _ in 1..selector.popCount: + nextStack.head = nextStack.head.next + let nextValue = nextStack.head.value[selector.index] + if nextValue.isRecord: + let nextClass = classOf(nextValue) + let nextNode = table.getOrDefault($nextClass) + if not nextNode.isNil: + nextStack.prepend(nextValue.record) + walkNode(nextNode, nextStack) + + proc walkContinuation(continuation: Continuation) = + mCont(continuation, outerValue) + for (constPaths, constValMap) in continuation.leafMap.pairs: + let constVals = projectPaths(outerValue, constPaths) + let leaf = constValMap.getOrDefault(constVals) + if leaf.isNil: + if operation == addedEvent: + constValMap[constVals] = Leaf() + else: + mLeaf(leaf, outerValue) + for (capturePaths, handler) in leaf.handlerMap.pairs: + mHandler(handler, projectPaths(outerValue, capturePaths)) + if operation == removedEvent and leaf.isEmpty: + constValMap.del(constVals) + if constValMap.len == 0: + continuation.leafMap.del(constPaths) + var stack: SinglyLinkedList[seq[Value]] + stack.prepend(@[outerValue]) + walkNode(node, stack) + +proc extend*[Shape](node; skeleton: Skeleton[Shape]): Continuation = + var path: Path + proc walkNode(node: Node; popCount, index: int; skeleton: Skeleton[Shape]): tuple[popCount: int, node: Node] = + assert(not node.isNil) + if skeleton.isNone: + return (popCount, node) + else: + var + cls = skeleton.get.shape + table: Table[string, Node] + nextNode: Node + discard node.edges.pop((popCount, index), table) + if not table.pop(cls, nextNode): + nextNode = Node(continuation: Continuation()) + for a in node.continuation.cachedAssertions: + if $classOf(projectPath(a, path)) == cls: + nextNode.continuation.cachedAssertions.incl(a) + block: + var + popCount = 0 + index = 0 + path.add(index) + for member in skeleton.get.members: + (popCount, nextNode) = walkNode(nextNode, popCount, index, member) + inc(index) + discard path.pop() + path.add(index) + discard path.pop() + result = (popCount.succ, nextNode) + table[cls] = nextNode + node.edges[(popCount, index)] = table + walkNode(node, 0, 0, skeleton).node.continuation + +type + Index* = object + allAssertions: Bag[Value] + root: Node + +proc initIndex*(): Index = + result.root = Node(continuation: Continuation()) + +using index: Index + +proc `$`*(index): string = + result.add "Index(" + result.add ")Index" + +proc addHandler*(index; res: Analysis; callback: HandlerCallback) = + assert(not index.root.isNil) + let + constPaths = res.constPaths + constVals = res.constVals + capturePaths = res.capturePaths + continuation = index.root.extend(res.skeleton) + assert(not continuation.isNil) + var constValMap = continuation.leafMap.getOrDefault(constPaths) + if constValMap.isNil: + constValMap = newTable[seq[Value], Leaf]() + for a in continuation.cachedAssertions: + var leaf: Leaf + if not constValMap.pop(a.sequence, leaf): + new leaf + leaf.cachedAssertions.incl(a) + constValMap[a.sequence] = leaf + continuation.leafMap[constPaths] = constValMap + var leaf = constValMap.getOrDefault(constVals) + if leaf.isNil: + new leaf + constValMap[constVals] = leaf + var handler = leaf.handlerMap.getOrDefault(capturePaths) + if handler.isNil: + new handler + for a in leaf.cachedAssertions: + let a = projectPaths(a, capturePaths) + if handler.cachedCaptures.contains(a): + discard handler.cachedCaptures.change(a, +1) + leaf.handlerMap[capturePaths] = handler + handler.callbacks.incl(callback) + for captures, count in handler.cachedCaptures.pairs: + callback(addedEvent, captures) + +proc removeHandler*(index; res: Analysis; callback: HandlerCallback) = + let continuation = index.root.extend(res.skeleton) + try: + let + constValMap = continuation.leafMap[res.constPaths] + leaf = constValMap[res.constVals] + handler = leaf.handlerMap[res.capturePaths] + handler.callbacks.excl(callback) + if handler.callbacks.len == 0: + leaf.handlerMap.del(res.capturePaths) + if leaf.isEmpty: + constValMap.del(res.constVals) + if constValMap.len == 0: + continuation.leafMap.del(res.constPaths) + except KeyError: discard + +proc adjustAssertion*(index: var Index; outerValue: Value; delta: int): ChangeDescription = + result = index.allAssertions.change(outerValue, delta) + case result + of cdAbsentToPresent: + index.root.modify( + addedEvent, + outerValue, + (proc (c: Continuation; v: Value) = c.cachedAssertions.incl(v)), + (proc (l: Leaf; v: Value) = l.cachedAssertions.incl(v)), + (proc (h: Handler; vs: seq[Value]) = + if h.cachedCaptures.change(vs, +1) == cdAbsentToPresent: + for cb in h.callbacks: cb(addedEvent, vs))) + of cdPresentToAbsent: + index.root.modify( + removedEvent, + outerValue, + (proc (c: Continuation; v: Value) = c.cachedAssertions.excl(v)), + (proc (l: Leaf; v: Value) = l.cachedAssertions.excl(v)), + (proc (h: Handler; vs: seq[Value]) = + if h.cachedCaptures.change(vs, -1) == cdPresentToAbsent: + for cb in h.callbacks: cb(removedEvent, vs))) + else: + discard + +proc continuationNoop(c: Continuation; v: Value) = discard +proc leafNoop(l: Leaf; v: Value) = discard + +proc deliverMessage*(index; v: Value; leafCb: proc (l: Leaf; v: Value) {.gcsafe.}) = + proc handlerCb(h: Handler; vs: seq[Value]) = + for cb in h.callbacks: cb(messageEvent, vs) + index.root.modify(messageEvent, v, continuationNoop, leafCb, handlerCb) + +proc deliverMessage*(index; v: Value) = + proc handlerCb(h: Handler; vs: seq[Value]) = + for cb in h.callbacks: cb(messageEvent, vs) + index.root.modify(messageEvent, v, continuationNoop, leafNoop, handlerCb) diff --git a/syndicate.nimble b/syndicate.nimble new file mode 100644 index 0000000..7726681 --- /dev/null +++ b/syndicate.nimble @@ -0,0 +1,12 @@ +# Package + +version = "0.0.0" +author = "Emery Hemingway" +description = "Syndicated actors for conversational concurrency" +license = "ISC" +srcDir = "src" + + +# Dependencies + +requires "nim >= 1.4.8", "preserves >= 0.2.0" diff --git a/tests/config.nims b/tests/config.nims new file mode 100644 index 0000000..3bb69f8 --- /dev/null +++ b/tests/config.nims @@ -0,0 +1 @@ +switch("path", "$projectDir/../src") \ No newline at end of file diff --git a/tests/test_box_and_client.nim b/tests/test_box_and_client.nim new file mode 100644 index 0000000..2f256c7 --- /dev/null +++ b/tests/test_box_and_client.nim @@ -0,0 +1,73 @@ +# SPDX-License-Identifier: ISC + +import syndicate, syndicate/assertions, syndicate/events, syndicate/skeletons +import preserves +import asyncdispatch, tables, options + +const N = 100000 + +const + `?_` = init(Discard) + `?$` = init(Capture, `?_`) + BoxState = RecordClass(label: symbol"BoxState", arity: 1) + SetBox = RecordClass(label: symbol"SetBox", arity: 1) + + +proc boot(facet: Facet) = + + facet.spawn("box") do (facet: Facet): + facet.declareField(value, int, 0) + + discard facet.addEndpoint do (facet: Facet) -> EndpointSpec: + # echo "recomputing published BoxState ", facet.fields.value + let a = BoxState.init(facet.fields.value.toPreserve) + result.assertion = some a + + discard facet.addDataflow do (facet: Facet): + # echo "box dataflow saw new value ", facet.fields.value + if facet.fields.value == N: + facet.stop do (facet: Facet): + echo "terminated box root facet" + + discard facet.addEndpoint do (facet: Facet) -> EndpointSpec: + const a = SetBox.init(`?$`) + result.analysis = some analyzeAssertion(a) + proc cb(facet: Facet; evt: EventKind; vs: seq[Value]) = + if evt == messageEvent: + facet.scheduleScript do (facet: Facet): + facet.fields.value = vs[0] + # echo "box updated value ", vs[0] + result.analysis.get.callback = some (facet.wrap cb) + const o = Observe.init(SetBox.init(`?$`)) + result.assertion = some o + + facet.spawn("client") do (facet: Facet): + + discard facet.addEndpoint do (facet: Facet) -> EndpointSpec: + const a = BoxState.init(`?$`) + result.analysis = some analyzeAssertion(a) + proc cb(facet: Facet; evt: EventKind; vs: seq[Value]) = + if evt == addedEvent: + facet.scheduleScript do (facet: Facet): + let v = SetBox.init(vs[0].int.succ.toPreserve) + # echo "client sending ", v + facet.send(v) + result.analysis.get.callback = some (facet.wrap cb) + const o = Observe.init(BoxState.init(`?$`)) + result.assertion = some o + + discard facet.addEndpoint do (facet: Facet) -> EndpointSpec: + const a = BoxState.init(`?_`) + result.analysis = some analyzeAssertion(a) + proc cb(facet: Facet; evt: EventKind; vs: seq[Value]) = + if evt == removedEvent: + facet.scheduleScript do (facet: Facet): + echo "box gone" + result.analysis.get.callback = some (facet.wrap cb) + const o = Observe.init(BoxState.init(`?_`)) + result.assertion = some o + + facet.actor.dataspace.ground.addStopHandler do (_: Dataspace): + echo "stopping box-and-client" + +waitFor bootModule(boot)