syndicate-nim/src/syndicate/actors.nim

348 lines
9.5 KiB
Nim

# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[deques, hashes, options, times]
import pkg/cps
import preserves
import ../syndicate/protocols/[protocol, sturdy]
# const traceSyndicate {.booldefine.}: bool = true
const traceSyndicate* = true
when traceSyndicate:
import std/streams
from std/os import getEnv
import ./protocols/trace
export protocol.Handle
type
Cont* = ref object of Continuation
turn: Turn
facet: Facet
Handler* = proc() {.closure.}
Work = Deque[Cont]
HandlerDeque = seq[ContinuationProc[Continuation]]
FacetState = enum fFresh, fRunning, fEnded
Callback* = proc () {.nimcall.}
Facet* = ref object
## https://synit.org/book/glossary.html#facet
actor: Actor
parent: Facet
children: seq[Facet]
stopHandlers: HandlerDeque
stopCallbacks: seq[Callback]
state: FacetState
when traceSyndicate:
id: FacetId
Turn* = ref object
## https://synit.org/book/glossary.html#turn
facet: Facet
entity: Entity
event: Option[protocol.Event]
work: Work
when traceSyndicate:
desc: TurnDescription
Entity* = ref object of RootObj
## https://synit.org/book/glossary.html#entity
facet*: Facet
oid*: sturdy.Oid # oid is how Entities are identified over the wire
when traceSyndicate:
id: FacetId
Cap* {.final, preservesEmbedded.} = ref object of EmbeddedObj
relay*: Facet
target*: Entity
attenuation*: seq[sturdy.Caveat]
Actor* = ref object
## https://synit.org/book/glossary.html#actor
# crashHandlers: HandlerDeque
root: Facet
handleAllocator: Handle
facetIdAllocator: int
id: ActorId
when traceSyndicate:
traceStream: FileStream
stopped: bool
template syndicate*(prc: typed): untyped =
cps(Cont, prc)
proc activeTurn*(c: Cont): Turn {.cpsVoodoo.} =
## Return the active `Turn` within a `{.syndicate.}` context.
assert not c.turn.isNil
c.turn
proc activeFacet*(c: Cont): Facet {.cpsVoodoo.} =
## Return the active `Facet` within a `{.syndicate.}` context.
assert not c.facet.isNil
c.facet
proc activeActor*(c: Cont): Actor {.cpsVoodoo.} =
## Return the active `Actor` within a `{.syndicate.}` context.
assert not c.turn.isNil
c.facet.actor
using
actor: Actor
facet: Facet
entity: Entity
cap: Cap
turn: Turn
proc hash*(facet): Hash = facet.unsafeAddr.hash
proc hash*(cap): Hash = cap.unsafeAddr.hash
proc newFacet(actor: Actor; parent: Facet): Facet =
inc(actor.facetIdAllocator)
result = Facet(
actor: actor,
parent: parent,
id: actor.facetIdAllocator.toPreserves,
)
proc stopped*(facet): bool = facet.state != fRunning
proc newActor(name: string): Actor =
result = Actor(id: name.toPreserves)
result.root = newFacet(result, nil)
when traceSyndicate:
let path = getEnv("SYNDICATE_TRACE_FILE", "")
case path
of "": discard
of "-": result.traceStream = newFileStream(stderr)
else: result.traceStream = openFileStream(path, fmWrite)
when traceSyndicate:
proc trace(actor; act: ActorActivation) =
if not actor.traceStream.isNil:
var entry = TraceEntry(
timestamp: getTime().toUnixFloat(),
actor: actor.id,
item: act,
)
actor.traceStream.writeLine($entry.toPreserves)
proc trace(actor; turn) =
if not actor.traceStream.isNil:
actor.trace(ActorActivation(
orKind: ActorActivationKind.turn,
turn: turn.desc,
))
proc traceTarget(cap): trace.Target =
let facet = cap.relay
Target(
actor: facet.actor.id,
facet: facet.id,
oid: cap.target.oid.toPreserves,
)
proc traceTarget(turn): trace.Target =
let facet = turn.facet
Target(
actor: facet.actor.id,
facet: facet.id,
)
proc newExternalTurn(facet): Turn =
result = Turn(facet: facet)
when traceSyndicate:
result.desc = TurnDescription(cause: TurnCause(orKind: TurnCauseKind.external))
proc pass*(a, b: Cont): Cont =
b.turn = a.turn
if b.facet.isNil:
b.facet = a.facet
# TODO: whelp a new continuation at facet boundaries?
b
proc queue(t: Turn; c: Cont) =
c.facet = t.facet
t.work.addLast(c)
proc queue(c: Cont): Cont {.cpsMagic.} =
queue(c.turn, c)
nil
proc complete(turn; c: Cont) =
var c = c
try:
while not c.isNil and not c.fn.isNil:
c.turn = turn
var y = c.fn
var x = y(c)
c = Cont(x)
except CatchableError as err:
if not c.dismissed:
writeStackFrames c
# terminate(c.facet, err)
proc run(turn) =
let actor = turn.facet.actor
assert not actor.stopped
while turn.work.len > 0:
complete(turn, turn.work.popFirst())
when traceSyndicate:
actor.trace(turn)
if actor.stopped:
trace(actor, ActorActivation(orkind: ActorActivationKind.stop))
proc start(actor; cont: Cont) =
when traceSyndicate:
var act = ActorActivation(orkind: ActorActivationKind.start)
trace(actor, act)
actor.root.state = fRunning
let turn = actor.root.newExternalTurn()
turn.queue(cont)
run(turn)
proc stop(turn; actor)
proc collectPath(result: var seq[FacetId]; facet) =
if not facet.parent.isNil:
collectPath(result, facet.parent)
result.add(facet.id)
proc runNextStop(c: Cont; facet: Facet): Cont {.cpsMagic.} =
c.fn = facet.stopHandlers.pop()
result = c
proc runNextFacetStop() {.syndicate.} =
activeFacet().runNextStop()
proc stop(turn; facet; reason: FacetStopReason) =
while facet.stopHandlers.len > 0:
var c = whelp runNextFacetStop()
c.facet = facet
complete(turn, c)
while facet.stopCallbacks.len > 0:
var cb = facet.stopCallbacks.pop()
cb()
while facet.children.len > 0:
stop(turn, facet.children.pop(), FacetStopReason.parentStopping)
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.facetstop)
collectPath(act.facetstop.path, facet)
act.facetStop.reason = reason
turn.desc.actions.add act
if facet.parent.isNil:
facet.actor.root = nil
stop(turn, facet.actor)
proc stop(turn; actor) =
if not actor.root.isNil:
stop(turn, actor.root, FacetStopReason.actorStopping)
actor.stopped = true
proc bootActor*(name: string, c: Cont) =
start(newActor(name), c)
proc stopActor(c: Cont; a: Actor): Cont {.cpsMagic.} =
stop(c.turn, a)
nil
proc stopFacet(c: Cont; f: Facet): Cont {.cpsMagic.} =
stop(c.turn, f, FacetStopReason.explicitAction)
nil
proc stopFacet*() {.syndicate.} =
queue()
stop(activeTurn(), activeFacet(), FacetStopReason.explicitAction)
proc stopActor*() {.syndicate.} =
queue()
stop(activeTurn(), activeActor())
type
AssertionRef* = ref object
value*: Value
# if the Enity methods take a Value object then the generated
# C code has "redefinition of struct" problems when orc is enabled
method publish*(e: Entity; turn: Turn; v: AssertionRef; h: Handle) {.base.} = discard
method retract*(e: Entity; turn: Turn; h: Handle) {.base.} = discard
method message*(e: Entity; turn: Turn; v: AssertionRef) {.base.} = discard
method sync*(e: Entity; turn: Turn; peer: Cap) {.base.} = discard
proc newCap*(f: Facet; e: Entity): Cap =
Cap(relay: f, target: e)
proc nextHandle(facet: Facet): Handle =
inc(facet.actor.handleAllocator)
facet.actor.handleAllocator
proc publish*(turn: Turn; cap: Cap; val: Value): Handle =
result = turn.facet.nextHandle()
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event = TargetedTurnEvent(
target: cap.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.assert)
)
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
act.enqueue.event.detail.assert = TurnEventAssert(
assertion: AssertionDescription(orKind: AssertionDescriptionKind.value),
handle: result,
)
act.enqueue.event.detail.assert.assertion.value.value = val
turn.desc.actions.add act
proc retract*(turn; h: Handle) =
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event = TargetedTurnEvent(
target: turn.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.retract)
)
act.enqueue.event.detail.retract = TurnEventRetract(handle: h)
turn.desc.actions.add act
proc message*(turn; cap; val: Value) =
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event = TargetedTurnEvent(
target: turn.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.message)
)
act.enqueue.event.detail.message.body.value.value = val
turn.desc.actions.add act
proc sync*(turn; peer: Cap) =
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event = TargetedTurnEvent(
target: turn.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.sync)
)
act.enqueue.event.detail.sync.peer = peer.traceTarget
turn.desc.actions.add act
proc publish*(cap: Cap; val: Value): Handle {.syndicate.} =
publish(activeTurn(), cap, val)
proc retract*(h: Handle) {.syndicate.} =
activeTurn().retract(h)
proc message*(cap: Cap; val: Value) {.syndicate.} =
activeTurn().message(cap, val)
proc sync*(cap: Cap) {.syndicate.} =
activeTurn().sync(cap)
proc installStopHook*(c: Cont, facet: Facet): Cont {.cpsMagic.} =
facet.stopHandlers.add(c.fn)
return c
proc addOnStopHandler*(c: Cont; cb: Callback): Cont {.cpsMagic.} =
c.facet.stopCallbacks.add(cb)
result = c