Simplify EndpointSpec type

This commit is contained in:
Emery Hemingway 2021-07-09 12:38:10 +02:00
parent 262a8d7452
commit 6a4eb032ad
4 changed files with 56 additions and 132 deletions

View File

@ -35,7 +35,7 @@ template stopIf*(cond, body: untyped): untyped =
## Stop the current facet if `cond` is true and
## invoke `body` after the facet has stopped.
mixin getCurrentFacet
discard getCurrentFacet().addDataflow do (facet: Facet):
getCurrentFacet().addDataflow do (facet: Facet):
if cond:
facet.stop do (facet: Facet):
proc getCurrentFacet(): Facet {.inject, used.} = facet
@ -109,13 +109,13 @@ proc onEvent(event: EventKind, pattern, handler: NimNode): NimNode =
handlerSym = handler[0]
result = quote do:
mixin getCurrentFacet
discard getCurrentFacet().addEndpoint do (facet: Facet) -> EndpointSpec:
getCurrentFacet().addEndpoint do (facet: Facet) -> EndpointSpec:
proc getCurrentFacet(): Facet {.inject, used.} = facet
`handler`
let a = `pattern`
result.assertion = some(Observe % a)
result.assertion = Observe % a
result.analysis = some(analyzeAssertion(a))
result.analysis.get.callback = wrap(facet, EventKind(`event`), `handlerSym`)
result.callback = wrap(facet, EventKind(`event`), `handlerSym`)
macro onAsserted*(pattern: Preserve; handler: untyped) =
onEvent(addedEvent, pattern, handler)
@ -140,9 +140,8 @@ template onStop*(body: untyped): untyped =
template assert*(a: Preserve): untyped =
mixin getCurrentFacet
let facet = getCurrentFacet()
discard facet.addEndpoint do (_: Facet) -> EndpointSpec:
result.assertion = some(a)
getCurrentFacet().addEndpoint do (_: Facet) -> EndpointSpec:
result.assertion = a
template field*(F: untyped; T: typedesc; initial: T): untyped =
## Declare a field. The identifier `F` shall be a value with

View File

@ -57,9 +57,10 @@ type
cleanupChanges: Bag
parentId: ActorId
EndpointSpec* = object
assertion*: Option[Value]
analysis*: Option[Analysis]
EndpointSpec* = tuple
callback: HandlerCallback
assertion: Value
analysis: Option[Analysis]
Endpoint = ref object
id: EndpointId
@ -116,35 +117,6 @@ using
actor: Actor
facet: Facet
proc `$`*(spec: EndpointSpec): string =
result.add "{assertion: "
if spec.assertion.isSome:
result.add $(spec.assertion.get)
else:
result.add "nil"
result.add ", analysis: "
if spec.analysis.isSome:
result.add $spec.analysis.get
else:
result.add "nil"
result.add " }"
proc `$`*(actor): string =
result.add "Actor("
result.add $actor.id
result.add ','
result.add actor.name
result.add ')'
proc `$`*(facet): string =
result.add "Facet("
result.add $facet.actor.id
result.add ','
result.add facet.actor.name
result.add ','
result.add $facet.id
result.add ')'
proc hash*(ep: Endpoint): Hash =
!$(hash(ep.id) !& hash(ep.facet.id))
@ -169,14 +141,14 @@ proc applyPatch(ds: Dataspace; actor: Option[Actor]; changes: Bag) =
var removals: seq[Pair]
for a, count in changes.pairs:
if count > 0:
# echo "applyPatch +", a
# debugEcho "applyPatch +", a
discard ds.index.adjustAssertion(a, count)
else:
removals.add((a, count))
actor.map do (ac: Actor):
discard ac.cleanupChanges.change(a, -count)
for (a, count) in removals:
# echo "applyPatch -", a
# debugEcho "applyPatch -", a
discard ds.index.adjustAssertion(a, count)
proc initPatch(): Action =
@ -193,24 +165,17 @@ proc pendingPatch(actor): var Action =
proc adjust(patch: var Action; v: Value; delta: int) =
discard patch.changes.change(v, delta)
proc subscribe(ds: Dataspace; handler: Analysis) =
assert(not handler.callback.isNil)
ds.index.addHandler(handler, handler.callback)
proc unsubscribe(ds: Dataspace; handler: Analysis) =
assert(not handler.callback.isNil)
ds.index.removeHandler(handler, handler.callback)
proc assert(actor; a: Value) = actor.pendingPatch.adjust(a, +1)
proc retract(actor; a: Value) = actor.pendingPatch.adjust(a, -1)
proc install(ep: Endpoint; spec: EndpointSpec) =
ep.spec = spec
ep.spec.assertion.map do (a: Value):
ep.facet.actor.assert(a)
if not ep.spec.assertion.isNil:
ep.facet.actor.assert(ep.spec.assertion)
ep.spec.analysis.map do (a: Analysis):
ep.facet.actor.dataspace.subscribe(a)
assert(not ep.spec.callback.isNil)
ep.facet.actor.dataspace.index.addHandler(a, ep.spec.callback)
proc isRunnable(actor): bool =
for tasks in actor.pendingTasks:
@ -230,10 +195,11 @@ proc abandonQueuedWork(actor) =
proc uninstall(ep: Endpoint; emitPatches: bool) =
if emitPatches:
ep.spec.assertion.map do (a: Value):
ep.facet.actor.retract(a)
if not ep.spec.assertion.isNil:
ep.facet.actor.retract(ep.spec.assertion)
ep.spec.analysis.map do (a: Analysis):
ep.facet.actor.dataspace.unsubscribe(a)
assert(not ep.spec.callback.isNil)
ep.facet.actor.dataspace.index.removeHandler(a, ep.spec.callback)
proc destroy(ep: Endpoint; emitPatches: bool) =
ep.facet.actor.dataspace.dataflow.forgetSubject(ep)
@ -448,35 +414,35 @@ proc newDataspace(ground: Ground; name: string; bootProc: ActivationScript): Dat
let turn = Turn(actions: @[initSpawnAction(name, bootProc, Value(kind: pkSet))])
Dataspace(ground: ground, index: initIndex(), pendingTurns: @[turn])
proc addEndpoint*(facet; updateScript: Script[EndpointSpec], isDynamic = true): Endpoint =
facet.ensureFacetSetup("add endpoint")
proc addEndpoint*(facet; updateScript: Script[EndpointSpec], isDynamic = true) =
facet.ensureFacetSetup("addEndpoint")
let
actor = facet.actor
dataspace = actor.dataspace
result = Endpoint(
ep = Endpoint(
id: dataspace.generateId.EndpointId,
facet: facet,
updateProc: updateScript)
dataspace.dataflow.addSubject(result)
dataspace.dataflow.addSubject(ep)
let
dyn = if isDynamic: some result else: none Endpoint
dyn = if isDynamic: some ep else: none Endpoint
initialSpec = dataspace.dataflow.withSubject(dyn) do () -> EndpointSpec:
updateScript(facet)
result.install(initialSpec)
facet.endpoints[result.id] = result
# dataspace.endpointHook(facet, result)
# Subclasses may override
assert:
(initialSpec.analysis.isNone and initialSpec.callback.isNil) or
(initialSpec.analysis.isSome and (not initialSpec.callback.isNil))
ep.install(initialSpec)
facet.endpoints[ep.id] = ep
proc addDataflow*(facet; prio: Priority; subjectProc: Script[void]): Endpoint =
proc addDataflow*(facet; prio: Priority; subjectProc: Script[void]) =
facet.addEndpoint do (fa: Facet) -> EndpointSpec:
let subjectId = facet.actor.dataspace.dataflow.currentSubjectId
facet.scheduleScript(prio) do (fa: Facet):
if facet.isLive:
facet.actor.dataspace.dataflow.withSubject(subjectId):
subjectProc(facet)
# result is the default EndpointSpec
proc addDataflow*(facet; subjectProc: Script[void]): Endpoint =
proc addDataflow*(facet; subjectProc: Script[void]) =
addDataflow(facet, pNormal, subjectProc)
proc commitActions(dataspace; actor; pending: seq[Action]) =

View File

@ -5,15 +5,25 @@ import preserves, preserves/records
import lists, options, sets, tables
type
Value = Preserve
NonEmptySkeleton*[Shape] = object
shape: Shape
members: seq[Skeleton[Shape]]
Skeleton*[Shape] = Option[NonEmptySkeleton[Shape]]
Shape = string
Value = Preserve
HandlerCallback* = proc (event: EventKind; bindings: seq[Value]) {.gcsafe.}
Path = seq[Natural]
Analysis* = object
skeleton: Skeleton[Shape]
constPaths: seq[Path]
constVals: seq[Value]
capturePaths: seq[Path]
proc projectPath(v: Value; path: Path): Value =
result = v
for index in path:
@ -23,28 +33,6 @@ proc projectPaths(v: Value; paths: seq[Path]): seq[Value] =
result.setLen(paths.len)
for i, path in paths: result[i] = projectPath(v, path)
type
Shape = string
HandlerCallback* = proc (event: EventKind; bindings: seq[Value]) {.gcsafe.}
Analysis* = object
skeleton: Skeleton[Shape]
constPaths: seq[Path]
constVals: seq[Value]
capturePaths: seq[Path]
callback*: HandlerCallback
proc `$`(a: Analysis): string =
result.add "\n\t skeleton: "
result.add $a.skeleton
result.add "\n\t constPaths: "
result.add $a.constPaths
result.add "\n\t constVals: "
result.add $a.constVals
result.add "\n\t capturePaths: "
result.add $a.capturePaths
proc analyzeAssertion*(a: Value): Analysis =
var path: Path
proc walk(analysis: var Analysis; a: Value): Skeleton[Shape] =
@ -95,27 +83,6 @@ using
leaf: Leaf
node: Node
proc `$`(leaf): string =
result.add "Leaf{ cached: "
result.add $leaf.cachedAssertions
result.add ", handler count: "
result.add $leaf.handlerMap.len
result.add " }"
proc `$`(continuation): string =
result.add "Continuation{ cached: "
result.add $continuation.cachedAssertions
result.add ", "
result.add $continuation.leafMap
result.add " }"
proc `$`(node): string =
result.add "Node{ "
result.add $node.continuation
result.add ", edges: "
result.add $node.edges
result.add "}"
proc isEmpty(leaf): bool =
leaf.cachedAssertions.len == 0 and leaf.handlerMap.len == 0
@ -209,10 +176,6 @@ proc initIndex*(): Index =
using index: Index
proc `$`*(index): string =
result.add "Index("
result.add ")Index"
proc addHandler*(index; res: Analysis; callback: HandlerCallback) =
assert(not index.root.isNil)
let

View File

@ -18,31 +18,29 @@ proc boot(facet: Facet) =
facet.spawn("box") do (facet: Facet):
facet.declareField(value, int, 0)
discard facet.addEndpoint do (facet: Facet) -> EndpointSpec:
facet.addEndpoint do (facet: Facet) -> EndpointSpec:
# echo "recomputing published BoxState ", facet.fields.value
let a = BoxState.init(value.getPreserve)
result.assertion = some a
result.assertion = BoxState.init(value.getPreserve)
discard facet.addDataflow do (facet: Facet):
facet.addDataflow do (facet: Facet):
# echo "box dataflow saw new value ", facet.fields.value
if value.get == N:
facet.stop do (facet: Facet):
echo "terminated box root facet"
discard facet.addEndpoint do (facet: Facet) -> EndpointSpec:
facet.addEndpoint do (facet: Facet) -> EndpointSpec:
const a = SetBox.init(`?$`)
result.analysis = some analyzeAssertion(a)
proc cb(facet: Facet; vs: seq[Value]) =
facet.scheduleScript do (facet: Facet):
value.set(vs[0])
# echo "box updated value ", vs[0]
result.analysis.get.callback = facet.wrap(messageEvent, cb)
const o = Observe.init(SetBox.init(`?$`))
result.assertion = some o
result.callback = facet.wrap(messageEvent, cb)
result.assertion = Observe.init(SetBox.init(`?$`))
facet.spawn("client") do (facet: Facet):
discard facet.addEndpoint do (facet: Facet) -> EndpointSpec:
facet.addEndpoint do (facet: Facet) -> EndpointSpec:
const a = BoxState.init(`?$`)
result.analysis = some analyzeAssertion(a)
proc cb(facet: Facet; vs: seq[Value]) =
@ -50,19 +48,17 @@ proc boot(facet: Facet) =
let v = SetBox.init(vs[0].int.succ.toPreserve)
# echo "client sending ", v
facet.send(v)
result.analysis.get.callback = facet.wrap(addedEvent, cb)
const o = Observe.init(BoxState.init(`?$`))
result.assertion = some o
result.callback = facet.wrap(addedEvent, cb)
result.assertion = Observe.init(BoxState.init(`?$`))
discard facet.addEndpoint do (facet: Facet) -> EndpointSpec:
facet.addEndpoint do (facet: Facet) -> EndpointSpec:
const a = BoxState.init(`?_`)
result.analysis = some analyzeAssertion(a)
proc cb(facet: Facet; vs: seq[Value]) =
facet.scheduleScript do (facet: Facet):
echo "box gone"
result.analysis.get.callback = facet.wrap(removedEvent, cb)
const o = Observe.init(BoxState.init(`?_`))
result.assertion = some o
result.callback = facet.wrap(removedEvent, cb)
result.assertion = Observe.init(BoxState.init(`?_`))
facet.actor.dataspace.ground.addStopHandler do (_: Dataspace):
echo "stopping box-and-client"