Hide Entity action callbacks

This commit is contained in:
Emery Hemingway 2024-02-24 16:51:41 +00:00
parent 9eae178723
commit 9f59bb1e94
6 changed files with 175 additions and 95 deletions

View File

@ -1,6 +1,9 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
# Use procedures to call into SAM,
# use continuations to call within SAM.
import std/[deques, hashes, options, tables, times]
import pkg/cps
import preserves
@ -58,7 +61,6 @@ type
Turn {.byref.} = object
## https://synit.org/book/glossary.html#turn
facet: Facet
entity: Entity
work: Work
actions: seq[Cont]
event: Option[protocol.Event]
@ -67,14 +69,17 @@ type
Entity* = ref object of RootObj
## https://synit.org/book/glossary.html#entity
publishImpl: PublishProc
retractImpl: RetractProc
messageImpl: MessageProc
syncImpl: SyncProc
facet*: Facet
# This implementation associates Entities to
# Facets, which is not to be taken as a SAM
# axiom.
oid*: sturdy.Oid # oid is how Entities are identified over the wire
publishImpl*: PublishProc
retractImpl*: RetractProc
messageImpl*: MessageProc
syncImpl*: SyncProc
Cap* {.final, preservesEmbedded.} = ref object of EmbeddedObj
relay*: Facet
target*: Entity
attenuation*: seq[sturdy.Caveat]
@ -94,7 +99,7 @@ type
template turnWork*(prc: typed): untyped =
cps(Cont, prc)
proc activeFacet*(c: Cont): Facet {.cpsVoodoo.} =
proc activeFacet(c: Cont): Facet {.cpsVoodoo.} =
## Return the active `Facet` within a `{.syndicate.}` context.
assert not c.facet.isNil
c.facet
@ -112,27 +117,50 @@ proc `$`*(cap): string = "#:…"
proc hash*(facet): Hash = facet.unsafeAddr.hash
proc hash*(cap): Hash = cap.unsafeAddr.hash
proc newFacet(actor: Actor; parent: Facet): Facet =
proc relay*(cap): Facet =
assert not cap.target.facet.isNil
cap.target.facet
proc collectPath(result: var seq[FacetId]; facet) =
if not facet.parent.isNil:
collectPath(result, facet.parent)
result.add(facet.id)
proc newFacet(actor; parent: Facet): Facet =
inc(actor.facetIdAllocator)
result = Facet(
actor: actor,
parent: parent,
id: actor.facetIdAllocator.toPreserves,
)
if not parent.isNil:
parent.children.add result
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
collectPath(act.facetstart.path, result)
actor.turn.desc.actions.add act
echo actor.turn.desc
proc stopped*(facet): bool = facet.state != fRunning
proc newActor(name: string): Actor =
proc newActor(parent: Facet; name: string): Actor =
result = Actor(id: name.toPreserves)
result.root = newFacet(result, nil)
result.root = newFacet(result, parent)
when traceSyndicate:
let path = getEnv("SYNDICATE_TRACE_FILE", "")
case path
of "": discard
of "-": result.traceStream = newFileStream(stderr)
else: result.traceStream = openFileStream(path, fmWrite)
if parent.isNil:
let path = getEnv("SYNDICATE_TRACE_FILE", "")
case path
of "": discard
of "-": result.traceStream = newFileStream(stderr)
else: result.traceStream = openFileStream(path, fmWrite)
else:
result.traceStream = parent.actor.traceStream
when traceSyndicate:
proc traceFlush(actor) =
if not actor.traceStream.isNil:
actor.traceStream.flush()
proc trace(actor; act: ActorActivation) =
if not actor.traceStream.isNil:
var entry = TraceEntry(
@ -148,6 +176,8 @@ when traceSyndicate:
orKind: ActorActivationKind.turn,
turn: actor.turn.desc,
))
actor.traceFlush()
reset actor.turn.desc
proc traceTarget(facet): trace.Target =
Target(
@ -204,6 +234,8 @@ proc startExternalTurn(facet) =
actor.turn.desc = TurnDescription(cause: TurnCause(orKind: TurnCauseKind.external))
proc terminate(actor; err: ref Exception) =
when traceSyndicate:
actor.traceTurn()
raise err
proc terminate(facet; err: ref Exception) =
@ -221,24 +253,37 @@ proc complete(c: Cont) =
writeStackFrames c
terminate(c.facet, err)
proc run(actor) =
assert not actor.stopped
var n = 0
while actor.turn.work.len > 0:
actor.turn.work.popFirst().complete()
inc n
echo n, " items completed from work queue"
when traceSyndicate:
actor.traceTurn()
var i: int
while i < actor.turn.actions.len:
complete(move actor.turn.actions[i])
inc i
echo i, " items completed from action queue"
turn.actions.setLen(0)
when traceSyndicate:
if actor.stopped:
trace(actor, ActorActivation(orkind: ActorActivationKind.stop))
proc runTurn*(actor): bool =
if actor.stopped: return
try:
result = actor.turn.work.len > 0
while actor.turn.work.len > 0:
actor.turn.work.popFirst().complete()
var i: int
echo actor.turn.actions.len, " items in pending action queue"
while i < actor.turn.actions.len:
complete(move actor.turn.actions[i])
inc i
echo i, " items completed from action queue"
actor.turn.actions.setLen(0)
when traceSyndicate:
actor.traceTurn()
if actor.stopped:
trace(actor, ActorActivation(orkind: ActorActivationKind.stop))
except Exception as err:
actor.terminate(err)
proc runChildren(facet): bool =
var i = 0
for child in facet.children:
inc(i)
if child.actor != facet.actor:
result = result or
runTurn(child.actor) or
runChildren(child)
proc runActors*(actor): bool =
runTurn(actor) or runChildren(actor.root)
proc start(actor; cont: Cont) =
when traceSyndicate:
@ -247,14 +292,8 @@ proc start(actor; cont: Cont) =
actor.root.state = fRunning
actor.root.startExternalTurn()
actor.root.queueWork(cont)
run(actor)
proc stop(actor)
proc collectPath(result: var seq[FacetId]; facet) =
if not facet.parent.isNil:
collectPath(result, facet.parent)
result.add(facet.id)
proc stop*(actor)
proc runNextStop(c: Cont; facet: Facet): Cont {.cpsMagic.} =
c.fn = facet.stopHandlers.pop()
@ -283,18 +322,17 @@ proc stop(facet; reason: FacetStopReason) =
actor.root = nil
stop(actor)
proc stop(actor) =
proc stop*(actor) =
if not actor.root.isNil:
stop(actor.root, FacetStopReason.actorStopping)
actor.stopped = true
proc bootActor(name: string, c: Cont) =
start(newActor(name), c)
# proc stopFacetAction(reason: FacetStopReason) {.syndicate.} =
# stop(c.facet, reason)
proc stopActorAction() {.cps: Cont.} =
yieldToActions()
activeFacet().actor.stop()
proc stopActor*(facet) =
@ -302,14 +340,20 @@ proc stopActor*(facet) =
c.facet = facet
facet.actor.turn.actions.add(c)
proc stopActor*(cap) =
## Stop an `Actor` from a `Cap`.
stopActor(cap.relay)
type
AssertionRef* = ref object
value*: Value
# if the Enity methods take a Value object then the generated
# C code has "redefinition of struct" problems when orc is enabled
proc newCap*(f: Facet; e: Entity): Cap =
Cap(relay: f, target: e)
proc newCap*(facet; entity): Cap =
doAssert entity.facet.isNil
entity.facet = facet
Cap(target: entity)
proc nextHandle(facet: Facet): Handle =
inc(facet.actor.handleAllocator)
@ -319,12 +363,11 @@ proc actor(cap): Actor = cap.relay.actor
type Bindings = Table[Value, Value]
proc attenuate(r: Cap; a: seq[Caveat]): Cap =
if a.len == 0: result = r
else: result = Cap(
relay: r.relay,
target: r.target,
attenuation: a & r.attenuation)
proc attenuate(cap; att: seq[Caveat]): Cap =
if att.len == 0: cap
else: Cap(
target: cap.target,
attenuation: att & cap.attenuation)
proc match(bindings: var Bindings; p: Pattern; v: Value): bool =
case p.orKind
@ -436,17 +479,34 @@ proc runRewrites(v: Value; a: openarray[Caveat]): Value =
result = examineAlternatives(stage, result)
if result.isFalse: break
proc setActions*(entity;
publish = PublishProc();
retract = RetractProc();
message = MessageProc();
sync = SyncProc();
): Entity {.discardable} =
## Set the action handlers for an `Entity`.
result = entity
result.publishImpl = publish
result.retractImpl = retract
result.messageImpl = message
result.syncImpl = sync
proc publish(c: Cont; e: Entity; v: Value; h: Handle): Cont {.cpsMagic.} =
pass c, e.publishImpl.call(e, v, h)
if not e.publishImpl.fn.isNil:
result = pass(c, e.publishImpl.call(e, v, h))
proc retract(c: Cont; e: Entity; h: Handle): Cont {.cpsMagic.} =
pass c, e.retractImpl.call(e, h)
if not e.retractImpl.fn.isNil:
result = pass(c, e.retractImpl.call(e, h))
proc message(c: Cont; e: Entity; v: Value): Cont {.cpsMagic.} =
pass c, e.messageImpl.call(e, v)
if not e.messageImpl.fn.isNil:
result = pass(c, e.messageImpl.call(e, v))
proc sync(c: Cont; e: Entity; p: Cap): Cont {.cpsMagic.} =
pass c, e.syncImpl.call(e, p)
if not e.syncImpl.fn.isNil:
result = pass(c, e.syncImpl.call(e, p))
proc turnPublish(cap: Cap; val: Value; h: Handle) {.turnWork.} =
when traceSyndicate:
@ -509,15 +569,22 @@ proc turnSync(cap: Cap; peer: Cap) {.turnWork.} =
cap.target.sync(peer)
proc publish*(cap; val: Value): Handle =
## Publish a `Value` to a `Cap` returning `Handle`
## for later retraction.
# The publish action on an Entity always goes through
# here first.
var val = runRewrites(val, cap.attenuation)
# TODO: attenuation to nothing?
result = cap.relay.nextHandle()
cap.relay.queueWork(whelp turnPublish(cap, val, result))
proc publish*[T](cap; x: T): Handle =
## Publish Preserves-convertable value to a `Cap`
## returning `Handle` for later retraction.
publish(cap, x.toPreserves)
proc retract*(cap; h: Handle) =
## Retract a `Handle` from a `Cap`.
cap.relay.queueWork(whelp turnRetract(cap, h))
proc message*(cap; val: Value) =
@ -541,12 +608,15 @@ proc addOnStopHandler(c: Cont; cb: Callback): Cont {.cpsMagic.} =
proc onStop*(facet; cb: proc () {.closure.}) =
facet.stopCallbacks.add(cb)
proc bootActor*(name: string; bootProc: FacetProc): Actor =
result = newActor(name)
result.root.startExternalTurn()
bootProc(result.root)
proc callLater(prc: FacetProc) {.cps: Cont.} =
prc activeFacet()
proc runActor*(name: string; bootProc: FacetProc) =
let actor = bootActor(name, bootProc)
while not actor.stopped:
run(actor)
proc bootActor*(name: string; bootProc: FacetProc): Actor =
result = newActor(nil, name)
result.root.startExternalTurn()
result.root.queueWork(whelp callLater(bootProc))
proc spawnActor*(parent: Facet; name: string; bootProc: FacetProc): Actor {.discardable.} =
result = newActor(parent, name)
result.root.startExternalTurn()
result.root.queueWork(whelp callLater(bootProc))

View File

@ -37,12 +37,16 @@ proc dsMessage(e: Entity; v: Value) {.cps: Cont.} =
proc newDataspace*(f: Facet): Cap =
var ds = Dataspace(
publishImpl: whelp dsPublish,
retractImpl: whelp dsRetract,
messageImpl: whelp dsMessage,
index: initIndex(),
).setActions(
publish = whelp dsPublish,
retract = whelp dsRetract,
message = whelp dsMessage,
)
newCap(f, ds)
proc observe*(cap: Cap; pat: Pattern; peer: Cap): Handle =
publish(cap, Observe(pattern: pat, observer: peer))
proc observe*(cap: Cap; pat: Pattern; e: Entity): Handle =
publish(cap, Observe(pattern: pat, observer: newCap(cap.relay, e)))
observe(cap, pat, newCap(cap.relay, e))

View File

@ -3,10 +3,11 @@
import std/[hashes, tables]
import preserves
import pkg/cps
import ./[actors, patterns]
type
DuringProc* = proc (a: Value; h: Handle): FacetProc {.gcsafe.}
DuringProc* = proc (f: Facet; a: Value; h: Handle): FacetProc {.closure.}
DuringActionKind = enum null, dead, act
DuringAction = object
case kind: DuringActionKind
@ -19,11 +20,11 @@ type
proc duringPublish(e: Entity; v: Value; h: Handle) {.cps: Cont.} =
var de = DuringEntity(e)
let handler = de.handler(de.facet, a.value, h)
let g = de.assertionMap.getOrDefault h
case g.kind
of null, dead:
de.assertionMap[h] = DuringAction(kind: act, action: handler)
var cb = de.publishProc(e.facet, v, h)
de.assertionMap[h] = DuringAction(kind: act, retractProc: cb)
of act:
raiseAssert("during: duplicate handle in publish: " & $h)
@ -37,7 +38,12 @@ proc duringRetract(e: Entity; h: Handle) {.cps: Cont.} =
raiseAssert("during: duplicate handle in retract: " & $h)
of act:
de.assertionMap.del h
if not g.action.isNil:
g.action(de.facet)
if not g.retractProc.isNil:
g.retractProc(de.facet)
proc during*(cb: DuringProc): DuringEntity = DuringEntity(cb: cb)
proc during*(cb: DuringProc): Entity =
## TODO: this doesn't follow Nim idom well.
DuringEntity(publishProc: cb).setActions(
publish = whelp duringPublish,
retract = whelp duringRetract,
)

View File

@ -10,8 +10,7 @@ import pkg/cps
import preserves
export preserves
import ./[actors, dataspaces, patterns]
# durings
import ./[actors, dataspaces, durings, patterns]
import ./protocols/dataspace
export actors, dataspace, dataspaces, patterns
@ -100,7 +99,7 @@ proc wrapPublishHandler(handler: NimNode): NimNode =
handlerSym = genSym(nskProc, "publish")
bindingsSym = ident"bindings"
quote do:
proc `handlerSym`(turn: Turn; `bindingsSym`: Value; `handleSym`: Handle) =
proc `handlerSym`(`bindingsSym`: Value; `handleSym`: Handle) =
`varSection`
if fromPreserves(`valuesSym`, bindings):
`publishBody`
@ -126,23 +125,21 @@ proc wrapDuringHandler(entryBody, exitBody: NimNode): NimNode =
duringSym = genSym(nskProc, "during")
if exitBody.isNil:
quote do:
proc `duringSym`(turn: Turn; `bindingsSym`: Value; `handleSym`: Handle): TurnAction =
proc `duringSym`(f: Facet; `bindingsSym`: Value; `handleSym`: Handle): FacetProc =
`varSection`
if fromPreserves(`valuesSym`, `bindingsSym`):
`publishBody`
else:
quote do:
proc `duringSym`(turn: Turn; `bindingsSym`: Value; `handleSym`: Handle): TurnAction =
proc `duringSym`(f: Facet; `bindingsSym`: Value; `handleSym`: Handle): FacetProc =
`varSection`
if fromPreserves(`valuesSym`, `bindingsSym`):
`publishBody`
proc action(turn: Turn) =
result = proc (facet: Facet) =
`exitBody`
result = action
#[
macro onPublish*(ds: Cap; pattern: Pattern; handler: untyped) =
## Call `handler` when an assertion matching `pattern` is published at `ds`.
macro onPublish*(cap: Cap; pattern: Pattern; handler: untyped) =
## Call `handler` when an assertion matching `pattern` is published at `cap`.
let
argCount = argumentCount(handler)
handlerProc = wrapPublishHandler(handler)
@ -151,8 +148,12 @@ macro onPublish*(ds: Cap; pattern: Pattern; handler: untyped) =
if `argCount` != 0 and `pattern`.analyse.capturePaths.len != `argCount`:
raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments - " & $`pattern`)
`handlerProc`
discard observe(activeTurn(), `ds`, `pattern`, ClosureEntity(publishImpl: `handlerSym`))
]#
var ce = ClosureEntity(
publishCb: `handlerSym`,
).setActions(
publish = whelp publishCont,
)
discard observe(`cap`, `pattern`, ce)
macro onMessage*(cap: Cap; pattern: Pattern; handler: untyped) =
## Call `handler` when an message matching `pattern` is broadcasted at `cap`.
@ -167,11 +168,11 @@ macro onMessage*(cap: Cap; pattern: Pattern; handler: untyped) =
discard observe(`cap`, `pattern`, ClosureEntity(
messageImpl: whelp messageCont,
messageCb: `handlerSym`,
name: "ClosureEntity",
))
#[
macro during*(ds: Cap; pattern: Pattern; publishBody, retractBody: untyped) =
## Call `publishBody` when an assertion matching `pattern` is published to `ds` and
macro during*(cap: Cap; pattern: Pattern; publishBody, retractBody: untyped) =
## Call `publishBody` when an assertion matching `pattern` is published to `cap` and
## call `retractBody` on retraction. Assertions that match `pattern` but are not
## convertable to the arguments of `publishBody` are silently discarded.
##
@ -186,9 +187,9 @@ macro during*(ds: Cap; pattern: Pattern; publishBody, retractBody: untyped) =
if `argCount` != 0 and `pattern`.analyse.capturePaths.len != `argCount`:
raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments - " & $`pattern`)
`callbackProc`
discard observe(activeTurn(), `ds`, `pattern`, during(`callbackSym`))
discard observe(`cap`, `pattern`, during(`callbackSym`))
macro during*(ds: Cap; pattern: Pattern; publishBody: untyped) =
macro during*(cap: Cap; pattern: Pattern; publishBody: untyped) =
## Variant of `during` without a retract body.
let
`argCount` = argumentCount(publishBody)
@ -198,8 +199,7 @@ macro during*(ds: Cap; pattern: Pattern; publishBody: untyped) =
if `argCount` != 0 and `pattern`.analyse.capturePaths.len != `argCount`:
raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments - " & $`pattern`)
`callbackProc`
discard observe(activeTurn(), `ds`, `pattern`, during(`callbackSym`))
]#
discard observe(`cap`, `pattern`, during(`callbackSym`))
proc wrapHandler(body: NimNode; ident: string): NimNode =
var sym = genSym(nskProc, ident)

View File

@ -1,6 +1,6 @@
# Package
version = "20240219"
version = "20240224"
author = "Emery Hemingway"
description = "Syndicated actors for conversational concurrency"
license = "Unlicense"

View File

@ -1,7 +1,7 @@
include_rules
NIM_FLAGS += --define:traceSyndicate
: foreach *.prs |> !preserves_schema_nim |> | {schema}
: foreach t*.nim | ../../preserves-nim/<tests> {schema} $(SYNDICATE_PROTOCOL) |> !nim_run |> | ../<test>
: foreach t*.nim | ../../preserves-nim/<tests> {schema} $(SYNDICATE_PROTOCOL) |> !nim |> | ../<test> {test}
: test_timers.run |> SYNDICATE_TRACE_FILE=%o ./%f |> ./%B.trace.bin {bintrace}
: foreach {test} |> SYNDICATE_TRACE_FILE=%o ./%f |> ./%B.trace.bin {bintrace}
: foreach {bintrace} |> preserves-tool convert <%f >%o |> %B