Simplifications at dataspaces and skeletons
This commit is contained in:
parent
0e493d0696
commit
f745e8b53f
|
@ -56,7 +56,6 @@ type
|
|||
adhocAssertions: Bag
|
||||
cleanupChanges: Bag
|
||||
parentId: ActorId
|
||||
isRunnable: bool
|
||||
|
||||
EndpointSpec* = object
|
||||
assertion*: Option[Value]
|
||||
|
@ -163,18 +162,6 @@ proc newActor(ds: Dataspace; name: string; initialAssertions: Value; parentId: A
|
|||
discard result.adhocAssertions.change(v, 1)
|
||||
ds.actors[result.id] = result
|
||||
|
||||
proc newFacet(actor; parent: ParentFacet): Facet =
|
||||
result = Facet(
|
||||
id: actor.dataspace.generateId.FacetId,
|
||||
actor: actor,
|
||||
parent: parent,
|
||||
isLive: true,
|
||||
inScript: true)
|
||||
if parent.isSome:
|
||||
parent.get.children[result.id] = result
|
||||
else:
|
||||
actor.rootFacet = some result
|
||||
|
||||
proc applyPatch(ds: Dataspace; actor: Option[Actor]; changes: Bag) =
|
||||
type Pair = tuple[val: Value; count: int]
|
||||
var removals: seq[Pair]
|
||||
|
@ -205,10 +192,12 @@ proc adjust(patch: var Action; v: Value; delta: int) =
|
|||
discard patch.changes.change(v, delta)
|
||||
|
||||
proc subscribe(ds: Dataspace; handler: Analysis) =
|
||||
ds.index.addHandler(handler, handler.callback.get)
|
||||
assert(not handler.callback.isNil)
|
||||
ds.index.addHandler(handler, handler.callback)
|
||||
|
||||
proc unsubscribe(ds: Dataspace; handler: Analysis) =
|
||||
ds.index.removeHandler(handler, handler.callback.get)
|
||||
assert(not handler.callback.isNil)
|
||||
ds.index.removeHandler(handler, handler.callback)
|
||||
|
||||
proc assert(actor; a: Value) = actor.pendingPatch.adjust(a, +1)
|
||||
|
||||
|
@ -221,9 +210,12 @@ proc install(ep: Endpoint; spec: EndpointSpec) =
|
|||
ep.spec.analysis.map do (a: Analysis):
|
||||
ep.facet.actor.dataspace.subscribe(a)
|
||||
|
||||
proc isRunnable(actor): bool =
|
||||
for tasks in actor.pendingTasks:
|
||||
if tasks.len > 0: return true
|
||||
|
||||
proc scheduleTask(actor; prio: Priority; task: Task[void]) =
|
||||
if not actor.isRunnable:
|
||||
actor.isRunnable = true
|
||||
actor.dataspace.runnable.add(actor)
|
||||
actor.pendingTasks[prio].addLast(task)
|
||||
|
||||
|
@ -231,7 +223,7 @@ proc scheduleTask(actor; task: Task[void]) =
|
|||
scheduleTask(actor, pNormal, task)
|
||||
|
||||
proc abandonQueuedWork(actor) =
|
||||
actor.pendingActions = @[]
|
||||
reset actor.pendingActions
|
||||
for q in actor.pendingTasks.mitems: clear(q)
|
||||
|
||||
proc uninstall(ep: Endpoint; emitPatches: bool) =
|
||||
|
@ -411,8 +403,9 @@ proc addActor(ds: Dataspace; name: string; bootProc: Script[void]; initialAssert
|
|||
ac.adhocRetract(a)
|
||||
|
||||
proc send*(facet; body: Value) =
|
||||
## Send a message into the dataspace.
|
||||
facet.ensureNonFacetSetup("send")
|
||||
proc impl(action: Action; ds: Dataspace; actor: Option[Actor]) =
|
||||
proc impl(_: Action; ds: Dataspace; actor: Option[Actor]) =
|
||||
ds.deliverMessage(body, actor)
|
||||
facet.enqueueScriptAction(Action(impl: impl, kind: messageAction))
|
||||
|
||||
|
@ -444,12 +437,12 @@ proc initActivationAction(script: ActivationScript; name: string): Action =
|
|||
ds.addActor(name, boot, Preserve(kind: pkSet), actor)
|
||||
Action(impl: impl, kind: activationAction)
|
||||
|
||||
proc activate(facet; script: ActivationScript; name = "") =
|
||||
proc activate(facet; name: string; script: ActivationScript) =
|
||||
facet.ensureNonFacetSetup "`activate`"
|
||||
facet.enqueueScriptAction(initActivationAction(script, name))
|
||||
|
||||
proc newDataspace(ground: Ground; bootProc: ActivationScript): Dataspace =
|
||||
let turn = Turn(actions: @[initSpawnAction("", bootProc, Value(kind: pkSet))])
|
||||
proc newDataspace(ground: Ground; name: string; bootProc: ActivationScript): Dataspace =
|
||||
let turn = Turn(actions: @[initSpawnAction(name, bootProc, Value(kind: pkSet))])
|
||||
Dataspace(ground: ground, index: initIndex(), pendingTurns: @[turn])
|
||||
|
||||
proc addEndpoint*(facet; updateScript: Script[EndpointSpec], isDynamic = true): Endpoint =
|
||||
|
@ -496,7 +489,6 @@ proc runPendingTask(actor): bool =
|
|||
|
||||
proc runPendingTasks(actor) =
|
||||
while actor.runPendingTask(): discard
|
||||
actor.isRunnable = false
|
||||
if actor.pendingActions.len > 0:
|
||||
var pending = move actor.pendingActions
|
||||
actor.dataspace.commitActions(actor, pending)
|
||||
|
@ -543,12 +535,12 @@ proc step(g: Ground) =
|
|||
reset g.stopHandlers
|
||||
complete(g.future)
|
||||
|
||||
proc bootModule*(bootProc: ActivationScript): Future[void] =
|
||||
proc bootModule*(name: string; bootProc: ActivationScript): Future[void] =
|
||||
# TODO: better integration with the async dispatcher
|
||||
let g = Ground(future: newFuture[void]"bootModule")
|
||||
g.dataspace = newDataspace(g) do (rootFacet: Facet):
|
||||
g.dataspace = newDataspace(g, name) do (rootFacet: Facet):
|
||||
rootFacet.addStartScript do (rootFacet: Facet):
|
||||
rootFacet.activate(bootProc)
|
||||
rootFacet.activate(name, bootProc)
|
||||
addTimer(1, true) do (fd: AsyncFD) -> bool:
|
||||
step(g)
|
||||
true
|
||||
|
@ -562,15 +554,15 @@ template declareField*(facet: Facet; F: untyped; T: typedesc; initial: T): untyp
|
|||
facet.actor.dataspace.dataflow.defineObservableProperty(`F`.id)
|
||||
facet.fields.add(toPreserve(initial))
|
||||
let fieldOff = facet.fields.high
|
||||
proc set(f: DistinctField; x: T) =
|
||||
proc set(f: DistinctField; x: T) {.used.} =
|
||||
facet.actor.dataspace.dataflow.recordDamage(f.id)
|
||||
facet.fields[fieldOff] = toPreserve[T](x)
|
||||
proc set(f: DistinctField; x: Preserve) =
|
||||
proc set(f: DistinctField; x: Preserve) {.used.} =
|
||||
facet.actor.dataspace.dataflow.recordDamage(f.id)
|
||||
facet.fields[fieldOff] = x
|
||||
proc get(f: DistinctField): T =
|
||||
proc get(f: DistinctField): T {.used.} =
|
||||
facet.actor.dataspace.dataflow.recordObservation(f.id)
|
||||
fromPreserve[T](result, facet.fields[fieldOff])
|
||||
proc getPreserve(f: DistinctField): Preserve =
|
||||
proc getPreserve(f: DistinctField): Preserve {.used.} =
|
||||
facet.actor.dataspace.dataflow.recordObservation(f.id)
|
||||
facet.fields[fieldOff]
|
||||
|
|
|
@ -33,7 +33,7 @@ type
|
|||
constPaths: seq[Path]
|
||||
constVals: seq[Value]
|
||||
capturePaths: seq[Path]
|
||||
callback*: Option[HandlerCallback]
|
||||
callback*: HandlerCallback
|
||||
|
||||
proc `$`(a: Analysis): string =
|
||||
result.add "\n\t skeleton: "
|
||||
|
|
|
@ -38,7 +38,7 @@ proc boot(facet: Facet) =
|
|||
facet.scheduleScript do (facet: Facet):
|
||||
facet.fields.value = vs[0]
|
||||
# echo "box updated value ", vs[0]
|
||||
result.analysis.get.callback = some (facet.wrap cb)
|
||||
result.analysis.get.callback = facet.wrap cb
|
||||
const o = Observe.init(SetBox.init(`?$`))
|
||||
result.assertion = some o
|
||||
|
||||
|
@ -53,7 +53,7 @@ proc boot(facet: Facet) =
|
|||
let v = SetBox.init(vs[0].int.succ.toPreserve)
|
||||
# echo "client sending ", v
|
||||
facet.send(v)
|
||||
result.analysis.get.callback = some (facet.wrap cb)
|
||||
result.analysis.get.callback = facet.wrap cb
|
||||
const o = Observe.init(BoxState.init(`?$`))
|
||||
result.assertion = some o
|
||||
|
||||
|
@ -64,11 +64,11 @@ proc boot(facet: Facet) =
|
|||
if evt == removedEvent:
|
||||
facet.scheduleScript do (facet: Facet):
|
||||
echo "box gone"
|
||||
result.analysis.get.callback = some (facet.wrap cb)
|
||||
result.analysis.get.callback = facet.wrap cb
|
||||
const o = Observe.init(BoxState.init(`?_`))
|
||||
result.assertion = some o
|
||||
|
||||
facet.actor.dataspace.ground.addStopHandler do (_: Dataspace):
|
||||
echo "stopping box-and-client"
|
||||
|
||||
waitFor bootModule(boot)
|
||||
waitFor bootModule("box-and-client", boot)
|
||||
|
|
Loading…
Reference in New Issue