diff --git a/src/syndicate/dataspaces.nim b/src/syndicate/dataspaces.nim index d0d2862..d64737b 100644 --- a/src/syndicate/dataspaces.nim +++ b/src/syndicate/dataspaces.nim @@ -56,7 +56,6 @@ type adhocAssertions: Bag cleanupChanges: Bag parentId: ActorId - isRunnable: bool EndpointSpec* = object assertion*: Option[Value] @@ -163,18 +162,6 @@ proc newActor(ds: Dataspace; name: string; initialAssertions: Value; parentId: A discard result.adhocAssertions.change(v, 1) ds.actors[result.id] = result -proc newFacet(actor; parent: ParentFacet): Facet = - result = Facet( - id: actor.dataspace.generateId.FacetId, - actor: actor, - parent: parent, - isLive: true, - inScript: true) - if parent.isSome: - parent.get.children[result.id] = result - else: - actor.rootFacet = some result - proc applyPatch(ds: Dataspace; actor: Option[Actor]; changes: Bag) = type Pair = tuple[val: Value; count: int] var removals: seq[Pair] @@ -205,10 +192,12 @@ proc adjust(patch: var Action; v: Value; delta: int) = discard patch.changes.change(v, delta) proc subscribe(ds: Dataspace; handler: Analysis) = - ds.index.addHandler(handler, handler.callback.get) + assert(not handler.callback.isNil) + ds.index.addHandler(handler, handler.callback) proc unsubscribe(ds: Dataspace; handler: Analysis) = - ds.index.removeHandler(handler, handler.callback.get) + assert(not handler.callback.isNil) + ds.index.removeHandler(handler, handler.callback) proc assert(actor; a: Value) = actor.pendingPatch.adjust(a, +1) @@ -221,9 +210,12 @@ proc install(ep: Endpoint; spec: EndpointSpec) = ep.spec.analysis.map do (a: Analysis): ep.facet.actor.dataspace.subscribe(a) +proc isRunnable(actor): bool = + for tasks in actor.pendingTasks: + if tasks.len > 0: return true + proc scheduleTask(actor; prio: Priority; task: Task[void]) = if not actor.isRunnable: - actor.isRunnable = true actor.dataspace.runnable.add(actor) actor.pendingTasks[prio].addLast(task) @@ -231,7 +223,7 @@ proc scheduleTask(actor; task: Task[void]) = scheduleTask(actor, pNormal, task) proc abandonQueuedWork(actor) = - actor.pendingActions = @[] + reset actor.pendingActions for q in actor.pendingTasks.mitems: clear(q) proc uninstall(ep: Endpoint; emitPatches: bool) = @@ -411,8 +403,9 @@ proc addActor(ds: Dataspace; name: string; bootProc: Script[void]; initialAssert ac.adhocRetract(a) proc send*(facet; body: Value) = + ## Send a message into the dataspace. facet.ensureNonFacetSetup("send") - proc impl(action: Action; ds: Dataspace; actor: Option[Actor]) = + proc impl(_: Action; ds: Dataspace; actor: Option[Actor]) = ds.deliverMessage(body, actor) facet.enqueueScriptAction(Action(impl: impl, kind: messageAction)) @@ -444,12 +437,12 @@ proc initActivationAction(script: ActivationScript; name: string): Action = ds.addActor(name, boot, Preserve(kind: pkSet), actor) Action(impl: impl, kind: activationAction) -proc activate(facet; script: ActivationScript; name = "") = +proc activate(facet; name: string; script: ActivationScript) = facet.ensureNonFacetSetup "`activate`" facet.enqueueScriptAction(initActivationAction(script, name)) -proc newDataspace(ground: Ground; bootProc: ActivationScript): Dataspace = - let turn = Turn(actions: @[initSpawnAction("", bootProc, Value(kind: pkSet))]) +proc newDataspace(ground: Ground; name: string; bootProc: ActivationScript): Dataspace = + let turn = Turn(actions: @[initSpawnAction(name, bootProc, Value(kind: pkSet))]) Dataspace(ground: ground, index: initIndex(), pendingTurns: @[turn]) proc addEndpoint*(facet; updateScript: Script[EndpointSpec], isDynamic = true): Endpoint = @@ -496,7 +489,6 @@ proc runPendingTask(actor): bool = proc runPendingTasks(actor) = while actor.runPendingTask(): discard - actor.isRunnable = false if actor.pendingActions.len > 0: var pending = move actor.pendingActions actor.dataspace.commitActions(actor, pending) @@ -543,12 +535,12 @@ proc step(g: Ground) = reset g.stopHandlers complete(g.future) -proc bootModule*(bootProc: ActivationScript): Future[void] = +proc bootModule*(name: string; bootProc: ActivationScript): Future[void] = # TODO: better integration with the async dispatcher let g = Ground(future: newFuture[void]"bootModule") - g.dataspace = newDataspace(g) do (rootFacet: Facet): + g.dataspace = newDataspace(g, name) do (rootFacet: Facet): rootFacet.addStartScript do (rootFacet: Facet): - rootFacet.activate(bootProc) + rootFacet.activate(name, bootProc) addTimer(1, true) do (fd: AsyncFD) -> bool: step(g) true @@ -562,15 +554,15 @@ template declareField*(facet: Facet; F: untyped; T: typedesc; initial: T): untyp facet.actor.dataspace.dataflow.defineObservableProperty(`F`.id) facet.fields.add(toPreserve(initial)) let fieldOff = facet.fields.high - proc set(f: DistinctField; x: T) = + proc set(f: DistinctField; x: T) {.used.} = facet.actor.dataspace.dataflow.recordDamage(f.id) facet.fields[fieldOff] = toPreserve[T](x) - proc set(f: DistinctField; x: Preserve) = + proc set(f: DistinctField; x: Preserve) {.used.} = facet.actor.dataspace.dataflow.recordDamage(f.id) facet.fields[fieldOff] = x - proc get(f: DistinctField): T = + proc get(f: DistinctField): T {.used.} = facet.actor.dataspace.dataflow.recordObservation(f.id) fromPreserve[T](result, facet.fields[fieldOff]) - proc getPreserve(f: DistinctField): Preserve = + proc getPreserve(f: DistinctField): Preserve {.used.} = facet.actor.dataspace.dataflow.recordObservation(f.id) facet.fields[fieldOff] diff --git a/src/syndicate/skeletons.nim b/src/syndicate/skeletons.nim index 137d768..e92a105 100644 --- a/src/syndicate/skeletons.nim +++ b/src/syndicate/skeletons.nim @@ -33,7 +33,7 @@ type constPaths: seq[Path] constVals: seq[Value] capturePaths: seq[Path] - callback*: Option[HandlerCallback] + callback*: HandlerCallback proc `$`(a: Analysis): string = result.add "\n\t skeleton: " diff --git a/tests/test_box_and_client.nim b/tests/test_box_and_client.nim index 5c20259..f4b70c1 100644 --- a/tests/test_box_and_client.nim +++ b/tests/test_box_and_client.nim @@ -38,7 +38,7 @@ proc boot(facet: Facet) = facet.scheduleScript do (facet: Facet): facet.fields.value = vs[0] # echo "box updated value ", vs[0] - result.analysis.get.callback = some (facet.wrap cb) + result.analysis.get.callback = facet.wrap cb const o = Observe.init(SetBox.init(`?$`)) result.assertion = some o @@ -53,7 +53,7 @@ proc boot(facet: Facet) = let v = SetBox.init(vs[0].int.succ.toPreserve) # echo "client sending ", v facet.send(v) - result.analysis.get.callback = some (facet.wrap cb) + result.analysis.get.callback = facet.wrap cb const o = Observe.init(BoxState.init(`?$`)) result.assertion = some o @@ -64,11 +64,11 @@ proc boot(facet: Facet) = if evt == removedEvent: facet.scheduleScript do (facet: Facet): echo "box gone" - result.analysis.get.callback = some (facet.wrap cb) + result.analysis.get.callback = facet.wrap cb const o = Observe.init(BoxState.init(`?_`)) result.assertion = some o facet.actor.dataspace.ground.addStopHandler do (_: Dataspace): echo "stopping box-and-client" -waitFor bootModule(boot) +waitFor bootModule("box-and-client", boot)