syndicate-nim/src/syndicate/actors.nim

324 lines
8.7 KiB
Nim

# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[deques, hashes, options, times]
import pkg/cps
import preserves
import ../syndicate/protocols/[protocol, sturdy]
# const traceSyndicate {.booldefine.}: bool = true
const traceSyndicate* = true
when traceSyndicate:
import std/streams
from std/os import getEnv
import ./protocols/trace
export protocol.Handle
type
Cont* = ref object of Continuation
turn: Turn
Handler* = proc() {.closure.}
Work = Deque[Cont]
FacetState = enum fIdle, fRunning, fStopped
Facet* = ref object
## https://synit.org/book/glossary.html#facet
actor: Actor
parent: Facet
stopHandlers: Deque[Handler]
# state: FacetState
when traceSyndicate:
id: FacetId
Turn* = ref object
## https://synit.org/book/glossary.html#turn
facet: Facet
entity: Entity
event: Option[protocol.Event]
work: Work
when traceSyndicate:
desc: TurnDescription
Entity* = ref object of RootObj
## https://synit.org/book/glossary.html#entity
facet*: Facet
oid*: sturdy.Oid # oid is how Entities are identified over the wire
when traceSyndicate:
id: FacetId
Cap* {.final, preservesEmbedded.} = ref object of EmbeddedObj
relay*: Facet
target*: Entity
attenuation*: seq[sturdy.Caveat]
Actor* = ref object
## https://synit.org/book/glossary.html#actor
# crashHandlers: Deque[Handler]
root: Facet
handleAllocator: Handle
facetIdAllocator: int
id: ActorId
when traceSyndicate:
traceStream: FileStream
stopped: bool
using
actor: Actor
facet: Facet
entity: Entity
cap: Cap
turn: Turn
proc hash*(facet): Hash = facet.unsafeAddr.hash
proc hash*(cap): Hash = cap.unsafeAddr.hash
proc newFacet(actor: Actor; parent: Facet): Facet =
inc(actor.facetIdAllocator)
result = Facet(
actor: actor,
parent: parent,
id: actor.facetIdAllocator.toPreserves,
)
proc newActor(name: string): Actor =
result = Actor(id: name.toPreserves)
result.root = newFacet(result, nil)
when traceSyndicate:
let path = getEnv("SYNDICATE_TRACE_FILE", "")
case path
of "": discard
of "-": result.traceStream = newFileStream(stderr)
else: result.traceStream = openFileStream(path, fmWrite)
when traceSyndicate:
proc trace(actor; act: ActorActivation) =
if not actor.traceStream.isNil:
var entry = TraceEntry(
timestamp: getTime().toUnixFloat(),
actor: actor.id,
item: act,
)
actor.traceStream.writeLine($entry.toPreserves)
proc trace(actor; turn) =
if not actor.traceStream.isNil:
actor.trace(ActorActivation(
orKind: ActorActivationKind.turn,
turn: turn.desc,
))
proc traceTarget(cap): trace.Target =
let facet = cap.relay
Target(
actor: facet.actor.id,
facet: facet.id,
oid: cap.target.oid.toPreserves,
)
proc traceTarget(turn): trace.Target =
let facet = turn.facet
Target(
actor: facet.actor.id,
facet: facet.id,
)
proc newExternalTurn(facet): Turn =
result = Turn(facet: facet)
when traceSyndicate:
result.desc = TurnDescription(cause: TurnCause(orKind: TurnCauseKind.external))
proc pass*(a, b: Cont): Cont =
b.turn = move a.turn
b
proc queue(t: Turn; c: Cont) =
c.turn = t
t.work.addLast(c)
proc queue(c: Cont): Cont {.cpsMagic.} =
queue(c.turn, c)
nil
proc run(facet; work: var Work) =
while work.len > 0:
var c = work.popFirst()
let t = c.turn
try:
while not c.isNil and not c.fn.isNil:
c.turn = t
var y = c.fn
var x = y(c)
c = Cont(x)
except CatchableError as err:
if not c.dismissed:
writeStackFrames c
# terminate(facet, err)
proc run(turn) =
let
facet = turn.facet
actor = facet.actor
assert not actor.stopped
run(facet, turn.work)
when traceSyndicate:
actor.trace(turn)
if actor.stopped:
trace(actor, ActorActivation(orkind: ActorActivationKind.stop))
proc run(handlers: var Deque[Handler]) =
while handlers.len > 0:
var h = handlers.popLast()
h()
proc start(actor; cont: Cont) =
when traceSyndicate:
var act = ActorActivation(orkind: ActorActivationKind.start)
trace(actor, act)
let turn = actor.root.newExternalTurn()
turn.queue(cont)
run(turn)
proc stop(turn; actor)
proc collectPath(result: var seq[FacetId]; facet) =
if not facet.parent.isNil:
collectPath(result, facet.parent)
result.add(facet.id)
proc stop(turn; facet; reason: FacetStopReason) =
run(facet.stopHandlers)
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.facetstop)
collectPath(act.facetstop.path, facet)
act.facetStop.reason = reason
turn.desc.actions.add act
if facet.parent.isNil:
facet.actor.root = nil
stop(turn, facet.actor)
proc stop(turn; actor) =
if not actor.root.isNil:
stop(turn, actor.root, FacetStopReason.actorStopping)
actor.stopped = true
proc bootActor*(name: string, c: Cont) =
start(newActor(name), c)
template syndicate*(prc: typed): untyped =
cps(Cont, prc)
proc activeTurn*(c: Cont): Turn {.cpsVoodoo.} =
## Return the active `Turn` within a `{.syndicate.}` context.
assert not c.turn.isNil
c.turn
proc activeFacet*(c: Cont): Facet {.cpsVoodoo.} =
## Return the active `Facet` within a `{.syndicate.}` context.
assert not c.turn.isNil
c.turn.facet
proc activeActor*(c: Cont): Actor {.cpsVoodoo.} =
## Return the active `Actor` within a `{.syndicate.}` context.
assert not c.turn.isNil
c.turn.facet.actor
proc stopActor(c: Cont; a: Actor): Cont {.cpsMagic.} =
stop(c.turn, a)
nil
proc stopFacet(c: Cont; f: Facet): Cont {.cpsMagic.} =
stop(c.turn, f, FacetStopReason.explicitAction)
nil
proc stopFacet*() {.syndicate.} =
queue()
stop(activeTurn(), activeFacet(), FacetStopReason.explicitAction)
proc stopActor*() {.syndicate.} =
queue()
stop(activeTurn(), activeActor())
type
AssertionRef* = ref object
value*: Value
# if the Enity methods take a Value object then the generated
# C code has "redefinition of struct" problems when orc is enabled
method publish*(e: Entity; turn: Turn; v: AssertionRef; h: Handle) {.base.} = discard
method retract*(e: Entity; turn: Turn; h: Handle) {.base.} = discard
method message*(e: Entity; turn: Turn; v: AssertionRef) {.base.} = discard
method sync*(e: Entity; turn: Turn; peer: Cap) {.base.} = discard
proc newCap*(f: Facet; e: Entity): Cap =
Cap(relay: f, target: e)
proc nextHandle(facet: Facet): Handle =
inc(facet.actor.handleAllocator)
facet.actor.handleAllocator
proc publish*(turn: Turn; cap: Cap; val: Value): Handle =
result = turn.facet.nextHandle()
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event = TargetedTurnEvent(
target: cap.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.assert)
)
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
act.enqueue.event.detail.assert = TurnEventAssert(
assertion: AssertionDescription(orKind: AssertionDescriptionKind.value),
handle: result,
)
act.enqueue.event.detail.assert.assertion.value.value = val
turn.desc.actions.add act
proc retract*(turn; h: Handle) =
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event = TargetedTurnEvent(
target: turn.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.retract)
)
act.enqueue.event.detail.retract = TurnEventRetract(handle: h)
turn.desc.actions.add act
proc message*(turn; cap; val: Value) =
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event = TargetedTurnEvent(
target: turn.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.message)
)
act.enqueue.event.detail.message.body.value.value = val
turn.desc.actions.add act
proc sync*(turn; peer: Cap) =
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event = TargetedTurnEvent(
target: turn.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.sync)
)
act.enqueue.event.detail.sync.peer = peer.traceTarget
turn.desc.actions.add act
proc publish*(cap: Cap; val: Value): Handle {.syndicate.} =
publish(activeTurn(), cap, val)
proc retract*(h: Handle) {.syndicate.} =
activeTurn().retract(h)
proc message*(cap: Cap; val: Value) {.syndicate.} =
activeTurn().message(cap, val)
proc sync*(cap: Cap) {.syndicate.} =
activeTurn().sync(cap)
proc addStopHandler*(actor: Actor; h: Handler) =
actor.root.stopHandlers.addLast(h)