syndicate-nim/src/sam/actors.nim

678 lines
19 KiB
Nim

# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
# Use procedures to call into SAM,
# use continuations to call within SAM.
import std/[deques, hashes, options, sets, tables, times]
import pkg/cps
import preserves
import ./protocols/[protocol, sturdy]
# const traceSyndicate {.booldefine.}: bool = true
const traceSyndicate* = true
when traceSyndicate:
import std/streams
from std/os import getEnv
import ./protocols/trace
export protocol.Handle
type
Cont* = ref object of Continuation
turn: Turn
PublishProc* = proc (e: Entity; v: Value; h: Handle) {.cps: Cont.}
RetractProc* = proc (e: Entity; h: Handle) {.cps: Cont.}
MessageProc* = proc (e: Entity; v: Value) {.cps: Cont.}
SyncProc* = proc (e: Entity; peer: Cap) {.cps: Cont.}
Handler* = proc() {.closure.}
HandlerDeque = seq[ContinuationProc[Continuation]]
FacetState = enum fFresh, fRunning, fEnded
Callback* = proc () {.closure.}
OutboundTable = Table[Handle, OutboundAssertion]
OutboundAssertion = ref object
handle: Handle
peer: Cap
established: bool
Facet* = ref object
## https://synit.org/book/glossary.html#facet
actor: Actor
parent: Facet
children: seq[Facet]
outbound: OutboundTable
stopHandlers: HandlerDeque
stopCallbacks: seq[Callback]
state: FacetState
id: FacetId
FacetProc* = proc (f: Facet) {.closure.}
## Type for callbacks to be called within a turn.
## The `Facet` parameter is the owning facet.
Turn = ref object
## https://synit.org/book/glossary.html#turn
actor: Actor
work: Dequeue[Cont]
actions: seq[Cont]
event: Option[protocol.Event]
rollback: bool
when traceSyndicate:
desc: TurnDescription
Entity* = ref object of RootObj
## https://synit.org/book/glossary.html#entity
publishImpl: PublishProc
retractImpl: RetractProc
messageImpl: MessageProc
syncImpl: SyncProc
facet*: Facet
# This implementation associates Entities to
# Facets, which is not to be taken as a SAM
# axiom.
oid*: sturdy.Oid # oid is how Entities are identified over the wire
Cap* {.final, preservesEmbedded.} = ref object of EmbeddedObj
target*: Entity
attenuation*: seq[sturdy.Caveat]
Actor* = ref object
## https://synit.org/book/glossary.html#actor
# TODO: run on a seperate thread.
# crashHandlers: HandlerDeque
root: Facet
handleAllocator: Handle
facetIdAllocator: int
id: ActorId
when traceSyndicate:
traceStream: FileStream
stopped: bool
var turnQueue {.threadvar, used.}: Deque[Turn]
proc queueTurn(turn: sink Turn) =
turnQueue.addLast(turn)
proc turnsPending*(): bool =
turnQueue.len > 0
template turnWork*(prc: typed): untyped =
## Pragma to mark work that executes in a `Turn` context.
cps(Cont, prc)
proc activeTurn(c: Cont): Turn {.cpsVoodoo.} =
## Return the active `Turn` within a turn context.
assert not c.turn.isNil
c.turn
proc activeFacet(c: Cont): Facet {.cpsVoodoo.} =
## Return the active `Facet` within a turn context.
assert not c.turn.isNil
assert not c.turn.facet.isNil
c.turn.facet
using
actor: Actor
facet: Facet
entity: Entity
cap: Cap
turn: Turn
proc `$`*(facet): string = $facet.id
proc `$`*(cap): string = "#:" & $cast[uint](cap.unsafeAddr)
proc hash*(x: Actor|Facet|Cap): Hash = x.unsafeAddr.hash
proc relay*(cap): Facet =
assert not cap.target.facet.isNil
cap.target.facet
proc collectPath(result: var seq[FacetId]; facet) =
if not facet.parent.isNil:
collectPath(result, facet.parent)
result.add(facet.id)
proc stopped*(facet): bool = facet.state != fRunning
when traceSyndicate:
proc traceFlush(actor) =
if not actor.traceStream.isNil:
actor.traceStream.flush()
proc trace(actor; act: ActorActivation) =
if not actor.traceStream.isNil:
var entry = TraceEntry(
timestamp: getTime().toUnixFloat(),
actor: actor.id,
item: act,
)
actor.traceStream.writeLine($entry.toPreserves)
proc traceTurn(actor) =
if not actor.traceStream.isNil:
actor.trace(ActorActivation(
orKind: ActorActivationKind.turn,
turn: actor.turn.desc,
))
actor.traceFlush()
reset actor.turn.desc
proc traceTarget(facet): trace.Target =
Target(
actor: facet.actor.id,
facet: facet.id,
)
proc traceTarget(cap): trace.Target =
let facet = cap.relay
Target(
actor: facet.actor.id,
facet: facet.id,
oid: cap.target.oid.toPreserves,
)
proc traceEnqueue(actor; e: TargetedTurnEvent) =
actor.turn.desc.actions.add ActionDescription(
orKind: ActionDescriptionKind.enqueue,
enqueue: ActionDescriptionEnqueue(event: e),
)
proc traceDequeue(actor; e: TargetedTurnEvent) =
actor.turn.desc.actions.add ActionDescription(
orKind: ActionDescriptionKind.dequeue,
dequeue: ActionDescriptionDequeue(event: e),
)
proc pass*(a, b: Cont): Cont =
assert not a.facet.isNil
b.facet = a.facet
b
proc queueWork*(facet; c: Cont) =
c.facet = facet
facet.actor.turn.work.addLast(c)
proc queueAction*(facet; c: Cont) =
c.facet = facet
facet.actor.turn.actions.add(c)
proc yieldWork(c: Cont): Cont {.cpsMagic.} =
## Suspend and enqueue the caller until later in the turn.
assert not c.facet.isNil
c.facet.queueWork(c)
nil
proc yieldToActions(c: Cont): Cont {.cpsMagic.} =
assert not c.facet.isNil
c.facet.actor.turn.actions.add(c)
nil
proc terminate(actor; err: ref Exception) =
when traceSyndicate:
actor.traceTurn()
raise err
proc terminate(facet; err: ref Exception) =
terminate(facet.actor, err)
proc complete(c: Cont) =
var c = c
try:
while not c.isNil and not c.fn.isNil:
var y = c.fn
var x = y(c)
c = Cont(x)
except CatchableError as err:
c.facet.actor.turn.rollback = true
if not c.dismissed:
writeStackFrames c
terminate(c.facet, err)
proc run*(actor): bool =
if actor.stopped: return
try:
result = actor.turn.work.len > 0
while actor.turn.work.len > 0:
actor.turn.work.popFirst().complete()
var i: int
while i < actor.turn.actions.len:
complete(move actor.turn.actions[i])
inc i
actor.turn.actions.setLen(0)
when traceSyndicate:
actor.traceTurn()
if actor.stopped:
trace(actor, ActorActivation(orkind: ActorActivationKind.stop))
for child in actor.children:
result = result and run(child)
except Exception as err:
actor.terminate(err)
proc start(actor; cont: Cont) =
when traceSyndicate:
var act = ActorActivation(orkind: ActorActivationKind.start)
trace(actor, act)
actor.root.state = fRunning
actor.root.startExternalTurn()
actor.root.queueWork(cont)
proc stop*(actor)
proc runNextStop(c: Cont; facet: Facet): Cont {.cpsMagic.} =
c.fn = facet.stopHandlers.pop()
result = c
proc runNextFacetStop() {.cps: Cont.} =
activeFacet().runNextStop()
proc stop(facet; reason: FacetStopReason) =
let actor = facet.actor
while facet.stopHandlers.len > 0:
var c = whelp runNextFacetStop()
c.facet = facet
complete(c)
while facet.stopCallbacks.len > 0:
var cb = facet.stopCallbacks.pop()
cb()
while facet.children.len > 0:
stop(facet.children.pop(), FacetStopReason.parentStopping)
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.facetstop)
collectPath(act.facetstop.path, facet)
act.facetStop.reason = reason
actor.turn.desc.actions.add act
if facet.parent.isNil:
actor.root = nil
stop(actor)
proc stop*(actor) =
if not actor.root.isNil:
stop(actor.root, FacetStopReason.actorStopping)
actor.stopped = true
proc stopped*(actor): bool =
actor.`stopped`
# proc stopFacetAction(reason: FacetStopReason) {.syndicate.} =
# stop(c.facet, reason)
proc stopActorAction() {.cps: Cont.} =
yieldToActions()
activeFacet().actor.stop()
proc stopActor*(facet) =
let c = whelp stopActorAction()
c.facet = facet
facet.actor.turn.actions.add(c)
proc stopActor*(cap) =
## Stop an `Actor` from a `Cap`.
stopActor(cap.relay)
type
AssertionRef* = ref object
value*: Value
# if the Enity methods take a Value object then the generated
# C code has "redefinition of struct" problems when orc is enabled
proc newCap*(facet; entity): Cap =
doAssert entity.facet.isNil
entity.facet = facet
Cap(target: entity)
proc nextHandle(facet: Facet): Handle =
inc(facet.actor.handleAllocator)
facet.actor.handleAllocator
proc actor(cap): Actor = cap.relay.actor
type Bindings = Table[Value, Value]
proc attenuate(cap; att: seq[Caveat]): Cap =
if att.len == 0: cap
else: Cap(
target: cap.target,
attenuation: att & cap.attenuation)
proc match(bindings: var Bindings; p: Pattern; v: Value): bool =
case p.orKind
of PatternKind.Pdiscard: result = true
of PatternKind.Patom:
result = case p.patom
of PAtom.Boolean: v.isBoolean
of PAtom.Double: v.isDouble
of PAtom.Signedinteger: v.isInteger
of PAtom.String: v.isString
of PAtom.Bytestring: v.isByteString
of PAtom.Symbol: v.isSymbol
of PatternKind.Pembedded:
result = v.isEmbedded
of PatternKind.Pbind:
if match(bindings, p.pbind.pattern, v):
bindings[p.pbind.pattern.toPreserves] = v
result = true
of PatternKind.Pand:
for pp in p.pand.patterns:
result = match(bindings, pp, v)
if not result: break
of PatternKind.Pnot:
var b: Bindings
result = not match(b, p.pnot.pattern, v)
of PatternKind.Lit:
result = p.lit.value == v
of PatternKind.PCompound:
case p.pcompound.orKind
of PCompoundKind.rec:
if v.isRecord and
p.pcompound.rec.label == v.label and
p.pcompound.rec.fields.len == v.arity:
result = true
for i, pp in p.pcompound.rec.fields:
if not match(bindings, pp, v[i]):
result = false
break
of PCompoundKind.arr:
if v.isSequence and p.pcompound.arr.items.len == v.sequence.len:
result = true
for i, pp in p.pcompound.arr.items:
if not match(bindings, pp, v[i]):
result = false
break
of PCompoundKind.dict:
if v.isDictionary:
result = true
for key, pp in p.pcompound.dict.entries:
let vv = step(v, key)
if vv.isNone or not match(bindings, pp, get vv):
result = true
break
proc match(p: Pattern; v: Value): Option[Bindings] =
var b: Bindings
if match(b, p, v):
result = some b
proc instantiate(t: Template; bindings: Bindings): Value =
case t.orKind
of TemplateKind.Tattenuate:
let v = instantiate(t.tattenuate.template, bindings)
let cap = v.unembed(Cap)
if cap.isNone:
raise newException(ValueError, "Attempt to attenuate non-capability")
result = attenuate(get cap, t.tattenuate.attenuation).embed
of TemplateKind.TRef:
let n = $t.tref.binding.int
try: result = bindings[n.toPreserves]
except KeyError:
raise newException(ValueError, "unbound reference: " & n)
of TemplateKind.Lit:
result = t.lit.value
of TemplateKind.Tcompound:
case t.tcompound.orKind
of TCompoundKind.rec:
result = initRecord(t.tcompound.rec.label, t.tcompound.rec.fields.len)
for i, tt in t.tcompound.rec.fields:
result[i] = instantiate(tt, bindings)
of TCompoundKind.arr:
result = initSequence(t.tcompound.arr.items.len)
for i, tt in t.tcompound.arr.items:
result[i] = instantiate(tt, bindings)
of TCompoundKind.dict:
result = initDictionary()
for key, tt in t.tcompound.dict.entries:
result[key] = instantiate(tt, bindings)
proc rewrite(r: Rewrite; v: Value): Value =
let bindings = match(r.pattern, v)
if bindings.isSome:
result = instantiate(r.template, get bindings)
proc examineAlternatives(cav: Caveat; v: Value): Value =
case cav.orKind
of CaveatKind.Rewrite:
result = rewrite(cav.rewrite, v)
of CaveatKind.Alts:
for r in cav.alts.alternatives:
result = rewrite(r, v)
if not result.isFalse: break
of CaveatKind.Reject: discard
of CaveatKind.unknown: discard
proc runRewrites(v: Value; a: openarray[Caveat]): Value =
result = v
for stage in a:
result = examineAlternatives(stage, result)
if result.isFalse: break
proc setActions*(entity;
publish = PublishProc();
retract = RetractProc();
message = MessageProc();
sync = SyncProc();
): Entity {.discardable} =
## Set the action handlers for an `Entity`.
result = entity
result.publishImpl = publish
result.retractImpl = retract
result.messageImpl = message
result.syncImpl = sync
proc publish(c: Cont; e: Entity; v: Value; h: Handle): Cont {.cpsMagic.} =
if not e.publishImpl.fn.isNil:
result = pass(c, e.publishImpl.call(e, v, h))
proc retract(c: Cont; e: Entity; h: Handle): Cont {.cpsMagic.} =
if not e.retractImpl.fn.isNil:
result = pass(c, e.retractImpl.call(e, h))
proc message(c: Cont; e: Entity; v: Value): Cont {.cpsMagic.} =
if not e.messageImpl.fn.isNil:
result = pass(c, e.messageImpl.call(e, v))
proc sync(c: Cont; e: Entity; p: Cap): Cont {.cpsMagic.} =
if not e.syncImpl.fn.isNil:
result = pass(c, e.syncImpl.call(e, p))
proc turnPublish(cap: Cap; val: Value; h: Handle) {.turnWork.} =
when traceSyndicate:
var traceEvent = TargetedTurnEvent(
target: cap.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.assert)
)
traceEvent.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
traceEvent.detail.assert = TurnEventAssert(
assertion: AssertionDescription(orKind: AssertionDescriptionKind.value),
handle: h,
)
traceEvent.detail.assert.assertion.value.value = val
cap.actor.traceEnqueue(traceEvent)
cap.relay.outbound[h] = OutboundAssertion(handle: h, peer: cap)
yieldToActions()
if activeTurn().rollback:
cap.relay.outbound.del(h)
else:
when traceSyndicate:
cap.actor.traceDequeue(traceEvent)
cap.target.publish(val, h)
cap.relay.outbound[h].established = true
proc turnRetract(cap: Cap; h: Handle) {.turnWork.} =
when traceSyndicate:
var traceEvent = TargetedTurnEvent(
target: cap.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.retract)
)
traceEvent.detail.retract.handle = h
cap.actor.traceEnqueue(traceEvent)
yieldToActions()
if not activeTurn().rollback:
when traceSyndicate:
cap.actor.traceDequeue(traceEvent)
var e: OutboundAssertion
if cap.relay.outbound.pop(h, e):
cap.target.retract(h)
proc turnMessage(cap: Cap; val: Value) {.turnWork.} =
var val = runRewrites(val, cap.attenuation)
when traceSyndicate:
var traceEvent = TargetedTurnEvent(
target: cap.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.message)
)
traceEvent.detail.message.body.value.value = val
cap.actor.traceEnqueue(traceEvent)
yieldToActions()
if not activeTurn().rollback:
when traceSyndicate:
cap.actor.traceDequeue(traceEvent)
cap.target.message(val)
proc turnSync(cap: Cap; peer: Cap) {.turnWork.} =
when traceSyndicate:
var traceEvent = TargetedTurnEvent(
target: cap.traceTarget,
detail: trace.TurnEvent(orKind: trace.TurnEventKind.sync)
)
traceEvent.detail.sync.peer = peer.traceTarget
cap.actor.traceEnqueue(traceEvent)
yieldToActions()
if not activeTurn().rollback:
when traceSyndicate:
cap.actor.traceDequeue(traceEvent)
cap.target.sync(peer)
proc publish*(cap; val: Value): Handle =
## Publish a `Value` to a `Cap` returning `Handle`
## for later retraction.
# The publish action on an Entity always goes through
# here first.
var val = runRewrites(val, cap.attenuation)
# TODO: attenuation to nothing?
result = cap.relay.nextHandle()
cap.relay.queueWork(whelp turnPublish(cap, val, result))
proc publish*[T](cap; x: T): Handle =
## Publish Preserves-convertable value to a `Cap`
## returning `Handle` for later retraction.
publish(cap, x.toPreserves)
proc retract*(cap; h: Handle) =
## Retract a `Handle` from a `Cap`.
cap.relay.queueWork(whelp turnRetract(cap, h))
proc message*(cap; val: Value) =
var val = runRewrites(val, cap.attenuation)
cap.relay.queueWork(whelp turnMessage(cap, val))
proc message*[T](cap; x: T) =
message(cap, x.toPreserves)
proc sync*(cap, peer: Cap) =
cap.relay.queueWork(whelp turnSync(cap, peer))
proc installStopHook(c: Cont, facet: Facet): Cont {.cpsMagic.} =
facet.stopHandlers.add(c.fn)
return c
proc addOnStopHandler(c: Cont; cb: Callback): Cont {.cpsMagic.} =
c.facet.stopCallbacks.add(cb)
result = c
proc onStop*(facet; cb: Callback) =
facet.stopCallbacks.add(cb)
proc facetCall(prc: FacetProc; f: Facet) {.cps: Cont.} =
prc(f)
proc facetCall(prc: FacetProc) {.cps: Cont.} =
prc(activeFacet())
proc workCall(cb: Callback) {.cps: Cont.} =
cb()
proc god(facet): Actor =
## Return the parent of all actors associated with `facet`.
var facet = facet
while not facet.parent.isNil:
facet = facet.parent
facet.actor
proc newExternalTurn(facet; desc: Value) =
result = Turn(facet: facet)
let actor = facet.actor
when traceSyndicate:
result.desc.cause = TurnCause(orKind: TurnCauseKind.external))
result.desc.cause.external description = desc
proc runExternalTurn*(facet: Facet; cb: proc ()) =
echo "startExternalTurn"
startExternalTurn(facet)
facet.queueWork(whelp workCall(cb))
while run(facet.actor): discard
echo "runExternalTurn finished"
assert facet.actor.turn.work.len == 0
assert facet.actor.turn.actions.len == 0
proc newFacet(turn: Turn; actor; parent: Facet): Facet =
inc(actor.facetIdAllocator)
result = Facet(
actor: actor,
parent: parent,
id: actor.facetIdAllocator.toPreserves,
)
if not parent.isNil:
parent.children.add result
when traceSyndicate:
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
collectPath(act.facetstart.path, result)
actor.turn.desc.actions.add act
proc newActor(parent: Facet; name: string): Actor =
result = Actor(id: name.toPreserves)
result.root = newFacet(result, parent)
when traceSyndicate:
if parent.isNil:
let path = getEnv("SYNDICATE_TRACE_FILE", "")
case path
of "": discard
of "-": result.traceStream = newFileStream(stderr)
else: result.traceStream = openFileStream(path, fmWrite)
else:
result.traceStream = parent.actor.traceStream
proc spawnActor*(parent: Facet; name: string; bootProc: FacetProc): Actor {.discardable.} =
## Spawn a new `Actor` at `parent`.
result = newActor(parent, name)
var turn = newExternalTurn(result.root, "bootActor".toPreserves)
turn.queueWork(whelp facetCall(bootProc))
queueTurn(turn)
proc bootActor*(name: string; bootProc: FacetProc): Actor =
## Boot a new `Actor`.
result = newActor(nil, name)
var turn = newExternalTurn(result.root, "bootActor".toPreserves)
turn.queueWork(whelp facetCall(bootProc))
queueTurn(turn)
proc run(turn: sink Turn) =
try:
while turn.work.len > 0:
discard trampoline turn.work.pop()
except CatchableError as err:
terminate turn.actor(err)
return
var i: int
while i < turn.actions.len:
discard trampoline turn.actions[i]
proc runTurn*() =
## Run one turn.
turnQueue.pop().run()