Fix runaway shutdown loop

This commit is contained in:
Emery Hemingway 2024-03-14 13:14:51 +00:00
parent 9ca073d433
commit a3146f88a5
5 changed files with 177 additions and 131 deletions

View File

@ -4,7 +4,6 @@
## This module implements the `Syndicate DSL <https://syndicate-lang.org/doc/syndicate/>`_.
import std/[macros, tables, typetraits]
import pkg/sys/ioqueue
import preserves
export fromPreserves, toPreserves

View File

@ -63,7 +63,7 @@ type
Turn* = ref object
facet: Facet # active facet that may change during a turn
work: Deque[tuple[facet: Facet, act: TurnAction]]
effects: seq[Turn]
effects: Table[Actor, Turn]
when tracing:
desc: TurnDescription
@ -109,11 +109,33 @@ when tracing:
result.add f.id.toPreserves
f = f.parent
proc initEnqueue(turn: Turn; cap: Cap): ActionDescription =
result = ActionDescription(orKind: ActionDescriptionKind.enqueue)
result.enqueue.event.target.actor = turn.facet.actor.id.toPreserves
result.enqueue.event.target.facet = turn.facet.id.toPreserves
result.enqueue.event.target.oid = cap.target.oid.toPreserves
proc toDequeue(act: sink ActionDescription): ActionDescription =
result = ActionDescription(orKind: ActionDescriptionKind.dequeue)
result.dequeue.event = move act.enqueue.event
proc toTraceTarget(cap: Cap): trace.Target =
assert not cap.target.isNil
assert not cap.target.facet.isNil
result.actor = cap.target.facet.actor.id
result.facet = cap.target.facet.id
result.oid = cap.target.oid.toPreserves
method publish*(e: Entity; turn: var Turn; v: AssertionRef; h: Handle) {.base.} = discard
method retract*(e: Entity; turn: var Turn; h: Handle) {.base.} = discard
method message*(e: Entity; turn: var Turn; v: AssertionRef) {.base.} = discard
method sync*(e: Entity; turn: var Turn; peer: Cap) {.base.} = discard
converter toActor(f: Facet): Actor = f.actor
converter toActor(t: Turn): Actor = t.facet.actor
converter toFacet(a: Actor): Facet = a.root
converter toFacet(t: Turn): Facet = t.facet
using
actor: Actor
facet: Facet
@ -121,24 +143,27 @@ using
action: TurnAction
proc labels(f: Facet): string =
assert not f.isNil
assert not f.actor.isNil
result.add f.actor.name
proc catLabels(f: Facet; labels: var string) =
if not f.parent.isNil:
catLabels(f.parent, labels)
labels.add ':'
labels.add $f.id
result.add f.actor.name
catLabels(f, result)
proc `$`*(f: Facet): string =
"<Facet:" & f.labels & ">"
proc `$`*(actor: Actor): string =
"<Actor:" & actor.name & ">" # TODO: ambigous
when tracing:
proc `$`*(r: Cap): string =
"<Ref:" & r.relay.labels & ">"
proc `$`*(actor: Actor): string =
"<Actor:" & actor.name & ">" # TODO: ambigous
proc `$`*(t: Turn): string =
"<Turn:" & $t.desc.id & ">"
@ -150,8 +175,11 @@ proc attenuate*(r: Cap; a: Attenuation): Cap =
relay: r.relay,
attenuation: a & r.attenuation)
proc hash*(actor): Hash =
actor.unsafeAddr.hash
proc hash*(facet): Hash =
facet.id.hash
facet.unsafeAddr.hash
proc hash*(r: Cap): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash)
@ -196,17 +224,19 @@ proc run*(facet: Facet; action: TurnAction) = queueTurn(facet, action)
proc facet*(turn: Turn): Facet = turn.facet
proc queueEffect*(turn: var Turn; target: Facet; act: TurnAction) =
if target.actor == turn.facet.actor:
let fremd = target.actor
if fremd == turn.facet.actor:
turn.work.addLast((target, act,))
else:
var next = Turn(facet: target)
assert not target.isNil
next.work.addLast((target, act,))
when tracing:
next.desc.id = nextTurnId()
next.desc.cause = TurnCause(orKind: TurnCauseKind.turn)
next.desc.cause.turn.id = turn.desc.id
turn.effects.add next
var fremdTurn = turn.effects.getOrDefault(fremd)
if fremdTurn.isNil:
fremdTurn = Turn(facet: target)
turn.effects[fremd] = fremdTurn
when tracing:
fremdTurn.desc.id = nextTurnId()
fremdTurn.desc.cause = TurnCause(orKind: TurnCauseKind.turn)
fremdTurn.desc.cause.turn.id = turn.desc.id
fremdTurn.work.addLast((target, act,))
type Bindings = Table[Value, Value]
@ -325,19 +355,21 @@ proc publish(turn: var Turn; cap: Cap; v: Value; h: Handle) =
if not a.isFalse:
let e = OutboundAssertion(handle: h, peer: cap)
turn.facet.outbound[h] = e
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event.target.actor = turn.facet.actor.id.toPreserves
act.enqueue.event.target.facet = turn.facet.id.toPreserves
act.enqueue.event.target.oid = cap.target.oid.toPreserves
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
act.enqueue.event.detail.assert.assertion.value.value =
mapEmbeds(v) do (cap: Value) -> Value: discard
act.enqueue.event.detail.assert.handle = h
turn.desc.actions.add act
queueEffect(turn, cap.relay) do (turn: var Turn):
e.established = true
when tracing:
turn.desc.actions.add act.toDequeue
publish(cap.target, turn, AssertionRef(value: a), e.handle)
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event.target.actor = turn.facet.actor.id.toPreserves
act.enqueue.event.target.facet = turn.facet.id.toPreserves
act.enqueue.event.target.oid = cap.target.oid.toPreserves
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
act.enqueue.event.detail.assert.assertion.value.value =
mapEmbeds(v) do (cap: Value) -> Value: discard
act.enqueue.event.detail.assert.handle = h
turn.desc.actions.add act
proc publish*(turn: var Turn; r: Cap; a: Value): Handle {.discardable.} =
result = turn.facet.nextHandle()
@ -347,7 +379,14 @@ proc publish*[T](turn: var Turn; r: Cap; a: T): Handle {.discardable.} =
publish(turn, r, a.toPreserves)
proc retract(turn: var Turn; e: OutboundAssertion) =
when tracing:
var act = initEnqueue(turn, e.peer)
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.retract)
act.enqueue.event.detail.retract.handle = e.handle
turn.desc.actions.add act
queueEffect(turn, e.peer.relay) do (turn: var Turn):
when tracing:
turn.desc.actions.add act.toDequeue
if e.established:
e.established = false
e.peer.target.retract(turn, e.handle)
@ -360,18 +399,30 @@ proc retract*(turn: var Turn; h: Handle) =
proc message*(turn: var Turn; r: Cap; v: Value) =
var a = runRewrites(r.attenuation, v)
if not a.isFalse:
when tracing:
var act = initEnqueue(turn, r)
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.message)
act.enqueue.event.detail.message.body.value.value =
mapEmbeds(a) do (cap: Value) -> Value: discard
turn.desc.actions.add act
queueEffect(turn, r.relay) do (turn: var Turn):
when tracing:
turn.desc.actions.add act.toDequeue
r.target.message(turn, AssertionRef(value: a))
proc message*[T](turn: var Turn; r: Cap; v: T) =
message(turn, r, v.toPreserves)
proc sync(turn: var Turn; e: Entity; peer: Cap) =
e.sync(turn, peer)
proc sync*(turn: var Turn; r, peer: Cap) =
when tracing:
var act = initEnqueue(turn, peer)
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.sync)
act.enqueue.event.detail.sync.peer = peer.toTraceTarget
turn.desc.actions.add act
queueEffect(turn, r.relay) do (turn: var Turn):
sync(turn, r.target, peer)
when tracing:
turn.desc.actions.add act.toDequeue
r.target.sync(turn, peer)
proc replace*[T](turn: var Turn; cap: Cap; h: Handle; v: T): Handle =
result = publish(turn, cap, v)
@ -414,29 +465,18 @@ proc preventInertCheck*(turn: Turn) =
proc terminateActor(turn; reason: ref Exception)
proc terminate(facet; turn: var Turn; orderly: bool) =
proc terminateFacetOrderly(turn: var Turn) =
let facet = turn.facet
if facet.isAlive:
facet.isAlive = false
let parent = facet.parent
if not parent.isNil:
parent.children.excl facet
queueWork(turn, facet) do (turn: var Turn):
while facet.children.len > 0:
facet.children.pop.terminate(turn, orderly)
if orderly:
for act in facet.shutdownActions:
act(turn)
for a in facet.outbound.values: turn.retract(a)
if orderly:
if not parent.isNil:
if parent.isInert:
parent.terminate(turn, true)
else:
terminateActor(turn, nil)
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
act.facetstop.path = facet.path
turn.desc.actions.add act
var i = 0
while i < facet.shutdownActions.len:
facet.shutdownActions[i](turn)
inc i
setLen facet.shutdownActions, 0
for e in facet.outbound.values:
retract(turn, e)
clear facet.outbound
proc inertCheck(turn: var Turn) =
if (not turn.facet.parent.isNil and
@ -444,11 +484,21 @@ proc inertCheck(turn: var Turn) =
turn.facet.isInert:
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
act.facetstop.path = facet.path
act.facetstop.path = turn.facet.path
act.facetstop.reason = FacetStopReason.inert
turn.desc.actions.add act
stop(turn)
proc terminateFacet(turn: var Turn) =
let facet = turn.facet
for child in facet.children:
queueWork(turn, child, terminateFacetOrderly)
# terminate all children
facet.children.clear()
# detach all children
queueWork(turn, facet, terminateFacetOrderly)
# self-termination
proc stopIfInertAfter(action: TurnAction): TurnAction =
proc work(turn: var Turn) =
queueWork(turn, turn.facet, action)
@ -457,20 +507,6 @@ proc stopIfInertAfter(action: TurnAction): TurnAction =
proc newFacet(turn: var Turn): Facet = newFacet(turn.facet.actor, turn.facet)
#[
proc inFacet(turn: var Turn; bootProc: TurnAction): Facet =
result = newFacet(turn)
recallFacet turn:
turn.facet = result
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
act.facetstart.path.add result.path
turn.desc.actions.add act
stopIfInertAfter(bootProc)(turn)
]#
# proc facet(turn: var Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc)
proc newActor(name: string; parent: Facet): Actor =
result = Actor(
name: name,
@ -566,75 +602,67 @@ proc terminateActor(turn; reason: ref Exception) =
return
proc finish(turn: var Turn) =
assert not actor.root.isNil, actor.name
actor.root.terminate(turn, reason.isNil)
terminateFacet(turn)
actor.root = nil
actor.exited = true
queueTurn(actor.root, finish)
proc terminate*(facet; e: ref Exception) =
proc terminateFacet*(facet; e: ref Exception) =
run(facet.actor.root) do (turn: var Turn):
terminateActor(turn, e)
proc stopNow(turn: var Turn) =
let caller = turn.facet
recallFacet turn:
while caller.children.len > 0:
var child = caller.children.pop()
if child.actor == caller.actor:
turn.facet = child
stopNow(turn)
else:
queueEffect(turn, child, stopNow)
caller.terminate(turn, true)
proc stop*(turn: var Turn, facet: Facet) =
queueEffect(turn, facet, stopNow)
queueEffect(turn, facet) do (turn: var Turn):
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
act.facetstop.path = facet.path
act.facetstop.reason = FacetStopReason.explicitAction
turn.desc.actions.add act
terminateFacet(turn)
proc stop*(turn: var Turn) =
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
act.facetstop.path = facet.path
act.facetstop.reason = FacetStopReason.explicitAction
turn.desc.actions.add act
stop(turn, turn.facet)
proc onStop*(facet: Facet; act: TurnAction) =
## Add a `proc (turn: var Turn)` action to `facet` to be called as it stops.
assert not facet.isNil
add(facet.shutdownActions, act)
proc onStop*(turn: var Turn; act: TurnAction) =
onStop(turn.facet, act)
proc isAlive(actor): bool =
not(actor.exited or actor.exiting)
proc stop*(actor: Actor) =
queueTurn(actor.root) do (turn: var Turn):
assert(not turn.facet.isNil)
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
act.facetstop.path = facet.path
act.facetstop.reason = FacetStopReason.actorStopping
turn.desc.actions.add act
stop(turn, turn.facet)
if actor.isAlive:
queueTurn(actor.root) do (turn: var Turn):
assert(not turn.facet.isNil)
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
act.facetstop.path = turn.facet.path
act.facetstop.reason = FacetStopReason.actorStopping
turn.desc.actions.add act
stop(turn, turn.facet)
proc stopActor*(facet: Facet) =
stop(facet.actor)
proc stopActor*(turn: var Turn) =
assert(not turn.facet.isNil)
assert(not turn.facet.actor.isNil)
assert(not turn.facet.actor.root.isNil)
stop(turn, turn.facet.actor.root)
proc freshen*(turn: var Turn, act: TurnAction) {.deprecated.} =
run(turn.facet, act)
proc newCap*(relay: Facet; e: Entity): Cap {.deprecated.} =
Cap(relay: relay, target: e)
proc newCap*(relay: Facet; entity: Entity): Cap =
## Create a new capability for `entity` via `relay`.
# An Entity has an owning facet and a Cap does as well?
if entity.facet.isNil: entity.facet = relay
Cap(relay: relay, target: entity)
proc newCap*(turn; e: Entity): Cap =
Cap(relay: turn.facet, target: e)
newCap(turn.facet, e)
proc newCap*(e: Entity; turn): Cap =
Cap(relay: turn.facet, target: e)
newCap(turn.facet, e)
type SyncContinuation {.final.} = ref object of Entity
action: TurnAction
@ -660,10 +688,11 @@ proc run(turn: var Turn) =
var act = ActorActivation(orKind: ActorActivationKind.turn)
act.turn = move turn.desc
trace(turn.facet.actor, act)
turn.facet = nil # invalidate the turn
# TODO: catch exceptions here
for eff in turn.effects.mitems:
for eff in turn.effects.mvalues:
assert not eff.facet.isNil
turnQueue.addLast(move eff)
turn.facet = nil # invalidate the turn
proc run* =
## Run actors to completion
@ -699,8 +728,9 @@ proc disarm*(g: var FacetGuard) =
dec g.facet.inertCheckPreventers
g.facet = nil
proc `=destroy`*(g: var FacetGuard) =
disarm(g)
proc `=destroy`*(g: FacetGuard) =
if not g.facet.isNil:
dec g.facet.inertCheckPreventers
proc `=copy`*(dst: var FacetGuard, src: FacetGuard) =
dst.facet = src.facet

View File

@ -161,9 +161,6 @@ proc lookupLocal(relay; oid: Oid): Cap =
if not sym.isNil:
result = sym.cap
proc isInert(r: Cap): bool =
r.target.isNil
proc rewriteCapIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Cap =
case n.orKind
of WireRefKind.mine:
@ -319,7 +316,6 @@ when defined(posix):
stopActor(entity.facet)
else:
entity.relay.recv(buf[], 0..<n)
close(entity.stdin)
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Stdio) =
## Connect to an external dataspace over stdio.
@ -347,6 +343,8 @@ when defined(posix):
facet: turn.facet, relay: relay, stdin: newAsyncFile(FD fd))
onStop(entity.facet) do (turn: var Turn):
entity.alive = false
close(entity.stdin)
# Close stdin to remove it from the ioqueue
discard trampoline:
whelp loop(entity)
publish(turn, ds, TransportConnection(
@ -383,7 +381,9 @@ when defined(posix):
template bootSocketEntity() {.dirty.} =
proc setup(turn: var Turn) {.closure.} =
proc kill(turn: var Turn) =
entity.alive = false
if entity.alive:
entity.alive = false
close(entity.sock)
onStop(turn, kill)
publish(turn, ds, TransportConnection(
`addr`: ta.toPreserves,
@ -401,7 +401,7 @@ when defined(posix):
stopActor(entity.facet)
else:
entity.relay.recv(buf[], 0..<n)
close(entity.sock)
# the socket closes when the actor is stopped
proc boot(entity: TcpEntity; ta: transportAddress.Tcp; ds: Cap) {.asyncio.} =
entity.sock = connectTcpAsync(ta.host, Port ta.port)

View File

@ -1,7 +1,8 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[asyncdispatch, asyncfile, parseopt]
import std/[oserrors, parseopt, posix, strutils]
import pkg/sys/[files, handles, ioqueue]
import preserves, syndicate, syndicate/relays
type
@ -10,18 +11,35 @@ type
Says {.preservesRecord: "Says".} = object
who, what: string
proc readStdin(facet: Facet; ds: Cap; username: string) =
let file = openAsync("/dev/stdin")
onStop(facet) do (turn: var Turn): close(file)
close(stdin)
proc readLine() {.gcsafe.} =
let future = readLine(file)
addCallback(future, facet) do (turn: var Turn):
var msg = read(future)
if msg == "": quit()
message(turn, ds, Says(who: username, what: msg))
readLine()
readLine()
proc syncAndStop(facet: Facet; cap: Cap) =
## Stop the actor responsible for `facet` after
## synchronizing with `cap`.
run(facet) do (turn: var Turn):
sync(turn, cap, stopActor)
proc readStdin(facet: Facet; ds: Cap; username: string) {.asyncio.} =
let
fd = stdin.getOsFileHandle()
flags = fcntl(fd.cint, F_GETFL, 0)
if flags < 0:
raiseOSError(osLastError())
if fcntl(fd.cint, F_SETFL, flags or O_NONBLOCK) < 0:
raiseOSError(osLastError())
let
file = newAsyncFile(FD fd)
buf = new string
buf[].setLen(0x1000)
while true:
let n = read(file, buf)
if n < 1:
stderr.writeLine "test_chat calls stopsActor ", facet.actor
syncAndStop(facet, ds)
return
else:
var msg = buf[][0..<n].strip
proc send(turn: var Turn) =
message(turn, ds, Says(who: username, what: msg))
run(facet, send)
proc chat(turn: var Turn; ds: Cap; username: string) =
during(turn, ds, ?:Present) do (who: string):
@ -33,10 +51,10 @@ proc chat(turn: var Turn; ds: Cap; username: string) =
echo who, ": ", what
discard publish(turn, ds, Present(username: username))
readStdin(turn.facet, ds, username)
discard trampoline:
whelp readStdin(turn.facet, ds, username)
proc main =
let route = envRoute()
var username = ""
for kind, key, val in getopt():
@ -48,9 +66,8 @@ proc main =
if username == "":
stderr.writeLine "--user: unspecified"
else:
runActor("chat") do (turn: var Turn; root: Cap):
spawnRelays(turn, root)
resolve(turn, root, route) do (turn: var Turn; ds: Cap):
runActor("chat") do (turn: var Turn):
resolveEnvironment(turn) do (turn: var Turn; ds: Cap):
chat(turn, ds, username)
main()

View File

@ -7,7 +7,7 @@ import syndicate, syndicate/actors/timers
let actor = bootActor("timer-test") do (turn: var Turn):
let timers = newDataspace(turn)
spawnTimerActor(timers, turn)
spawnTimerActor(turn, timers)
onPublish(turn, timers, ?LaterThan(seconds: 1356100000)):
echo "now in 13th bʼakʼtun"