553 lines
16 KiB
Nim
553 lines
16 KiB
Nim
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
|
# SPDX-License-Identifier: Unlicense
|
|
|
|
import std/[deques, hashes, options, tables, times]
|
|
import pkg/cps
|
|
import preserves
|
|
import ./protocols/[protocol, sturdy]
|
|
|
|
# const traceSyndicate {.booldefine.}: bool = true
|
|
const traceSyndicate* = true
|
|
|
|
when traceSyndicate:
|
|
import std/streams
|
|
from std/os import getEnv
|
|
import ./protocols/trace
|
|
|
|
export protocol.Handle
|
|
|
|
type
|
|
Cont* = ref object of Continuation
|
|
facet: Facet
|
|
|
|
PublishProc* = proc (e: Entity; v: Value; h: Handle) {.cps: Cont.}
|
|
RetractProc* = proc (e: Entity; h: Handle) {.cps: Cont.}
|
|
MessageProc* = proc (e: Entity; v: Value) {.cps: Cont.}
|
|
SyncProc* = proc (e: Entity; peer: Cap) {.cps: Cont.}
|
|
|
|
Handler* = proc() {.closure.}
|
|
|
|
Work = Deque[Cont]
|
|
HandlerDeque = seq[ContinuationProc[Continuation]]
|
|
|
|
FacetState = enum fFresh, fRunning, fEnded
|
|
|
|
Callback* = proc () {.closure.}
|
|
|
|
OutboundTable = Table[Handle, OutboundAssertion]
|
|
OutboundAssertion = ref object
|
|
handle: Handle
|
|
peer: Cap
|
|
established: bool
|
|
|
|
Facet* = ref object
|
|
## https://synit.org/book/glossary.html#facet
|
|
actor: Actor
|
|
parent: Facet
|
|
children: seq[Facet]
|
|
outbound: OutboundTable
|
|
stopHandlers: HandlerDeque
|
|
stopCallbacks: seq[Callback]
|
|
state: FacetState
|
|
id: FacetId
|
|
|
|
FacetProc* = proc (f: Facet) {.closure.}
|
|
## Type for callbacks to be called within a turn.
|
|
## The `Facet` parameter is the owning facet.
|
|
|
|
Turn {.byref.} = object
|
|
## https://synit.org/book/glossary.html#turn
|
|
facet: Facet
|
|
entity: Entity
|
|
work: Work
|
|
actions: seq[Cont]
|
|
event: Option[protocol.Event]
|
|
when traceSyndicate:
|
|
desc: TurnDescription
|
|
|
|
Entity* = ref object of RootObj
|
|
## https://synit.org/book/glossary.html#entity
|
|
oid*: sturdy.Oid # oid is how Entities are identified over the wire
|
|
publishImpl*: PublishProc
|
|
retractImpl*: RetractProc
|
|
messageImpl*: MessageProc
|
|
syncImpl*: SyncProc
|
|
|
|
Cap* {.final, preservesEmbedded.} = ref object of EmbeddedObj
|
|
relay*: Facet
|
|
target*: Entity
|
|
attenuation*: seq[sturdy.Caveat]
|
|
|
|
Actor* = ref object
|
|
## https://synit.org/book/glossary.html#actor
|
|
# TODO: run on a seperate thread.
|
|
# crashHandlers: HandlerDeque
|
|
root: Facet
|
|
handleAllocator: Handle
|
|
facetIdAllocator: int
|
|
id: ActorId
|
|
turn: Turn
|
|
when traceSyndicate:
|
|
traceStream: FileStream
|
|
stopped: bool
|
|
|
|
template turnWork*(prc: typed): untyped =
|
|
cps(Cont, prc)
|
|
|
|
proc activeFacet*(c: Cont): Facet {.cpsVoodoo.} =
|
|
## Return the active `Facet` within a `{.syndicate.}` context.
|
|
assert not c.facet.isNil
|
|
c.facet
|
|
|
|
using
|
|
actor: Actor
|
|
facet: Facet
|
|
entity: Entity
|
|
cap: Cap
|
|
turn: Turn
|
|
|
|
proc `$`*(facet): string = $facet.id
|
|
proc `$`*(cap): string = "#:…"
|
|
|
|
proc hash*(facet): Hash = facet.unsafeAddr.hash
|
|
proc hash*(cap): Hash = cap.unsafeAddr.hash
|
|
|
|
proc newFacet(actor: Actor; parent: Facet): Facet =
|
|
inc(actor.facetIdAllocator)
|
|
result = Facet(
|
|
actor: actor,
|
|
parent: parent,
|
|
id: actor.facetIdAllocator.toPreserves,
|
|
)
|
|
|
|
proc stopped*(facet): bool = facet.state != fRunning
|
|
|
|
proc newActor(name: string): Actor =
|
|
result = Actor(id: name.toPreserves)
|
|
result.root = newFacet(result, nil)
|
|
when traceSyndicate:
|
|
let path = getEnv("SYNDICATE_TRACE_FILE", "")
|
|
case path
|
|
of "": discard
|
|
of "-": result.traceStream = newFileStream(stderr)
|
|
else: result.traceStream = openFileStream(path, fmWrite)
|
|
|
|
when traceSyndicate:
|
|
proc trace(actor; act: ActorActivation) =
|
|
if not actor.traceStream.isNil:
|
|
var entry = TraceEntry(
|
|
timestamp: getTime().toUnixFloat(),
|
|
actor: actor.id,
|
|
item: act,
|
|
)
|
|
actor.traceStream.writeLine($entry.toPreserves)
|
|
|
|
proc traceTurn(actor) =
|
|
if not actor.traceStream.isNil:
|
|
actor.trace(ActorActivation(
|
|
orKind: ActorActivationKind.turn,
|
|
turn: actor.turn.desc,
|
|
))
|
|
|
|
proc traceTarget(facet): trace.Target =
|
|
Target(
|
|
actor: facet.actor.id,
|
|
facet: facet.id,
|
|
)
|
|
|
|
proc traceTarget(cap): trace.Target =
|
|
let facet = cap.relay
|
|
Target(
|
|
actor: facet.actor.id,
|
|
facet: facet.id,
|
|
oid: cap.target.oid.toPreserves,
|
|
)
|
|
|
|
proc traceEnqueue(actor; e: TargetedTurnEvent) =
|
|
actor.turn.desc.actions.add ActionDescription(
|
|
orKind: ActionDescriptionKind.enqueue,
|
|
enqueue: ActionDescriptionEnqueue(event: e),
|
|
)
|
|
|
|
proc traceDequeue(actor; e: TargetedTurnEvent) =
|
|
actor.turn.desc.actions.add ActionDescription(
|
|
orKind: ActionDescriptionKind.dequeue,
|
|
dequeue: ActionDescriptionDequeue(event: e),
|
|
)
|
|
|
|
proc pass*(a, b: Cont): Cont =
|
|
assert not a.facet.isNil
|
|
b.facet = a.facet
|
|
b
|
|
|
|
proc queueWork(facet; c: Cont) =
|
|
c.facet = facet
|
|
facet.actor.turn.work.addLast(c)
|
|
|
|
proc yieldWork(c: Cont): Cont {.cpsMagic.} =
|
|
## Suspend and enqueue the caller until later in the turn.
|
|
assert not c.facet.isNil
|
|
c.facet.queueWork(c)
|
|
nil
|
|
|
|
proc yieldToActions(c: Cont): Cont {.cpsMagic.} =
|
|
assert not c.facet.isNil
|
|
c.facet.actor.turn.actions.add(c)
|
|
nil
|
|
|
|
proc startExternalTurn(facet) =
|
|
let actor = facet.actor
|
|
assert actor.turn.work.len == 0
|
|
assert actor.turn.actions.len == 0
|
|
actor.turn.facet = facet
|
|
when traceSyndicate:
|
|
actor.turn.desc = TurnDescription(cause: TurnCause(orKind: TurnCauseKind.external))
|
|
|
|
proc terminate(actor; err: ref Exception) =
|
|
raise err
|
|
|
|
proc terminate(facet; err: ref Exception) =
|
|
terminate(facet.actor, err)
|
|
|
|
proc complete(c: Cont) =
|
|
var c = c
|
|
try:
|
|
while not c.isNil and not c.fn.isNil:
|
|
var y = c.fn
|
|
var x = y(c)
|
|
c = Cont(x)
|
|
except CatchableError as err:
|
|
if not c.dismissed:
|
|
writeStackFrames c
|
|
terminate(c.facet, err)
|
|
|
|
proc run(actor) =
|
|
assert not actor.stopped
|
|
var n = 0
|
|
while actor.turn.work.len > 0:
|
|
actor.turn.work.popFirst().complete()
|
|
inc n
|
|
echo n, " items completed from work queue"
|
|
when traceSyndicate:
|
|
actor.traceTurn()
|
|
var i: int
|
|
while i < actor.turn.actions.len:
|
|
complete(move actor.turn.actions[i])
|
|
inc i
|
|
echo i, " items completed from action queue"
|
|
turn.actions.setLen(0)
|
|
when traceSyndicate:
|
|
if actor.stopped:
|
|
trace(actor, ActorActivation(orkind: ActorActivationKind.stop))
|
|
|
|
proc start(actor; cont: Cont) =
|
|
when traceSyndicate:
|
|
var act = ActorActivation(orkind: ActorActivationKind.start)
|
|
trace(actor, act)
|
|
actor.root.state = fRunning
|
|
actor.root.startExternalTurn()
|
|
actor.root.queueWork(cont)
|
|
run(actor)
|
|
|
|
proc stop(actor)
|
|
|
|
proc collectPath(result: var seq[FacetId]; facet) =
|
|
if not facet.parent.isNil:
|
|
collectPath(result, facet.parent)
|
|
result.add(facet.id)
|
|
|
|
proc runNextStop(c: Cont; facet: Facet): Cont {.cpsMagic.} =
|
|
c.fn = facet.stopHandlers.pop()
|
|
result = c
|
|
|
|
proc runNextFacetStop() {.cps: Cont.} =
|
|
activeFacet().runNextStop()
|
|
|
|
proc stop(facet; reason: FacetStopReason) =
|
|
let actor = facet.actor
|
|
while facet.stopHandlers.len > 0:
|
|
var c = whelp runNextFacetStop()
|
|
c.facet = facet
|
|
complete(c)
|
|
while facet.stopCallbacks.len > 0:
|
|
var cb = facet.stopCallbacks.pop()
|
|
cb()
|
|
while facet.children.len > 0:
|
|
stop(facet.children.pop(), FacetStopReason.parentStopping)
|
|
when traceSyndicate:
|
|
var act = ActionDescription(orKind: ActionDescriptionKind.facetstop)
|
|
collectPath(act.facetstop.path, facet)
|
|
act.facetStop.reason = reason
|
|
actor.turn.desc.actions.add act
|
|
if facet.parent.isNil:
|
|
actor.root = nil
|
|
stop(actor)
|
|
|
|
proc stop(actor) =
|
|
if not actor.root.isNil:
|
|
stop(actor.root, FacetStopReason.actorStopping)
|
|
actor.stopped = true
|
|
|
|
proc bootActor(name: string, c: Cont) =
|
|
start(newActor(name), c)
|
|
|
|
# proc stopFacetAction(reason: FacetStopReason) {.syndicate.} =
|
|
# stop(c.facet, reason)
|
|
|
|
proc stopActorAction() {.cps: Cont.} =
|
|
activeFacet().actor.stop()
|
|
|
|
proc stopActor*(facet) =
|
|
let c = whelp stopActorAction()
|
|
c.facet = facet
|
|
facet.actor.turn.actions.add(c)
|
|
|
|
type
|
|
AssertionRef* = ref object
|
|
value*: Value
|
|
# if the Enity methods take a Value object then the generated
|
|
# C code has "redefinition of struct" problems when orc is enabled
|
|
|
|
proc newCap*(f: Facet; e: Entity): Cap =
|
|
Cap(relay: f, target: e)
|
|
|
|
proc nextHandle(facet: Facet): Handle =
|
|
inc(facet.actor.handleAllocator)
|
|
facet.actor.handleAllocator
|
|
|
|
proc actor(cap): Actor = cap.relay.actor
|
|
|
|
type Bindings = Table[Value, Value]
|
|
|
|
proc attenuate(r: Cap; a: seq[Caveat]): Cap =
|
|
if a.len == 0: result = r
|
|
else: result = Cap(
|
|
relay: r.relay,
|
|
target: r.target,
|
|
attenuation: a & r.attenuation)
|
|
|
|
proc match(bindings: var Bindings; p: Pattern; v: Value): bool =
|
|
case p.orKind
|
|
of PatternKind.Pdiscard: result = true
|
|
of PatternKind.Patom:
|
|
result = case p.patom
|
|
of PAtom.Boolean: v.isBoolean
|
|
of PAtom.Double: v.isDouble
|
|
of PAtom.Signedinteger: v.isInteger
|
|
of PAtom.String: v.isString
|
|
of PAtom.Bytestring: v.isByteString
|
|
of PAtom.Symbol: v.isSymbol
|
|
of PatternKind.Pembedded:
|
|
result = v.isEmbedded
|
|
of PatternKind.Pbind:
|
|
if match(bindings, p.pbind.pattern, v):
|
|
bindings[p.pbind.pattern.toPreserves] = v
|
|
result = true
|
|
of PatternKind.Pand:
|
|
for pp in p.pand.patterns:
|
|
result = match(bindings, pp, v)
|
|
if not result: break
|
|
of PatternKind.Pnot:
|
|
var b: Bindings
|
|
result = not match(b, p.pnot.pattern, v)
|
|
of PatternKind.Lit:
|
|
result = p.lit.value == v
|
|
of PatternKind.PCompound:
|
|
case p.pcompound.orKind
|
|
of PCompoundKind.rec:
|
|
if v.isRecord and
|
|
p.pcompound.rec.label == v.label and
|
|
p.pcompound.rec.fields.len == v.arity:
|
|
result = true
|
|
for i, pp in p.pcompound.rec.fields:
|
|
if not match(bindings, pp, v[i]):
|
|
result = false
|
|
break
|
|
of PCompoundKind.arr:
|
|
if v.isSequence and p.pcompound.arr.items.len == v.sequence.len:
|
|
result = true
|
|
for i, pp in p.pcompound.arr.items:
|
|
if not match(bindings, pp, v[i]):
|
|
result = false
|
|
break
|
|
of PCompoundKind.dict:
|
|
if v.isDictionary:
|
|
result = true
|
|
for key, pp in p.pcompound.dict.entries:
|
|
let vv = step(v, key)
|
|
if vv.isNone or not match(bindings, pp, get vv):
|
|
result = true
|
|
break
|
|
|
|
proc match(p: Pattern; v: Value): Option[Bindings] =
|
|
var b: Bindings
|
|
if match(b, p, v):
|
|
result = some b
|
|
|
|
proc instantiate(t: Template; bindings: Bindings): Value =
|
|
case t.orKind
|
|
of TemplateKind.Tattenuate:
|
|
let v = instantiate(t.tattenuate.template, bindings)
|
|
let cap = v.unembed(Cap)
|
|
if cap.isNone:
|
|
raise newException(ValueError, "Attempt to attenuate non-capability")
|
|
result = attenuate(get cap, t.tattenuate.attenuation).embed
|
|
of TemplateKind.TRef:
|
|
let n = $t.tref.binding.int
|
|
try: result = bindings[n.toPreserves]
|
|
except KeyError:
|
|
raise newException(ValueError, "unbound reference: " & n)
|
|
of TemplateKind.Lit:
|
|
result = t.lit.value
|
|
of TemplateKind.Tcompound:
|
|
case t.tcompound.orKind
|
|
of TCompoundKind.rec:
|
|
result = initRecord(t.tcompound.rec.label, t.tcompound.rec.fields.len)
|
|
for i, tt in t.tcompound.rec.fields:
|
|
result[i] = instantiate(tt, bindings)
|
|
of TCompoundKind.arr:
|
|
result = initSequence(t.tcompound.arr.items.len)
|
|
for i, tt in t.tcompound.arr.items:
|
|
result[i] = instantiate(tt, bindings)
|
|
of TCompoundKind.dict:
|
|
result = initDictionary()
|
|
for key, tt in t.tcompound.dict.entries:
|
|
result[key] = instantiate(tt, bindings)
|
|
|
|
proc rewrite(r: Rewrite; v: Value): Value =
|
|
let bindings = match(r.pattern, v)
|
|
if bindings.isSome:
|
|
result = instantiate(r.template, get bindings)
|
|
|
|
proc examineAlternatives(cav: Caveat; v: Value): Value =
|
|
case cav.orKind
|
|
of CaveatKind.Rewrite:
|
|
result = rewrite(cav.rewrite, v)
|
|
of CaveatKind.Alts:
|
|
for r in cav.alts.alternatives:
|
|
result = rewrite(r, v)
|
|
if not result.isFalse: break
|
|
of CaveatKind.Reject: discard
|
|
of CaveatKind.unknown: discard
|
|
|
|
proc runRewrites(v: Value; a: openarray[Caveat]): Value =
|
|
result = v
|
|
for stage in a:
|
|
result = examineAlternatives(stage, result)
|
|
if result.isFalse: break
|
|
|
|
proc publish(c: Cont; e: Entity; v: Value; h: Handle): Cont {.cpsMagic.} =
|
|
pass c, e.publishImpl.call(e, v, h)
|
|
|
|
proc retract(c: Cont; e: Entity; h: Handle): Cont {.cpsMagic.} =
|
|
pass c, e.retractImpl.call(e, h)
|
|
|
|
proc message(c: Cont; e: Entity; v: Value): Cont {.cpsMagic.} =
|
|
pass c, e.messageImpl.call(e, v)
|
|
|
|
proc sync(c: Cont; e: Entity; p: Cap): Cont {.cpsMagic.} =
|
|
pass c, e.syncImpl.call(e, p)
|
|
|
|
proc turnPublish(cap: Cap; val: Value; h: Handle) {.turnWork.} =
|
|
when traceSyndicate:
|
|
var traceEvent = TargetedTurnEvent(
|
|
target: cap.traceTarget,
|
|
detail: trace.TurnEvent(orKind: trace.TurnEventKind.assert)
|
|
)
|
|
traceEvent.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
|
|
traceEvent.detail.assert = TurnEventAssert(
|
|
assertion: AssertionDescription(orKind: AssertionDescriptionKind.value),
|
|
handle: h,
|
|
)
|
|
traceEvent.detail.assert.assertion.value.value = val
|
|
cap.actor.traceEnqueue(traceEvent)
|
|
cap.relay.outbound[h] = OutboundAssertion(handle: h, peer: cap)
|
|
yieldToActions()
|
|
when traceSyndicate:
|
|
cap.actor.traceDequeue(traceEvent)
|
|
cap.target.publish(val, h)
|
|
cap.relay.outbound[h].established = true
|
|
|
|
proc turnRetract(cap: Cap; h: Handle) {.turnWork.} =
|
|
when traceSyndicate:
|
|
var traceEvent = TargetedTurnEvent(
|
|
target: cap.traceTarget,
|
|
detail: trace.TurnEvent(orKind: trace.TurnEventKind.retract)
|
|
)
|
|
traceEvent.detail.retract.handle = h
|
|
cap.actor.traceEnqueue(traceEvent)
|
|
yieldToActions()
|
|
when traceSyndicate:
|
|
cap.actor.traceDequeue(traceEvent)
|
|
cap.target.retract(h)
|
|
|
|
proc turnMessage(cap: Cap; val: Value) {.turnWork.} =
|
|
var val = runRewrites(val, cap.attenuation)
|
|
when traceSyndicate:
|
|
var traceEvent = TargetedTurnEvent(
|
|
target: cap.traceTarget,
|
|
detail: trace.TurnEvent(orKind: trace.TurnEventKind.message)
|
|
)
|
|
traceEvent.detail.message.body.value.value = val
|
|
cap.actor.traceEnqueue(traceEvent)
|
|
yieldToActions()
|
|
when traceSyndicate:
|
|
cap.actor.traceDequeue(traceEvent)
|
|
cap.target.message(val)
|
|
|
|
proc turnSync(cap: Cap; peer: Cap) {.turnWork.} =
|
|
when traceSyndicate:
|
|
var traceEvent = TargetedTurnEvent(
|
|
target: cap.traceTarget,
|
|
detail: trace.TurnEvent(orKind: trace.TurnEventKind.sync)
|
|
)
|
|
traceEvent.detail.sync.peer = peer.traceTarget
|
|
cap.actor.traceEnqueue(traceEvent)
|
|
yieldToActions()
|
|
when traceSyndicate:
|
|
cap.actor.traceDequeue(traceEvent)
|
|
cap.target.sync(peer)
|
|
|
|
proc publish*(cap; val: Value): Handle =
|
|
var val = runRewrites(val, cap.attenuation)
|
|
# TODO: attenuation to nothing?
|
|
result = cap.relay.nextHandle()
|
|
cap.relay.queueWork(whelp turnPublish(cap, val, result))
|
|
|
|
proc publish*[T](cap; x: T): Handle =
|
|
publish(cap, x.toPreserves)
|
|
|
|
proc retract*(cap; h: Handle) =
|
|
cap.relay.queueWork(whelp turnRetract(cap, h))
|
|
|
|
proc message*(cap; val: Value) =
|
|
var val = runRewrites(val, cap.attenuation)
|
|
cap.relay.queueWork(whelp turnMessage(cap, val))
|
|
|
|
proc message*[T](cap; x: T) =
|
|
message(cap, x.toPreserves)
|
|
|
|
proc sync*(cap, peer: Cap) =
|
|
cap.relay.queueWork(whelp turnSync(cap, peer))
|
|
|
|
proc installStopHook(c: Cont, facet: Facet): Cont {.cpsMagic.} =
|
|
facet.stopHandlers.add(c.fn)
|
|
return c
|
|
|
|
proc addOnStopHandler(c: Cont; cb: Callback): Cont {.cpsMagic.} =
|
|
c.facet.stopCallbacks.add(cb)
|
|
result = c
|
|
|
|
proc onStop*(facet; cb: proc () {.closure.}) =
|
|
facet.stopCallbacks.add(cb)
|
|
|
|
proc bootActor*(name: string; bootProc: FacetProc): Actor =
|
|
result = newActor(name)
|
|
result.root.startExternalTurn()
|
|
bootProc(result.root)
|
|
|
|
proc runActor*(name: string; bootProc: FacetProc) =
|
|
let actor = bootActor(name, bootProc)
|
|
while not actor.stopped:
|
|
run(actor)
|