From a3146f88a5f8d650b45934385ed60e85ab6b1795 Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Thu, 14 Mar 2024 13:14:51 +0000 Subject: [PATCH] Fix runaway shutdown loop --- src/syndicate.nim | 1 - src/syndicate/actors.nim | 240 ++++++++++++++++++++++----------------- src/syndicate/relays.nim | 12 +- tests/test_chat.nim | 53 ++++++--- tests/test_timers.nim | 2 +- 5 files changed, 177 insertions(+), 131 deletions(-) diff --git a/src/syndicate.nim b/src/syndicate.nim index e2e4852..b382835 100644 --- a/src/syndicate.nim +++ b/src/syndicate.nim @@ -4,7 +4,6 @@ ## This module implements the `Syndicate DSL `_. import std/[macros, tables, typetraits] -import pkg/sys/ioqueue import preserves export fromPreserves, toPreserves diff --git a/src/syndicate/actors.nim b/src/syndicate/actors.nim index c3ff04f..5918cbe 100644 --- a/src/syndicate/actors.nim +++ b/src/syndicate/actors.nim @@ -63,7 +63,7 @@ type Turn* = ref object facet: Facet # active facet that may change during a turn work: Deque[tuple[facet: Facet, act: TurnAction]] - effects: seq[Turn] + effects: Table[Actor, Turn] when tracing: desc: TurnDescription @@ -109,11 +109,33 @@ when tracing: result.add f.id.toPreserves f = f.parent + proc initEnqueue(turn: Turn; cap: Cap): ActionDescription = + result = ActionDescription(orKind: ActionDescriptionKind.enqueue) + result.enqueue.event.target.actor = turn.facet.actor.id.toPreserves + result.enqueue.event.target.facet = turn.facet.id.toPreserves + result.enqueue.event.target.oid = cap.target.oid.toPreserves + + proc toDequeue(act: sink ActionDescription): ActionDescription = + result = ActionDescription(orKind: ActionDescriptionKind.dequeue) + result.dequeue.event = move act.enqueue.event + + proc toTraceTarget(cap: Cap): trace.Target = + assert not cap.target.isNil + assert not cap.target.facet.isNil + result.actor = cap.target.facet.actor.id + result.facet = cap.target.facet.id + result.oid = cap.target.oid.toPreserves + method publish*(e: Entity; turn: var Turn; v: AssertionRef; h: Handle) {.base.} = discard method retract*(e: Entity; turn: var Turn; h: Handle) {.base.} = discard method message*(e: Entity; turn: var Turn; v: AssertionRef) {.base.} = discard method sync*(e: Entity; turn: var Turn; peer: Cap) {.base.} = discard +converter toActor(f: Facet): Actor = f.actor +converter toActor(t: Turn): Actor = t.facet.actor +converter toFacet(a: Actor): Facet = a.root +converter toFacet(t: Turn): Facet = t.facet + using actor: Actor facet: Facet @@ -121,24 +143,27 @@ using action: TurnAction proc labels(f: Facet): string = + assert not f.isNil + assert not f.actor.isNil + result.add f.actor.name proc catLabels(f: Facet; labels: var string) = if not f.parent.isNil: catLabels(f.parent, labels) labels.add ':' labels.add $f.id - result.add f.actor.name catLabels(f, result) proc `$`*(f: Facet): string = "" +proc `$`*(actor: Actor): string = + "" # TODO: ambigous + when tracing: proc `$`*(r: Cap): string = "" - proc `$`*(actor: Actor): string = - "" # TODO: ambigous proc `$`*(t: Turn): string = "" @@ -150,8 +175,11 @@ proc attenuate*(r: Cap; a: Attenuation): Cap = relay: r.relay, attenuation: a & r.attenuation) +proc hash*(actor): Hash = + actor.unsafeAddr.hash + proc hash*(facet): Hash = - facet.id.hash + facet.unsafeAddr.hash proc hash*(r: Cap): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash) @@ -196,17 +224,19 @@ proc run*(facet: Facet; action: TurnAction) = queueTurn(facet, action) proc facet*(turn: Turn): Facet = turn.facet proc queueEffect*(turn: var Turn; target: Facet; act: TurnAction) = - if target.actor == turn.facet.actor: + let fremd = target.actor + if fremd == turn.facet.actor: turn.work.addLast((target, act,)) else: - var next = Turn(facet: target) - assert not target.isNil - next.work.addLast((target, act,)) - when tracing: - next.desc.id = nextTurnId() - next.desc.cause = TurnCause(orKind: TurnCauseKind.turn) - next.desc.cause.turn.id = turn.desc.id - turn.effects.add next + var fremdTurn = turn.effects.getOrDefault(fremd) + if fremdTurn.isNil: + fremdTurn = Turn(facet: target) + turn.effects[fremd] = fremdTurn + when tracing: + fremdTurn.desc.id = nextTurnId() + fremdTurn.desc.cause = TurnCause(orKind: TurnCauseKind.turn) + fremdTurn.desc.cause.turn.id = turn.desc.id + fremdTurn.work.addLast((target, act,)) type Bindings = Table[Value, Value] @@ -325,19 +355,21 @@ proc publish(turn: var Turn; cap: Cap; v: Value; h: Handle) = if not a.isFalse: let e = OutboundAssertion(handle: h, peer: cap) turn.facet.outbound[h] = e + when tracing: + var act = ActionDescription(orKind: ActionDescriptionKind.enqueue) + act.enqueue.event.target.actor = turn.facet.actor.id.toPreserves + act.enqueue.event.target.facet = turn.facet.id.toPreserves + act.enqueue.event.target.oid = cap.target.oid.toPreserves + act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert) + act.enqueue.event.detail.assert.assertion.value.value = + mapEmbeds(v) do (cap: Value) -> Value: discard + act.enqueue.event.detail.assert.handle = h + turn.desc.actions.add act queueEffect(turn, cap.relay) do (turn: var Turn): e.established = true + when tracing: + turn.desc.actions.add act.toDequeue publish(cap.target, turn, AssertionRef(value: a), e.handle) - when tracing: - var act = ActionDescription(orKind: ActionDescriptionKind.enqueue) - act.enqueue.event.target.actor = turn.facet.actor.id.toPreserves - act.enqueue.event.target.facet = turn.facet.id.toPreserves - act.enqueue.event.target.oid = cap.target.oid.toPreserves - act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert) - act.enqueue.event.detail.assert.assertion.value.value = - mapEmbeds(v) do (cap: Value) -> Value: discard - act.enqueue.event.detail.assert.handle = h - turn.desc.actions.add act proc publish*(turn: var Turn; r: Cap; a: Value): Handle {.discardable.} = result = turn.facet.nextHandle() @@ -347,7 +379,14 @@ proc publish*[T](turn: var Turn; r: Cap; a: T): Handle {.discardable.} = publish(turn, r, a.toPreserves) proc retract(turn: var Turn; e: OutboundAssertion) = + when tracing: + var act = initEnqueue(turn, e.peer) + act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.retract) + act.enqueue.event.detail.retract.handle = e.handle + turn.desc.actions.add act queueEffect(turn, e.peer.relay) do (turn: var Turn): + when tracing: + turn.desc.actions.add act.toDequeue if e.established: e.established = false e.peer.target.retract(turn, e.handle) @@ -360,18 +399,30 @@ proc retract*(turn: var Turn; h: Handle) = proc message*(turn: var Turn; r: Cap; v: Value) = var a = runRewrites(r.attenuation, v) if not a.isFalse: + when tracing: + var act = initEnqueue(turn, r) + act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.message) + act.enqueue.event.detail.message.body.value.value = + mapEmbeds(a) do (cap: Value) -> Value: discard + turn.desc.actions.add act queueEffect(turn, r.relay) do (turn: var Turn): + when tracing: + turn.desc.actions.add act.toDequeue r.target.message(turn, AssertionRef(value: a)) proc message*[T](turn: var Turn; r: Cap; v: T) = message(turn, r, v.toPreserves) -proc sync(turn: var Turn; e: Entity; peer: Cap) = - e.sync(turn, peer) - proc sync*(turn: var Turn; r, peer: Cap) = + when tracing: + var act = initEnqueue(turn, peer) + act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.sync) + act.enqueue.event.detail.sync.peer = peer.toTraceTarget + turn.desc.actions.add act queueEffect(turn, r.relay) do (turn: var Turn): - sync(turn, r.target, peer) + when tracing: + turn.desc.actions.add act.toDequeue + r.target.sync(turn, peer) proc replace*[T](turn: var Turn; cap: Cap; h: Handle; v: T): Handle = result = publish(turn, cap, v) @@ -414,29 +465,18 @@ proc preventInertCheck*(turn: Turn) = proc terminateActor(turn; reason: ref Exception) -proc terminate(facet; turn: var Turn; orderly: bool) = +proc terminateFacetOrderly(turn: var Turn) = + let facet = turn.facet if facet.isAlive: facet.isAlive = false - let parent = facet.parent - if not parent.isNil: - parent.children.excl facet - queueWork(turn, facet) do (turn: var Turn): - while facet.children.len > 0: - facet.children.pop.terminate(turn, orderly) - if orderly: - for act in facet.shutdownActions: - act(turn) - for a in facet.outbound.values: turn.retract(a) - if orderly: - if not parent.isNil: - if parent.isInert: - parent.terminate(turn, true) - else: - terminateActor(turn, nil) - when tracing: - var act = ActionDescription(orKind: ActionDescriptionKind.facetStop) - act.facetstop.path = facet.path - turn.desc.actions.add act + var i = 0 + while i < facet.shutdownActions.len: + facet.shutdownActions[i](turn) + inc i + setLen facet.shutdownActions, 0 + for e in facet.outbound.values: + retract(turn, e) + clear facet.outbound proc inertCheck(turn: var Turn) = if (not turn.facet.parent.isNil and @@ -444,11 +484,21 @@ proc inertCheck(turn: var Turn) = turn.facet.isInert: when tracing: var act = ActionDescription(orKind: ActionDescriptionKind.facetStop) - act.facetstop.path = facet.path + act.facetstop.path = turn.facet.path act.facetstop.reason = FacetStopReason.inert turn.desc.actions.add act stop(turn) +proc terminateFacet(turn: var Turn) = + let facet = turn.facet + for child in facet.children: + queueWork(turn, child, terminateFacetOrderly) + # terminate all children + facet.children.clear() + # detach all children + queueWork(turn, facet, terminateFacetOrderly) + # self-termination + proc stopIfInertAfter(action: TurnAction): TurnAction = proc work(turn: var Turn) = queueWork(turn, turn.facet, action) @@ -457,20 +507,6 @@ proc stopIfInertAfter(action: TurnAction): TurnAction = proc newFacet(turn: var Turn): Facet = newFacet(turn.facet.actor, turn.facet) -#[ -proc inFacet(turn: var Turn; bootProc: TurnAction): Facet = - result = newFacet(turn) - recallFacet turn: - turn.facet = result - when tracing: - var act = ActionDescription(orKind: ActionDescriptionKind.facetstart) - act.facetstart.path.add result.path - turn.desc.actions.add act - stopIfInertAfter(bootProc)(turn) -]# - -# proc facet(turn: var Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc) - proc newActor(name: string; parent: Facet): Actor = result = Actor( name: name, @@ -566,75 +602,67 @@ proc terminateActor(turn; reason: ref Exception) = return proc finish(turn: var Turn) = assert not actor.root.isNil, actor.name - actor.root.terminate(turn, reason.isNil) + terminateFacet(turn) + actor.root = nil actor.exited = true queueTurn(actor.root, finish) -proc terminate*(facet; e: ref Exception) = +proc terminateFacet*(facet; e: ref Exception) = run(facet.actor.root) do (turn: var Turn): terminateActor(turn, e) -proc stopNow(turn: var Turn) = - let caller = turn.facet - recallFacet turn: - while caller.children.len > 0: - var child = caller.children.pop() - if child.actor == caller.actor: - turn.facet = child - stopNow(turn) - else: - queueEffect(turn, child, stopNow) - caller.terminate(turn, true) - proc stop*(turn: var Turn, facet: Facet) = - queueEffect(turn, facet, stopNow) + queueEffect(turn, facet) do (turn: var Turn): + when tracing: + var act = ActionDescription(orKind: ActionDescriptionKind.facetStop) + act.facetstop.path = facet.path + act.facetstop.reason = FacetStopReason.explicitAction + turn.desc.actions.add act + terminateFacet(turn) proc stop*(turn: var Turn) = - when tracing: - var act = ActionDescription(orKind: ActionDescriptionKind.facetStop) - act.facetstop.path = facet.path - act.facetstop.reason = FacetStopReason.explicitAction - turn.desc.actions.add act stop(turn, turn.facet) proc onStop*(facet: Facet; act: TurnAction) = ## Add a `proc (turn: var Turn)` action to `facet` to be called as it stops. - assert not facet.isNil add(facet.shutdownActions, act) proc onStop*(turn: var Turn; act: TurnAction) = onStop(turn.facet, act) +proc isAlive(actor): bool = + not(actor.exited or actor.exiting) + proc stop*(actor: Actor) = - queueTurn(actor.root) do (turn: var Turn): - assert(not turn.facet.isNil) - when tracing: - var act = ActionDescription(orKind: ActionDescriptionKind.facetStop) - act.facetstop.path = facet.path - act.facetstop.reason = FacetStopReason.actorStopping - turn.desc.actions.add act - stop(turn, turn.facet) + if actor.isAlive: + queueTurn(actor.root) do (turn: var Turn): + assert(not turn.facet.isNil) + when tracing: + var act = ActionDescription(orKind: ActionDescriptionKind.facetStop) + act.facetstop.path = turn.facet.path + act.facetstop.reason = FacetStopReason.actorStopping + turn.desc.actions.add act + stop(turn, turn.facet) proc stopActor*(facet: Facet) = stop(facet.actor) proc stopActor*(turn: var Turn) = - assert(not turn.facet.isNil) - assert(not turn.facet.actor.isNil) - assert(not turn.facet.actor.root.isNil) stop(turn, turn.facet.actor.root) proc freshen*(turn: var Turn, act: TurnAction) {.deprecated.} = run(turn.facet, act) -proc newCap*(relay: Facet; e: Entity): Cap {.deprecated.} = - Cap(relay: relay, target: e) +proc newCap*(relay: Facet; entity: Entity): Cap = + ## Create a new capability for `entity` via `relay`. + # An Entity has an owning facet and a Cap does as well? + if entity.facet.isNil: entity.facet = relay + Cap(relay: relay, target: entity) proc newCap*(turn; e: Entity): Cap = - Cap(relay: turn.facet, target: e) - + newCap(turn.facet, e) proc newCap*(e: Entity; turn): Cap = - Cap(relay: turn.facet, target: e) + newCap(turn.facet, e) type SyncContinuation {.final.} = ref object of Entity action: TurnAction @@ -660,10 +688,11 @@ proc run(turn: var Turn) = var act = ActorActivation(orKind: ActorActivationKind.turn) act.turn = move turn.desc trace(turn.facet.actor, act) - turn.facet = nil # invalidate the turn # TODO: catch exceptions here - for eff in turn.effects.mitems: + for eff in turn.effects.mvalues: + assert not eff.facet.isNil turnQueue.addLast(move eff) + turn.facet = nil # invalidate the turn proc run* = ## Run actors to completion @@ -699,8 +728,9 @@ proc disarm*(g: var FacetGuard) = dec g.facet.inertCheckPreventers g.facet = nil -proc `=destroy`*(g: var FacetGuard) = - disarm(g) +proc `=destroy`*(g: FacetGuard) = + if not g.facet.isNil: + dec g.facet.inertCheckPreventers proc `=copy`*(dst: var FacetGuard, src: FacetGuard) = dst.facet = src.facet diff --git a/src/syndicate/relays.nim b/src/syndicate/relays.nim index dc612a2..89ac3a4 100644 --- a/src/syndicate/relays.nim +++ b/src/syndicate/relays.nim @@ -161,9 +161,6 @@ proc lookupLocal(relay; oid: Oid): Cap = if not sym.isNil: result = sym.cap -proc isInert(r: Cap): bool = - r.target.isNil - proc rewriteCapIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Cap = case n.orKind of WireRefKind.mine: @@ -319,7 +316,6 @@ when defined(posix): stopActor(entity.facet) else: entity.relay.recv(buf[], 0..