Compare commits
11 Commits
75cd3f32ca
...
b209548f5d
Author | SHA1 | Date |
---|---|---|
Emery Hemingway | b209548f5d | |
Emery Hemingway | a7c0c2027b | |
Emery Hemingway | 7b7577f8ff | |
Emery Hemingway | ac55c9fc75 | |
Emery Hemingway | 209d2327d2 | |
Emery Hemingway | ee0918492e | |
Emery Hemingway | 727d7a335f | |
Emery Hemingway | 70d68b17d0 | |
Emery Hemingway | aca4f4b822 | |
Emery Hemingway | 20c81fe225 | |
Emery Hemingway | 0e84fe2775 |
|
@ -60,11 +60,11 @@
|
|||
"packages": [
|
||||
"preserves"
|
||||
],
|
||||
"path": "/nix/store/6nnn5di5vip1vladlb7z56rbw18d1y7j-source",
|
||||
"rev": "2825bceecf33a15b9b7942db5331a32cbc39b281",
|
||||
"sha256": "145vf46fy3wc52j6vs509fm9bi5lx7c53gskbkpcfbkv82l86dgk",
|
||||
"path": "/nix/store/2hy124xgabz134dxj3wji7mp47fdwy3w-source",
|
||||
"rev": "9ae435a83c6d5028405538af5d24a023af625b6e",
|
||||
"sha256": "1k7ywcp1a53x2fpc6wc2b0qzb264dkifash0s1wcp66rw3lx15k2",
|
||||
"srcDir": "src",
|
||||
"url": "https://git.syndicate-lang.org/ehmry/preserves-nim/archive/2825bceecf33a15b9b7942db5331a32cbc39b281.tar.gz"
|
||||
"url": "https://git.syndicate-lang.org/ehmry/preserves-nim/archive/9ae435a83c6d5028405538af5d24a023af625b6e.tar.gz"
|
||||
},
|
||||
{
|
||||
"method": "fetchzip",
|
||||
|
|
|
@ -4,7 +4,6 @@
|
|||
## This module implements the `Syndicate DSL <https://syndicate-lang.org/doc/syndicate/>`_.
|
||||
|
||||
import std/[macros, tables, typetraits]
|
||||
import pkg/sys/ioqueue
|
||||
|
||||
import preserves
|
||||
export fromPreserves, toPreserves
|
||||
|
@ -174,7 +173,8 @@ macro during*(turn: untyped; ds: Cap; pattern: Pattern; publishBody, retractBody
|
|||
if `argCount` != 0 and `pattern`.analyse.capturePaths.len != `argCount`:
|
||||
raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments - " & $`pattern`)
|
||||
`callbackProc`
|
||||
discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`))
|
||||
discard inFacet(`turn`) do (`turn`: var Turn):
|
||||
discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`))
|
||||
|
||||
macro during*(turn: untyped; ds: Cap; pattern: Pattern; publishBody: untyped) =
|
||||
## Variant of `during` without a retract body.
|
||||
|
@ -186,9 +186,5 @@ macro during*(turn: untyped; ds: Cap; pattern: Pattern; publishBody: untyped) =
|
|||
if `argCount` != 0 and `pattern`.analyse.capturePaths.len != `argCount`:
|
||||
raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments - " & $`pattern`)
|
||||
`callbackProc`
|
||||
discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`))
|
||||
|
||||
proc runActor*(name: string; bootProc: TurnAction) =
|
||||
## Boot an actor `Actor` and churn ioqueue once.
|
||||
discard bootActor(name, bootProc)
|
||||
actors.run()
|
||||
discard inFacet(`turn`) do (`turn`: var Turn):
|
||||
discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`))
|
||||
|
|
|
@ -6,14 +6,13 @@ import pkg/cps
|
|||
import pkg/sys/ioqueue
|
||||
|
||||
import preserves
|
||||
import ../syndicate/protocols/[protocol, sturdy]
|
||||
import ../syndicate/protocols/[protocol, sturdy, trace]
|
||||
|
||||
const tracing = defined(traceSyndicate)
|
||||
|
||||
when tracing:
|
||||
import std/streams
|
||||
from std/os import getEnv
|
||||
import ./protocols/trace
|
||||
|
||||
export Handle
|
||||
|
||||
|
@ -64,7 +63,7 @@ type
|
|||
Turn* = ref object
|
||||
facet: Facet # active facet that may change during a turn
|
||||
work: Deque[tuple[facet: Facet, act: TurnAction]]
|
||||
effects: seq[Turn]
|
||||
effects: Table[Actor, Turn]
|
||||
when tracing:
|
||||
desc: TurnDescription
|
||||
|
||||
|
@ -110,39 +109,64 @@ when tracing:
|
|||
result.add f.id.toPreserves
|
||||
f = f.parent
|
||||
|
||||
proc initEnqueue(turn: Turn; cap: Cap): ActionDescription =
|
||||
result = ActionDescription(orKind: ActionDescriptionKind.enqueue)
|
||||
result.enqueue.event.target.actor = turn.facet.actor.id.toPreserves
|
||||
result.enqueue.event.target.facet = turn.facet.id.toPreserves
|
||||
result.enqueue.event.target.oid = cap.target.oid.toPreserves
|
||||
|
||||
proc toDequeue(act: sink ActionDescription): ActionDescription =
|
||||
result = ActionDescription(orKind: ActionDescriptionKind.dequeue)
|
||||
result.dequeue.event = move act.enqueue.event
|
||||
|
||||
proc toTraceTarget(cap: Cap): trace.Target =
|
||||
assert not cap.target.isNil
|
||||
assert not cap.target.facet.isNil
|
||||
result.actor = cap.target.facet.actor.id
|
||||
result.facet = cap.target.facet.id
|
||||
result.oid = cap.target.oid.toPreserves
|
||||
|
||||
method publish*(e: Entity; turn: var Turn; v: AssertionRef; h: Handle) {.base.} = discard
|
||||
method retract*(e: Entity; turn: var Turn; h: Handle) {.base.} = discard
|
||||
method message*(e: Entity; turn: var Turn; v: AssertionRef) {.base.} = discard
|
||||
method sync*(e: Entity; turn: var Turn; peer: Cap) {.base.} = discard
|
||||
|
||||
converter toActor(f: Facet): Actor = f.actor
|
||||
converter toActor(t: Turn): Actor = t.facet.actor
|
||||
converter toFacet(a: Actor): Facet = a.root
|
||||
converter toFacet(t: Turn): Facet = t.facet
|
||||
|
||||
using
|
||||
actor: Actor
|
||||
facet: Facet
|
||||
turn: Turn
|
||||
turn: var Turn
|
||||
action: TurnAction
|
||||
|
||||
proc labels(f: Facet): string =
|
||||
assert not f.isNil
|
||||
assert not f.actor.isNil
|
||||
result.add f.actor.name
|
||||
proc catLabels(f: Facet; labels: var string) =
|
||||
labels.add ':'
|
||||
if not f.parent.isNil:
|
||||
catLabels(f.parent, labels)
|
||||
labels.add ':'
|
||||
when tracing:
|
||||
labels.add $f.id
|
||||
result.add f.actor.name
|
||||
catLabels(f, result)
|
||||
|
||||
proc `$`*(f: Facet): string =
|
||||
"<Facet:" & f.labels & ">"
|
||||
|
||||
proc `$`*(r: Cap): string =
|
||||
"<Ref:" & r.relay.labels & ">"
|
||||
|
||||
proc `$`*(actor: Actor): string =
|
||||
"<Actor:" & actor.name & ">" # TODO: ambigous
|
||||
|
||||
proc `$`*(t: Turn): string =
|
||||
"<Turn:" & $t.desc.id & ">"
|
||||
when tracing:
|
||||
|
||||
proc `$`*(r: Cap): string =
|
||||
"<Ref:" & r.relay.labels & ">"
|
||||
|
||||
|
||||
proc `$`*(t: Turn): string =
|
||||
"<Turn:" & $t.desc.id & ">"
|
||||
|
||||
proc attenuate*(r: Cap; a: Attenuation): Cap =
|
||||
if a.len == 0: result = r
|
||||
|
@ -151,8 +175,11 @@ proc attenuate*(r: Cap; a: Attenuation): Cap =
|
|||
relay: r.relay,
|
||||
attenuation: a & r.attenuation)
|
||||
|
||||
proc hash*(actor): Hash =
|
||||
actor.unsafeAddr.hash
|
||||
|
||||
proc hash*(facet): Hash =
|
||||
facet.id.hash
|
||||
facet.unsafeAddr.hash
|
||||
|
||||
proc hash*(r: Cap): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash)
|
||||
|
||||
|
@ -162,6 +189,13 @@ proc nextHandle(facet: Facet): Handle =
|
|||
result = succ(facet.actor.handleAllocator[])
|
||||
facet.actor.handleAllocator[] = result
|
||||
|
||||
template recallFacet(turn: var Turn; body: untyped): untyped =
|
||||
let facet = turn.facet
|
||||
block:
|
||||
body
|
||||
assert facet.actor == turn.facet.actor
|
||||
turn.facet = facet
|
||||
|
||||
proc queueWork*(turn: var Turn; facet: Facet; act: TurnAction) =
|
||||
assert not facet.isNil
|
||||
turn.work.addLast((facet, act,))
|
||||
|
@ -190,17 +224,19 @@ proc run*(facet: Facet; action: TurnAction) = queueTurn(facet, action)
|
|||
proc facet*(turn: Turn): Facet = turn.facet
|
||||
|
||||
proc queueEffect*(turn: var Turn; target: Facet; act: TurnAction) =
|
||||
if target.actor == turn.facet.actor:
|
||||
let fremd = target.actor
|
||||
if fremd == turn.facet.actor:
|
||||
turn.work.addLast((target, act,))
|
||||
else:
|
||||
var next = Turn(facet: target)
|
||||
assert not target.isNil
|
||||
next.work.addLast((target, act,))
|
||||
when tracing:
|
||||
next.desc.id = nextTurnId()
|
||||
next.desc.cause = TurnCause(orKind: TurnCauseKind.turn)
|
||||
next.desc.cause.turn.id = turn.desc.id
|
||||
turn.effects.add next
|
||||
var fremdTurn = turn.effects.getOrDefault(fremd)
|
||||
if fremdTurn.isNil:
|
||||
fremdTurn = Turn(facet: target)
|
||||
turn.effects[fremd] = fremdTurn
|
||||
when tracing:
|
||||
fremdTurn.desc.id = nextTurnId()
|
||||
fremdTurn.desc.cause = TurnCause(orKind: TurnCauseKind.turn)
|
||||
fremdTurn.desc.cause.turn.id = turn.desc.id
|
||||
fremdTurn.work.addLast((target, act,))
|
||||
|
||||
type Bindings = Table[Value, Value]
|
||||
|
||||
|
@ -319,19 +355,21 @@ proc publish(turn: var Turn; cap: Cap; v: Value; h: Handle) =
|
|||
if not a.isFalse:
|
||||
let e = OutboundAssertion(handle: h, peer: cap)
|
||||
turn.facet.outbound[h] = e
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
|
||||
act.enqueue.event.target.actor = turn.facet.actor.id.toPreserves
|
||||
act.enqueue.event.target.facet = turn.facet.id.toPreserves
|
||||
act.enqueue.event.target.oid = cap.target.oid.toPreserves
|
||||
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
|
||||
act.enqueue.event.detail.assert.assertion.value.value =
|
||||
mapEmbeds(v) do (cap: Value) -> Value: discard
|
||||
act.enqueue.event.detail.assert.handle = h
|
||||
turn.desc.actions.add act
|
||||
queueEffect(turn, cap.relay) do (turn: var Turn):
|
||||
e.established = true
|
||||
when tracing:
|
||||
turn.desc.actions.add act.toDequeue
|
||||
publish(cap.target, turn, AssertionRef(value: a), e.handle)
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
|
||||
act.enqueue.event.target.actor = turn.facet.actor.id.toPreserves
|
||||
act.enqueue.event.target.facet = turn.facet.id.toPreserves
|
||||
act.enqueue.event.target.oid = cap.target.oid.toPreserves
|
||||
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
|
||||
act.enqueue.event.detail.assert.assertion.value.value =
|
||||
mapEmbeds(v) do (cap: Value) -> Value: discard
|
||||
act.enqueue.event.detail.assert.handle = h
|
||||
turn.desc.actions.add act
|
||||
|
||||
proc publish*(turn: var Turn; r: Cap; a: Value): Handle {.discardable.} =
|
||||
result = turn.facet.nextHandle()
|
||||
|
@ -341,7 +379,14 @@ proc publish*[T](turn: var Turn; r: Cap; a: T): Handle {.discardable.} =
|
|||
publish(turn, r, a.toPreserves)
|
||||
|
||||
proc retract(turn: var Turn; e: OutboundAssertion) =
|
||||
when tracing:
|
||||
var act = initEnqueue(turn, e.peer)
|
||||
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.retract)
|
||||
act.enqueue.event.detail.retract.handle = e.handle
|
||||
turn.desc.actions.add act
|
||||
queueEffect(turn, e.peer.relay) do (turn: var Turn):
|
||||
when tracing:
|
||||
turn.desc.actions.add act.toDequeue
|
||||
if e.established:
|
||||
e.established = false
|
||||
e.peer.target.retract(turn, e.handle)
|
||||
|
@ -354,18 +399,30 @@ proc retract*(turn: var Turn; h: Handle) =
|
|||
proc message*(turn: var Turn; r: Cap; v: Value) =
|
||||
var a = runRewrites(r.attenuation, v)
|
||||
if not a.isFalse:
|
||||
when tracing:
|
||||
var act = initEnqueue(turn, r)
|
||||
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.message)
|
||||
act.enqueue.event.detail.message.body.value.value =
|
||||
mapEmbeds(a) do (cap: Value) -> Value: discard
|
||||
turn.desc.actions.add act
|
||||
queueEffect(turn, r.relay) do (turn: var Turn):
|
||||
when tracing:
|
||||
turn.desc.actions.add act.toDequeue
|
||||
r.target.message(turn, AssertionRef(value: a))
|
||||
|
||||
proc message*[T](turn: var Turn; r: Cap; v: T) =
|
||||
message(turn, r, v.toPreserves)
|
||||
|
||||
proc sync(turn: var Turn; e: Entity; peer: Cap) =
|
||||
e.sync(turn, peer)
|
||||
|
||||
proc sync*(turn: var Turn; r, peer: Cap) =
|
||||
when tracing:
|
||||
var act = initEnqueue(turn, peer)
|
||||
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.sync)
|
||||
act.enqueue.event.detail.sync.peer = peer.toTraceTarget
|
||||
turn.desc.actions.add act
|
||||
queueEffect(turn, r.relay) do (turn: var Turn):
|
||||
sync(turn, r.target, peer)
|
||||
when tracing:
|
||||
turn.desc.actions.add act.toDequeue
|
||||
r.target.sync(turn, peer)
|
||||
|
||||
proc replace*[T](turn: var Turn; cap: Cap; h: Handle; v: T): Handle =
|
||||
result = publish(turn, cap, v)
|
||||
|
@ -401,68 +458,64 @@ proc isInert(facet): bool =
|
|||
noOutboundHandles = facet.outbound.len == 0
|
||||
isRootFacet = facet.parent.isNil
|
||||
noInertCheckPreventers = facet.inertCheckPreventers == 0
|
||||
noKids and (noOutboundHandles or isRootFacet) and noInertCheckPreventers
|
||||
result = noKids and (noOutboundHandles or isRootFacet) and noInertCheckPreventers
|
||||
|
||||
proc preventInertCheck*(facet): (proc()) {.discardable.} =
|
||||
var armed = true
|
||||
inc facet.inertCheckPreventers
|
||||
proc disarm() =
|
||||
if armed:
|
||||
armed = false
|
||||
dec facet.inertCheckPreventers
|
||||
result = disarm
|
||||
proc preventInertCheck*(turn: Turn) =
|
||||
inc turn.facet.inertCheckPreventers
|
||||
|
||||
proc terminate(actor; turn; reason: ref Exception)
|
||||
proc terminateActor(turn; reason: ref Exception)
|
||||
|
||||
proc terminate(facet; turn: var Turn; orderly: bool) =
|
||||
proc terminateFacetOrderly(turn: var Turn) =
|
||||
let facet = turn.facet
|
||||
if facet.isAlive:
|
||||
facet.isAlive = false
|
||||
let parent = facet.parent
|
||||
if not parent.isNil:
|
||||
parent.children.excl facet
|
||||
queueWork(turn, facet) do (turn: var Turn):
|
||||
while facet.children.len > 0:
|
||||
facet.children.pop.terminate(turn, orderly)
|
||||
if orderly:
|
||||
for act in facet.shutdownActions:
|
||||
act(turn)
|
||||
for a in facet.outbound.values: turn.retract(a)
|
||||
if orderly:
|
||||
if not parent.isNil:
|
||||
if parent.isInert:
|
||||
parent.terminate(turn, true)
|
||||
else:
|
||||
terminate(facet.actor, turn, nil)
|
||||
var i = 0
|
||||
while i < facet.shutdownActions.len:
|
||||
facet.shutdownActions[i](turn)
|
||||
inc i
|
||||
setLen facet.shutdownActions, 0
|
||||
for e in facet.outbound.values:
|
||||
retract(turn, e)
|
||||
clear facet.outbound
|
||||
|
||||
proc inertCheck(turn: var Turn) =
|
||||
if (not turn.facet.parent.isNil and
|
||||
(not turn.facet.parent.isAlive)) or
|
||||
turn.facet.isInert:
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
|
||||
act.facetstop.path = facet.path
|
||||
act.facetstop.path = turn.facet.path
|
||||
act.facetstop.reason = FacetStopReason.inert
|
||||
turn.desc.actions.add act
|
||||
stop(turn)
|
||||
|
||||
proc terminateFacet(turn: var Turn) =
|
||||
let facet = turn.facet
|
||||
for child in facet.children:
|
||||
queueWork(turn, child, terminateFacetOrderly)
|
||||
# terminate all children
|
||||
facet.children.clear()
|
||||
# detach all children
|
||||
queueWork(turn, facet, terminateFacetOrderly)
|
||||
# self-termination
|
||||
|
||||
proc stopIfInertAfter(action: TurnAction): TurnAction =
|
||||
# TODO: verify this
|
||||
proc wrapper(turn: var Turn) =
|
||||
action(turn)
|
||||
queueEffect(turn, turn.facet) do (turn: var Turn):
|
||||
if (not turn.facet.parent.isNil and
|
||||
(not turn.facet.parent.isAlive)) or
|
||||
turn.facet.isInert:
|
||||
stop(turn)
|
||||
wrapper
|
||||
proc work(turn: var Turn) =
|
||||
queueWork(turn, turn.facet, action)
|
||||
queueEffect(turn, turn.facet, inertCheck)
|
||||
work
|
||||
|
||||
proc newFacet(turn: var Turn): Facet = newFacet(turn.facet.actor, turn.facet)
|
||||
|
||||
proc inFacet*(turn: var Turn; bootProc: TurnAction): Facet =
|
||||
result = newFacet(turn)
|
||||
let facet = turn.facet
|
||||
turn.facet = result
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
|
||||
act.facetstart.path.add result.path
|
||||
turn.desc.actions.add act
|
||||
stopIfInertAfter(bootProc)(turn)
|
||||
turn.facet = facet
|
||||
|
||||
proc facet(turn: var Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc)
|
||||
recallFacet turn:
|
||||
turn.facet = result
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
|
||||
act.facetstart.path.add result.path
|
||||
turn.desc.actions.add act
|
||||
stopIfInertAfter(bootProc)(turn)
|
||||
|
||||
proc newActor(name: string; parent: Facet): Actor =
|
||||
result = Actor(
|
||||
|
@ -539,7 +592,8 @@ proc newInertCap*(): Cap =
|
|||
|
||||
proc atExit*(actor; action) = actor.exitHooks.add action
|
||||
|
||||
proc terminate(actor; turn; reason: ref Exception) =
|
||||
proc terminateActor(turn; reason: ref Exception) =
|
||||
let actor = turn.actor
|
||||
if not actor.exiting:
|
||||
actor.exiting = true
|
||||
actor.exitReason = reason
|
||||
|
@ -549,75 +603,76 @@ proc terminate(actor; turn; reason: ref Exception) =
|
|||
act.stop.status = ExitStatus(orKind: ExitStatusKind.Error)
|
||||
act.stop.status.error.message = reason.msg
|
||||
trace(actor, act)
|
||||
for hook in actor.exitHooks: hook(turn)
|
||||
while actor.exitHooks.len > 0:
|
||||
var hook = actor.exitHooks.pop()
|
||||
try: hook(turn)
|
||||
except CatchableError as err:
|
||||
if reason.isNil:
|
||||
terminateActor(turn, err)
|
||||
return
|
||||
proc finish(turn: var Turn) =
|
||||
assert not actor.root.isNil, actor.name
|
||||
actor.root.terminate(turn, reason.isNil)
|
||||
terminateFacet(turn)
|
||||
actor.root = nil
|
||||
actor.exited = true
|
||||
queueTurn(actor.root, finish)
|
||||
|
||||
proc terminate*(facet; e: ref Exception) =
|
||||
proc terminateFacet*(facet; e: ref Exception) =
|
||||
run(facet.actor.root) do (turn: var Turn):
|
||||
facet.actor.terminate(turn, e)
|
||||
|
||||
template recallFacet(turn: var Turn; body: untyped): untyped =
|
||||
let facet = turn.facet
|
||||
block:
|
||||
body
|
||||
assert facet.actor == turn.facet.actor, "turn of " & $facet.actor & " ended at " & $turn.facet.actor
|
||||
turn.facet = facet
|
||||
|
||||
proc stopNow(turn: var Turn) =
|
||||
let caller = turn.facet
|
||||
recallFacet turn:
|
||||
while caller.children.len > 0:
|
||||
var child = caller.children.pop()
|
||||
if child.actor == caller.actor:
|
||||
turn.facet = child
|
||||
stopNow(turn)
|
||||
else:
|
||||
queueEffect(turn, child, stopNow)
|
||||
caller.terminate(turn, true)
|
||||
terminateActor(turn, e)
|
||||
|
||||
proc stop*(turn: var Turn, facet: Facet) =
|
||||
queueEffect(turn, facet, stopNow)
|
||||
queueEffect(turn, facet) do (turn: var Turn):
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
|
||||
act.facetstop.path = facet.path
|
||||
act.facetstop.reason = FacetStopReason.explicitAction
|
||||
turn.desc.actions.add act
|
||||
terminateFacet(turn)
|
||||
|
||||
proc stop*(turn: var Turn) =
|
||||
stop(turn, turn.facet)
|
||||
|
||||
proc onStop*(facet: Facet; act: TurnAction) =
|
||||
## Add a `proc (turn: var Turn)` action to `facet` to be called as it stops.
|
||||
assert not facet.isNil
|
||||
add(facet.shutdownActions, act)
|
||||
|
||||
proc onStop*(turn: var Turn; act: TurnAction) =
|
||||
onStop(turn.facet, act)
|
||||
|
||||
proc isAlive(actor): bool =
|
||||
not(actor.exited or actor.exiting)
|
||||
|
||||
proc stop*(actor: Actor) =
|
||||
queueTurn(actor.root) do (turn: var Turn):
|
||||
assert(not turn.facet.isNil)
|
||||
stop(turn)
|
||||
if actor.isAlive:
|
||||
queueTurn(actor.root) do (turn: var Turn):
|
||||
assert(not turn.facet.isNil)
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
|
||||
act.facetstop.path = turn.facet.path
|
||||
act.facetstop.reason = FacetStopReason.actorStopping
|
||||
turn.desc.actions.add act
|
||||
stop(turn, turn.facet)
|
||||
|
||||
proc stopActor*(facet: Facet) =
|
||||
stop(facet.actor)
|
||||
|
||||
proc stopActor*(turn: var Turn) =
|
||||
assert(not turn.facet.isNil)
|
||||
assert(not turn.facet.actor.isNil)
|
||||
assert(not turn.facet.actor.root.isNil)
|
||||
stop(turn, turn.facet.actor.root)
|
||||
|
||||
proc freshen*(turn: var Turn, act: TurnAction) {.deprecated.} =
|
||||
run(turn.facet, act)
|
||||
|
||||
proc newCap*(relay: Facet; e: Entity): Cap {.deprecated.} =
|
||||
Cap(relay: relay, target: e)
|
||||
proc newCap*(relay: Facet; entity: Entity): Cap =
|
||||
## Create a new capability for `entity` via `relay`.
|
||||
# An Entity has an owning facet and a Cap does as well?
|
||||
if entity.facet.isNil: entity.facet = relay
|
||||
Cap(relay: relay, target: entity)
|
||||
|
||||
proc newCap*(turn; e: Entity): Cap =
|
||||
Cap(relay: turn.facet, target: e)
|
||||
|
||||
newCap(turn.facet, e)
|
||||
proc newCap*(e: Entity; turn): Cap =
|
||||
Cap(relay: turn.facet, target: e)
|
||||
newCap(turn.facet, e)
|
||||
|
||||
type SyncContinuation {.final.} = ref object of Entity
|
||||
action: TurnAction
|
||||
|
@ -643,21 +698,62 @@ proc run(turn: var Turn) =
|
|||
var act = ActorActivation(orKind: ActorActivationKind.turn)
|
||||
act.turn = move turn.desc
|
||||
trace(turn.facet.actor, act)
|
||||
turn.facet = nil # invalidate the turn
|
||||
# TODO: catch exceptions here
|
||||
for eff in turn.effects.mitems:
|
||||
for eff in turn.effects.mvalues:
|
||||
assert not eff.facet.isNil
|
||||
turnQueue.addLast(move eff)
|
||||
turn.facet = nil # invalidate the turn
|
||||
|
||||
proc runPendingTurns* =
|
||||
while turnQueue.len > 0:
|
||||
var turn = turnQueue.popFirst()
|
||||
# TODO: check if actor is still valid
|
||||
try: run(turn)
|
||||
except CatchableError as err:
|
||||
stderr.writeLine("actor ", turn.actor.name, " threw an error during a turn")
|
||||
terminateActor(turn, err)
|
||||
|
||||
proc run* =
|
||||
## Run actors to completion
|
||||
var ready: seq[Continuation]
|
||||
while true:
|
||||
while turnQueue.len > 0:
|
||||
var turn = turnQueue.popFirst()
|
||||
# TODO: check if actor is still valid
|
||||
run turn
|
||||
runPendingTurns()
|
||||
ioqueue.poll(ready)
|
||||
if ready.len == 0: break
|
||||
while ready.len > 0:
|
||||
discard trampoline:
|
||||
ready.pop()
|
||||
try:
|
||||
discard trampoline:
|
||||
ready.pop()
|
||||
except CatchableError as err:
|
||||
stderr.writeLine "ioqueue continuation threw an error"
|
||||
raise err
|
||||
|
||||
proc runActor*(name: string; bootProc: TurnAction) =
|
||||
## Boot an actor `Actor` and churn ioqueue.
|
||||
let actor = bootActor(name, bootProc)
|
||||
if not actor.exitReason.isNil:
|
||||
raise actor.exitReason
|
||||
actors.run()
|
||||
if not actor.exitReason.isNil:
|
||||
raise actor.exitReason
|
||||
|
||||
type FacetGuard* = object
|
||||
facet: Facet
|
||||
|
||||
proc initGuard*(f: Facet): FacetGuard =
|
||||
result.facet = f
|
||||
inc result.facet.inertCheckPreventers
|
||||
|
||||
proc disarm*(g: var FacetGuard) =
|
||||
if not g.facet.isNil:
|
||||
assert g.facet.inertCheckPreventers > 0
|
||||
dec g.facet.inertCheckPreventers
|
||||
g.facet = nil
|
||||
|
||||
proc `=destroy`*(g: FacetGuard) =
|
||||
if not g.facet.isNil:
|
||||
dec g.facet.inertCheckPreventers
|
||||
|
||||
proc `=copy`*(dst: var FacetGuard, src: FacetGuard) =
|
||||
dst.facet = src.facet
|
||||
inc dst.facet.inertCheckPreventers
|
||||
|
|
|
@ -42,7 +42,7 @@ type DeprecatedBootProc = proc (ds: Cap; turn: var Turn) {.closure.}
|
|||
|
||||
proc bootDataspace*(name: string; bootProc: BootProc): Actor =
|
||||
bootActor(name) do (turn: var Turn):
|
||||
discard turn.facet.preventInertCheck()
|
||||
turn.preventInertCheck()
|
||||
bootProc(turn, newDataspace(turn))
|
||||
|
||||
proc bootDataspace*(name: string; bootProc: DeprecatedBootProc): Actor {.deprecated.} =
|
||||
|
|
|
@ -161,9 +161,6 @@ proc lookupLocal(relay; oid: Oid): Cap =
|
|||
if not sym.isNil:
|
||||
result = sym.cap
|
||||
|
||||
proc isInert(r: Cap): bool =
|
||||
r.target.isNil
|
||||
|
||||
proc rewriteCapIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Cap =
|
||||
case n.orKind
|
||||
of WireRefKind.mine:
|
||||
|
@ -262,12 +259,12 @@ type
|
|||
|
||||
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) =
|
||||
spawnLink(name, turn) do (turn: var Turn):
|
||||
turn.preventInertCheck()
|
||||
let relay = Relay(
|
||||
facet: turn.facet,
|
||||
packetWriter: opts.packetWriter,
|
||||
wireBuf: newBufferedDecoder(0),
|
||||
)
|
||||
discard relay.facet.preventInertCheck()
|
||||
if not opts.initialCap.isNil:
|
||||
var exported: seq[WireSymbol]
|
||||
discard rewriteCapOut(relay, opts.initialCap, exported)
|
||||
|
@ -319,10 +316,10 @@ when defined(posix):
|
|||
stopActor(entity.facet)
|
||||
else:
|
||||
entity.relay.recv(buf[], 0..<n)
|
||||
close(entity.stdin)
|
||||
|
||||
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Stdio) =
|
||||
## Connect to an external dataspace over stdio.
|
||||
let localDataspace = newDataspace(turn)
|
||||
proc stdoutWriter(turn: var Turn; buf: seq[byte]) =
|
||||
## Blocking write to stdout.
|
||||
let n = writeBytes(stdout, buf, 0, buf.len)
|
||||
|
@ -331,7 +328,7 @@ when defined(posix):
|
|||
stopActor(turn)
|
||||
var opts = RelayActorOptions(
|
||||
packetWriter: stdoutWriter,
|
||||
initialCap: ds,
|
||||
initialCap: localDataspace,
|
||||
initialOid: 0.Oid.some,
|
||||
)
|
||||
spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
|
||||
|
@ -346,12 +343,14 @@ when defined(posix):
|
|||
facet: turn.facet, relay: relay, stdin: newAsyncFile(FD fd))
|
||||
onStop(entity.facet) do (turn: var Turn):
|
||||
entity.alive = false
|
||||
close(entity.stdin)
|
||||
# Close stdin to remove it from the ioqueue
|
||||
discard trampoline:
|
||||
whelp loop(entity)
|
||||
publish(turn, ds, TransportConnection(
|
||||
`addr`: ta.toPreserves,
|
||||
control: newCap(entity, turn),
|
||||
resolved: relay.peer.accepted,
|
||||
resolved: localDataspace.accepted,
|
||||
))
|
||||
|
||||
proc connectStdio*(turn: var Turn; ds: Cap) =
|
||||
|
@ -382,7 +381,9 @@ when defined(posix):
|
|||
template bootSocketEntity() {.dirty.} =
|
||||
proc setup(turn: var Turn) {.closure.} =
|
||||
proc kill(turn: var Turn) =
|
||||
entity.alive = false
|
||||
if entity.alive:
|
||||
entity.alive = false
|
||||
close(entity.sock)
|
||||
onStop(turn, kill)
|
||||
publish(turn, ds, TransportConnection(
|
||||
`addr`: ta.toPreserves,
|
||||
|
@ -400,7 +401,7 @@ when defined(posix):
|
|||
stopActor(entity.facet)
|
||||
else:
|
||||
entity.relay.recv(buf[], 0..<n)
|
||||
close(entity.sock)
|
||||
# the socket closes when the actor is stopped
|
||||
|
||||
proc boot(entity: TcpEntity; ta: transportAddress.Tcp; ds: Cap) {.asyncio.} =
|
||||
entity.sock = connectTcpAsync(ta.host, Port ta.port)
|
||||
|
@ -560,11 +561,16 @@ proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
|
|||
proc resolveEnvironment*(turn: var Turn; bootProc: BootProc) =
|
||||
## Resolve a capability from the calling environment
|
||||
## and call `bootProc`. See envRoute_.
|
||||
var resolved = false
|
||||
let
|
||||
ds = newDataspace(turn)
|
||||
pat = ResolvePath ?: {0: ?envRoute(), 3: ?:ResolvedAccepted}
|
||||
during(turn, ds, pat) do (dst: Cap):
|
||||
bootProc(turn, dst)
|
||||
if not resolved:
|
||||
resolved = true
|
||||
bootProc(turn, dst)
|
||||
do:
|
||||
resolved = false
|
||||
spawnRelays(turn, ds)
|
||||
|
||||
# TODO: define a runActor that comes preloaded with relaying
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# Package
|
||||
|
||||
version = "20240308"
|
||||
version = "20240315"
|
||||
author = "Emery Hemingway"
|
||||
description = "Syndicated actors for conversational concurrency"
|
||||
license = "Unlicense"
|
||||
|
@ -9,4 +9,4 @@ srcDir = "src"
|
|||
|
||||
# Dependencies
|
||||
|
||||
requires "https://github.com/ehmry/hashlib.git >= 20231130", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240208", "https://github.com/ehmry/nim-sys.git#4ef3b624db86e331ba334e705c1aa235d55b05e1", "https://github.com/nim-works/cps"
|
||||
requires "https://github.com/ehmry/hashlib.git >= 20231130", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240312", "https://github.com/ehmry/nim-sys.git#4ef3b624db86e331ba334e705c1aa235d55b05e1", "https://github.com/nim-works/cps"
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[asyncdispatch, asyncfile, parseopt]
|
||||
import std/[oserrors, parseopt, posix, strutils]
|
||||
import pkg/sys/[files, handles, ioqueue]
|
||||
import preserves, syndicate, syndicate/relays
|
||||
|
||||
type
|
||||
|
@ -10,33 +11,51 @@ type
|
|||
Says {.preservesRecord: "Says".} = object
|
||||
who, what: string
|
||||
|
||||
proc readStdin(facet: Facet; ds: Cap; username: string) =
|
||||
let file = openAsync("/dev/stdin")
|
||||
onStop(facet) do (turn: var Turn): close(file)
|
||||
close(stdin)
|
||||
proc readLine() {.gcsafe.} =
|
||||
let future = readLine(file)
|
||||
addCallback(future, facet) do (turn: var Turn):
|
||||
var msg = read(future)
|
||||
if msg == "": quit()
|
||||
message(turn, ds, Says(who: username, what: msg))
|
||||
readLine()
|
||||
readLine()
|
||||
proc syncAndStop(facet: Facet; cap: Cap) =
|
||||
## Stop the actor responsible for `facet` after
|
||||
## synchronizing with `cap`.
|
||||
run(facet) do (turn: var Turn):
|
||||
sync(turn, cap, stopActor)
|
||||
|
||||
proc readStdin(facet: Facet; ds: Cap; username: string) {.asyncio.} =
|
||||
let
|
||||
fd = stdin.getOsFileHandle()
|
||||
flags = fcntl(fd.cint, F_GETFL, 0)
|
||||
if flags < 0:
|
||||
raiseOSError(osLastError())
|
||||
if fcntl(fd.cint, F_SETFL, flags or O_NONBLOCK) < 0:
|
||||
raiseOSError(osLastError())
|
||||
let
|
||||
file = newAsyncFile(FD fd)
|
||||
buf = new string
|
||||
buf[].setLen(0x1000)
|
||||
while true:
|
||||
let n = read(file, buf)
|
||||
if n < 1:
|
||||
stderr.writeLine "test_chat calls stopsActor ", facet.actor
|
||||
syncAndStop(facet, ds)
|
||||
return
|
||||
else:
|
||||
var msg = buf[][0..<n].strip
|
||||
proc send(turn: var Turn) =
|
||||
message(turn, ds, Says(who: username, what: msg))
|
||||
run(facet, send)
|
||||
|
||||
proc chat(turn: var Turn; ds: Cap; username: string) =
|
||||
during(turn, ds, ?:Present) do (who: string):
|
||||
echo who, " joined"
|
||||
do:
|
||||
echo who, " left"
|
||||
during(turn, ds, ?:Present) do (who: string):
|
||||
echo who, " joined"
|
||||
do:
|
||||
echo who, " left"
|
||||
|
||||
onMessage(turn, ds, ?:Says) do (who: string, what: string):
|
||||
echo who, ": ", what
|
||||
onMessage(turn, ds, ?:Says) do (who: string, what: string):
|
||||
echo who, ": ", what
|
||||
|
||||
discard publish(turn, ds, Present(username: username))
|
||||
readStdin(turn.facet, ds, username)
|
||||
discard publish(turn, ds, Present(username: username))
|
||||
|
||||
discard trampoline:
|
||||
whelp readStdin(turn.facet, ds, username)
|
||||
|
||||
proc main =
|
||||
let route = envRoute()
|
||||
var username = ""
|
||||
|
||||
for kind, key, val in getopt():
|
||||
|
@ -48,9 +67,8 @@ proc main =
|
|||
if username == "":
|
||||
stderr.writeLine "--user: unspecified"
|
||||
else:
|
||||
runActor("chat") do (turn: var Turn; root: Cap):
|
||||
spawnRelays(turn, root)
|
||||
resolve(turn, root, route) do (turn: var Turn; ds: Cap):
|
||||
runActor("chat") do (turn: var Turn):
|
||||
resolveEnvironment(turn) do (turn: var Turn; ds: Cap):
|
||||
chat(turn, ds, username)
|
||||
|
||||
main()
|
||||
|
|
|
@ -7,7 +7,7 @@ import syndicate, syndicate/actors/timers
|
|||
|
||||
let actor = bootActor("timer-test") do (turn: var Turn):
|
||||
let timers = newDataspace(turn)
|
||||
spawnTimerActor(timers, turn)
|
||||
spawnTimerActor(turn, timers)
|
||||
|
||||
onPublish(turn, timers, ?LaterThan(seconds: 1356100000)):
|
||||
echo "now in 13th bʼakʼtun"
|
||||
|
|
Loading…
Reference in New Issue