actors: do not execute actions when turn fails

This commit is contained in:
Emery Hemingway 2024-02-29 09:47:09 +00:00
parent 0a7d129991
commit 18609d01ce
2 changed files with 52 additions and 37 deletions

View File

@ -4,7 +4,7 @@
# Use procedures to call into SAM,
# use continuations to call within SAM.
import std/[deques, hashes, options, tables, times]
import std/[deques, hashes, options, sets, tables, times]
import pkg/cps
import preserves
import ./protocols/[protocol, sturdy]
@ -64,6 +64,7 @@ type
work: Work
actions: seq[Cont]
event: Option[protocol.Event]
rollback: bool
when traceSyndicate:
desc: TurnDescription
@ -88,6 +89,9 @@ type
# TODO: run on a seperate thread.
# crashHandlers: HandlerDeque
root: Facet
children: HashSet[Actor]
# linked subordinate actors, this collection
# is here for running actor turns
handleAllocator: Handle
facetIdAllocator: int
id: ActorId
@ -100,10 +104,15 @@ template turnWork*(prc: typed): untyped =
cps(Cont, prc)
proc activeFacet(c: Cont): Facet {.cpsVoodoo.} =
## Return the active `Facet` within a `{.syndicate.}` context.
## Return the active `Facet` within a turn context.
assert not c.facet.isNil
c.facet
proc activeTurn(c: Cont): Turn {.cpsVoodoo.} =
## Return the active `Turn` within a turn context.
assert not c.facet.isNil
c.facet.actor.turn
using
actor: Actor
facet: Facet
@ -112,10 +121,9 @@ using
turn: Turn
proc `$`*(facet): string = $facet.id
proc `$`*(cap): string = "#:"
proc `$`*(cap): string = "#:" & $cast[uint](cap.unsafeAddr)
proc hash*(facet): Hash = facet.unsafeAddr.hash
proc hash*(cap): Hash = cap.unsafeAddr.hash
proc hash*(x: Actor|Facet|Cap): Hash = x.unsafeAddr.hash
proc relay*(cap): Facet =
assert not cap.target.facet.isNil
@ -139,13 +147,14 @@ proc newFacet(actor; parent: Facet): Facet =
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
collectPath(act.facetstart.path, result)
actor.turn.desc.actions.add act
echo actor.turn.desc
proc stopped*(facet): bool = facet.state != fRunning
proc newActor(parent: Facet; name: string): Actor =
result = Actor(id: name.toPreserves)
result.root = newFacet(result, parent)
if not parent.isNil:
parent.actor.children.incl result
when traceSyndicate:
if parent.isNil:
let path = getEnv("SYNDICATE_TRACE_FILE", "")
@ -210,10 +219,14 @@ proc pass*(a, b: Cont): Cont =
b.facet = a.facet
b
proc queueWork(facet; c: Cont) =
proc queueWork*(facet; c: Cont) =
c.facet = facet
facet.actor.turn.work.addLast(c)
proc queueAction*(facet; c: Cont) =
c.facet = facet
facet.actor.turn.actions.add(c)
proc yieldWork(c: Cont): Cont {.cpsMagic.} =
## Suspend and enqueue the caller until later in the turn.
assert not c.facet.isNil
@ -249,42 +262,31 @@ proc complete(c: Cont) =
var x = y(c)
c = Cont(x)
except CatchableError as err:
c.facet.actor.turn.rollback = true
if not c.dismissed:
writeStackFrames c
terminate(c.facet, err)
proc runTurn*(actor): bool =
proc run*(actor): bool =
if actor.stopped: return
try:
result = actor.turn.work.len > 0
while actor.turn.work.len > 0:
actor.turn.work.popFirst().complete()
var i: int
echo actor.turn.actions.len, " items in pending action queue"
while i < actor.turn.actions.len:
complete(move actor.turn.actions[i])
inc i
echo i, " items completed from action queue"
actor.turn.actions.setLen(0)
when traceSyndicate:
actor.traceTurn()
if actor.stopped:
trace(actor, ActorActivation(orkind: ActorActivationKind.stop))
for child in actor.children:
result = result and run(child)
except Exception as err:
actor.terminate(err)
proc runChildren(facet): bool =
var i = 0
for child in facet.children:
inc(i)
if child.actor != facet.actor:
result = result or
runTurn(child.actor) or
runChildren(child)
proc runActors*(actor): bool =
runTurn(actor) or runChildren(actor.root)
proc start(actor; cont: Cont) =
when traceSyndicate:
var act = ActorActivation(orkind: ActorActivationKind.start)
@ -327,6 +329,8 @@ proc stop*(actor) =
stop(actor.root, FacetStopReason.actorStopping)
actor.stopped = true
proc stopped*(actor): bool =
actor.`stopped`
# proc stopFacetAction(reason: FacetStopReason) {.syndicate.} =
# stop(c.facet, reason)
@ -523,10 +527,13 @@ proc turnPublish(cap: Cap; val: Value; h: Handle) {.turnWork.} =
cap.actor.traceEnqueue(traceEvent)
cap.relay.outbound[h] = OutboundAssertion(handle: h, peer: cap)
yieldToActions()
when traceSyndicate:
cap.actor.traceDequeue(traceEvent)
cap.target.publish(val, h)
cap.relay.outbound[h].established = true
if activeTurn().rollback:
cap.relay.outbound.del(h)
else:
when traceSyndicate:
cap.actor.traceDequeue(traceEvent)
cap.target.publish(val, h)
cap.relay.outbound[h].established = true
proc turnRetract(cap: Cap; h: Handle) {.turnWork.} =
when traceSyndicate:
@ -537,9 +544,12 @@ proc turnRetract(cap: Cap; h: Handle) {.turnWork.} =
traceEvent.detail.retract.handle = h
cap.actor.traceEnqueue(traceEvent)
yieldToActions()
when traceSyndicate:
cap.actor.traceDequeue(traceEvent)
cap.target.retract(h)
if not activeTurn().rollback:
when traceSyndicate:
cap.actor.traceDequeue(traceEvent)
var e: OutboundAssertion
if cap.relay.outbound.pop(h, e):
cap.target.retract(h)
proc turnMessage(cap: Cap; val: Value) {.turnWork.} =
var val = runRewrites(val, cap.attenuation)
@ -551,9 +561,10 @@ proc turnMessage(cap: Cap; val: Value) {.turnWork.} =
traceEvent.detail.message.body.value.value = val
cap.actor.traceEnqueue(traceEvent)
yieldToActions()
when traceSyndicate:
cap.actor.traceDequeue(traceEvent)
cap.target.message(val)
if not activeTurn().rollback:
when traceSyndicate:
cap.actor.traceDequeue(traceEvent)
cap.target.message(val)
proc turnSync(cap: Cap; peer: Cap) {.turnWork.} =
when traceSyndicate:
@ -564,9 +575,10 @@ proc turnSync(cap: Cap; peer: Cap) {.turnWork.} =
traceEvent.detail.sync.peer = peer.traceTarget
cap.actor.traceEnqueue(traceEvent)
yieldToActions()
when traceSyndicate:
cap.actor.traceDequeue(traceEvent)
cap.target.sync(peer)
if not activeTurn().rollback:
when traceSyndicate:
cap.actor.traceDequeue(traceEvent)
cap.target.sync(peer)
proc publish*(cap; val: Value): Handle =
## Publish a `Value` to a `Cap` returning `Handle`
@ -608,6 +620,9 @@ proc addOnStopHandler(c: Cont; cb: Callback): Cont {.cpsMagic.} =
proc onStop*(facet; cb: proc () {.closure.}) =
facet.stopCallbacks.add(cb)
proc facetCall(prc: FacetProc; f: Facet) {.cps: Cont.} =
prc(f)
proc facetCall(prc: FacetProc) {.cps: Cont.} =
prc(activeFacet())
@ -619,4 +634,4 @@ proc bootActor*(name: string; bootProc: FacetProc): Actor =
proc spawnActor*(parent: Facet; name: string; bootProc: FacetProc): Actor {.discardable.} =
result = newActor(parent, name)
result.root.startExternalTurn()
result.root.queueWork(whelp facetCall(bootProc))
parent.queueWork(whelp facetCall(bootProc, result.root))

View File

@ -1,6 +1,6 @@
# Package
version = "20240228"
version = "20240229"
author = "Emery Hemingway"
description = "Syndicated actors for conversational concurrency"
license = "Unlicense"