actors: better facet stop tracing
This commit is contained in:
parent
1592fac3b1
commit
82f2e8ee98
|
@ -122,18 +122,17 @@ using
|
|||
|
||||
proc labels(f: Facet): string =
|
||||
proc catLabels(f: Facet; labels: var string) =
|
||||
labels.add ':'
|
||||
if not f.parent.isNil:
|
||||
catLabels(f.parent, labels)
|
||||
labels.add ':'
|
||||
when tracing:
|
||||
labels.add $f.id
|
||||
result.add f.actor.name
|
||||
catLabels(f, result)
|
||||
|
||||
proc `$`*(f: Facet): string =
|
||||
"<Facet:" & f.labels & ">"
|
||||
|
||||
when tracing:
|
||||
proc `$`*(f: Facet): string =
|
||||
"<Facet:" & f.labels & ">"
|
||||
|
||||
proc `$`*(r: Cap): string =
|
||||
"<Ref:" & r.relay.labels & ">"
|
||||
|
@ -162,6 +161,13 @@ proc nextHandle(facet: Facet): Handle =
|
|||
result = succ(facet.actor.handleAllocator[])
|
||||
facet.actor.handleAllocator[] = result
|
||||
|
||||
template recallFacet(turn: var Turn; body: untyped): untyped =
|
||||
let facet = turn.facet
|
||||
block:
|
||||
body
|
||||
assert facet.actor == turn.facet.actor
|
||||
turn.facet = facet
|
||||
|
||||
proc queueWork*(turn: var Turn; facet: Facet; act: TurnAction) =
|
||||
assert not facet.isNil
|
||||
turn.work.addLast((facet, act,))
|
||||
|
@ -401,18 +407,12 @@ proc isInert(facet): bool =
|
|||
noOutboundHandles = facet.outbound.len == 0
|
||||
isRootFacet = facet.parent.isNil
|
||||
noInertCheckPreventers = facet.inertCheckPreventers == 0
|
||||
noKids and (noOutboundHandles or isRootFacet) and noInertCheckPreventers
|
||||
result = noKids and (noOutboundHandles or isRootFacet) and noInertCheckPreventers
|
||||
|
||||
proc preventInertCheck*(facet): (proc()) {.discardable.} =
|
||||
var armed = true
|
||||
inc facet.inertCheckPreventers
|
||||
proc disarm() =
|
||||
if armed:
|
||||
armed = false
|
||||
dec facet.inertCheckPreventers
|
||||
result = disarm
|
||||
proc preventInertCheck*(turn: Turn) =
|
||||
inc turn.facet.inertCheckPreventers
|
||||
|
||||
proc terminate(actor; turn; reason: ref Exception)
|
||||
proc terminateActor(turn; reason: ref Exception)
|
||||
|
||||
proc terminate(facet; turn: var Turn; orderly: bool) =
|
||||
if facet.isAlive:
|
||||
|
@ -432,37 +432,44 @@ proc terminate(facet; turn: var Turn; orderly: bool) =
|
|||
if parent.isInert:
|
||||
parent.terminate(turn, true)
|
||||
else:
|
||||
terminate(facet.actor, turn, nil)
|
||||
terminateActor(turn, nil)
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
|
||||
act.facetstop.path = facet.path
|
||||
turn.desc.actions.add act
|
||||
|
||||
proc inertCheck(turn: var Turn) =
|
||||
if (not turn.facet.parent.isNil and
|
||||
(not turn.facet.parent.isAlive)) or
|
||||
turn.facet.isInert:
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
|
||||
act.facetstop.path = facet.path
|
||||
act.facetstop.reason = FacetStopReason.inert
|
||||
turn.desc.actions.add act
|
||||
stop(turn)
|
||||
|
||||
proc stopIfInertAfter(action: TurnAction): TurnAction =
|
||||
# TODO: verify this
|
||||
proc wrapper(turn: var Turn) =
|
||||
action(turn)
|
||||
queueEffect(turn, turn.facet) do (turn: var Turn):
|
||||
if (not turn.facet.parent.isNil and
|
||||
(not turn.facet.parent.isAlive)) or
|
||||
turn.facet.isInert:
|
||||
stop(turn)
|
||||
wrapper
|
||||
proc work(turn: var Turn) =
|
||||
queueWork(turn, turn.facet, action)
|
||||
queueEffect(turn, turn.facet, inertCheck)
|
||||
work
|
||||
|
||||
proc newFacet(turn: var Turn): Facet = newFacet(turn.facet.actor, turn.facet)
|
||||
|
||||
proc inFacet*(turn: var Turn; bootProc: TurnAction): Facet =
|
||||
#[
|
||||
proc inFacet(turn: var Turn; bootProc: TurnAction): Facet =
|
||||
result = newFacet(turn)
|
||||
let facet = turn.facet
|
||||
turn.facet = result
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
|
||||
act.facetstart.path.add result.path
|
||||
turn.desc.actions.add act
|
||||
stopIfInertAfter(bootProc)(turn)
|
||||
turn.facet = facet
|
||||
recallFacet turn:
|
||||
turn.facet = result
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
|
||||
act.facetstart.path.add result.path
|
||||
turn.desc.actions.add act
|
||||
stopIfInertAfter(bootProc)(turn)
|
||||
]#
|
||||
|
||||
proc facet(turn: var Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc)
|
||||
# proc facet(turn: var Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc)
|
||||
|
||||
proc newActor(name: string; parent: Facet): Actor =
|
||||
result = Actor(
|
||||
|
@ -539,7 +546,8 @@ proc newInertCap*(): Cap =
|
|||
|
||||
proc atExit*(actor; action) = actor.exitHooks.add action
|
||||
|
||||
proc terminate(actor; turn; reason: ref Exception) =
|
||||
proc terminateActor(turn; reason: ref Exception) =
|
||||
let actor = turn.actor
|
||||
if not actor.exiting:
|
||||
actor.exiting = true
|
||||
actor.exitReason = reason
|
||||
|
@ -549,7 +557,13 @@ proc terminate(actor; turn; reason: ref Exception) =
|
|||
act.stop.status = ExitStatus(orKind: ExitStatusKind.Error)
|
||||
act.stop.status.error.message = reason.msg
|
||||
trace(actor, act)
|
||||
for hook in actor.exitHooks: hook(turn)
|
||||
while actor.exitHooks.len > 0:
|
||||
var hook = actor.exitHooks.pop()
|
||||
try: hook(turn)
|
||||
except CatchableError as err:
|
||||
if reason.isNil:
|
||||
terminateActor(turn, err)
|
||||
return
|
||||
proc finish(turn: var Turn) =
|
||||
assert not actor.root.isNil, actor.name
|
||||
actor.root.terminate(turn, reason.isNil)
|
||||
|
@ -558,14 +572,7 @@ proc terminate(actor; turn; reason: ref Exception) =
|
|||
|
||||
proc terminate*(facet; e: ref Exception) =
|
||||
run(facet.actor.root) do (turn: var Turn):
|
||||
facet.actor.terminate(turn, e)
|
||||
|
||||
template recallFacet(turn: var Turn; body: untyped): untyped =
|
||||
let facet = turn.facet
|
||||
block:
|
||||
body
|
||||
assert facet.actor == turn.facet.actor
|
||||
turn.facet = facet
|
||||
terminateActor(turn, e)
|
||||
|
||||
proc stopNow(turn: var Turn) =
|
||||
let caller = turn.facet
|
||||
|
@ -583,6 +590,11 @@ proc stop*(turn: var Turn, facet: Facet) =
|
|||
queueEffect(turn, facet, stopNow)
|
||||
|
||||
proc stop*(turn: var Turn) =
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
|
||||
act.facetstop.path = facet.path
|
||||
act.facetstop.reason = FacetStopReason.explicitAction
|
||||
turn.desc.actions.add act
|
||||
stop(turn, turn.facet)
|
||||
|
||||
proc onStop*(facet: Facet; act: TurnAction) =
|
||||
|
@ -596,7 +608,12 @@ proc onStop*(turn: var Turn; act: TurnAction) =
|
|||
proc stop*(actor: Actor) =
|
||||
queueTurn(actor.root) do (turn: var Turn):
|
||||
assert(not turn.facet.isNil)
|
||||
stop(turn)
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
|
||||
act.facetstop.path = facet.path
|
||||
act.facetstop.reason = FacetStopReason.actorStopping
|
||||
turn.desc.actions.add act
|
||||
stop(turn, turn.facet)
|
||||
|
||||
proc stopActor*(facet: Facet) =
|
||||
stop(facet.actor)
|
||||
|
@ -655,9 +672,16 @@ proc run* =
|
|||
while turnQueue.len > 0:
|
||||
var turn = turnQueue.popFirst()
|
||||
# TODO: check if actor is still valid
|
||||
run turn
|
||||
try: run(turn)
|
||||
except CatchableError as err:
|
||||
stderr.writeLine("actor ", turn.actor.name, " threw an error during a turn")
|
||||
terminateActor(turn, err)
|
||||
ioqueue.poll(ready)
|
||||
if ready.len == 0: break
|
||||
while ready.len > 0:
|
||||
discard trampoline:
|
||||
ready.pop()
|
||||
try:
|
||||
discard trampoline:
|
||||
ready.pop()
|
||||
except CatchableError as err:
|
||||
stderr.writeLine "ioqueue continuation threw an error"
|
||||
raise err
|
||||
|
|
|
@ -42,7 +42,7 @@ type DeprecatedBootProc = proc (ds: Cap; turn: var Turn) {.closure.}
|
|||
|
||||
proc bootDataspace*(name: string; bootProc: BootProc): Actor =
|
||||
bootActor(name) do (turn: var Turn):
|
||||
discard turn.facet.preventInertCheck()
|
||||
turn.preventInertCheck()
|
||||
bootProc(turn, newDataspace(turn))
|
||||
|
||||
proc bootDataspace*(name: string; bootProc: DeprecatedBootProc): Actor {.deprecated.} =
|
||||
|
|
|
@ -262,12 +262,12 @@ type
|
|||
|
||||
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) =
|
||||
spawnLink(name, turn) do (turn: var Turn):
|
||||
turn.preventInertCheck()
|
||||
let relay = Relay(
|
||||
facet: turn.facet,
|
||||
packetWriter: opts.packetWriter,
|
||||
wireBuf: newBufferedDecoder(0),
|
||||
)
|
||||
discard relay.facet.preventInertCheck()
|
||||
if not opts.initialCap.isNil:
|
||||
var exported: seq[WireSymbol]
|
||||
discard rewriteCapOut(relay, opts.initialCap, exported)
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# Package
|
||||
|
||||
version = "20240311"
|
||||
version = "20240312"
|
||||
author = "Emery Hemingway"
|
||||
description = "Syndicated actors for conversational concurrency"
|
||||
license = "Unlicense"
|
||||
|
|
Loading…
Reference in New Issue