From 98580c2bb61fd241a839c66b648e6f86383bbae4 Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Fri, 1 Mar 2024 09:33:05 +0000 Subject: [PATCH] Failed continuafication --- src/sam/actors.nim | 162 ++++++++++++++++++++++++-------------- src/sam/actors/timers.nim | 11 ++- syndicate.nimble | 2 +- tests/test_timers.nim | 6 ++ 4 files changed, 113 insertions(+), 68 deletions(-) diff --git a/src/sam/actors.nim b/src/sam/actors.nim index 2223909..90cc53e 100644 --- a/src/sam/actors.nim +++ b/src/sam/actors.nim @@ -21,7 +21,7 @@ export protocol.Handle type Cont* = ref object of Continuation - facet: Facet + turn: Turn PublishProc* = proc (e: Entity; v: Value; h: Handle) {.cps: Cont.} RetractProc* = proc (e: Entity; h: Handle) {.cps: Cont.} @@ -30,7 +30,6 @@ type Handler* = proc() {.closure.} - Work = Deque[Cont] HandlerDeque = seq[ContinuationProc[Continuation]] FacetState = enum fFresh, fRunning, fEnded @@ -58,10 +57,10 @@ type ## Type for callbacks to be called within a turn. ## The `Facet` parameter is the owning facet. - Turn {.byref.} = object + Turn = ref object ## https://synit.org/book/glossary.html#turn - facet: Facet - work: Work + actor: Actor + work: Dequeue[Cont] actions: seq[Cont] event: Option[protocol.Event] rollback: bool @@ -89,29 +88,35 @@ type # TODO: run on a seperate thread. # crashHandlers: HandlerDeque root: Facet - children: HashSet[Actor] - # linked subordinate actors, this collection - # is here for running actor turns handleAllocator: Handle facetIdAllocator: int id: ActorId - turn: Turn when traceSyndicate: traceStream: FileStream stopped: bool -template turnWork*(prc: typed): untyped = - cps(Cont, prc) +var turnQueue {.threadvar, used.}: Deque[Turn] -proc activeFacet(c: Cont): Facet {.cpsVoodoo.} = - ## Return the active `Facet` within a turn context. - assert not c.facet.isNil - c.facet +proc queueTurn(turn: sink Turn) = + turnQueue.addLast(turn) + +proc turnsPending*(): bool = + turnQueue.len > 0 + +template turnWork*(prc: typed): untyped = + ## Pragma to mark work that executes in a `Turn` context. + cps(Cont, prc) proc activeTurn(c: Cont): Turn {.cpsVoodoo.} = ## Return the active `Turn` within a turn context. - assert not c.facet.isNil - c.facet.actor.turn + assert not c.turn.isNil + c.turn + +proc activeFacet(c: Cont): Facet {.cpsVoodoo.} = + ## Return the active `Facet` within a turn context. + assert not c.turn.isNil + assert not c.turn.facet.isNil + c.turn.facet using actor: Actor @@ -134,37 +139,8 @@ proc collectPath(result: var seq[FacetId]; facet) = collectPath(result, facet.parent) result.add(facet.id) -proc newFacet(actor; parent: Facet): Facet = - inc(actor.facetIdAllocator) - result = Facet( - actor: actor, - parent: parent, - id: actor.facetIdAllocator.toPreserves, - ) - if not parent.isNil: - parent.children.add result - when traceSyndicate: - var act = ActionDescription(orKind: ActionDescriptionKind.facetstart) - collectPath(act.facetstart.path, result) - actor.turn.desc.actions.add act - proc stopped*(facet): bool = facet.state != fRunning -proc newActor(parent: Facet; name: string): Actor = - result = Actor(id: name.toPreserves) - result.root = newFacet(result, parent) - if not parent.isNil: - parent.actor.children.incl result - when traceSyndicate: - if parent.isNil: - let path = getEnv("SYNDICATE_TRACE_FILE", "") - case path - of "": discard - of "-": result.traceStream = newFileStream(stderr) - else: result.traceStream = openFileStream(path, fmWrite) - else: - result.traceStream = parent.actor.traceStream - when traceSyndicate: proc traceFlush(actor) = if not actor.traceStream.isNil: @@ -238,14 +214,6 @@ proc yieldToActions(c: Cont): Cont {.cpsMagic.} = c.facet.actor.turn.actions.add(c) nil -proc startExternalTurn(facet) = - let actor = facet.actor - assert actor.turn.work.len == 0 - assert actor.turn.actions.len == 0 - actor.turn.facet = facet - when traceSyndicate: - actor.turn.desc = TurnDescription(cause: TurnCause(orKind: TurnCauseKind.external)) - proc terminate(actor; err: ref Exception) = when traceSyndicate: actor.traceTurn() @@ -617,7 +585,7 @@ proc addOnStopHandler(c: Cont; cb: Callback): Cont {.cpsMagic.} = c.facet.stopCallbacks.add(cb) result = c -proc onStop*(facet; cb: proc () {.closure.}) = +proc onStop*(facet; cb: Callback) = facet.stopCallbacks.add(cb) proc facetCall(prc: FacetProc; f: Facet) {.cps: Cont.} = @@ -626,12 +594,84 @@ proc facetCall(prc: FacetProc; f: Facet) {.cps: Cont.} = proc facetCall(prc: FacetProc) {.cps: Cont.} = prc(activeFacet()) -proc bootActor*(name: string; bootProc: FacetProc): Actor = - result = newActor(nil, name) - result.root.startExternalTurn() - result.root.queueWork(whelp facetCall(bootProc)) +proc workCall(cb: Callback) {.cps: Cont.} = + cb() + +proc god(facet): Actor = + ## Return the parent of all actors associated with `facet`. + var facet = facet + while not facet.parent.isNil: + facet = facet.parent + facet.actor + +proc newExternalTurn(facet; desc: Value) = + result = Turn(facet: facet) + let actor = facet.actor + when traceSyndicate: + result.desc.cause = TurnCause(orKind: TurnCauseKind.external)) + result.desc.cause.external description = desc + +proc runExternalTurn*(facet: Facet; cb: proc ()) = + echo "startExternalTurn" + startExternalTurn(facet) + facet.queueWork(whelp workCall(cb)) + while run(facet.actor): discard + echo "runExternalTurn finished" + assert facet.actor.turn.work.len == 0 + assert facet.actor.turn.actions.len == 0 + +proc newFacet(turn: Turn; actor; parent: Facet): Facet = + inc(actor.facetIdAllocator) + result = Facet( + actor: actor, + parent: parent, + id: actor.facetIdAllocator.toPreserves, + ) + if not parent.isNil: + parent.children.add result + when traceSyndicate: + var act = ActionDescription(orKind: ActionDescriptionKind.facetstart) + collectPath(act.facetstart.path, result) + actor.turn.desc.actions.add act + +proc newActor(parent: Facet; name: string): Actor = + result = Actor(id: name.toPreserves) + result.root = newFacet(result, parent) + when traceSyndicate: + if parent.isNil: + let path = getEnv("SYNDICATE_TRACE_FILE", "") + case path + of "": discard + of "-": result.traceStream = newFileStream(stderr) + else: result.traceStream = openFileStream(path, fmWrite) + else: + result.traceStream = parent.actor.traceStream proc spawnActor*(parent: Facet; name: string; bootProc: FacetProc): Actor {.discardable.} = + ## Spawn a new `Actor` at `parent`. result = newActor(parent, name) - result.root.startExternalTurn() - parent.queueWork(whelp facetCall(bootProc, result.root)) + var turn = newExternalTurn(result.root, "bootActor".toPreserves) + turn.queueWork(whelp facetCall(bootProc)) + queueTurn(turn) + +proc bootActor*(name: string; bootProc: FacetProc): Actor = + ## Boot a new `Actor`. + result = newActor(nil, name) + var turn = newExternalTurn(result.root, "bootActor".toPreserves) + turn.queueWork(whelp facetCall(bootProc)) + queueTurn(turn) + +proc run(turn: sink Turn) = + try: + while turn.work.len > 0: + discard trampoline turn.work.pop() + except CatchableError as err: + terminate turn.actor(err) + return + var i: int + while i < turn.actions.len: + discard trampoline turn.actions[i] + +proc runTurn*() = + ## Run one turn. + turnQueue.pop().run() diff --git a/src/sam/actors/timers.nim b/src/sam/actors/timers.nim index 257b85f..feffad4 100644 --- a/src/sam/actors/timers.nim +++ b/src/sam/actors/timers.nim @@ -81,12 +81,12 @@ when defined(linux): raiseOSError(osLastError(), "failed to set timeout") driver.timers.incl(fd) while clock_realtime().toFloat() < deadline: - echo "waiting on timer descriptor" wait(FD fd, Read) - echo "wait returned on timer descriptor" if deadline in driver.deadlines: - {.warning: "TODO: need to start a turn at driver.facet".} - discard publish(driver.target, LaterThan(seconds: deadline)) + proc pub() = + echo "publishing later-than" + discard publish(driver.target, LaterThan(seconds: deadline)) + runExternalTurn(driver.facet, pub) discard close(fd) driver.timers.excl(fd) @@ -97,6 +97,7 @@ when defined(linux): let driver = spawnTimerDriver(facet, ds) let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()}) during(ds, pat) do (deadline: float): + echo "timer actor sees observation of ", LaterThan(seconds: deadline) if change(driver.deadlines, deadline, +1) == cdAbsentToPresent: discard trampoline(whelp await(driver, deadline)) do: @@ -110,5 +111,3 @@ proc after*(ds: Cap; dur: Duration; cb: proc () {.closure.}) = pat = ?LaterThan(seconds: later) onPublish(ds, pat): cb() - -# TODO: periodic timer diff --git a/syndicate.nimble b/syndicate.nimble index 2246e1d..5be1d19 100644 --- a/syndicate.nimble +++ b/syndicate.nimble @@ -1,6 +1,6 @@ # Package -version = "20240229" +version = "20240301" author = "Emery Hemingway" description = "Syndicated actors for conversational concurrency" license = "Unlicense" diff --git a/tests/test_timers.nim b/tests/test_timers.nim index d630675..4a5d17d 100644 --- a/tests/test_timers.nim +++ b/tests/test_timers.nim @@ -11,6 +11,9 @@ let actor = bootActor("timer-test") do (facet: Facet): let timers = facet.newDataspace() spawnTimerActor(timers) + onPublish(timers, ?LaterThan(seconds: 1356091200)): + echo "now in 13th bʼakʼtun" + timers.after(initDuration(seconds = 3)) do (): echo "third timer expired" stopActor(facet) @@ -22,5 +25,8 @@ let actor = bootActor("timer-test") do (facet: Facet): echo "second timer expired" while not actor.stopped: + echo "running actor" if not run(actor): + echo "run(actor) did not progress" ioqueue.run() + echo "ioqueue.run finished"