actors: better facet stop tracing
This commit is contained in:
parent
aca4f4b822
commit
70d68b17d0
|
@ -122,18 +122,17 @@ using
|
||||||
|
|
||||||
proc labels(f: Facet): string =
|
proc labels(f: Facet): string =
|
||||||
proc catLabels(f: Facet; labels: var string) =
|
proc catLabels(f: Facet; labels: var string) =
|
||||||
labels.add ':'
|
|
||||||
if not f.parent.isNil:
|
if not f.parent.isNil:
|
||||||
catLabels(f.parent, labels)
|
catLabels(f.parent, labels)
|
||||||
labels.add ':'
|
labels.add ':'
|
||||||
when tracing:
|
|
||||||
labels.add $f.id
|
labels.add $f.id
|
||||||
result.add f.actor.name
|
result.add f.actor.name
|
||||||
catLabels(f, result)
|
catLabels(f, result)
|
||||||
|
|
||||||
|
proc `$`*(f: Facet): string =
|
||||||
|
"<Facet:" & f.labels & ">"
|
||||||
|
|
||||||
when tracing:
|
when tracing:
|
||||||
proc `$`*(f: Facet): string =
|
|
||||||
"<Facet:" & f.labels & ">"
|
|
||||||
|
|
||||||
proc `$`*(r: Cap): string =
|
proc `$`*(r: Cap): string =
|
||||||
"<Ref:" & r.relay.labels & ">"
|
"<Ref:" & r.relay.labels & ">"
|
||||||
|
@ -162,6 +161,13 @@ proc nextHandle(facet: Facet): Handle =
|
||||||
result = succ(facet.actor.handleAllocator[])
|
result = succ(facet.actor.handleAllocator[])
|
||||||
facet.actor.handleAllocator[] = result
|
facet.actor.handleAllocator[] = result
|
||||||
|
|
||||||
|
template recallFacet(turn: var Turn; body: untyped): untyped =
|
||||||
|
let facet = turn.facet
|
||||||
|
block:
|
||||||
|
body
|
||||||
|
assert facet.actor == turn.facet.actor
|
||||||
|
turn.facet = facet
|
||||||
|
|
||||||
proc queueWork*(turn: var Turn; facet: Facet; act: TurnAction) =
|
proc queueWork*(turn: var Turn; facet: Facet; act: TurnAction) =
|
||||||
assert not facet.isNil
|
assert not facet.isNil
|
||||||
turn.work.addLast((facet, act,))
|
turn.work.addLast((facet, act,))
|
||||||
|
@ -401,18 +407,12 @@ proc isInert(facet): bool =
|
||||||
noOutboundHandles = facet.outbound.len == 0
|
noOutboundHandles = facet.outbound.len == 0
|
||||||
isRootFacet = facet.parent.isNil
|
isRootFacet = facet.parent.isNil
|
||||||
noInertCheckPreventers = facet.inertCheckPreventers == 0
|
noInertCheckPreventers = facet.inertCheckPreventers == 0
|
||||||
noKids and (noOutboundHandles or isRootFacet) and noInertCheckPreventers
|
result = noKids and (noOutboundHandles or isRootFacet) and noInertCheckPreventers
|
||||||
|
|
||||||
proc preventInertCheck*(facet): (proc()) {.discardable.} =
|
proc preventInertCheck*(turn: Turn) =
|
||||||
var armed = true
|
inc turn.facet.inertCheckPreventers
|
||||||
inc facet.inertCheckPreventers
|
|
||||||
proc disarm() =
|
|
||||||
if armed:
|
|
||||||
armed = false
|
|
||||||
dec facet.inertCheckPreventers
|
|
||||||
result = disarm
|
|
||||||
|
|
||||||
proc terminate(actor; turn; reason: ref Exception)
|
proc terminateActor(turn; reason: ref Exception)
|
||||||
|
|
||||||
proc terminate(facet; turn: var Turn; orderly: bool) =
|
proc terminate(facet; turn: var Turn; orderly: bool) =
|
||||||
if facet.isAlive:
|
if facet.isAlive:
|
||||||
|
@ -432,37 +432,44 @@ proc terminate(facet; turn: var Turn; orderly: bool) =
|
||||||
if parent.isInert:
|
if parent.isInert:
|
||||||
parent.terminate(turn, true)
|
parent.terminate(turn, true)
|
||||||
else:
|
else:
|
||||||
terminate(facet.actor, turn, nil)
|
terminateActor(turn, nil)
|
||||||
when tracing:
|
when tracing:
|
||||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
|
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
|
||||||
act.facetstop.path = facet.path
|
act.facetstop.path = facet.path
|
||||||
turn.desc.actions.add act
|
turn.desc.actions.add act
|
||||||
|
|
||||||
|
proc inertCheck(turn: var Turn) =
|
||||||
|
if (not turn.facet.parent.isNil and
|
||||||
|
(not turn.facet.parent.isAlive)) or
|
||||||
|
turn.facet.isInert:
|
||||||
|
when tracing:
|
||||||
|
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
|
||||||
|
act.facetstop.path = facet.path
|
||||||
|
act.facetstop.reason = FacetStopReason.inert
|
||||||
|
turn.desc.actions.add act
|
||||||
|
stop(turn)
|
||||||
|
|
||||||
proc stopIfInertAfter(action: TurnAction): TurnAction =
|
proc stopIfInertAfter(action: TurnAction): TurnAction =
|
||||||
# TODO: verify this
|
proc work(turn: var Turn) =
|
||||||
proc wrapper(turn: var Turn) =
|
queueWork(turn, turn.facet, action)
|
||||||
action(turn)
|
queueEffect(turn, turn.facet, inertCheck)
|
||||||
queueEffect(turn, turn.facet) do (turn: var Turn):
|
work
|
||||||
if (not turn.facet.parent.isNil and
|
|
||||||
(not turn.facet.parent.isAlive)) or
|
|
||||||
turn.facet.isInert:
|
|
||||||
stop(turn)
|
|
||||||
wrapper
|
|
||||||
|
|
||||||
proc newFacet(turn: var Turn): Facet = newFacet(turn.facet.actor, turn.facet)
|
proc newFacet(turn: var Turn): Facet = newFacet(turn.facet.actor, turn.facet)
|
||||||
|
|
||||||
proc inFacet*(turn: var Turn; bootProc: TurnAction): Facet =
|
#[
|
||||||
|
proc inFacet(turn: var Turn; bootProc: TurnAction): Facet =
|
||||||
result = newFacet(turn)
|
result = newFacet(turn)
|
||||||
let facet = turn.facet
|
recallFacet turn:
|
||||||
turn.facet = result
|
turn.facet = result
|
||||||
when tracing:
|
when tracing:
|
||||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
|
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
|
||||||
act.facetstart.path.add result.path
|
act.facetstart.path.add result.path
|
||||||
turn.desc.actions.add act
|
turn.desc.actions.add act
|
||||||
stopIfInertAfter(bootProc)(turn)
|
stopIfInertAfter(bootProc)(turn)
|
||||||
turn.facet = facet
|
]#
|
||||||
|
|
||||||
proc facet(turn: var Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc)
|
# proc facet(turn: var Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc)
|
||||||
|
|
||||||
proc newActor(name: string; parent: Facet): Actor =
|
proc newActor(name: string; parent: Facet): Actor =
|
||||||
result = Actor(
|
result = Actor(
|
||||||
|
@ -539,7 +546,8 @@ proc newInertCap*(): Cap =
|
||||||
|
|
||||||
proc atExit*(actor; action) = actor.exitHooks.add action
|
proc atExit*(actor; action) = actor.exitHooks.add action
|
||||||
|
|
||||||
proc terminate(actor; turn; reason: ref Exception) =
|
proc terminateActor(turn; reason: ref Exception) =
|
||||||
|
let actor = turn.actor
|
||||||
if not actor.exiting:
|
if not actor.exiting:
|
||||||
actor.exiting = true
|
actor.exiting = true
|
||||||
actor.exitReason = reason
|
actor.exitReason = reason
|
||||||
|
@ -549,7 +557,13 @@ proc terminate(actor; turn; reason: ref Exception) =
|
||||||
act.stop.status = ExitStatus(orKind: ExitStatusKind.Error)
|
act.stop.status = ExitStatus(orKind: ExitStatusKind.Error)
|
||||||
act.stop.status.error.message = reason.msg
|
act.stop.status.error.message = reason.msg
|
||||||
trace(actor, act)
|
trace(actor, act)
|
||||||
for hook in actor.exitHooks: hook(turn)
|
while actor.exitHooks.len > 0:
|
||||||
|
var hook = actor.exitHooks.pop()
|
||||||
|
try: hook(turn)
|
||||||
|
except CatchableError as err:
|
||||||
|
if reason.isNil:
|
||||||
|
terminateActor(turn, err)
|
||||||
|
return
|
||||||
proc finish(turn: var Turn) =
|
proc finish(turn: var Turn) =
|
||||||
assert not actor.root.isNil, actor.name
|
assert not actor.root.isNil, actor.name
|
||||||
actor.root.terminate(turn, reason.isNil)
|
actor.root.terminate(turn, reason.isNil)
|
||||||
|
@ -558,14 +572,7 @@ proc terminate(actor; turn; reason: ref Exception) =
|
||||||
|
|
||||||
proc terminate*(facet; e: ref Exception) =
|
proc terminate*(facet; e: ref Exception) =
|
||||||
run(facet.actor.root) do (turn: var Turn):
|
run(facet.actor.root) do (turn: var Turn):
|
||||||
facet.actor.terminate(turn, e)
|
terminateActor(turn, e)
|
||||||
|
|
||||||
template recallFacet(turn: var Turn; body: untyped): untyped =
|
|
||||||
let facet = turn.facet
|
|
||||||
block:
|
|
||||||
body
|
|
||||||
assert facet.actor == turn.facet.actor
|
|
||||||
turn.facet = facet
|
|
||||||
|
|
||||||
proc stopNow(turn: var Turn) =
|
proc stopNow(turn: var Turn) =
|
||||||
let caller = turn.facet
|
let caller = turn.facet
|
||||||
|
@ -583,6 +590,11 @@ proc stop*(turn: var Turn, facet: Facet) =
|
||||||
queueEffect(turn, facet, stopNow)
|
queueEffect(turn, facet, stopNow)
|
||||||
|
|
||||||
proc stop*(turn: var Turn) =
|
proc stop*(turn: var Turn) =
|
||||||
|
when tracing:
|
||||||
|
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
|
||||||
|
act.facetstop.path = facet.path
|
||||||
|
act.facetstop.reason = FacetStopReason.explicitAction
|
||||||
|
turn.desc.actions.add act
|
||||||
stop(turn, turn.facet)
|
stop(turn, turn.facet)
|
||||||
|
|
||||||
proc onStop*(facet: Facet; act: TurnAction) =
|
proc onStop*(facet: Facet; act: TurnAction) =
|
||||||
|
@ -596,7 +608,12 @@ proc onStop*(turn: var Turn; act: TurnAction) =
|
||||||
proc stop*(actor: Actor) =
|
proc stop*(actor: Actor) =
|
||||||
queueTurn(actor.root) do (turn: var Turn):
|
queueTurn(actor.root) do (turn: var Turn):
|
||||||
assert(not turn.facet.isNil)
|
assert(not turn.facet.isNil)
|
||||||
stop(turn)
|
when tracing:
|
||||||
|
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
|
||||||
|
act.facetstop.path = facet.path
|
||||||
|
act.facetstop.reason = FacetStopReason.actorStopping
|
||||||
|
turn.desc.actions.add act
|
||||||
|
stop(turn, turn.facet)
|
||||||
|
|
||||||
proc stopActor*(facet: Facet) =
|
proc stopActor*(facet: Facet) =
|
||||||
stop(facet.actor)
|
stop(facet.actor)
|
||||||
|
@ -655,9 +672,16 @@ proc run* =
|
||||||
while turnQueue.len > 0:
|
while turnQueue.len > 0:
|
||||||
var turn = turnQueue.popFirst()
|
var turn = turnQueue.popFirst()
|
||||||
# TODO: check if actor is still valid
|
# TODO: check if actor is still valid
|
||||||
run turn
|
try: run(turn)
|
||||||
|
except CatchableError as err:
|
||||||
|
stderr.writeLine("actor ", turn.actor.name, " threw an error during a turn")
|
||||||
|
terminateActor(turn, err)
|
||||||
ioqueue.poll(ready)
|
ioqueue.poll(ready)
|
||||||
if ready.len == 0: break
|
if ready.len == 0: break
|
||||||
while ready.len > 0:
|
while ready.len > 0:
|
||||||
discard trampoline:
|
try:
|
||||||
ready.pop()
|
discard trampoline:
|
||||||
|
ready.pop()
|
||||||
|
except CatchableError as err:
|
||||||
|
stderr.writeLine "ioqueue continuation threw an error"
|
||||||
|
raise err
|
||||||
|
|
|
@ -42,7 +42,7 @@ type DeprecatedBootProc = proc (ds: Cap; turn: var Turn) {.closure.}
|
||||||
|
|
||||||
proc bootDataspace*(name: string; bootProc: BootProc): Actor =
|
proc bootDataspace*(name: string; bootProc: BootProc): Actor =
|
||||||
bootActor(name) do (turn: var Turn):
|
bootActor(name) do (turn: var Turn):
|
||||||
discard turn.facet.preventInertCheck()
|
turn.preventInertCheck()
|
||||||
bootProc(turn, newDataspace(turn))
|
bootProc(turn, newDataspace(turn))
|
||||||
|
|
||||||
proc bootDataspace*(name: string; bootProc: DeprecatedBootProc): Actor {.deprecated.} =
|
proc bootDataspace*(name: string; bootProc: DeprecatedBootProc): Actor {.deprecated.} =
|
||||||
|
|
|
@ -262,12 +262,12 @@ type
|
||||||
|
|
||||||
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) =
|
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) =
|
||||||
spawnLink(name, turn) do (turn: var Turn):
|
spawnLink(name, turn) do (turn: var Turn):
|
||||||
|
turn.preventInertCheck()
|
||||||
let relay = Relay(
|
let relay = Relay(
|
||||||
facet: turn.facet,
|
facet: turn.facet,
|
||||||
packetWriter: opts.packetWriter,
|
packetWriter: opts.packetWriter,
|
||||||
wireBuf: newBufferedDecoder(0),
|
wireBuf: newBufferedDecoder(0),
|
||||||
)
|
)
|
||||||
discard relay.facet.preventInertCheck()
|
|
||||||
if not opts.initialCap.isNil:
|
if not opts.initialCap.isNil:
|
||||||
var exported: seq[WireSymbol]
|
var exported: seq[WireSymbol]
|
||||||
discard rewriteCapOut(relay, opts.initialCap, exported)
|
discard rewriteCapOut(relay, opts.initialCap, exported)
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
# Package
|
# Package
|
||||||
|
|
||||||
version = "20240311"
|
version = "20240312"
|
||||||
author = "Emery Hemingway"
|
author = "Emery Hemingway"
|
||||||
description = "Syndicated actors for conversational concurrency"
|
description = "Syndicated actors for conversational concurrency"
|
||||||
license = "Unlicense"
|
license = "Unlicense"
|
||||||
|
|
Loading…
Reference in New Issue