syndicate-nim/src/syndicate/dataspaces.nim

105 lines
3.5 KiB
Nim
Raw Normal View History

2021-09-01 11:44:28 +00:00
# SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway
# SPDX-License-Identifier: Unlicense
2021-10-29 16:26:33 +00:00
import std/[hashes, macros, tables]
2021-07-08 09:50:13 +00:00
import preserves
import ./actors, ./bags, ./patterns, ./protocols/dataspace
from ./protocols/protocol import Handle
type
2021-09-24 19:25:47 +00:00
Observe = dataspace.Observe[Ref]
Turn = actors.Turn
#[
2021-09-24 19:25:47 +00:00
DataspaceEntity = ref object of Entity
assertions: Bag[Assertion]
subscriptions: Table[Assertion, TableRef[Ref, TableRef[Assertion, Handle]]]
handleMap: Table[Handle, Assertion] # breaks toPreserve(Observe[Ref]())
method publish(ds: DataspaceEntity; turn: var Turn; rec: Assertion; h: Handle) =
if rec.isRecord:
ds.handleMap[h] = rec
if ds.assertions.change(rec, +1) == cdAbsentToPresent:
var obs: Observe
if fromPreserve(obs, rec):
var seen = newTable[Assertion, Handle]()
for prev, count in ds.assertions.pairs:
if prev == rec.label:
seen[prev] = publish(turn, obs.observer.unembed, prev)
var patternSubs = ds.subscriptions.getOrDefault(rec.label)
if patternSubs.isNil:
patternSubs = newTable[Ref, TableRef[Value, Handle]]()
ds.subscriptions[rec.label] = patternSubs
patternSubs[obs.observer.unembed] = move seen
for peer, seen in ds.subscriptions[rec.label]:
if rec notin seen:
seen[rec] = publish(turn, peer, rec)
method retract(ds: DataspaceEntity; turn: var Turn; upstreamHandle: Handle) =
let rec = ds.handleMap.getOrDefault(upstreamHandle)
if rec.isRecord:
ds.handleMap.del upstreamHandle
if ds.assertions.change(rec, -1) == cdPresentToAbsent:
for peer, seen in ds.subscriptions[rec.label]:
var h: Handle
if pop(seen, rec, h): retract(turn, h)
preserveTo(rec, Observe).map do (obs: Observe):
let peerMap = ds.subscriptions[rec.label]
peerMap.del(obs.observer.unembed)
if peerMap.len == 0:
ds.subscriptions.del(rec.label)
method message(ds: DataspaceEntity; turn: var Turn; msg: Assertion) =
if msg.isRecord:
for peer, seen in ds.subscriptions[msg.label].pairs:
message(turn, peer, msg)
]#
2021-09-24 19:25:47 +00:00
type DuringProc* = proc (turn: var Turn; a: Assertion): TurnAction {.gcsafe.}
2021-09-24 19:25:47 +00:00
type
DuringActionKind = enum null, dead, act
DuringAction = object
case kind: DuringActionKind
of null, dead: discard
of act:
action: TurnAction
type DuringEntity = ref object of Entity
cb: DuringProc
2021-10-28 16:57:09 +00:00
assertionMap: Table[Handle, DuringAction]
2021-09-24 19:25:47 +00:00
2021-10-28 16:57:09 +00:00
proc duringPublish(e: Entity; turn: var Turn; a: Assertion; h: Handle) =
var de = DuringEntity(e)
2021-09-24 19:25:47 +00:00
let action = de.cb(turn, a)
# assert(not action.isNil "should have put in a no-op action")
let g = de.assertionMap.getOrDefault h
case g.kind
of null:
de.assertionMap[h] = DuringAction(kind: act, action: action)
of dead:
de.assertionMap.del h
freshen(turn, action)
of act:
raiseAssert("during: duplicate handle in publish: " & $h)
2021-10-28 16:57:09 +00:00
proc duringRetract(e: Entity; turn: var Turn; h: Handle) =
var de = DuringEntity(e)
2021-09-24 19:25:47 +00:00
let g = de.assertionMap.getOrDefault h
case g.kind
of null:
de.assertionMap[h] = DuringAction(kind: dead)
of dead:
raiseAssert("during: duplicate handle in retract: " & $h)
of act:
de.assertionMap.del h
g.action(turn)
proc during*(cb: DuringProc): DuringEntity =
result = DuringEntity(cb: cb)
2021-10-28 16:57:09 +00:00
result.setProcs(publish = duringPublish, retract = duringRetract)
2021-09-24 19:25:47 +00:00
proc observe*(turn: var Turn; ds: Ref; pat: Pattern; e: Entity): Handle =
publish(turn, ds, Observe(pattern: pat, observer: embed newRef(turn, e)))