Initial commit

A mostly verbatim translation of syndicate-js.

https://git.syndicate-lang.org/syndicate-lang/syndicate-js
This commit is contained in:
Emery Hemingway 2021-06-24 17:50:27 +02:00
commit dd977991ad
11 changed files with 1167 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
tests/test_box_and_client

3
README.md Normal file
View File

@ -0,0 +1,3 @@
# Syndicate
Nim implementation of [Syndicate](https://syndicate-lang.org/) dataspaces and actors.

649
src/syndicate.nim Normal file
View File

@ -0,0 +1,649 @@
# SPDX-License-Identifier: ISC
import ./syndicate/bags, ./syndicate/dataflow, ./syndicate/events, ./syndicate/skeletons
import preserves
import asyncdispatch, deques, hashes, macros, options, sets, strutils, tables
export dataflow.defineObservableProperty
export dataflow.recordObservation
export dataflow.recordDamage
template generateIdType(T: untyped) =
type T* = distinct Natural
proc `==`*(x, y: T): bool {.borrow.}
proc `$`*(id: T): string {.borrow.}
generateIdType(ActorId)
generateIdType(FacetId)
generateIdType(EndpointId)
generateIdType(FieldId)
type
Value* = Preserve
Bag = bags.Bag[Value]
Task[T] = proc (): T
Script[T] = proc (facet: Facet): T
ActivationScript* = Script[void]
ActionKind = enum
patchAction, messageAction, spawnAction, quitAction, deferredTurnAction, activationAction
Action = object
impl: proc (action: Action; ds: Dataspace; actor: Option[Actor]) {.gcsafe.}
case kind: ActionKind
of patchAction:
changes: Bag
else:
discard
Priority = enum
pQueryHigh = 0,
pQuery,
pQueryHandler,
pNormal,
pGC,
pIdle,
len
Actor = ref object
id: ActorId
name: string
dataspace*: Dataspace
rootFacet: ParentFacet
pendingTasks: array[Priority.len, Deque[Task[void]]]
pendingActions: seq[Action]
adhocAssertions: Bag
cleanupChanges: Bag
parentId: ActorId
isRunnable: bool
EndpointSpec* = object
assertion*: Option[Value]
analysis*: Option[Analysis]
Endpoint = ref object
id: EndpointId
facet: Facet
updateProc: Script[EndpointSpec]
spec: EndpointSpec
Field = tuple[name: string; value: Preserve]
Fields = seq[Field]
# TODO: compile-time tuples
Turn = object
actions: seq[Action]
actor: Option[Actor]
Dataspace* = ref object
ground*: Ground
index: Index
dataflow*: Graph[Endpoint, FieldId]
runnable: seq[Actor]
pendingTurns: seq[Turn]
actors: Table[ActorId, Actor]
activations: seq[ActivationScript]
nextId: Natural
StopHandler = proc (ds: Dataspace) {.gcsafe.}
Ground = ref object
dataspace: Dataspace
stopHandlers: seq[StopHandler]
future: Future[void]
ParentFacet = Option[Facet]
Facet* = ref FacetObj
FacetObj = object
id: FacetId
actor*: Actor
parent: ParentFacet
endpoints: Table[EndpointId, Endpoint]
stopScripts: seq[Script[void]]
children: Table[FacetId, Facet]
fields*: Fields
isLive, inScript: bool
# FacetImpl[Fields] = ref FacetImplObj[Fields]
# FacetImplObj[Fields] {.final.} = object of FacetBaseObj
using
dataspace: Dataspace
actor: Actor
facet: Facet
proc `$`*(spec: EndpointSpec): string =
result.add "{assertion: "
if spec.assertion.isSome:
result.add $(spec.assertion.get)
else:
result.add "nil"
result.add ", analysis: "
if spec.analysis.isSome:
result.add $spec.analysis.get
else:
result.add "nil"
result.add " }"
proc `$`*(actor): string =
result.add "Actor("
result.add $actor.id
result.add ','
result.add actor.name
result.add ')'
proc `$`*(facet): string =
result.add "Facet("
result.add $facet.actor.id
result.add ','
result.add facet.actor.name
result.add ','
result.add $facet.id
result.add ')'
proc hash*(ep: Endpoint): Hash =
!$(hash(ep.id) !& hash(ep.facet.id))
proc generateId*(ds: Dataspace): Natural =
# TODO: used by declareField, but should be hidden.
inc(ds.nextId)
ds.nextId
proc newActor(ds: Dataspace; name: string; initialAssertions: Value; parentId: ActorId): Actor =
assert(initialAssertions.kind == pkSet)
result = Actor(
id: ds.generateId.ActorId,
name: name,
dataspace: ds,
parentId: parentId)
for v in initialAssertions.set:
discard result.adhocAssertions.change(v, 1)
ds.actors[result.id] = result
proc newFacet(actor; parent: ParentFacet): Facet =
result = Facet(
id: actor.dataspace.generateId.FacetId,
actor: actor,
parent: parent,
isLive: true,
inScript: true)
if parent.isSome:
parent.get.children[result.id] = result
else:
actor.rootFacet = some result
proc applyPatch(ds: Dataspace; actor: Option[Actor]; changes: Bag) =
type Pair = tuple[val: Value; count: int]
var removals: seq[Pair]
for a, count in changes.pairs:
if count > 0:
# echo "applyPatch +", a
discard ds.index.adjustAssertion(a, count)
else:
removals.add((a, count))
actor.map do (ac: Actor):
discard ac.cleanupChanges.change(a, -count)
for (a, count) in removals:
# echo "applyPatch -", a
discard ds.index.adjustAssertion(a, count)
proc initPatch(): Action =
proc impl(patch: Action; ds: Dataspace; actor: Option[Actor]) {.gcsafe.} =
ds.applyPatch(actor, patch.changes)
Action(impl: impl, kind: patchAction)
proc pendingPatch(actor): var Action =
for a in actor.pendingActions.mitems:
if a.kind == patchAction: return a
actor.pendingActions.add(initPatch())
actor.pendingActions[actor.pendingActions.high]
proc adjust(patch: var Action; v: Value; delta: int) =
discard patch.changes.change(v, delta)
proc subscribe(ds: Dataspace; handler: Analysis) =
ds.index.addHandler(handler, handler.callback.get)
proc unsubscribe(ds: Dataspace; handler: Analysis) =
ds.index.removeHandler(handler, handler.callback.get)
proc assert(actor; a: Value) = actor.pendingPatch.adjust(a, +1)
proc retract(actor; a: Value) = actor.pendingPatch.adjust(a, -1)
proc install(ep: Endpoint; spec: EndpointSpec) =
ep.spec = spec
ep.spec.assertion.map do (a: Value):
ep.facet.actor.assert(a)
ep.spec.analysis.map do (a: Analysis):
ep.facet.actor.dataspace.subscribe(a)
proc scheduleTask(actor; prio: Priority; task: Task[void]) =
if not actor.isRunnable:
actor.isRunnable = true
actor.dataspace.runnable.add(actor)
actor.pendingTasks[prio].addLast(task)
proc scheduleTask(actor; task: Task[void]) =
scheduleTask(actor, pNormal, task)
proc abandonQueuedWork(actor) =
actor.pendingActions = @[]
for q in actor.pendingTasks.mitems: clear(q)
proc uninstall(ep: Endpoint; emitPatches: bool) =
if emitPatches:
ep.spec.assertion.map do (a: Value):
ep.facet.actor.retract(a)
ep.spec.analysis.map do (a: Analysis):
ep.facet.actor.dataspace.unsubscribe(a)
proc destroy(ep: Endpoint; emitPatches: bool) =
ep.facet.actor.dataspace.dataflow.forgetSubject(ep)
ep.uninstall(emitPatches)
ep.facet.actor.scheduleTask(pGC) do ():
ep.facet.endpoints.del(ep.id)
# TODO: cannot remove from ep.facet.endpoints during
# its iteration, defering remove is probably unecessary
# because the facet is going down.
proc retractAssertionsAndSubscriptions(facet; emitPatches: bool) =
facet.actor.scheduleTask do ():
for ep in facet.endpoints.values:
ep.destroy(emitPatches)
clear(facet.endpoints)
proc abort(facet; emitPatches: bool) =
facet.isLive = false
for child in facet.children.values:
child.abort(emitPatches)
facet.retractAssertionsAndSubscriptions(emitPatches)
proc enqueueScriptAction(actor; action: Action) =
actor.pendingActions.add(action)
proc enqueueScriptAction(facet; action: Action) =
enqueueScriptAction(facet.actor, action)
proc initQuitAction(): Action =
proc impl(action: Action; ds: Dataspace; actor: Option[Actor]) =
assert(actor.isSome)
ds.applyPatch(actor, actor.get.cleanupChanges)
ds.actors.del(actor.get.id)
Action(impl: impl, kind: quitAction)
proc terminate(actor; emitPatches: bool) =
if emitPatches:
actor.scheduleTask do ():
for a in actor.adhocAssertions.keys:
actor.retract(a)
actor.rootFacet.map do (root: Facet):
root.abort(emitPatches)
actor.scheduleTask do ():
actor.enqueueScriptAction(initQuitAction())
proc invokeScript(facet; script: Script[void]) =
try: script(facet)
except:
let e = getCurrentException()
# TODO: install an error handling callback at the facet?
facet.actor.abandonQueuedWork()
facet.actor.terminate(false)
raise e
func isInert(facet): bool =
facet.endpoints.len == 0 and facet.children.len == 0
proc terminate(facet) =
if facet.isLive:
let
actor = facet.actor
parent = facet.parent
if parent.isSome:
parent.get.children.del(facet.id)
else:
reset actor.rootFacet
facet.isLive = false
for child in facet.children.values:
child.terminate()
actor.scheduleTask do ():
facet.invokeScript do (facet: Facet):
for s in facet.stopScripts:
s(facet)
facet.retractAssertionsAndSubscriptions(true)
actor.scheduleTask(pGC) do ():
if parent.isSome:
if parent.get.isInert:
parent.get.terminate()
else:
actor.terminate(true)
template withNonScriptContext(facet; body: untyped) =
let inScriptPrev = facet.inScript
facet.inScript = false
try: body
finally: facet.inScript = inScriptPrev
proc ensureFacetSetup(facet; s: string) =
assert(not facet.inScript, "Cannot " & s & "ouside facet setup")
proc ensureNonFacetSetup(facet; s: string) =
assert(facet.inScript, "Cannot " & s & " during facet setup")
proc wrap(facet; script: Script[void]): Task[void] =
proc task() = facet.invokeScript(script)
task
proc wrap*(facet; cb: proc(facet: Facet; event: EventKind; bindings: seq[Value]) {.gcsafe.}): HandlerCallback =
proc wrapper(event: EventKind; bindings: seq[Value]) =
facet.invokeScript do (facet: Facet):
cb(facet, event, bindings)
wrapper
proc scheduleScript*(facet; prio: Priority; script: Script[void]) =
facet.actor.scheduleTask(prio, facet.wrap(script))
proc scheduleScript*(facet; script: Script[void]) =
facet.actor.scheduleTask(pNormal, facet.wrap(script))
proc addStartScript(facet; s: Script[void]) =
facet.ensureFacetSetup("onStart")
facet.scheduleScript(pNormal, s)
proc addFacet(actor; parentFacet: Option[Facet]; bootScript: Script[void]; checkInScript = false) =
if checkInScript and parentFacet.isSome:
assert parentFacet.get.inScript
let f = Facet(
id: actor.dataspace.generateId.FacetId,
actor: actor,
parent: parentFacet,
isLive: true,
inScript: true)
if parentFacet.isSome:
parentFacet.get.children[f.id] = f
f.fields = parentFacet.get.fields
# inherit scope by copying fields of the parent
else:
actor.rootFacet = some f
f.invokeScript do (facet: Facet):
facet.withNonScriptContext:
bootScript(facet)
actor.scheduleTask do ():
if ((parentFacet.isSome) and (not parentFacet.get.isLive)) or f.isInert:
f.terminate()
proc deliverMessage(ds: Dataspace; msg: Value; ac: Option[Actor]) =
ds.index.deliverMessage(msg)
proc adhocRetract(actor; a: Value) =
if actor.adhocAssertions.change(a, -1, true) == cdPresentToAbsent:
actor.retract(a)
proc refresh(ep: Endpoint) =
let newSpec = ep.updateProc(ep.facet)
if newSpec.assertion != ep.spec.assertion:
ep.uninstall(true)
ep.install(newSpec)
proc refreshAssertions(ds: Dataspace) =
ds.dataflow.repairDamage do (ep: Endpoint):
let facet = ep.facet
assert(facet.isLive)
facet.invokeScript do (f: Facet):
f.withNonScriptContext:
refresh(ep)
proc addActor(ds: Dataspace; name: string; bootProc: Script[void]; initialAssertions: Value; parent: Option[Actor]) =
var parentId: ActorId
parent.map do (p: Actor): parentId = p.id
let ac = newActor(ds, name, initialAssertions, parentId)
ds.applyPatch(some ac, ac.adhocAssertions)
ac.addFacet(none Facet) do (systemFacet: Facet):
# Root facet is a dummy "system" facet that exists to hold
# one-or-more "user" "root" facets.
ac.addFacet(some systemFacet, bootProc)
# ^ The "true root", user-visible facet.
for a in initialAssertions.set:
ac.adhocRetract(a)
proc send*(facet; body: Value) =
facet.ensureNonFacetSetup("send")
proc impl(action: Action; ds: Dataspace; actor: Option[Actor]) =
ds.deliverMessage(body, actor)
facet.enqueueScriptAction(Action(impl: impl, kind: messageAction))
proc initSpawnAction(name: string; bootProc: Script[void], initialAssertions: Value): Action =
proc impl(action: Action; ds: Dataspace; actor: Option[Actor]) =
ds.addActor(name, bootProc, initialAssertions, actor)
Action(impl: impl, kind: spawnAction)
proc spawn*(facet; name: string; bootProc: Script[void], initialAssertions: Value) =
facet.ensureNonFacetSetup("spawn")
facet.enqueueScriptAction(initSpawnAction(name, bootProc, initialAssertions))
proc spawn*(facet; name: string; bootProc: Script[void]) =
spawn(facet, name, bootProc, Value(kind: pkSet))
#[
template spawn*(facet; name: string; fields: untyped; bootProc: Script[void]): untyped =
type Fields = typeof(fields)
spawn[Fields](facet, name, bootProc, Value(kind: pkSet))
]#
proc initActivationAction(script: ActivationScript; name: string): Action =
proc impl(action: Action; ds: Dataspace; actor: Option[Actor]) =
for s in ds.activations:
if s == script: return
ds.activations.add(script)
proc boot(root: Facet) =
root.addStartScript(script)
ds.addActor(name, boot, Preserve(kind: pkSet), actor)
Action(impl: impl, kind: activationAction)
proc activate(facet; script: ActivationScript; name = "") =
facet.ensureNonFacetSetup "`activate`"
facet.enqueueScriptAction(initActivationAction(script, name))
proc newDataspace(ground: Ground; bootProc: ActivationScript): Dataspace =
let turn = Turn(actions: @[initSpawnAction("", bootProc, Value(kind: pkSet))])
Dataspace(ground: ground, index: initIndex(), pendingTurns: @[turn])
proc addEndpoint*(facet; updateScript: Script[EndpointSpec], isDynamic = true): Endpoint =
facet.ensureFacetSetup("add endpoint")
let
actor = facet.actor
dataspace = actor.dataspace
result = Endpoint(
id: dataspace.generateId.EndpointId,
facet: facet,
updateProc: updateScript)
dataspace.dataflow.addSubject(result)
let
dyn = if isDynamic: some result else: none Endpoint
initialSpec = dataspace.dataflow.withSubject(dyn) do () -> EndpointSpec:
updateScript(facet)
result.install(initialSpec)
facet.endpoints[result.id] = result
# dataspace.endpointHook(facet, result)
# Subclasses may override
proc addDataflow*(facet; prio: Priority; subjectProc: Script[void]): Endpoint =
facet.addEndpoint do (fa: Facet) -> EndpointSpec:
let subjectId = facet.actor.dataspace.dataflow.currentSubjectId
facet.scheduleScript(prio) do (fa: Facet):
if facet.isLive:
facet.actor.dataspace.dataflow.withSubject(subjectId):
subjectProc(facet)
# result is the default EndpointSpec
proc addDataflow*(facet; subjectProc: Script[void]): Endpoint =
addDataflow(facet, pNormal, subjectProc)
proc commitActions(dataspace; actor; pending: seq[Action]) =
dataspace.pendingTurns.add(Turn(actor: some actor, actions: pending))
proc runPendingTask(actor): bool =
for deque in actor.pendingTasks.mitems:
if deque.len > 0:
let task = deque.popFirst()
task()
actor.dataspace.refreshAssertions()
return true
proc runPendingTasks(actor) =
while actor.runPendingTask(): discard
actor.isRunnable = false
if actor.pendingActions.len > 0:
var pending = move actor.pendingActions
actor.dataspace.commitActions(actor, pending)
proc runPendingTasks(ds: Dataspace) =
var runnable = move ds.runnable
for actor in runnable:
runPendingTasks(actor)
proc performPendingActions(ds: Dataspace) =
var turns = move ds.pendingTurns
for turn in turns:
for action in turn.actions:
action.impl(action, ds, turn.actor)
runPendingTasks(ds)
proc runTasks(ds: Dataspace): bool =
ds.runPendingTasks()
ds.performPendingActions()
result = ds.runnable.len > 0 or ds.pendingTurns.len > 0
proc stop*(facet; continuation: Script[void]) =
facet.parent.map do (parent: Facet):
facet.actor.scheduleTask do ():
facet.terminate()
parent.scheduleScript do (parent: Facet):
continuation(parent)
# ^ TODO: is this the correct scope to use??
proc stop*(facet) =
facet.parent.map do (parent: Facet):
facet.actor.scheduleTask do ():
facet.terminate()
proc addStopHandler*(g: Ground; h: StopHandler) =
g.stopHandlers.add(h)
proc step(g: Ground) =
if g.dataspace.runTasks():
asyncdispatch.callSoon: step(g)
else:
for sh in g.stopHandlers:
sh(g.dataspace)
reset g.stopHandlers
complete(g.future)
proc bootModule*(bootProc: ActivationScript): Future[void] =
# TODO: better integration with the async dispatcher
let g = Ground(future: newFuture[void]"bootModule")
g.dataspace = newDataspace(g) do (rootFacet: Facet):
rootFacet.addStartScript do (rootFacet: Facet):
rootFacet.activate(bootProc)
addTimer(1, true) do (fd: AsyncFD) -> bool:
step(g)
true
return g.future
proc registerField*(facet: Facet; field: Field): int =
# TODO: should be hidden, but used by declareField.
for i in 0..facet.fields.high:
if facet.fields[i].name == field.name:
facet.fields[i] = field
return i
facet.fields.add(field)
return facet.fields.high
template declareField*(facet: Facet; F: untyped; T: typedesc; init: T): untyped =
## Declare getter and setter procs for field `F` of type `T` initalized with `init`.
# TODO: do more at compile-time, use a tuple rather than a sequence of Preserves.
let
`F FieldId` = facet.actor.dataspace.generateId.FieldId
`F FeildOff` = registerField(facet,
(nimIdentNormalize("`F`"), toPreserve(init), ))
facet.actor.dataspace.dataflow.defineObservableProperty(`F FieldId`)
proc `F`(fields: Fields): T =
facet.actor.dataspace.dataflow.recordObservation(`F FieldId`)
fromPreserve[T](result, facet.fields[`F FeildOff`].value)
proc `F =`(fields: var Fields; x: T) =
facet.actor.dataspace.dataflow.recordDamage(`F FieldId`)
facet.fields[`F FeildOff`].value = toPreserve[T](x)
proc `F =`(fields: var Fields; x: Preserve) =
assert(x.kind == facet.fields[`F FeildOff`].value.kind, "invalid Preserves item for field type")
facet.actor.dataspace.dataflow.recordDamage(`F FieldId`)
facet.fields[`F FeildOff`].value = x
#[
# Some early macros
proc send() =
discard
proc generateField(stmt: NimNode): NimNode =
stmt.expectLen 3
let l = stmt[1]
let r = stmt[2]
case r.kind
of nnkStmtList:
r.expectLen 1
result = newNimNode(nnkVarSection).add(newIdentDefs(l, r[0]))
else:
raiseAssert("unhandled field " & r.treeRepr)
proc generateStopOnTuple(output: var NimNode; stmt: NimNode): NimNode =
stmt.expectLen 3
let
cond = stmt[1]
body = stmt[2]
# symCond = genSym(ident="stopOnCond")
symStop = genSym(ident="stopOnCb")
# output.add(newProc(name = symCond, params = [ident"bool"], body = cond))
output.add(newProc(name = symStop, body = body))
newPar(newColonExpr(ident"cond", cond), newColonExpr(ident"body", symStop))
macro spawn*(label: string; body: untyped) =
## Spawn actor.
result = newNimNode(nnkStmtList)
let blockBody = newNimNode(nnkStmtList)
let actorSym = genSym(nskVar, label.strVal)
result.add(newNimNode(nnkVarSection).add(
newIdentDefs(actorSym, ident"Actor")))
var stopOnSeq = newNimNode(nnkBracket)
body.expectKind nnkStmtList
for stmt in body:
case stmt.kind
of nnkCommentStmt:
result.add(stmt)
of nnkCommand:
let cmd = stmt[0]
cmd.expectKind nnkIdent
if eqIdent(cmd, "field"):
result.add(generateField(stmt))
elif eqIdent(cmd, "stopOn"):
let stop = generateStopOnTuple(result, stmt)
stopOnSeq.add(stop)
else:
raiseAssert("unhandled spawn command " & cmd.repr)
else:
raiseAssert("unhandled statment " & stmt.treeRepr)
result.add(
newAssignment(newDotExpr(actorSym, ident"stops"), prefix(stopOnSeq, "@")))
#result.add(newBlockStmt(blockBody))
echo result.repr
# echo result.treeRepr
macro dumpStuff*(body: untyped): untyped =
echo body.treeRepr
]#

View File

@ -0,0 +1,11 @@
# SPDX-License-Identifier: ISC
import preserves
const
Discard* = RecordClass(label: symbol"discard", arity: 0)
Capture* = RecordClass(label: symbol"capture", arity: 1)
Observe* = RecordClass(label: symbol"observe", arity: 1)
Inbound* = RecordClass(label: symbol"inbound", arity: 1)
Outbound* = RecordClass(label: symbol"outbound", arity: 1)
Instance* = RecordClass(label: symbol"instance", arity: 1)

37
src/syndicate/bags.nim Normal file
View File

@ -0,0 +1,37 @@
# SPDX-License-Identifier: ISC
## An unordered association of items to counts.
## An item count may be negative, unlike CountTable.
import tables
type
ChangeDescription* = enum
cdPresentToAbsent,
cdAbsentToAbsent,
cdAbsentToPresent,
cdPresentToPresent
Bag*[T] = Table[T, int]
proc change(count: var int; delta: int; clamp: bool): ChangeDescription =
var
oldCount = count
newCount = oldCount + delta
if clamp:
newCount = max(0, newCount)
if newCount == 0:
result =
if oldCount == 0: cdAbsentToAbsent
else: cdPresentToAbsent
else:
result =
if oldCount == 0: cdAbsentToPresent
else: cdPresentToPresent
count = newCount
proc change*[T](bag: var Bag[T]; key: T; delta: int; clamp = false): ChangeDescription =
assert(delta != 0)
result = change(bag.mGetOrPut(key, 0), delta, clamp)
if result in {cdAbsentToAbsent, cdPresentToAbsent}:
bag.del(key)

View File

@ -0,0 +1,80 @@
# SPDX-License-Identifier: ISC
import preserves
import options, sets, tables
type
Set = HashSet
Graph*[SubjectId, ObjectId] = object
edgesForward: Table[ObjectId, Set[SubjectId]]
edgesReverse: Table[SubjectId, Set[ObjectId]]
damagedNodes: Set[ObjectId]
currentSubjectId*: Option[SubjectId]
proc withSubject*[Sid, Oid, T](g: var Graph[Sid, Oid]; sid: Option[Sid];
body: proc (): T): T =
var oldSid = move g.currentSubjectId
g.currentSubjectId = sid
try: result = body()
finally: g.currentSubjectId = move oldSid
proc withSubject*[Sid, Oid](g: var Graph[Sid, Oid]; sid: Option[Sid];
body: proc ()) =
var oldSid = move g.currentSubjectId
g.currentSubjectId = sid
try: body()
finally: g.currentSubjectId = move oldSid
proc recordObservation*[Sid, Oid](g: var Graph[Sid, Oid]; oid: Oid) =
if g.currentSubjectId.isSome:
let sid = g.currentSubjectId.get
if not g.edgesForward.hasKey(oid):
g.edgesForward[oid] = initHashSet[Sid]()
g.edgesForward[oid].incl(sid)
if not g.edgesReverse.hasKey(sid):
g.edgesReverse[sid] = initHashSet[Oid]()
g.edgesReverse[sid].incl(oid)
# TODO: double lookups here
proc recordDamage*[Sid, Oid](g: var Graph[Sid, Oid]; oid: Oid) =
g.damagedNodes.incl(oid)
proc forgetSubject*[Sid, Oid](g: var Graph[Sid, Oid]; sid: Sid) =
var subjectObjects: Set[Oid]
if g.edgesReverse.pop(sid, subjectObjects):
for oid in subjectObjects:
g.edgesForward.del(oid)
iterator observersOf[Sid, Oid](g: Graph[Sid, Oid]; oid: Oid): Sid =
if g.edgesForward.hasKey(oid):
for sid in g.edgesForward[oid]: yield sid
proc repairDamage*[Sid, Oid](g: var Graph[Sid, Oid]; repairNode: proc (sid: Sid) {.gcsafe.}) =
var repairedThisRound: Set[Oid]
while true:
var workSet = move g.damagedNodes
assert(g.damagedNodes.len == 0)
var alreadyDamaged = workSet * repairedThisRound
if alreadyDamaged.len > 0:
echo "Cyclic dependencies involving ", alreadyDamaged
workSet = workSet - repairedThisRound
repairedThisRound = repairedThisRound + workSet
if workSet.len == 0: break
for oid in workSet:
for sid in g.observersOf(oid):
g.forgetSubject(sid)
g.withSubject(some sid) do: repairNode(sid)
proc defineObservableProperty*[Sid, Oid](g: var Graph[Sid, Oid]; oid: Oid) =
assert(not g.edgesForward.hasKey(oid))
g.edgesForward[oid] = initHashSet[Sid]()
g.recordDamage(oid)
proc addSubject*[Sid, Oid](g: var Graph[Sid, Oid]; sid: Sid) =
assert(not g.edgesReverse.hasKey(sid))
g.edgesReverse[sid] = initHashSet[Oid]()

3
src/syndicate/events.nim Normal file
View File

@ -0,0 +1,3 @@
# SPDX-License-Identifier: ISC
type EventKind* = enum addedEvent, removedEvent, messageEvent

297
src/syndicate/skeletons.nim Normal file
View File

@ -0,0 +1,297 @@
# SPDX-License-Identifier: ISC
import ./assertions, ./bags, ./events
import preserves
import lists, options, sets, tables
type
Value = Preserve
NonEmptySkeleton*[Shape] = object
shape: Shape
members: seq[Skeleton[Shape]]
Skeleton*[Shape] = Option[NonEmptySkeleton[Shape]]
Path = seq[Natural]
proc projectPath(v: Value; path: Path): Value =
result = v
for index in path:
result = result[index]
proc projectPaths(v: Value; paths: seq[Path]): seq[Value] =
result.setLen(paths.len)
for i, path in paths: result[i] = projectPath(v, path)
type
Shape = string
HandlerCallback* = proc (event: EventKind; bindings: seq[Value]) {.gcsafe.}
Analysis* = object
skeleton: Skeleton[Shape]
constPaths: seq[Path]
constVals: seq[Value]
capturePaths: seq[Path]
callback*: Option[HandlerCallback]
proc `$`(a: Analysis): string =
result.add "\n\t skeleton: "
result.add $a.skeleton
result.add "\n\t constPaths: "
result.add $a.constPaths
result.add "\n\t constVals: "
result.add $a.constVals
result.add "\n\t capturePaths: "
result.add $a.capturePaths
proc analyzeAssertion*(a: Value): Analysis =
var path: Path
proc walk(analysis: var Analysis; a: Value): Skeleton[Shape] =
if Capture.isClassOf a:
analysis.capturePaths.add(path)
result = walk(analysis, a.fields[0])
elif Discard.isClassOf a:
discard
else:
if a.kind == pkRecord:
let class = classOf(a)
result = some NonEmptySkeleton[Shape](shape: $class)
path.add(0)
var i: int
for field in a.fields:
path[path.high] = i
result.get.members.add(walk(analysis, field))
inc(i)
discard path.pop
else:
analysis.constPaths.add(path)
analysis.constVals.add(a)
result.skeleton = walk(result, a)
type
Handler = ref object
cachedCaptures: Bag[seq[Value]]
callbacks: HashSet[HandlerCallback]
Leaf = ref object
cachedAssertions: HashSet[Value]
handlerMap: Table[seq[Path], Handler]
Continuation = ref object
cachedAssertions: HashSet[Value]
leafMap: Table[seq[Path], TableRef[seq[Value], Leaf]] # TODO: not TableRef?
Selector = tuple[popCount: int; index: int]
Node = ref object
edges: Table[Selector, Table[string, Node]]
continuation: Continuation
using
continuation: Continuation
leaf: Leaf
node: Node
proc `$`(leaf): string =
result.add "Leaf{ cached: "
result.add $leaf.cachedAssertions
result.add ", handler count: "
result.add $leaf.handlerMap.len
result.add " }"
proc `$`(continuation): string =
result.add "Continuation{ cached: "
result.add $continuation.cachedAssertions
result.add ", "
result.add $continuation.leafMap
result.add " }"
proc `$`(node): string =
result.add "Node{ "
result.add $node.continuation
result.add ", edges: "
result.add $node.edges
result.add "}"
proc isEmpty(leaf): bool =
leaf.cachedAssertions.len == 0 and leaf.handlerMap.len == 0
type
ContinuationProc = proc (c: Continuation; v: Value) {.gcsafe.}
LeafProc = proc (l: Leaf; v: Value) {.gcsafe.}
HandlerProc = proc (h: Handler; vs: seq[Value]) {.gcsafe.}
proc modify(node; operation: EventKind; outerValue: Value;
mCont: ContinuationProc; mLeaf: LeafProc; mHandler: HandlerProc) =
proc walkContinuation(continuation) {.gcsafe.}
proc walkNode(node; termStack: SinglyLinkedList[seq[Value]]) =
# TODO: use a seq for the stack?
walkContinuation(node.continuation)
for (selector, table) in node.edges.pairs:
var nextStack = termStack
for _ in 1..selector.popCount:
nextStack.head = nextStack.head.next
let nextValue = nextStack.head.value[selector.index]
if nextValue.isRecord:
let nextClass = classOf(nextValue)
let nextNode = table.getOrDefault($nextClass)
if not nextNode.isNil:
nextStack.prepend(nextValue.record)
walkNode(nextNode, nextStack)
proc walkContinuation(continuation: Continuation) =
mCont(continuation, outerValue)
for (constPaths, constValMap) in continuation.leafMap.pairs:
let constVals = projectPaths(outerValue, constPaths)
let leaf = constValMap.getOrDefault(constVals)
if leaf.isNil:
if operation == addedEvent:
constValMap[constVals] = Leaf()
else:
mLeaf(leaf, outerValue)
for (capturePaths, handler) in leaf.handlerMap.pairs:
mHandler(handler, projectPaths(outerValue, capturePaths))
if operation == removedEvent and leaf.isEmpty:
constValMap.del(constVals)
if constValMap.len == 0:
continuation.leafMap.del(constPaths)
var stack: SinglyLinkedList[seq[Value]]
stack.prepend(@[outerValue])
walkNode(node, stack)
proc extend*[Shape](node; skeleton: Skeleton[Shape]): Continuation =
var path: Path
proc walkNode(node: Node; popCount, index: int; skeleton: Skeleton[Shape]): tuple[popCount: int, node: Node] =
assert(not node.isNil)
if skeleton.isNone:
return (popCount, node)
else:
var
cls = skeleton.get.shape
table: Table[string, Node]
nextNode: Node
discard node.edges.pop((popCount, index), table)
if not table.pop(cls, nextNode):
nextNode = Node(continuation: Continuation())
for a in node.continuation.cachedAssertions:
if $classOf(projectPath(a, path)) == cls:
nextNode.continuation.cachedAssertions.incl(a)
block:
var
popCount = 0
index = 0
path.add(index)
for member in skeleton.get.members:
(popCount, nextNode) = walkNode(nextNode, popCount, index, member)
inc(index)
discard path.pop()
path.add(index)
discard path.pop()
result = (popCount.succ, nextNode)
table[cls] = nextNode
node.edges[(popCount, index)] = table
walkNode(node, 0, 0, skeleton).node.continuation
type
Index* = object
allAssertions: Bag[Value]
root: Node
proc initIndex*(): Index =
result.root = Node(continuation: Continuation())
using index: Index
proc `$`*(index): string =
result.add "Index("
result.add ")Index"
proc addHandler*(index; res: Analysis; callback: HandlerCallback) =
assert(not index.root.isNil)
let
constPaths = res.constPaths
constVals = res.constVals
capturePaths = res.capturePaths
continuation = index.root.extend(res.skeleton)
assert(not continuation.isNil)
var constValMap = continuation.leafMap.getOrDefault(constPaths)
if constValMap.isNil:
constValMap = newTable[seq[Value], Leaf]()
for a in continuation.cachedAssertions:
var leaf: Leaf
if not constValMap.pop(a.sequence, leaf):
new leaf
leaf.cachedAssertions.incl(a)
constValMap[a.sequence] = leaf
continuation.leafMap[constPaths] = constValMap
var leaf = constValMap.getOrDefault(constVals)
if leaf.isNil:
new leaf
constValMap[constVals] = leaf
var handler = leaf.handlerMap.getOrDefault(capturePaths)
if handler.isNil:
new handler
for a in leaf.cachedAssertions:
let a = projectPaths(a, capturePaths)
if handler.cachedCaptures.contains(a):
discard handler.cachedCaptures.change(a, +1)
leaf.handlerMap[capturePaths] = handler
handler.callbacks.incl(callback)
for captures, count in handler.cachedCaptures.pairs:
callback(addedEvent, captures)
proc removeHandler*(index; res: Analysis; callback: HandlerCallback) =
let continuation = index.root.extend(res.skeleton)
try:
let
constValMap = continuation.leafMap[res.constPaths]
leaf = constValMap[res.constVals]
handler = leaf.handlerMap[res.capturePaths]
handler.callbacks.excl(callback)
if handler.callbacks.len == 0:
leaf.handlerMap.del(res.capturePaths)
if leaf.isEmpty:
constValMap.del(res.constVals)
if constValMap.len == 0:
continuation.leafMap.del(res.constPaths)
except KeyError: discard
proc adjustAssertion*(index: var Index; outerValue: Value; delta: int): ChangeDescription =
result = index.allAssertions.change(outerValue, delta)
case result
of cdAbsentToPresent:
index.root.modify(
addedEvent,
outerValue,
(proc (c: Continuation; v: Value) = c.cachedAssertions.incl(v)),
(proc (l: Leaf; v: Value) = l.cachedAssertions.incl(v)),
(proc (h: Handler; vs: seq[Value]) =
if h.cachedCaptures.change(vs, +1) == cdAbsentToPresent:
for cb in h.callbacks: cb(addedEvent, vs)))
of cdPresentToAbsent:
index.root.modify(
removedEvent,
outerValue,
(proc (c: Continuation; v: Value) = c.cachedAssertions.excl(v)),
(proc (l: Leaf; v: Value) = l.cachedAssertions.excl(v)),
(proc (h: Handler; vs: seq[Value]) =
if h.cachedCaptures.change(vs, -1) == cdPresentToAbsent:
for cb in h.callbacks: cb(removedEvent, vs)))
else:
discard
proc continuationNoop(c: Continuation; v: Value) = discard
proc leafNoop(l: Leaf; v: Value) = discard
proc deliverMessage*(index; v: Value; leafCb: proc (l: Leaf; v: Value) {.gcsafe.}) =
proc handlerCb(h: Handler; vs: seq[Value]) =
for cb in h.callbacks: cb(messageEvent, vs)
index.root.modify(messageEvent, v, continuationNoop, leafCb, handlerCb)
proc deliverMessage*(index; v: Value) =
proc handlerCb(h: Handler; vs: seq[Value]) =
for cb in h.callbacks: cb(messageEvent, vs)
index.root.modify(messageEvent, v, continuationNoop, leafNoop, handlerCb)

12
syndicate.nimble Normal file
View File

@ -0,0 +1,12 @@
# Package
version = "0.0.0"
author = "Emery Hemingway"
description = "Syndicated actors for conversational concurrency"
license = "ISC"
srcDir = "src"
# Dependencies
requires "nim >= 1.4.8", "preserves >= 0.2.0"

1
tests/config.nims Normal file
View File

@ -0,0 +1 @@
switch("path", "$projectDir/../src")

View File

@ -0,0 +1,73 @@
# SPDX-License-Identifier: ISC
import syndicate, syndicate/assertions, syndicate/events, syndicate/skeletons
import preserves
import asyncdispatch, tables, options
const N = 100000
const
`?_` = init(Discard)
`?$` = init(Capture, `?_`)
BoxState = RecordClass(label: symbol"BoxState", arity: 1)
SetBox = RecordClass(label: symbol"SetBox", arity: 1)
proc boot(facet: Facet) =
facet.spawn("box") do (facet: Facet):
facet.declareField(value, int, 0)
discard facet.addEndpoint do (facet: Facet) -> EndpointSpec:
# echo "recomputing published BoxState ", facet.fields.value
let a = BoxState.init(facet.fields.value.toPreserve)
result.assertion = some a
discard facet.addDataflow do (facet: Facet):
# echo "box dataflow saw new value ", facet.fields.value
if facet.fields.value == N:
facet.stop do (facet: Facet):
echo "terminated box root facet"
discard facet.addEndpoint do (facet: Facet) -> EndpointSpec:
const a = SetBox.init(`?$`)
result.analysis = some analyzeAssertion(a)
proc cb(facet: Facet; evt: EventKind; vs: seq[Value]) =
if evt == messageEvent:
facet.scheduleScript do (facet: Facet):
facet.fields.value = vs[0]
# echo "box updated value ", vs[0]
result.analysis.get.callback = some (facet.wrap cb)
const o = Observe.init(SetBox.init(`?$`))
result.assertion = some o
facet.spawn("client") do (facet: Facet):
discard facet.addEndpoint do (facet: Facet) -> EndpointSpec:
const a = BoxState.init(`?$`)
result.analysis = some analyzeAssertion(a)
proc cb(facet: Facet; evt: EventKind; vs: seq[Value]) =
if evt == addedEvent:
facet.scheduleScript do (facet: Facet):
let v = SetBox.init(vs[0].int.succ.toPreserve)
# echo "client sending ", v
facet.send(v)
result.analysis.get.callback = some (facet.wrap cb)
const o = Observe.init(BoxState.init(`?$`))
result.assertion = some o
discard facet.addEndpoint do (facet: Facet) -> EndpointSpec:
const a = BoxState.init(`?_`)
result.analysis = some analyzeAssertion(a)
proc cb(facet: Facet; evt: EventKind; vs: seq[Value]) =
if evt == removedEvent:
facet.scheduleScript do (facet: Facet):
echo "box gone"
result.analysis.get.callback = some (facet.wrap cb)
const o = Observe.init(BoxState.init(`?_`))
result.assertion = some o
facet.actor.dataspace.ground.addStopHandler do (_: Dataspace):
echo "stopping box-and-client"
waitFor bootModule(boot)