Goto trick
This commit is contained in:
parent
b83ef93484
commit
da93f9eb1f
|
@ -16,11 +16,11 @@
|
|||
"packages": [
|
||||
"cps"
|
||||
],
|
||||
"path": "/nix/store/l7z889g1bcsmz9xpixpfg07ckpqxvhjc-source",
|
||||
"rev": "44ad3dcb1b7d1aab7043586fdce5717e8f6e79d9",
|
||||
"sha256": "14c6b3y714yl6rqp3y9r9qhaf44bw2ys21hpjqlkwgbh6gzp461w",
|
||||
"path": "/nix/store/6z2yi8vcc2jc4j813dbfsbklx8dpybcn-source",
|
||||
"rev": "d870c8ba485bf7f81e3941d52afffe74fc926215",
|
||||
"sha256": "0kspz3ansl0ibf2rb2hjnjg076v96kbwr196vjxc76yk4bjp4zfa",
|
||||
"srcDir": "",
|
||||
"url": "https://github.com/nim-works/cps/archive/44ad3dcb1b7d1aab7043586fdce5717e8f6e79d9.tar.gz"
|
||||
"url": "https://github.com/nim-works/cps/archive/d870c8ba485bf7f81e3941d52afffe74fc926215.tar.gz"
|
||||
},
|
||||
{
|
||||
"method": "fetchzip",
|
||||
|
|
|
@ -148,7 +148,9 @@ macro onPublish*(ds: Cap; pattern: Pattern; handler: untyped) =
|
|||
raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments - " & $`pattern`)
|
||||
`handlerProc`
|
||||
discard observe(activeTurn(), `ds`, `pattern`, ClosureEntity(publishImpl: `handlerSym`))
|
||||
]#
|
||||
|
||||
#[
|
||||
macro onMessage*(ds: Cap; pattern: Pattern; handler: untyped) =
|
||||
## Call `handler` when an message matching `pattern` is broadcasted at `ds`.
|
||||
let
|
||||
|
@ -160,7 +162,9 @@ macro onMessage*(ds: Cap; pattern: Pattern; handler: untyped) =
|
|||
raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments - " & $`pattern`)
|
||||
`handlerProc`
|
||||
discard observe(activeTurn(), `ds`, `pattern`, ClosureEntity(messageImpl: `handlerSym`))
|
||||
]#
|
||||
|
||||
#[
|
||||
macro during*(ds: Cap; pattern: Pattern; publishBody, retractBody: untyped) =
|
||||
## Call `publishBody` when an assertion matching `pattern` is published to `ds` and
|
||||
## call `retractBody` on retraction. Assertions that match `pattern` but are not
|
||||
|
@ -211,13 +215,15 @@ proc runActor*(name: string; bootProc: DeprecatedBootProc) {.deprecated.} =
|
|||
proc wrapHandler(body: NimNode; ident: string): NimNode =
|
||||
var sym = genSym(nskProc, ident)
|
||||
quote do:
|
||||
proc `sym`() =
|
||||
proc `sym`(cont: Cont): Cont {.cpsMagic.} =
|
||||
`body`
|
||||
nil
|
||||
|
||||
macro onStop*(body: untyped) =
|
||||
let
|
||||
handler = wrapHandler(body, "onStop")
|
||||
handlerSym = handler[0]
|
||||
quote do:
|
||||
`handler`
|
||||
addStopHandler(activeActor(), `handlerSym`)
|
||||
block:
|
||||
let facet = activeFacet()
|
||||
facet.installStopHook()
|
||||
if facet.stopped():
|
||||
`body`
|
||||
return
|
||||
|
|
|
@ -19,19 +19,22 @@ export protocol.Handle
|
|||
type
|
||||
Cont* = ref object of Continuation
|
||||
turn: Turn
|
||||
facet: Facet
|
||||
|
||||
Handler* = proc() {.closure.}
|
||||
|
||||
Work = Deque[Cont]
|
||||
HandlerDeque = seq[ContinuationProc[Continuation]]
|
||||
|
||||
FacetState = enum fIdle, fRunning, fStopped
|
||||
FacetState = enum fFresh, fRunning, fEnded
|
||||
|
||||
Facet* = ref object
|
||||
## https://synit.org/book/glossary.html#facet
|
||||
actor: Actor
|
||||
parent: Facet
|
||||
stopHandlers: Deque[Handler]
|
||||
# state: FacetState
|
||||
children: seq[Facet]
|
||||
stopHandlers: HandlerDeque
|
||||
state: FacetState
|
||||
when traceSyndicate:
|
||||
id: FacetId
|
||||
|
||||
|
@ -58,7 +61,7 @@ type
|
|||
|
||||
Actor* = ref object
|
||||
## https://synit.org/book/glossary.html#actor
|
||||
# crashHandlers: Deque[Handler]
|
||||
# crashHandlers: HandlerDeque
|
||||
root: Facet
|
||||
handleAllocator: Handle
|
||||
facetIdAllocator: int
|
||||
|
@ -67,6 +70,24 @@ type
|
|||
traceStream: FileStream
|
||||
stopped: bool
|
||||
|
||||
template syndicate*(prc: typed): untyped =
|
||||
cps(Cont, prc)
|
||||
|
||||
proc activeTurn*(c: Cont): Turn {.cpsVoodoo.} =
|
||||
## Return the active `Turn` within a `{.syndicate.}` context.
|
||||
assert not c.turn.isNil
|
||||
c.turn
|
||||
|
||||
proc activeFacet*(c: Cont): Facet {.cpsVoodoo.} =
|
||||
## Return the active `Facet` within a `{.syndicate.}` context.
|
||||
assert not c.facet.isNil
|
||||
c.facet
|
||||
|
||||
proc activeActor*(c: Cont): Actor {.cpsVoodoo.} =
|
||||
## Return the active `Actor` within a `{.syndicate.}` context.
|
||||
assert not c.turn.isNil
|
||||
c.facet.actor
|
||||
|
||||
using
|
||||
actor: Actor
|
||||
facet: Facet
|
||||
|
@ -85,6 +106,8 @@ proc newFacet(actor: Actor; parent: Facet): Facet =
|
|||
id: actor.facetIdAllocator.toPreserves,
|
||||
)
|
||||
|
||||
proc stopped*(facet): bool = facet.state != fRunning
|
||||
|
||||
proc newActor(name: string): Actor =
|
||||
result = Actor(id: name.toPreserves)
|
||||
result.root = newFacet(result, nil)
|
||||
|
@ -133,52 +156,48 @@ proc newExternalTurn(facet): Turn =
|
|||
result.desc = TurnDescription(cause: TurnCause(orKind: TurnCauseKind.external))
|
||||
|
||||
proc pass*(a, b: Cont): Cont =
|
||||
b.turn = move a.turn
|
||||
b.turn = a.turn
|
||||
if b.facet.isNil:
|
||||
b.facet = a.facet
|
||||
# TODO: whelp a new continuation at facet boundaries?
|
||||
b
|
||||
|
||||
proc queue(t: Turn; c: Cont) =
|
||||
c.turn = t
|
||||
c.facet = t.facet
|
||||
t.work.addLast(c)
|
||||
|
||||
proc queue(c: Cont): Cont {.cpsMagic.} =
|
||||
queue(c.turn, c)
|
||||
nil
|
||||
|
||||
proc run(facet; work: var Work) =
|
||||
while work.len > 0:
|
||||
var c = work.popFirst()
|
||||
let t = c.turn
|
||||
try:
|
||||
while not c.isNil and not c.fn.isNil:
|
||||
c.turn = t
|
||||
var y = c.fn
|
||||
var x = y(c)
|
||||
c = Cont(x)
|
||||
except CatchableError as err:
|
||||
if not c.dismissed:
|
||||
writeStackFrames c
|
||||
# terminate(facet, err)
|
||||
proc complete(turn; c: Cont) =
|
||||
var c = c
|
||||
try:
|
||||
while not c.isNil and not c.fn.isNil:
|
||||
c.turn = turn
|
||||
var y = c.fn
|
||||
var x = y(c)
|
||||
c = Cont(x)
|
||||
except CatchableError as err:
|
||||
if not c.dismissed:
|
||||
writeStackFrames c
|
||||
# terminate(c.facet, err)
|
||||
|
||||
proc run(turn) =
|
||||
let
|
||||
facet = turn.facet
|
||||
actor = facet.actor
|
||||
let actor = turn.facet.actor
|
||||
assert not actor.stopped
|
||||
run(facet, turn.work)
|
||||
while turn.work.len > 0:
|
||||
complete(turn, turn.work.popFirst())
|
||||
when traceSyndicate:
|
||||
actor.trace(turn)
|
||||
if actor.stopped:
|
||||
trace(actor, ActorActivation(orkind: ActorActivationKind.stop))
|
||||
|
||||
proc run(handlers: var Deque[Handler]) =
|
||||
while handlers.len > 0:
|
||||
var h = handlers.popLast()
|
||||
h()
|
||||
|
||||
proc start(actor; cont: Cont) =
|
||||
when traceSyndicate:
|
||||
var act = ActorActivation(orkind: ActorActivationKind.start)
|
||||
trace(actor, act)
|
||||
actor.root.state = fRunning
|
||||
let turn = actor.root.newExternalTurn()
|
||||
turn.queue(cont)
|
||||
run(turn)
|
||||
|
@ -190,8 +209,20 @@ proc collectPath(result: var seq[FacetId]; facet) =
|
|||
collectPath(result, facet.parent)
|
||||
result.add(facet.id)
|
||||
|
||||
proc runNextStop(c: Cont; facet: Facet): Cont {.cpsMagic.} =
|
||||
c.fn = facet.stopHandlers.pop()
|
||||
result = c
|
||||
|
||||
proc runNextFacetStop() {.syndicate.} =
|
||||
activeFacet().runNextStop()
|
||||
|
||||
proc stop(turn; facet; reason: FacetStopReason) =
|
||||
run(facet.stopHandlers)
|
||||
while facet.stopHandlers.len > 0:
|
||||
var c = whelp runNextFacetStop()
|
||||
c.facet = facet
|
||||
complete(turn, c)
|
||||
while facet.children.len > 0:
|
||||
stop(turn, facet.children.pop(), FacetStopReason.parentStopping)
|
||||
when traceSyndicate:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetstop)
|
||||
collectPath(act.facetstop.path, facet)
|
||||
|
@ -209,24 +240,6 @@ proc stop(turn; actor) =
|
|||
proc bootActor*(name: string, c: Cont) =
|
||||
start(newActor(name), c)
|
||||
|
||||
template syndicate*(prc: typed): untyped =
|
||||
cps(Cont, prc)
|
||||
|
||||
proc activeTurn*(c: Cont): Turn {.cpsVoodoo.} =
|
||||
## Return the active `Turn` within a `{.syndicate.}` context.
|
||||
assert not c.turn.isNil
|
||||
c.turn
|
||||
|
||||
proc activeFacet*(c: Cont): Facet {.cpsVoodoo.} =
|
||||
## Return the active `Facet` within a `{.syndicate.}` context.
|
||||
assert not c.turn.isNil
|
||||
c.turn.facet
|
||||
|
||||
proc activeActor*(c: Cont): Actor {.cpsVoodoo.} =
|
||||
## Return the active `Actor` within a `{.syndicate.}` context.
|
||||
assert not c.turn.isNil
|
||||
c.turn.facet.actor
|
||||
|
||||
proc stopActor(c: Cont; a: Actor): Cont {.cpsMagic.} =
|
||||
stop(c.turn, a)
|
||||
nil
|
||||
|
@ -319,5 +332,6 @@ proc message*(cap: Cap; val: Value) {.syndicate.} =
|
|||
proc sync*(cap: Cap) {.syndicate.} =
|
||||
activeTurn().sync(cap)
|
||||
|
||||
proc addStopHandler*(actor: Actor; h: Handler) =
|
||||
actor.root.stopHandlers.addLast(h)
|
||||
proc installStopHook*(c: Cont, facet: Facet): Cont {.cpsMagic.} =
|
||||
facet.stopHandlers.add(c.fn)
|
||||
return c
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
# SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway
|
||||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[hashes, tables]
|
||||
|
@ -46,3 +46,19 @@ proc during*(cb: DuringProc): DuringEntity = DuringEntity(cb: cb)
|
|||
|
||||
proc observe*(turn: Turn; ds: Cap; pat: Pattern; e: Entity): Handle =
|
||||
publish(turn, ds, Observe(pattern: pat, observer: newCap(turn, e)).toPreserves)
|
||||
|
||||
proc assertHandle(turn: Turn): Handle =
|
||||
doAssert(
|
||||
turn.event.isSome and turn.event.get.orKind == EventKind.Assert,
|
||||
"operation not valid during this turn")
|
||||
turn.event.get.assert.handle
|
||||
|
||||
proc awaitRetraction(cont: Cont): Cont {.cpsMagic.} =
|
||||
{.error: "this cannot work".}
|
||||
let h = cont.turn.assertHandle
|
||||
while true:
|
||||
let turn = cont.turn
|
||||
if turn.retracts(h):
|
||||
return c
|
||||
else:
|
||||
turn.facet.actor.qeueue(c)
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# Package
|
||||
|
||||
version = "20240210"
|
||||
version = "20240216"
|
||||
author = "Emery Hemingway"
|
||||
description = "Syndicated actors for conversational concurrency"
|
||||
license = "Unlicense"
|
||||
|
|
|
@ -12,6 +12,10 @@ proc now: float64 = getTime().toUnixFloat()
|
|||
proc main() {.syndicate.} =
|
||||
let ds = newDataspace()
|
||||
let h = publish(ds, "hello world!".toPreserves)
|
||||
|
||||
#onMessage(ds, grab()) do (v: Value):
|
||||
# stderr.writeLine "observed message ", v
|
||||
|
||||
message(ds, "hello world!".toPreserves)
|
||||
retract(h)
|
||||
sync(ds)
|
||||
|
@ -23,12 +27,9 @@ proc main() {.syndicate.} =
|
|||
stopActor()
|
||||
echo "actor stopped but still executing?"
|
||||
|
||||
#onMessage(ds, grab()) do (v: Value):
|
||||
# stderr.writeLine "observed message ", v
|
||||
|
||||
#[
|
||||
block:
|
||||
spawnTimers(ds)
|
||||
# spawnTimers(ds)
|
||||
onPublish(ds, grab(LaterThan(seconds: now()+1.0))) do:
|
||||
stderr.writeLine "slept one second once"
|
||||
onPublish(ds, grab(LaterThan(seconds: now()+1.0))) do:
|
||||
|
@ -38,5 +39,4 @@ proc main() {.syndicate.} =
|
|||
stopActor()
|
||||
]#
|
||||
|
||||
|
||||
bootActor("main", whelp main())
|
||||
|
|
Loading…
Reference in New Issue