syndicate-nim/src/syndicate/dataflow.nim

82 lines
2.6 KiB
Nim

# SPDX-FileCopyrightText: 2021 ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import preserves
import options, sets, tables
type
Set = HashSet
Graph*[SubjectId, ObjectId] = object
edgesForward: Table[ObjectId, Set[SubjectId]]
edgesReverse: Table[SubjectId, Set[ObjectId]]
damagedNodes: Set[ObjectId]
currentSubjectId*: Option[SubjectId]
proc withSubject*[Sid, Oid, T](g: var Graph[Sid, Oid]; sid: Option[Sid];
body: proc (): T): T =
var oldSid = move g.currentSubjectId
g.currentSubjectId = sid
try: result = body()
finally: g.currentSubjectId = move oldSid
proc withSubject*[Sid, Oid](g: var Graph[Sid, Oid]; sid: Option[Sid];
body: proc ()) =
var oldSid = move g.currentSubjectId
g.currentSubjectId = sid
try: body()
finally: g.currentSubjectId = move oldSid
proc recordObservation*[Sid, Oid](g: var Graph[Sid, Oid]; oid: Oid) =
if g.currentSubjectId.isSome:
let sid = g.currentSubjectId.get
if not g.edgesForward.hasKey(oid):
g.edgesForward[oid] = initHashSet[Sid]()
g.edgesForward[oid].incl(sid)
if not g.edgesReverse.hasKey(sid):
g.edgesReverse[sid] = initHashSet[Oid]()
g.edgesReverse[sid].incl(oid)
# TODO: double lookups here
proc recordDamage*[Sid, Oid](g: var Graph[Sid, Oid]; oid: Oid) =
g.damagedNodes.incl(oid)
proc forgetSubject*[Sid, Oid](g: var Graph[Sid, Oid]; sid: Sid) =
var subjectObjects: Set[Oid]
if g.edgesReverse.pop(sid, subjectObjects):
for oid in subjectObjects:
g.edgesForward.del(oid)
iterator observersOf[Sid, Oid](g: Graph[Sid, Oid]; oid: Oid): Sid =
if g.edgesForward.hasKey(oid):
for sid in g.edgesForward[oid]: yield sid
proc repairDamage*[Sid, Oid](g: var Graph[Sid, Oid]; repairNode: proc (sid: Sid) {.gcsafe.}) =
var repairedThisRound: Set[Oid]
while true:
var workSet = move g.damagedNodes
assert(g.damagedNodes.len == 0)
var alreadyDamaged = workSet * repairedThisRound
if alreadyDamaged.len > 0:
echo "Cyclic dependencies involving ", alreadyDamaged
workSet = workSet - repairedThisRound
repairedThisRound = repairedThisRound + workSet
if workSet.len == 0: break
for oid in workSet:
for sid in g.observersOf(oid):
g.forgetSubject(sid)
g.withSubject(some sid) do: repairNode(sid)
proc defineObservableProperty*[Sid, Oid](g: var Graph[Sid, Oid]; oid: Oid) =
assert(not g.edgesForward.hasKey(oid))
g.edgesForward[oid] = initHashSet[Sid]()
g.recordDamage(oid)
proc addSubject*[Sid, Oid](g: var Graph[Sid, Oid]; sid: Sid) =
assert(not g.edgesReverse.hasKey(sid))
g.edgesReverse[sid] = initHashSet[Oid]()