diff --git a/src/syndicate/actors.nim b/src/syndicate/actors.nim index 7be19a2..3c81d78 100644 --- a/src/syndicate/actors.nim +++ b/src/syndicate/actors.nim @@ -5,6 +5,13 @@ import std/[asyncfutures, deques, hashes, monotimes, options, sets, tables, time import preserves import ../syndicate/protocols/[protocol, sturdy] +const tracing = defined(traceSyndicate) + +when tracing: + import std/streams + from std/os import getEnv + import ./protocols/trace + export Handle template generateIdType(typ: untyped) = @@ -25,6 +32,11 @@ type Attenuation = seq[Caveat] Rewrite = sturdy.Rewrite[Ref] + AssertionRef* = ref object + value*: Preserve[Ref] + # if the Enity methods take a Preserve[Ref] object then the generated + # C code has "redefinition of struct" problems when orc is enabled + Entity* = ref object of RootObj oid*: Oid # oid is how Entities are identified over the wire @@ -42,41 +54,59 @@ type Actor* = ref object future: Future[void] name: string - id: ActorId handleAllocator: ref Handle # a fresh actor gets a new ref Handle and # all actors spawned from it get the same ref. root: Facet exitReason: ref Exception exitHooks: seq[TurnAction] + id: ActorId exiting: bool + when tracing: + turnIdAllocator: ref TurnId + traceStream: FileStream TurnAction* = proc (t: var Turn) {.gcsafe.} Queues = TableRef[Facet, seq[TurnAction]] Turn* = object # an object that should remain on the stack - id: TurnId facet: Facet queues: Queues # a ref object that can outlive Turn - - ParentFacet = Option[Facet] + when defined(traceSyndicate): + desc: TurnDescription[void] Facet* = ref FacetObj FacetObj = object - id: FacetId actor*: Actor - parent: ParentFacet + parent: Facet children: HashSet[Facet] outbound: OutboundTable shutdownActions: seq[TurnAction] inertCheckPreventers: int + id: FacetId isAlive: bool -type AssertionRef* = ref object - value*: Preserve[Ref] - # if the Enity methods take a Preserve[Ref] object then the generated - # C code has "redefinition of struct" problems when orc is enabled +when tracing: + + proc nextTurnId(facet: Facet): TurnId = + result = succ(facet.actor.turnIdAllocator[]) + facet.actor.turnIdAllocator[] = result + + proc trace(actor: Actor; act: ActorActivation) = + if not actor.traceStream.isNil: + var entry = TraceEntry[void]( + timestamp: getTime().toUnixFloat(), + actor: initRecord("named", actor.name.toPreserve(void)), + item: act) + actor.traceStream.writeText entry.toPreserve(void) + actor.traceStream.writeLine() + + proc path(facet: Facet): seq[trace.FacetId[void]] = + var f = facet + while not f.isNil: + result.add f.id.toPreserve + f = f.parent method publish*(e: Entity; turn: var Turn; v: AssertionRef; h: Handle) {.base, gcsafe.} = discard method retract*(e: Entity; turn: var Turn; h: Handle) {.base, gcsafe.} = discard @@ -92,10 +122,11 @@ using proc labels(f: Facet): string = proc catLabels(f: Facet; labels: var string) = labels.add ':' - if f.parent.isSome: - catLabels(f.parent.get, labels) + if not f.parent.isNil: + catLabels(f.parent, labels) labels.add ':' - labels.add $f.id + when tracing: + labels.add $f.id result.add f.actor.name catLabels(f, result) @@ -253,6 +284,18 @@ proc publish(turn: var Turn; r: Ref; v: Assertion; h: Handle) = enqueue(turn, r.relay) do (turn: var Turn): e.established = true publish(r.target, turn, AssertionRef(value: a), e.handle) + when tracing: + var act = ActionDescription[void](orKind: ActionDescriptionKind.enqueue) + act.enqueue.event.target.actor = turn.facet.actor.id.toPreserve + act.enqueue.event.target.facet = turn.facet.id.toPreserve + act.enqueue.event.target.oid = r.target.oid.toPreserve + act.enqueue.event.detail = trace.TurnEvent[void](orKind: TurnEventKind.assert) + act.enqueue.event.detail.assert.assertion.value.value = + contract(v) do (r: Ref) -> Preserve[void]: + discard + act.enqueue.event.detail.assert.handle = h + turn.desc.actions.add act + proc publish*(turn: var Turn; r: Ref; a: Assertion): Handle = result = turn.facet.nextHandle() @@ -305,22 +348,22 @@ proc stop*(turn: var Turn) {.gcsafe.} proc run*(facet; action: TurnAction; zombieTurn = false) {.gcsafe.} -proc newFacet(actor; parent: ParentFacet; initialAssertions: OutboundTable): Facet = +proc newFacet(actor; parent: Facet; initialAssertions: OutboundTable): Facet = result = Facet( id: getMonoTime().ticks.FacetId, actor: actor, parent: parent, outbound: initialAssertions, isAlive: true) - if parent.isSome: parent.get.children.incl result + if not parent.isNil: parent.children.incl result -proc newFacet(actor; parent: ParentFacet): Facet = +proc newFacet(actor; parent: Facet): Facet = var initialAssertions: OutboundTable newFacet(actor, parent, initialAssertions) proc isInert(facet): bool = result = facet.children.len == 0 and - (facet.outbound.len == 0 or facet.parent.isNone) and + (facet.outbound.len == 0 or facet.parent.isNil) and facet.inertCheckPreventers == 0 proc preventInertCheck*(facet): (proc() {.gcsafe.}) {.discardable.} = @@ -344,8 +387,8 @@ proc terminate(facet; turn: var Turn; orderly: bool) {.gcsafe.} = if facet.isAlive: facet.isAlive = false let parent = facet.parent - if parent.isSome: - parent.get.children.excl facet + if not parent.isNil: + parent.children.excl facet block: var turn = Turn(facet: facet, queues: turn.queues) while facet.children.len > 0: @@ -355,24 +398,32 @@ proc terminate(facet; turn: var Turn; orderly: bool) {.gcsafe.} = act(turn) for a in facet.outbound.values: turn.retract(a) if orderly: - if parent.isSome: - if parent.get.isInert: - parent.get.terminate(turn, true) + if not parent.isNil: + if parent.isInert: + parent.terminate(turn, true) else: terminate(facet.actor, turn, nil) + when tracing: + var act = ActionDescription[void](orKind: ActionDescriptionKind.facetStop) + act.facetstop.path = facet.path + turn.desc.actions.add act proc stopIfInertAfter(action: TurnAction): TurnAction = proc wrapper(turn: var Turn) = action(turn) enqueue(turn, turn.facet) do (turn: var Turn): - if (turn.facet.parent.isSome and - (not turn.facet.parent.get.isAlive)) or + if (not turn.facet.parent.isNil and + (not turn.facet.parent.isAlive)) or turn.facet.isInert: stop(turn) wrapper proc inFacet*(turn: var Turn; bootProc: TurnAction): Facet = - result = newFacet(turn.facet.actor, some turn.facet) + result = newFacet(turn.facet.actor, turn.facet) + when tracing: + var act = ActionDescription[void](orKind: ActionDescriptionKind.facetstart) + act.facetstart.path.add result.path + turn.desc.actions.add act inFacet(turn, result, stopIfInertAfter(bootProc)) proc facet*(turn: var Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc) @@ -385,16 +436,28 @@ proc newActor(name: string): Actor = name: name, id: ActorId(seed)) new result.handleAllocator - result.root = newFacet(result, none Facet) + result.root = newFacet(result, nil) result.future = newFuture[void]($result) + when tracing: + var act = ActorActivation[void](orKind: ActorActivationKind.start) + act.start.actorName = Name[void](orKind: NameKind.named) + act.start.actorName.named.name = name.toPreserve(void) + trace(result, act) proc run(actor; bootProc: TurnAction; initialAssertions: OutboundTable) = - run(newFacet(actor, some actor.root, initialAssertions), stopIfInertAfter(bootProc)) + run(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc)) proc bootActor*(name: string; bootProc: TurnAction): Actor = var initialAssertions: OutboundTable result = newActor(name) new result.handleAllocator + when tracing: + new result.turnIdAllocator + let path = getEnv("SYNDICATE_TRACE_FILE", "/tmp/" & name & ".trace.pr") + case path + of "": stderr.writeLine "$SYNDICATE_TRACE_FILE unset, not tracing actor ", name + of "-": result.traceStream = newFileStream(stderr) + else: result.traceStream = openFileStream(path, fmWrite) run(result, bootProc, initialAssertions) proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()) = @@ -404,6 +467,12 @@ proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertion discard turn.facet.outbound.pop(key, newOutbound[key]) let actor = newActor(name) actor.handleAllocator = turn.facet.actor.handleAllocator + when tracing: + actor.turnIdAllocator = turn.facet.actor.turnIdAllocator + actor.traceStream = turn.facet.actor.traceStream + var act = ActionDescription[void](orKind: ActionDescriptionKind.spawn) + act.spawn.id = actor.id.toPreserve + turn.desc.actions.add act run(actor, bootProc, newOutBound) proc newInertRef*(): Ref = @@ -414,6 +483,12 @@ proc atExit*(actor; action) = actor.exitHooks.add action proc terminate(actor; turn; reason: ref Exception) = if not actor.exiting: + when tracing: + var act = ActorActivation[void](orKind: ActorActivationKind.stop) + if not reason.isNil: + act.stop.status = ExitStatus(orKind: ExitStatusKind.Error) + act.stop.status.error.message = reason.msg + trace(actor, act) actor.exiting = true actor.exitReason = reason for hook in actor.exitHooks: hook(turn) @@ -444,17 +519,18 @@ template tryFacet(facet; body: untyped) = except CatchableError as err: terminate(facet, err) proc run*(facet; action: TurnAction; zombieTurn = false) = - if not zombieTurn: - if not facet.actor.exitReason.isNil: return - if not facet.isAlive: return - # TODO: not Nim idiom - tryFacet(facet): - var queues = newTable[Facet, seq[TurnAction]]() - block: - var turn = Turn(facet: facet, queues: queues) - action(turn) - for facet, queue in queues: - for action in queue: run(facet, action) + if zombieTurn or (facet.actor.exitReason.isNil and facet.isAlive): + tryFacet(facet): + var queues = newTable[Facet, seq[TurnAction]]() + block: + var turn = Turn(facet: facet, queues: queues) + action(turn) + when tracing: + turn.desc.id = facet.nextTurnId.toPreserve + facet.actor.trace ActorActivation[void]( + orKind: ActorActivationKind.turn, turn: turn.desc) + for facet, queue in queues: + for action in queue: run(facet, action) proc run*(`ref`: Ref; action: TurnAction) = ## Convenience proc to run a `TurnAction` in the scope of a `Ref`. @@ -465,7 +541,14 @@ proc addCallback*(fut: FutureBase; facet: Facet; act: TurnAction) = ## within the context of `facet`. addCallback(fut) do (): if fut.failed: terminate(facet, fut.error) - else: run(facet, act) + else: + when tracing: + run(facet) do (turn: var Turn): + turn.desc.cause = TurnCause[void](orKind: TurnCauseKind.external) + turn.desc.cause.external.description = "Future".toPreserve + act(turn) + else: + run(facet, act) proc addCallback*(fut: FutureBase; turn: var Turn; act: TurnAction) = ## Add a callback to a `Future` that will be called at a later `Turn` @@ -482,11 +565,17 @@ proc addCallback*[T](fut: Future[T]; turn: var Turn; act: proc (t: var Turn, x: if fut.failed: terminate(facet.facet, fut.error) else: run(facet) do (turn: var Turn): + when tracing: + turn.desc.cause = TurnCause[void](orKind: TurnCauseKind.external) + turn.desc.cause.external.description = "Future".toPreserve act(turn, read fut) proc stop*(turn: var Turn, facet: Facet) = - enqueue(turn, facet.parent.get) do (turn: var Turn): - facet.terminate(turn, true) + if facet.parent.isNil: + facet.terminate(turn, true) + else: + enqueue(turn, facet.parent) do (turn: var Turn): + facet.terminate(turn, true) proc stop*(turn: var Turn) = stop(turn, turn.facet) @@ -511,6 +600,6 @@ proc newRef*(turn; e: Entity): Ref = Ref(relay: turn.facet, target: e) proc sync*(turn, refer: Ref, cb: proc(t: Turn) {.gcsafe.}) = - discard # TODO + raiseAssert "not implemented" proc future*(actor): Future[void] = actor.future diff --git a/syndicate.nimble b/syndicate.nimble index c33cd8a..f11f4d2 100644 --- a/syndicate.nimble +++ b/syndicate.nimble @@ -1,6 +1,6 @@ # Package -version = "20230712" +version = "20230713" author = "Emery Hemingway" description = "Syndicated actors for conversational concurrency" license = "Unlicense"