Proper dataspaces
This commit is contained in:
parent
d8f6d82956
commit
1912574ed8
|
@ -3,8 +3,8 @@
|
|||
|
||||
import std/macros
|
||||
import preserves
|
||||
import syndicate/[actors, dataspaces, patterns]
|
||||
export patterns
|
||||
import syndicate/[actors, dataspaces, durings, patterns]
|
||||
export dataspaces, patterns
|
||||
|
||||
from syndicate/protocols/protocol import Handle
|
||||
|
||||
|
|
|
@ -36,3 +36,6 @@ proc change*[T](bag: var Bag[T]; key: T; delta: int; clamp = false): ChangeDescr
|
|||
result = change(bag.mGetOrPut(key, 0), delta, clamp)
|
||||
if result in {cdAbsentToAbsent, cdPresentToAbsent}:
|
||||
bag.del(key)
|
||||
|
||||
iterator items*[T](bag: Bag[T]): (int, T) =
|
||||
for k, v in bag: yield(v, k)
|
||||
|
|
|
@ -1,104 +1,46 @@
|
|||
# SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway
|
||||
# SPDX-FileCopyrightText: ☭ 2022 Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[hashes, macros, tables]
|
||||
import std/[hashes, tables]
|
||||
import preserves
|
||||
import ./actors, ./bags, ./patterns, ./protocols/dataspace
|
||||
import ./actors, ./protocols/dataspace, ./skeletons
|
||||
|
||||
from ./protocols/protocol import Handle
|
||||
|
||||
template trace(args: varargs[untyped]): untyped = stderr.writeLine(args)
|
||||
|
||||
type
|
||||
Observe = dataspace.Observe[Ref]
|
||||
Turn = actors.Turn
|
||||
|
||||
#[
|
||||
DataspaceEntity = ref object of Entity
|
||||
assertions: Bag[Assertion]
|
||||
subscriptions: Table[Assertion, TableRef[Ref, TableRef[Assertion, Handle]]]
|
||||
handleMap: Table[Handle, Assertion] # breaks toPreserve(Observe[Ref]())
|
||||
Dataspace {.final.} = ref object of Entity
|
||||
index: Index
|
||||
handleMap: Table[Handle, Assertion]
|
||||
|
||||
method publish(ds: DataspaceEntity; turn: var Turn; rec: Assertion; h: Handle) =
|
||||
if rec.isRecord:
|
||||
ds.handleMap[h] = rec
|
||||
if ds.assertions.change(rec, +1) == cdAbsentToPresent:
|
||||
method publish(ds: Dataspace; turn: var Turn; v: Assertion; h: Handle) =
|
||||
if add(ds.index, turn, v):
|
||||
var obs: Observe
|
||||
if obs.fromPreserve v:
|
||||
ds.index.add(turn, obs.pattern, unembed obs.observer)
|
||||
ds.handleMap[h] = v
|
||||
|
||||
method retract(ds: Dataspace; turn: var Turn; h: Handle) =
|
||||
try:
|
||||
let v = ds.handleMap[h]
|
||||
if remove(ds.index, turn, v):
|
||||
ds.handleMap.del h
|
||||
var obs: Observe
|
||||
if fromPreserve(obs, rec):
|
||||
var seen = newTable[Assertion, Handle]()
|
||||
for prev, count in ds.assertions.pairs:
|
||||
if prev == rec.label:
|
||||
seen[prev] = publish(turn, obs.observer.unembed, prev)
|
||||
var patternSubs = ds.subscriptions.getOrDefault(rec.label)
|
||||
if patternSubs.isNil:
|
||||
patternSubs = newTable[Ref, TableRef[Value, Handle]]()
|
||||
ds.subscriptions[rec.label] = patternSubs
|
||||
patternSubs[obs.observer.unembed] = move seen
|
||||
for peer, seen in ds.subscriptions[rec.label]:
|
||||
if rec notin seen:
|
||||
seen[rec] = publish(turn, peer, rec)
|
||||
if obs.fromPreserve v:
|
||||
ds.index.remove(turn, obs.pattern, unembed obs.observer)
|
||||
except KeyError: discard
|
||||
|
||||
method retract(ds: DataspaceEntity; turn: var Turn; upstreamHandle: Handle) =
|
||||
let rec = ds.handleMap.getOrDefault(upstreamHandle)
|
||||
if rec.isRecord:
|
||||
ds.handleMap.del upstreamHandle
|
||||
if ds.assertions.change(rec, -1) == cdPresentToAbsent:
|
||||
for peer, seen in ds.subscriptions[rec.label]:
|
||||
var h: Handle
|
||||
if pop(seen, rec, h): retract(turn, h)
|
||||
preserveTo(rec, Observe).map do (obs: Observe):
|
||||
let peerMap = ds.subscriptions[rec.label]
|
||||
peerMap.del(obs.observer.unembed)
|
||||
if peerMap.len == 0:
|
||||
ds.subscriptions.del(rec.label)
|
||||
method message(ds: Dataspace; turn: var Turn; v: Assertion) =
|
||||
ds.index.deliverMessage(turn, v)
|
||||
|
||||
method message(ds: DataspaceEntity; turn: var Turn; msg: Assertion) =
|
||||
if msg.isRecord:
|
||||
for peer, seen in ds.subscriptions[msg.label].pairs:
|
||||
message(turn, peer, msg)
|
||||
]#
|
||||
type BootProc = proc (ds: Ref; turn: var Turn) {.gcsafe.}
|
||||
|
||||
type DuringProc* = proc (turn: var Turn; a: Assertion): TurnAction {.gcsafe.}
|
||||
|
||||
type
|
||||
DuringActionKind = enum null, dead, act
|
||||
DuringAction = object
|
||||
case kind: DuringActionKind
|
||||
of null, dead: discard
|
||||
of act:
|
||||
action: TurnAction
|
||||
|
||||
type DuringEntity = ref object of Entity
|
||||
cb: DuringProc
|
||||
assertionMap: Table[Handle, DuringAction]
|
||||
|
||||
proc duringPublish(e: Entity; turn: var Turn; a: Assertion; h: Handle) =
|
||||
var de = DuringEntity(e)
|
||||
let action = de.cb(turn, a)
|
||||
# assert(not action.isNil "should have put in a no-op action")
|
||||
let g = de.assertionMap.getOrDefault h
|
||||
case g.kind
|
||||
of null:
|
||||
de.assertionMap[h] = DuringAction(kind: act, action: action)
|
||||
of dead:
|
||||
de.assertionMap.del h
|
||||
freshen(turn, action)
|
||||
of act:
|
||||
raiseAssert("during: duplicate handle in publish: " & $h)
|
||||
|
||||
proc duringRetract(e: Entity; turn: var Turn; h: Handle) =
|
||||
var de = DuringEntity(e)
|
||||
let g = de.assertionMap.getOrDefault h
|
||||
case g.kind
|
||||
of null:
|
||||
de.assertionMap[h] = DuringAction(kind: dead)
|
||||
of dead:
|
||||
raiseAssert("during: duplicate handle in retract: " & $h)
|
||||
of act:
|
||||
de.assertionMap.del h
|
||||
g.action(turn)
|
||||
|
||||
proc during*(cb: DuringProc): DuringEntity =
|
||||
result = DuringEntity(cb: cb)
|
||||
result.setProcs(publish = duringPublish, retract = duringRetract)
|
||||
|
||||
proc observe*(turn: var Turn; ds: Ref; pat: Pattern; e: Entity): Handle =
|
||||
publish(turn, ds, Observe(pattern: pat, observer: embed newRef(turn, e)))
|
||||
proc bootDataspace*(name: string; bootProc: BootProc): Actor {.discardable.} =
|
||||
bootActor(name) do (turn: var Turn):
|
||||
discard turn.facet.preventInertCheck()
|
||||
let ds = newRef(turn, Dataspace(index: initIndex()))
|
||||
bootProc(ds, turn)
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
# SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[hashes, macros, tables]
|
||||
import preserves
|
||||
import ./actors, ./patterns, ./protocols/dataspace
|
||||
|
||||
from ./protocols/protocol import Handle
|
||||
|
||||
type
|
||||
Observe = dataspace.Observe[Ref]
|
||||
Turn = actors.Turn
|
||||
|
||||
type
|
||||
DuringProc* = proc (turn: var Turn; a: Assertion): TurnAction {.gcsafe.}
|
||||
DuringActionKind = enum null, dead, act
|
||||
DuringAction = object
|
||||
case kind: DuringActionKind
|
||||
of null, dead: discard
|
||||
of act:
|
||||
action: TurnAction
|
||||
DuringEntity {.final.}= ref object of Entity
|
||||
cb: DuringProc
|
||||
assertionMap: Table[Handle, DuringAction]
|
||||
|
||||
proc duringPublish(e: Entity; turn: var Turn; a: Assertion; h: Handle) =
|
||||
var de = DuringEntity(e)
|
||||
let action = de.cb(turn, a)
|
||||
# assert(not action.isNil "should have put in a no-op action")
|
||||
let g = de.assertionMap.getOrDefault h
|
||||
case g.kind
|
||||
of null:
|
||||
de.assertionMap[h] = DuringAction(kind: act, action: action)
|
||||
of dead:
|
||||
de.assertionMap.del h
|
||||
freshen(turn, action)
|
||||
of act:
|
||||
raiseAssert("during: duplicate handle in publish: " & $h)
|
||||
|
||||
proc duringRetract(e: Entity; turn: var Turn; h: Handle) =
|
||||
var de = DuringEntity(e)
|
||||
let g = de.assertionMap.getOrDefault h
|
||||
case g.kind
|
||||
of null:
|
||||
de.assertionMap[h] = DuringAction(kind: dead)
|
||||
of dead:
|
||||
raiseAssert("during: duplicate handle in retract: " & $h)
|
||||
of act:
|
||||
de.assertionMap.del h
|
||||
g.action(turn)
|
||||
|
||||
proc during*(cb: DuringProc): DuringEntity =
|
||||
result = DuringEntity(cb: cb)
|
||||
result.setProcs(publish = duringPublish, retract = duringRetract)
|
||||
|
||||
proc observe*(turn: var Turn; ds: Ref; pat: Pattern; e: Entity): Handle =
|
||||
publish(turn, ds, Observe(pattern: pat, observer: embed newRef(turn, e)))
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
import std/[asyncdispatch, options, tables]
|
||||
import preserves, preserves/parse
|
||||
import ./actors, ./dataspaces, ./membranes, ./protocols/[protocol, sturdy]
|
||||
import ./actors, ./durings, ./membranes, ./protocols/[protocol, sturdy]
|
||||
|
||||
when defined(traceSyndicate):
|
||||
template trace(args: varargs[untyped]): untyped = echo(args)
|
||||
|
|
|
@ -1,29 +1,15 @@
|
|||
# SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import ./assertions, ./bags, ./events
|
||||
import preserves, preserves/records
|
||||
import lists, options, sets, tables
|
||||
import std/[hashes, lists, options, sets, tables]
|
||||
import preserves
|
||||
import ./actors, ./bags, ./patterns
|
||||
|
||||
template trace(args: varargs[untyped]) = stderr.writeLine(args)
|
||||
|
||||
type
|
||||
NonEmptySkeleton*[Shape] = object
|
||||
shape: Shape
|
||||
members: seq[Skeleton[Shape]]
|
||||
Skeleton*[Shape] = Option[NonEmptySkeleton[Shape]]
|
||||
|
||||
Shape = string
|
||||
|
||||
Value = Preserve
|
||||
|
||||
HandlerCallback* = proc (event: EventKind; bindings: seq[Value]) {.gcsafe.}
|
||||
|
||||
Path = seq[Natural]
|
||||
|
||||
Analysis* = object
|
||||
skeleton: Skeleton[Shape]
|
||||
constPaths: seq[Path]
|
||||
constVals: seq[Value]
|
||||
capturePaths: seq[Path]
|
||||
Value = Preserve[Ref]
|
||||
Path = seq[Value]
|
||||
|
||||
proc projectPath(v: Value; path: Path): Value =
|
||||
result = v
|
||||
|
@ -33,50 +19,57 @@ proc projectPath(v: Value; path: Path): Value =
|
|||
proc projectPaths(v: Value; paths: seq[Path]): seq[Value] =
|
||||
result.setLen(paths.len)
|
||||
for i, path in paths: result[i] = projectPath(v, path)
|
||||
trace "projected ", v, " by paths ", paths, " to ", result
|
||||
|
||||
proc analyzeAssertion*(a: Value): Analysis =
|
||||
var path: Path
|
||||
proc walk(analysis: var Analysis; a: Value): Skeleton[Shape] =
|
||||
if a.preserveTo(Discard).isSome:
|
||||
discard
|
||||
elif a.preserveTo(Capture).isSome:
|
||||
analysis.capturePaths.add(path)
|
||||
result = walk(analysis, a.fields[0])
|
||||
else:
|
||||
if a.kind == pkRecord:
|
||||
let class = classOf(a)
|
||||
result = some NonEmptySkeleton[Shape](shape: $class)
|
||||
path.add(0)
|
||||
var i: int
|
||||
for field in a.fields:
|
||||
path[path.high] = i
|
||||
result.get.members.add(walk(analysis, field))
|
||||
inc(i)
|
||||
discard path.pop
|
||||
else:
|
||||
analysis.constPaths.add(path)
|
||||
analysis.constVals.add(a)
|
||||
result.skeleton = walk(result, a)
|
||||
type Class = distinct string
|
||||
|
||||
proc hash(cls: Class): Hash {.borrow.}
|
||||
proc `==`(x, y: Class): bool {.borrow.}
|
||||
|
||||
proc classOf*(v: Value): Class =
|
||||
if v.isRecord:
|
||||
result = Class $v.label & "/" & $v.arity
|
||||
elif v.isSequence:
|
||||
result = Class $v.len
|
||||
elif v.isDictionary:
|
||||
result = Class "{}"
|
||||
|
||||
proc classOf*(p: Pattern): Class =
|
||||
if p.orKind == PatternKind.DCompound:
|
||||
case p.dcompound.orKind
|
||||
of DCompoundKind.rec:
|
||||
result = Class $p.dcompound.rec.label & "/" & $p.dcompound.rec.fields.len
|
||||
of DCompoundKind.arr:
|
||||
result = Class $p.dcompound.arr.items.len
|
||||
of DCompoundKind.dict:
|
||||
result = Class "{}"
|
||||
|
||||
proc step(value, index: Value): Value =
|
||||
try: result = value[index]
|
||||
except KeyError, ValueError:
|
||||
trace "step failed, ", index, " not in ", value
|
||||
|
||||
type
|
||||
Handler = ref object
|
||||
cachedCaptures: Bag[seq[Value]]
|
||||
callbacks: HashSet[HandlerCallback]
|
||||
EventKind = enum addedEvent, removedEvent, messageEvent
|
||||
|
||||
AssertionCache = HashSet[Value]
|
||||
|
||||
ObserverGroup = ref object
|
||||
cachedCaptures: Bag[seq[Value]]
|
||||
observers: Table[Ref, TableRef[seq[Value], Handle]]
|
||||
|
||||
Leaf = ref object
|
||||
cachedAssertions: AssertionCache
|
||||
handlerMap: Table[seq[Path], Handler]
|
||||
observerGroups: Table[seq[Path], ObserverGroup]
|
||||
|
||||
Continuation = ref object
|
||||
cachedAssertions: AssertionCache
|
||||
leafMap: Table[seq[Path], TableRef[seq[Value], Leaf]] # TODO: not TableRef?
|
||||
leafMap: Table[seq[Path], TableRef[seq[Value], Leaf]]
|
||||
|
||||
Selector = tuple[popCount: int; index: int]
|
||||
Selector = tuple[popCount: int; index: Value]
|
||||
|
||||
Node = ref object
|
||||
edges: Table[Selector, TableRef[string, Node]]
|
||||
edges: Table[Selector, TableRef[Class, Node]]
|
||||
continuation: Continuation
|
||||
|
||||
using
|
||||
|
@ -85,87 +78,96 @@ using
|
|||
node: Node
|
||||
|
||||
proc isEmpty(leaf): bool =
|
||||
leaf.cachedAssertions.len == 0 and leaf.handlerMap.len == 0
|
||||
leaf.cachedAssertions.len == 0 and leaf.observerGroups.len == 0
|
||||
|
||||
type
|
||||
ContinuationProc = proc (c: Continuation; v: Value) {.gcsafe.}
|
||||
LeafProc = proc (l: Leaf; v: Value) {.gcsafe.}
|
||||
HandlerProc = proc (h: Handler; vs: seq[Value]) {.gcsafe.}
|
||||
ObserverProc = proc (turn: var Turn; h: ObserverGroup; vs: seq[Value]) {.gcsafe.}
|
||||
|
||||
proc modify(node; operation: EventKind; outerValue: Value;
|
||||
mCont: ContinuationProc; mLeaf: LeafProc; mHandler: HandlerProc) =
|
||||
proc modify(node; turn: var Turn; operation: EventKind; outerValue: Value;
|
||||
mCont: ContinuationProc; mLeaf: LeafProc; mObserverGroup: ObserverProc) =
|
||||
trace "modify node with outerValue ", outerValue
|
||||
|
||||
proc walkContinuation(continuation) {.gcsafe.}
|
||||
|
||||
proc walkNode(node; termStack: SinglyLinkedList[seq[Value]]) =
|
||||
# TODO: use a seq for the stack?
|
||||
walkContinuation(node.continuation)
|
||||
proc walkNode(turn: var Turn; node; termStack: SinglyLinkedList[Value]) =
|
||||
mCont(node.continuation, outerValue)
|
||||
if node.continuation.leafMap.len == 0:
|
||||
trace "node.continuation leafMap is empty"
|
||||
for (constPaths, constValMap) in node.continuation.leafMap.pairs:
|
||||
trace "got entry in node.continuation.leafMap for ", constPaths
|
||||
let constValues = projectPaths(outerValue, constPaths)
|
||||
var leaf = constValMap.getOrDefault(constValues)
|
||||
if leaf.isNil and operation == addedEvent:
|
||||
new leaf
|
||||
constValMap[constValues] = leaf
|
||||
if not leaf.isNil:
|
||||
mLeaf(leaf, outerValue)
|
||||
for (capturePaths, observerGroup) in leaf.observerGroups.pairs:
|
||||
mObserverGroup(turn, observerGroup, projectPaths(outerValue, capturePaths))
|
||||
if operation == removedEvent and leaf.isEmpty:
|
||||
constValMap.del(constValues)
|
||||
if constValues.len == 0:
|
||||
node.continuation.leafMap.del(constPaths)
|
||||
for (selector, table) in node.edges.pairs:
|
||||
var nextStack = termStack
|
||||
for _ in 1..selector.popCount:
|
||||
nextStack.head = nextStack.head.next
|
||||
let nextValue = nextStack.head.value[selector.index]
|
||||
if nextValue.isRecord:
|
||||
let nextClass = classOf(nextValue)
|
||||
let nextNode = table.getOrDefault($nextClass)
|
||||
trace "step ", nextStack.head.value, " with ", selector.index
|
||||
let
|
||||
nextValue = step(nextStack.head.value, selector.index)
|
||||
nextClass = classOf nextValue
|
||||
if nextClass != Class"":
|
||||
let nextNode = table.getOrDefault(nextClass)
|
||||
if not nextNode.isNil:
|
||||
nextStack.prepend(nextValue.record)
|
||||
walkNode(nextNode, nextStack)
|
||||
nextStack.prepend(nextValue)
|
||||
walkNode(turn, nextNode, nextStack)
|
||||
|
||||
proc walkContinuation(continuation: Continuation) =
|
||||
mCont(continuation, outerValue)
|
||||
for (constPaths, constValMap) in continuation.leafMap.pairs:
|
||||
let constVals = projectPaths(outerValue, constPaths)
|
||||
let leaf = constValMap.getOrDefault(constVals)
|
||||
if leaf.isNil:
|
||||
if operation == addedEvent:
|
||||
constValMap[constVals] = Leaf()
|
||||
else:
|
||||
mLeaf(leaf, outerValue)
|
||||
for (capturePaths, handler) in leaf.handlerMap.pairs:
|
||||
mHandler(handler, projectPaths(outerValue, capturePaths))
|
||||
if operation == removedEvent and leaf.isEmpty:
|
||||
constValMap.del(constVals)
|
||||
if constValMap.len == 0:
|
||||
continuation.leafMap.del(constPaths)
|
||||
var stack: SinglyLinkedList[seq[Value]]
|
||||
stack.prepend(@[outerValue])
|
||||
walkNode(node, stack)
|
||||
var stack: SinglyLinkedList[Value]
|
||||
stack.prepend(outerValue)
|
||||
walkNode(turn, node, stack)
|
||||
|
||||
proc extend[Shape](node; skeleton: Skeleton[Shape]): Continuation =
|
||||
proc extend(node: Node; pat: Pattern): Continuation =
|
||||
trace "extend node with ", pat
|
||||
var path: Path
|
||||
|
||||
proc walkNode(node; popCount, index: int; skeleton: Skeleton[Shape]): tuple[popCount: int, node: Node] =
|
||||
assert(not node.isNil)
|
||||
if skeleton.isNone:
|
||||
return (popCount, node)
|
||||
else:
|
||||
let selector: Selector = (popCount, index)
|
||||
var
|
||||
cls = skeleton.get.shape
|
||||
var
|
||||
table = node.edges.getOrDefault(selector)
|
||||
proc walkNode(node: Node; popCount: Natural; stepIndex: Value; pat: Pattern): tuple[popCount: Natural, nextNode: Node] =
|
||||
trace "walkNode step ", stepIndex, " of ", pat
|
||||
case pat.orKind
|
||||
of PatternKind.DDiscard, PatternKind.DLit: result = (popCount, node)
|
||||
of PatternKind.DBind: result = walkNode(node, popCount, stepIndex, pat.dbind.pattern)
|
||||
of PatternKind.DCompound:
|
||||
let selector = (popCount, stepIndex,)
|
||||
var table = node.edges.getOrDefault(selector)
|
||||
if table.isNil:
|
||||
table = newTable[string, Node]()
|
||||
trace "allocate new table for selector ", selector, " for ", pat
|
||||
table = newTable[Class, Node]()
|
||||
node.edges[selector] = table
|
||||
var nextNode = table.getOrDefault(cls)
|
||||
if nextNode.isNil:
|
||||
nextNode = Node(continuation: Continuation())
|
||||
table[cls] = nextNode
|
||||
else:
|
||||
trace "got a table for ", pat, " with selector ", selector
|
||||
let class = classOf pat
|
||||
result.nextNode = table.getOrDefault(class)
|
||||
if result.nextNode.isNil:
|
||||
trace "allocate result.nextNode for ", string class
|
||||
new result.nextNode
|
||||
table[class] = result.nextNode
|
||||
new result.nextNode.continuation
|
||||
for a in node.continuation.cachedAssertions:
|
||||
if $classOf(projectPath(a, path)) == cls:
|
||||
nextNode.continuation.cachedAssertions.incl(a)
|
||||
block:
|
||||
var popCount, index: int
|
||||
path.add(index)
|
||||
for member in skeleton.get.members:
|
||||
(popCount, nextNode) = walkNode(nextNode, result.popCount, index, member)
|
||||
inc(index)
|
||||
discard path.pop()
|
||||
path.add(index)
|
||||
if class == classOf projectPath(a, path):
|
||||
result.nextNode.continuation.cachedAssertions.incl a
|
||||
result.popCount = 0
|
||||
template walkKey(pat: Pattern; stepIndex: Value) =
|
||||
trace "walkKey ", pat, " with step ", stepIndex
|
||||
path.add(stepIndex)
|
||||
result = walkNode(result.nextNode, popCount, stepIndex, pat)
|
||||
discard path.pop()
|
||||
result = (popCount.succ, nextNode)
|
||||
walkNode(node, 0, 0, skeleton).node.continuation
|
||||
case pat.dcompound.orKind
|
||||
of DCompoundKind.rec:
|
||||
for k, e in pat.dcompound.rec.fields: walkKey(e, k.toPreserve(Ref))
|
||||
of DCompoundKind.arr:
|
||||
for k, e in pat.dcompound.arr.items: walkKey(e, k.toPreserve(Ref))
|
||||
of DCompoundKind.dict:
|
||||
for k, e in pat.dcompound.dict.entries: walkKey(e, k)
|
||||
result.popCount.inc
|
||||
walkNode(node, 0, toPreserve(0, Ref), pat).nextNode.continuation
|
||||
|
||||
type
|
||||
Index* = object
|
||||
|
@ -175,93 +177,103 @@ type
|
|||
proc initIndex*(): Index =
|
||||
Index(root: Node(continuation: Continuation()))
|
||||
|
||||
using index: Index
|
||||
|
||||
proc addHandler*(index; res: Analysis; callback: HandlerCallback) =
|
||||
assert(not index.root.isNil)
|
||||
proc add*(index: var Index; turn: var Turn; pattern: Pattern; observer: Ref) =
|
||||
trace "add pattern ", pattern, " for ", observer
|
||||
let
|
||||
constPaths = res.constPaths
|
||||
constVals = res.constVals
|
||||
capturePaths = res.capturePaths
|
||||
continuation = index.root.extend(res.skeleton)
|
||||
var constValMap = continuation.leafMap.getOrDefault(constPaths)
|
||||
analysis = analyse pattern
|
||||
continuation = index.root.extend pattern
|
||||
var constValMap = continuation.leafMap.getOrDefault(analysis.constPaths)
|
||||
if constValMap.isNil:
|
||||
constValMap = newTable[seq[Value], Leaf]()
|
||||
continuation.leafMap[constPaths] = constValMap
|
||||
trace "allocate constValMap in leafMap for ", analysis.constPaths
|
||||
new constValMap
|
||||
for a in continuation.cachedAssertions:
|
||||
let key = projectPaths(a, constPaths)
|
||||
let key = projectPaths(a, analysis.constPaths)
|
||||
var leaf = constValMap.getOrDefault(key)
|
||||
if leaf.isNil:
|
||||
new leaf
|
||||
constValMap[key] = leaf
|
||||
leaf.cachedAssertions.incl(a)
|
||||
var leaf = constValMap.getOrDefault(constVals)
|
||||
trace "update leafMap for ", analysis.constPaths
|
||||
continuation.leafMap[analysis.constPaths] = constValMap
|
||||
var leaf = constValMap.getOrDefault(analysis.constValues)
|
||||
if leaf.isNil:
|
||||
new leaf
|
||||
constValMap[constVals] = leaf
|
||||
var handler = leaf.handlerMap.getOrDefault(capturePaths)
|
||||
if handler.isNil:
|
||||
new handler
|
||||
leaf.handlerMap[capturePaths] = handler
|
||||
constValMap[analysis.constValues] = leaf
|
||||
trace "get observerGroup for ", analysis.capturePaths
|
||||
var observerGroup = leaf.observerGroups.getOrDefault(analysis.capturePaths)
|
||||
if observerGroup.isNil:
|
||||
trace "allocate observerGroup for ", analysis.capturePaths
|
||||
new observerGroup
|
||||
for a in leaf.cachedAssertions:
|
||||
let a = projectPaths(a, capturePaths)
|
||||
if handler.cachedCaptures.contains(a):
|
||||
discard handler.cachedCaptures.change(a, +1)
|
||||
handler.callbacks.incl(callback)
|
||||
for captures, count in handler.cachedCaptures.pairs:
|
||||
callback(addedEvent, captures)
|
||||
discard observerGroup.cachedCaptures.change(projectPaths(a, analysis.capturePaths), +1)
|
||||
leaf.observerGroups[analysis.capturePaths] = observerGroup
|
||||
var captureMap = newTable[seq[Value], Handle]()
|
||||
for (count, captures) in observerGroup.cachedCaptures:
|
||||
captureMap[captures] = publish(turn, observer, captures)
|
||||
observerGroup.observers[observer] = captureMap
|
||||
|
||||
proc removeHandler*(index; res: Analysis; callback: HandlerCallback) =
|
||||
let continuation = index.root.extend(res.skeleton)
|
||||
try:
|
||||
let
|
||||
constValMap = continuation.leafMap[res.constPaths]
|
||||
leaf = constValMap[res.constVals]
|
||||
handler = leaf.handlerMap[res.capturePaths]
|
||||
handler.callbacks.excl(callback)
|
||||
if handler.callbacks.len == 0:
|
||||
leaf.handlerMap.del(res.capturePaths)
|
||||
if leaf.isEmpty:
|
||||
constValMap.del(res.constVals)
|
||||
if constValMap.len == 0:
|
||||
continuation.leafMap.del(res.constPaths)
|
||||
except KeyError: discard
|
||||
proc remove*(index: var Index; turn: var Turn; pattern: Pattern; observer: Ref) =
|
||||
var
|
||||
analysis = analyse pattern
|
||||
continuation = index.root.extend pattern
|
||||
let constValMap = continuation.leafMap.getOrDefault(analysis.constPaths)
|
||||
if not constValMap.isNil:
|
||||
let leaf = constValMap.getOrDefault(analysis.constValues)
|
||||
if not leaf.isNil:
|
||||
let observerGroup = leaf.observerGroups.getOrDefault(analysis.capturePaths)
|
||||
if not observerGroup.isNil:
|
||||
let captureMap = observerGroup.observers.getOrDefault(observer)
|
||||
if not captureMap.isNil:
|
||||
for handle in captureMap.values: retract(observer.target, turn, handle)
|
||||
observerGroup.observers.del(observer)
|
||||
if observerGroup.observers.len == 0:
|
||||
leaf.observerGroups.del(analysis.capturePaths)
|
||||
if leaf.isEmpty:
|
||||
constValMap.del(analysis.constValues)
|
||||
if constValMap.len == 0:
|
||||
continuation.leafMap.del(analysis.constPaths)
|
||||
|
||||
proc adjustAssertion*(index: var Index; outerValue: Value; delta: int): ChangeDescription =
|
||||
result = index.allAssertions.change(outerValue, delta)
|
||||
case result
|
||||
proc adjustAssertion*(index: var Index; turn: var Turn; outerValue: Value; delta: int): bool =
|
||||
case index.allAssertions.change(outerValue, delta)
|
||||
of cdAbsentToPresent:
|
||||
result = true
|
||||
index.root.modify(
|
||||
turn,
|
||||
addedEvent,
|
||||
outerValue,
|
||||
(proc (c: Continuation; v: Value) = c.cachedAssertions.incl(v)),
|
||||
(proc (l: Leaf; v: Value) = l.cachedAssertions.incl(v)),
|
||||
(proc (h: Handler; vs: seq[Value]) =
|
||||
if h.cachedCaptures.change(vs, +1) == cdAbsentToPresent:
|
||||
#debugEcho " assertion of ", outerValue
|
||||
for cb in h.callbacks: cb(addedEvent, vs)))
|
||||
(proc (turn: var Turn; group: ObserverGroup; vs: seq[Value]) =
|
||||
if group.cachedCaptures.change(vs, +1) == cdAbsentToPresent:
|
||||
for (observer, captureMap) in group.observers.pairs:
|
||||
let a = vs.toPreserve(Ref)
|
||||
trace "publish to dataspace observer ", observer, " ", a
|
||||
captureMap[vs] = publish(turn, observer, a)))
|
||||
# TODO: this handle is coming from the facet?
|
||||
of cdPresentToAbsent:
|
||||
result = true
|
||||
index.root.modify(
|
||||
turn,
|
||||
removedEvent,
|
||||
outerValue,
|
||||
(proc (c: Continuation; v: Value) = c.cachedAssertions.excl(v)),
|
||||
(proc (l: Leaf; v: Value) = l.cachedAssertions.excl(v)),
|
||||
(proc (h: Handler; vs: seq[Value]) =
|
||||
if h.cachedCaptures.change(vs, -1) == cdPresentToAbsent:
|
||||
#debugEcho "retraction of ", outerValue
|
||||
for cb in h.callbacks: cb(removedEvent, vs)))
|
||||
else:
|
||||
discard
|
||||
(proc (turn: var Turn; group: ObserverGroup; vs: seq[Value]) =
|
||||
if group.cachedCaptures.change(vs, -1) == cdPresentToAbsent:
|
||||
for (observer, captureMap) in group.observers.pairs:
|
||||
retract(observer.target, turn, captureMap[vs])
|
||||
captureMap.del(vs)))
|
||||
else: discard
|
||||
|
||||
proc continuationNoop(c: Continuation; v: Value) = discard
|
||||
proc leafNoop(l: Leaf; v: Value) = discard
|
||||
|
||||
proc deliverMessage*(index; v: Value; leafCb: proc (l: Leaf; v: Value) {.gcsafe.}) =
|
||||
proc handlerCb(h: Handler; vs: seq[Value]) =
|
||||
for cb in h.callbacks: cb(messageEvent, vs)
|
||||
index.root.modify(messageEvent, v, continuationNoop, leafCb, handlerCb)
|
||||
proc add*(index: var Index; turn: var Turn; v: Assertion): bool =
|
||||
adjustAssertion(index, turn, v, +1)
|
||||
proc remove*(index: var Index; turn: var Turn; v: Assertion): bool =
|
||||
adjustAssertion(index, turn, v, -1)
|
||||
|
||||
proc deliverMessage*(index; v: Value) =
|
||||
proc handlerCb(h: Handler; vs: seq[Value]) =
|
||||
for cb in h.callbacks: cb(messageEvent, vs)
|
||||
index.root.modify(messageEvent, v, continuationNoop, leafNoop, handlerCb)
|
||||
proc deliverMessage*(index: Index; turn: var Turn; v: Value) =
|
||||
proc observersCb(turn: var Turn; group: ObserverGroup; vs: seq[Value]) =
|
||||
for observer in group.observers.keys: message(turn, observer, vs)
|
||||
index.root.modify(turn, messageEvent, v, continuationNoop, leafNoop, observersCb)
|
||||
|
|
Loading…
Reference in New Issue