From 1912574ed8e9c98e86fb244f4d81e3391d2ac25e Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Thu, 10 Mar 2022 17:30:07 -0600 Subject: [PATCH] Proper dataspaces --- src/syndicate.nim | 4 +- src/syndicate/bags.nim | 3 + src/syndicate/dataspaces.nim | 120 +++--------- src/syndicate/durings.nim | 57 ++++++ src/syndicate/relays.nim | 2 +- src/syndicate/skeletons.nim | 360 ++++++++++++++++++----------------- 6 files changed, 280 insertions(+), 266 deletions(-) create mode 100644 src/syndicate/durings.nim diff --git a/src/syndicate.nim b/src/syndicate.nim index e56fad4..81d7d99 100644 --- a/src/syndicate.nim +++ b/src/syndicate.nim @@ -3,8 +3,8 @@ import std/macros import preserves -import syndicate/[actors, dataspaces, patterns] -export patterns +import syndicate/[actors, dataspaces, durings, patterns] +export dataspaces, patterns from syndicate/protocols/protocol import Handle diff --git a/src/syndicate/bags.nim b/src/syndicate/bags.nim index b7545f1..5639f03 100644 --- a/src/syndicate/bags.nim +++ b/src/syndicate/bags.nim @@ -36,3 +36,6 @@ proc change*[T](bag: var Bag[T]; key: T; delta: int; clamp = false): ChangeDescr result = change(bag.mGetOrPut(key, 0), delta, clamp) if result in {cdAbsentToAbsent, cdPresentToAbsent}: bag.del(key) + +iterator items*[T](bag: Bag[T]): (int, T) = + for k, v in bag: yield(v, k) diff --git a/src/syndicate/dataspaces.nim b/src/syndicate/dataspaces.nim index 13dda26..45de645 100644 --- a/src/syndicate/dataspaces.nim +++ b/src/syndicate/dataspaces.nim @@ -1,104 +1,46 @@ -# SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway +# SPDX-FileCopyrightText: ☭ 2022 Emery Hemingway # SPDX-License-Identifier: Unlicense -import std/[hashes, macros, tables] +import std/[hashes, tables] import preserves -import ./actors, ./bags, ./patterns, ./protocols/dataspace +import ./actors, ./protocols/dataspace, ./skeletons from ./protocols/protocol import Handle +template trace(args: varargs[untyped]): untyped = stderr.writeLine(args) + type Observe = dataspace.Observe[Ref] Turn = actors.Turn -#[ - DataspaceEntity = ref object of Entity - assertions: Bag[Assertion] - subscriptions: Table[Assertion, TableRef[Ref, TableRef[Assertion, Handle]]] - handleMap: Table[Handle, Assertion] # breaks toPreserve(Observe[Ref]()) + Dataspace {.final.} = ref object of Entity + index: Index + handleMap: Table[Handle, Assertion] -method publish(ds: DataspaceEntity; turn: var Turn; rec: Assertion; h: Handle) = - if rec.isRecord: - ds.handleMap[h] = rec - if ds.assertions.change(rec, +1) == cdAbsentToPresent: +method publish(ds: Dataspace; turn: var Turn; v: Assertion; h: Handle) = + if add(ds.index, turn, v): + var obs: Observe + if obs.fromPreserve v: + ds.index.add(turn, obs.pattern, unembed obs.observer) + ds.handleMap[h] = v + +method retract(ds: Dataspace; turn: var Turn; h: Handle) = + try: + let v = ds.handleMap[h] + if remove(ds.index, turn, v): + ds.handleMap.del h var obs: Observe - if fromPreserve(obs, rec): - var seen = newTable[Assertion, Handle]() - for prev, count in ds.assertions.pairs: - if prev == rec.label: - seen[prev] = publish(turn, obs.observer.unembed, prev) - var patternSubs = ds.subscriptions.getOrDefault(rec.label) - if patternSubs.isNil: - patternSubs = newTable[Ref, TableRef[Value, Handle]]() - ds.subscriptions[rec.label] = patternSubs - patternSubs[obs.observer.unembed] = move seen - for peer, seen in ds.subscriptions[rec.label]: - if rec notin seen: - seen[rec] = publish(turn, peer, rec) + if obs.fromPreserve v: + ds.index.remove(turn, obs.pattern, unembed obs.observer) + except KeyError: discard -method retract(ds: DataspaceEntity; turn: var Turn; upstreamHandle: Handle) = - let rec = ds.handleMap.getOrDefault(upstreamHandle) - if rec.isRecord: - ds.handleMap.del upstreamHandle - if ds.assertions.change(rec, -1) == cdPresentToAbsent: - for peer, seen in ds.subscriptions[rec.label]: - var h: Handle - if pop(seen, rec, h): retract(turn, h) - preserveTo(rec, Observe).map do (obs: Observe): - let peerMap = ds.subscriptions[rec.label] - peerMap.del(obs.observer.unembed) - if peerMap.len == 0: - ds.subscriptions.del(rec.label) +method message(ds: Dataspace; turn: var Turn; v: Assertion) = + ds.index.deliverMessage(turn, v) -method message(ds: DataspaceEntity; turn: var Turn; msg: Assertion) = - if msg.isRecord: - for peer, seen in ds.subscriptions[msg.label].pairs: - message(turn, peer, msg) -]# +type BootProc = proc (ds: Ref; turn: var Turn) {.gcsafe.} -type DuringProc* = proc (turn: var Turn; a: Assertion): TurnAction {.gcsafe.} - -type - DuringActionKind = enum null, dead, act - DuringAction = object - case kind: DuringActionKind - of null, dead: discard - of act: - action: TurnAction - -type DuringEntity = ref object of Entity - cb: DuringProc - assertionMap: Table[Handle, DuringAction] - -proc duringPublish(e: Entity; turn: var Turn; a: Assertion; h: Handle) = - var de = DuringEntity(e) - let action = de.cb(turn, a) - # assert(not action.isNil "should have put in a no-op action") - let g = de.assertionMap.getOrDefault h - case g.kind - of null: - de.assertionMap[h] = DuringAction(kind: act, action: action) - of dead: - de.assertionMap.del h - freshen(turn, action) - of act: - raiseAssert("during: duplicate handle in publish: " & $h) - -proc duringRetract(e: Entity; turn: var Turn; h: Handle) = - var de = DuringEntity(e) - let g = de.assertionMap.getOrDefault h - case g.kind - of null: - de.assertionMap[h] = DuringAction(kind: dead) - of dead: - raiseAssert("during: duplicate handle in retract: " & $h) - of act: - de.assertionMap.del h - g.action(turn) - -proc during*(cb: DuringProc): DuringEntity = - result = DuringEntity(cb: cb) - result.setProcs(publish = duringPublish, retract = duringRetract) - -proc observe*(turn: var Turn; ds: Ref; pat: Pattern; e: Entity): Handle = - publish(turn, ds, Observe(pattern: pat, observer: embed newRef(turn, e))) +proc bootDataspace*(name: string; bootProc: BootProc): Actor {.discardable.} = + bootActor(name) do (turn: var Turn): + discard turn.facet.preventInertCheck() + let ds = newRef(turn, Dataspace(index: initIndex())) + bootProc(ds, turn) diff --git a/src/syndicate/durings.nim b/src/syndicate/durings.nim new file mode 100644 index 0000000..fef26bc --- /dev/null +++ b/src/syndicate/durings.nim @@ -0,0 +1,57 @@ +# SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway +# SPDX-License-Identifier: Unlicense + +import std/[hashes, macros, tables] +import preserves +import ./actors, ./patterns, ./protocols/dataspace + +from ./protocols/protocol import Handle + +type + Observe = dataspace.Observe[Ref] + Turn = actors.Turn + +type + DuringProc* = proc (turn: var Turn; a: Assertion): TurnAction {.gcsafe.} + DuringActionKind = enum null, dead, act + DuringAction = object + case kind: DuringActionKind + of null, dead: discard + of act: + action: TurnAction + DuringEntity {.final.}= ref object of Entity + cb: DuringProc + assertionMap: Table[Handle, DuringAction] + +proc duringPublish(e: Entity; turn: var Turn; a: Assertion; h: Handle) = + var de = DuringEntity(e) + let action = de.cb(turn, a) + # assert(not action.isNil "should have put in a no-op action") + let g = de.assertionMap.getOrDefault h + case g.kind + of null: + de.assertionMap[h] = DuringAction(kind: act, action: action) + of dead: + de.assertionMap.del h + freshen(turn, action) + of act: + raiseAssert("during: duplicate handle in publish: " & $h) + +proc duringRetract(e: Entity; turn: var Turn; h: Handle) = + var de = DuringEntity(e) + let g = de.assertionMap.getOrDefault h + case g.kind + of null: + de.assertionMap[h] = DuringAction(kind: dead) + of dead: + raiseAssert("during: duplicate handle in retract: " & $h) + of act: + de.assertionMap.del h + g.action(turn) + +proc during*(cb: DuringProc): DuringEntity = + result = DuringEntity(cb: cb) + result.setProcs(publish = duringPublish, retract = duringRetract) + +proc observe*(turn: var Turn; ds: Ref; pat: Pattern; e: Entity): Handle = + publish(turn, ds, Observe(pattern: pat, observer: embed newRef(turn, e))) diff --git a/src/syndicate/relays.nim b/src/syndicate/relays.nim index b257bdf..2b2ef8e 100644 --- a/src/syndicate/relays.nim +++ b/src/syndicate/relays.nim @@ -3,7 +3,7 @@ import std/[asyncdispatch, options, tables] import preserves, preserves/parse -import ./actors, ./dataspaces, ./membranes, ./protocols/[protocol, sturdy] +import ./actors, ./durings, ./membranes, ./protocols/[protocol, sturdy] when defined(traceSyndicate): template trace(args: varargs[untyped]): untyped = echo(args) diff --git a/src/syndicate/skeletons.nim b/src/syndicate/skeletons.nim index 70852d1..a5cf9f3 100644 --- a/src/syndicate/skeletons.nim +++ b/src/syndicate/skeletons.nim @@ -1,29 +1,15 @@ # SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway # SPDX-License-Identifier: Unlicense -import ./assertions, ./bags, ./events -import preserves, preserves/records -import lists, options, sets, tables +import std/[hashes, lists, options, sets, tables] +import preserves +import ./actors, ./bags, ./patterns + +template trace(args: varargs[untyped]) = stderr.writeLine(args) type - NonEmptySkeleton*[Shape] = object - shape: Shape - members: seq[Skeleton[Shape]] - Skeleton*[Shape] = Option[NonEmptySkeleton[Shape]] - - Shape = string - - Value = Preserve - - HandlerCallback* = proc (event: EventKind; bindings: seq[Value]) {.gcsafe.} - - Path = seq[Natural] - - Analysis* = object - skeleton: Skeleton[Shape] - constPaths: seq[Path] - constVals: seq[Value] - capturePaths: seq[Path] + Value = Preserve[Ref] + Path = seq[Value] proc projectPath(v: Value; path: Path): Value = result = v @@ -33,50 +19,57 @@ proc projectPath(v: Value; path: Path): Value = proc projectPaths(v: Value; paths: seq[Path]): seq[Value] = result.setLen(paths.len) for i, path in paths: result[i] = projectPath(v, path) + trace "projected ", v, " by paths ", paths, " to ", result -proc analyzeAssertion*(a: Value): Analysis = - var path: Path - proc walk(analysis: var Analysis; a: Value): Skeleton[Shape] = - if a.preserveTo(Discard).isSome: - discard - elif a.preserveTo(Capture).isSome: - analysis.capturePaths.add(path) - result = walk(analysis, a.fields[0]) - else: - if a.kind == pkRecord: - let class = classOf(a) - result = some NonEmptySkeleton[Shape](shape: $class) - path.add(0) - var i: int - for field in a.fields: - path[path.high] = i - result.get.members.add(walk(analysis, field)) - inc(i) - discard path.pop - else: - analysis.constPaths.add(path) - analysis.constVals.add(a) - result.skeleton = walk(result, a) +type Class = distinct string + +proc hash(cls: Class): Hash {.borrow.} +proc `==`(x, y: Class): bool {.borrow.} + +proc classOf*(v: Value): Class = + if v.isRecord: + result = Class $v.label & "/" & $v.arity + elif v.isSequence: + result = Class $v.len + elif v.isDictionary: + result = Class "{}" + +proc classOf*(p: Pattern): Class = + if p.orKind == PatternKind.DCompound: + case p.dcompound.orKind + of DCompoundKind.rec: + result = Class $p.dcompound.rec.label & "/" & $p.dcompound.rec.fields.len + of DCompoundKind.arr: + result = Class $p.dcompound.arr.items.len + of DCompoundKind.dict: + result = Class "{}" + +proc step(value, index: Value): Value = + try: result = value[index] + except KeyError, ValueError: + trace "step failed, ", index, " not in ", value type - Handler = ref object - cachedCaptures: Bag[seq[Value]] - callbacks: HashSet[HandlerCallback] + EventKind = enum addedEvent, removedEvent, messageEvent AssertionCache = HashSet[Value] + ObserverGroup = ref object + cachedCaptures: Bag[seq[Value]] + observers: Table[Ref, TableRef[seq[Value], Handle]] + Leaf = ref object cachedAssertions: AssertionCache - handlerMap: Table[seq[Path], Handler] + observerGroups: Table[seq[Path], ObserverGroup] Continuation = ref object cachedAssertions: AssertionCache - leafMap: Table[seq[Path], TableRef[seq[Value], Leaf]] # TODO: not TableRef? + leafMap: Table[seq[Path], TableRef[seq[Value], Leaf]] - Selector = tuple[popCount: int; index: int] + Selector = tuple[popCount: int; index: Value] Node = ref object - edges: Table[Selector, TableRef[string, Node]] + edges: Table[Selector, TableRef[Class, Node]] continuation: Continuation using @@ -85,87 +78,96 @@ using node: Node proc isEmpty(leaf): bool = - leaf.cachedAssertions.len == 0 and leaf.handlerMap.len == 0 + leaf.cachedAssertions.len == 0 and leaf.observerGroups.len == 0 type ContinuationProc = proc (c: Continuation; v: Value) {.gcsafe.} LeafProc = proc (l: Leaf; v: Value) {.gcsafe.} - HandlerProc = proc (h: Handler; vs: seq[Value]) {.gcsafe.} + ObserverProc = proc (turn: var Turn; h: ObserverGroup; vs: seq[Value]) {.gcsafe.} -proc modify(node; operation: EventKind; outerValue: Value; - mCont: ContinuationProc; mLeaf: LeafProc; mHandler: HandlerProc) = +proc modify(node; turn: var Turn; operation: EventKind; outerValue: Value; + mCont: ContinuationProc; mLeaf: LeafProc; mObserverGroup: ObserverProc) = + trace "modify node with outerValue ", outerValue - proc walkContinuation(continuation) {.gcsafe.} - - proc walkNode(node; termStack: SinglyLinkedList[seq[Value]]) = - # TODO: use a seq for the stack? - walkContinuation(node.continuation) + proc walkNode(turn: var Turn; node; termStack: SinglyLinkedList[Value]) = + mCont(node.continuation, outerValue) + if node.continuation.leafMap.len == 0: + trace "node.continuation leafMap is empty" + for (constPaths, constValMap) in node.continuation.leafMap.pairs: + trace "got entry in node.continuation.leafMap for ", constPaths + let constValues = projectPaths(outerValue, constPaths) + var leaf = constValMap.getOrDefault(constValues) + if leaf.isNil and operation == addedEvent: + new leaf + constValMap[constValues] = leaf + if not leaf.isNil: + mLeaf(leaf, outerValue) + for (capturePaths, observerGroup) in leaf.observerGroups.pairs: + mObserverGroup(turn, observerGroup, projectPaths(outerValue, capturePaths)) + if operation == removedEvent and leaf.isEmpty: + constValMap.del(constValues) + if constValues.len == 0: + node.continuation.leafMap.del(constPaths) for (selector, table) in node.edges.pairs: var nextStack = termStack for _ in 1..selector.popCount: nextStack.head = nextStack.head.next - let nextValue = nextStack.head.value[selector.index] - if nextValue.isRecord: - let nextClass = classOf(nextValue) - let nextNode = table.getOrDefault($nextClass) + trace "step ", nextStack.head.value, " with ", selector.index + let + nextValue = step(nextStack.head.value, selector.index) + nextClass = classOf nextValue + if nextClass != Class"": + let nextNode = table.getOrDefault(nextClass) if not nextNode.isNil: - nextStack.prepend(nextValue.record) - walkNode(nextNode, nextStack) + nextStack.prepend(nextValue) + walkNode(turn, nextNode, nextStack) - proc walkContinuation(continuation: Continuation) = - mCont(continuation, outerValue) - for (constPaths, constValMap) in continuation.leafMap.pairs: - let constVals = projectPaths(outerValue, constPaths) - let leaf = constValMap.getOrDefault(constVals) - if leaf.isNil: - if operation == addedEvent: - constValMap[constVals] = Leaf() - else: - mLeaf(leaf, outerValue) - for (capturePaths, handler) in leaf.handlerMap.pairs: - mHandler(handler, projectPaths(outerValue, capturePaths)) - if operation == removedEvent and leaf.isEmpty: - constValMap.del(constVals) - if constValMap.len == 0: - continuation.leafMap.del(constPaths) - var stack: SinglyLinkedList[seq[Value]] - stack.prepend(@[outerValue]) - walkNode(node, stack) + var stack: SinglyLinkedList[Value] + stack.prepend(outerValue) + walkNode(turn, node, stack) -proc extend[Shape](node; skeleton: Skeleton[Shape]): Continuation = +proc extend(node: Node; pat: Pattern): Continuation = + trace "extend node with ", pat var path: Path - - proc walkNode(node; popCount, index: int; skeleton: Skeleton[Shape]): tuple[popCount: int, node: Node] = - assert(not node.isNil) - if skeleton.isNone: - return (popCount, node) - else: - let selector: Selector = (popCount, index) - var - cls = skeleton.get.shape - var - table = node.edges.getOrDefault(selector) + proc walkNode(node: Node; popCount: Natural; stepIndex: Value; pat: Pattern): tuple[popCount: Natural, nextNode: Node] = + trace "walkNode step ", stepIndex, " of ", pat + case pat.orKind + of PatternKind.DDiscard, PatternKind.DLit: result = (popCount, node) + of PatternKind.DBind: result = walkNode(node, popCount, stepIndex, pat.dbind.pattern) + of PatternKind.DCompound: + let selector = (popCount, stepIndex,) + var table = node.edges.getOrDefault(selector) if table.isNil: - table = newTable[string, Node]() + trace "allocate new table for selector ", selector, " for ", pat + table = newTable[Class, Node]() node.edges[selector] = table - var nextNode = table.getOrDefault(cls) - if nextNode.isNil: - nextNode = Node(continuation: Continuation()) - table[cls] = nextNode + else: + trace "got a table for ", pat, " with selector ", selector + let class = classOf pat + result.nextNode = table.getOrDefault(class) + if result.nextNode.isNil: + trace "allocate result.nextNode for ", string class + new result.nextNode + table[class] = result.nextNode + new result.nextNode.continuation for a in node.continuation.cachedAssertions: - if $classOf(projectPath(a, path)) == cls: - nextNode.continuation.cachedAssertions.incl(a) - block: - var popCount, index: int - path.add(index) - for member in skeleton.get.members: - (popCount, nextNode) = walkNode(nextNode, result.popCount, index, member) - inc(index) - discard path.pop() - path.add(index) + if class == classOf projectPath(a, path): + result.nextNode.continuation.cachedAssertions.incl a + result.popCount = 0 + template walkKey(pat: Pattern; stepIndex: Value) = + trace "walkKey ", pat, " with step ", stepIndex + path.add(stepIndex) + result = walkNode(result.nextNode, popCount, stepIndex, pat) discard path.pop() - result = (popCount.succ, nextNode) - walkNode(node, 0, 0, skeleton).node.continuation + case pat.dcompound.orKind + of DCompoundKind.rec: + for k, e in pat.dcompound.rec.fields: walkKey(e, k.toPreserve(Ref)) + of DCompoundKind.arr: + for k, e in pat.dcompound.arr.items: walkKey(e, k.toPreserve(Ref)) + of DCompoundKind.dict: + for k, e in pat.dcompound.dict.entries: walkKey(e, k) + result.popCount.inc + walkNode(node, 0, toPreserve(0, Ref), pat).nextNode.continuation type Index* = object @@ -175,93 +177,103 @@ type proc initIndex*(): Index = Index(root: Node(continuation: Continuation())) -using index: Index - -proc addHandler*(index; res: Analysis; callback: HandlerCallback) = - assert(not index.root.isNil) +proc add*(index: var Index; turn: var Turn; pattern: Pattern; observer: Ref) = + trace "add pattern ", pattern, " for ", observer let - constPaths = res.constPaths - constVals = res.constVals - capturePaths = res.capturePaths - continuation = index.root.extend(res.skeleton) - var constValMap = continuation.leafMap.getOrDefault(constPaths) + analysis = analyse pattern + continuation = index.root.extend pattern + var constValMap = continuation.leafMap.getOrDefault(analysis.constPaths) if constValMap.isNil: - constValMap = newTable[seq[Value], Leaf]() - continuation.leafMap[constPaths] = constValMap + trace "allocate constValMap in leafMap for ", analysis.constPaths + new constValMap for a in continuation.cachedAssertions: - let key = projectPaths(a, constPaths) + let key = projectPaths(a, analysis.constPaths) var leaf = constValMap.getOrDefault(key) if leaf.isNil: new leaf constValMap[key] = leaf leaf.cachedAssertions.incl(a) - var leaf = constValMap.getOrDefault(constVals) + trace "update leafMap for ", analysis.constPaths + continuation.leafMap[analysis.constPaths] = constValMap + var leaf = constValMap.getOrDefault(analysis.constValues) if leaf.isNil: new leaf - constValMap[constVals] = leaf - var handler = leaf.handlerMap.getOrDefault(capturePaths) - if handler.isNil: - new handler - leaf.handlerMap[capturePaths] = handler + constValMap[analysis.constValues] = leaf + trace "get observerGroup for ", analysis.capturePaths + var observerGroup = leaf.observerGroups.getOrDefault(analysis.capturePaths) + if observerGroup.isNil: + trace "allocate observerGroup for ", analysis.capturePaths + new observerGroup for a in leaf.cachedAssertions: - let a = projectPaths(a, capturePaths) - if handler.cachedCaptures.contains(a): - discard handler.cachedCaptures.change(a, +1) - handler.callbacks.incl(callback) - for captures, count in handler.cachedCaptures.pairs: - callback(addedEvent, captures) + discard observerGroup.cachedCaptures.change(projectPaths(a, analysis.capturePaths), +1) + leaf.observerGroups[analysis.capturePaths] = observerGroup + var captureMap = newTable[seq[Value], Handle]() + for (count, captures) in observerGroup.cachedCaptures: + captureMap[captures] = publish(turn, observer, captures) + observerGroup.observers[observer] = captureMap -proc removeHandler*(index; res: Analysis; callback: HandlerCallback) = - let continuation = index.root.extend(res.skeleton) - try: - let - constValMap = continuation.leafMap[res.constPaths] - leaf = constValMap[res.constVals] - handler = leaf.handlerMap[res.capturePaths] - handler.callbacks.excl(callback) - if handler.callbacks.len == 0: - leaf.handlerMap.del(res.capturePaths) - if leaf.isEmpty: - constValMap.del(res.constVals) - if constValMap.len == 0: - continuation.leafMap.del(res.constPaths) - except KeyError: discard +proc remove*(index: var Index; turn: var Turn; pattern: Pattern; observer: Ref) = + var + analysis = analyse pattern + continuation = index.root.extend pattern + let constValMap = continuation.leafMap.getOrDefault(analysis.constPaths) + if not constValMap.isNil: + let leaf = constValMap.getOrDefault(analysis.constValues) + if not leaf.isNil: + let observerGroup = leaf.observerGroups.getOrDefault(analysis.capturePaths) + if not observerGroup.isNil: + let captureMap = observerGroup.observers.getOrDefault(observer) + if not captureMap.isNil: + for handle in captureMap.values: retract(observer.target, turn, handle) + observerGroup.observers.del(observer) + if observerGroup.observers.len == 0: + leaf.observerGroups.del(analysis.capturePaths) + if leaf.isEmpty: + constValMap.del(analysis.constValues) + if constValMap.len == 0: + continuation.leafMap.del(analysis.constPaths) -proc adjustAssertion*(index: var Index; outerValue: Value; delta: int): ChangeDescription = - result = index.allAssertions.change(outerValue, delta) - case result +proc adjustAssertion*(index: var Index; turn: var Turn; outerValue: Value; delta: int): bool = + case index.allAssertions.change(outerValue, delta) of cdAbsentToPresent: + result = true index.root.modify( + turn, addedEvent, outerValue, (proc (c: Continuation; v: Value) = c.cachedAssertions.incl(v)), (proc (l: Leaf; v: Value) = l.cachedAssertions.incl(v)), - (proc (h: Handler; vs: seq[Value]) = - if h.cachedCaptures.change(vs, +1) == cdAbsentToPresent: - #debugEcho " assertion of ", outerValue - for cb in h.callbacks: cb(addedEvent, vs))) + (proc (turn: var Turn; group: ObserverGroup; vs: seq[Value]) = + if group.cachedCaptures.change(vs, +1) == cdAbsentToPresent: + for (observer, captureMap) in group.observers.pairs: + let a = vs.toPreserve(Ref) + trace "publish to dataspace observer ", observer, " ", a + captureMap[vs] = publish(turn, observer, a))) + # TODO: this handle is coming from the facet? of cdPresentToAbsent: + result = true index.root.modify( + turn, removedEvent, outerValue, (proc (c: Continuation; v: Value) = c.cachedAssertions.excl(v)), (proc (l: Leaf; v: Value) = l.cachedAssertions.excl(v)), - (proc (h: Handler; vs: seq[Value]) = - if h.cachedCaptures.change(vs, -1) == cdPresentToAbsent: - #debugEcho "retraction of ", outerValue - for cb in h.callbacks: cb(removedEvent, vs))) - else: - discard + (proc (turn: var Turn; group: ObserverGroup; vs: seq[Value]) = + if group.cachedCaptures.change(vs, -1) == cdPresentToAbsent: + for (observer, captureMap) in group.observers.pairs: + retract(observer.target, turn, captureMap[vs]) + captureMap.del(vs))) + else: discard proc continuationNoop(c: Continuation; v: Value) = discard proc leafNoop(l: Leaf; v: Value) = discard -proc deliverMessage*(index; v: Value; leafCb: proc (l: Leaf; v: Value) {.gcsafe.}) = - proc handlerCb(h: Handler; vs: seq[Value]) = - for cb in h.callbacks: cb(messageEvent, vs) - index.root.modify(messageEvent, v, continuationNoop, leafCb, handlerCb) +proc add*(index: var Index; turn: var Turn; v: Assertion): bool = + adjustAssertion(index, turn, v, +1) +proc remove*(index: var Index; turn: var Turn; v: Assertion): bool = + adjustAssertion(index, turn, v, -1) -proc deliverMessage*(index; v: Value) = - proc handlerCb(h: Handler; vs: seq[Value]) = - for cb in h.callbacks: cb(messageEvent, vs) - index.root.modify(messageEvent, v, continuationNoop, leafNoop, handlerCb) +proc deliverMessage*(index: Index; turn: var Turn; v: Value) = + proc observersCb(turn: var Turn; group: ObserverGroup; vs: seq[Value]) = + for observer in group.observers.keys: message(turn, observer, vs) + index.root.modify(turn, messageEvent, v, continuationNoop, leafNoop, observersCb)