Compare commits
5 Commits
15d2e8bfb4
...
75cd3f32ca
Author | SHA1 | Date |
---|---|---|
Emery Hemingway | 75cd3f32ca | |
Emery Hemingway | 7eb9537df1 | |
Emery Hemingway | 20d7420b81 | |
Emery Hemingway | dc4bf22675 | |
Emery Hemingway | e217a9378d |
|
@ -54,7 +54,7 @@ The Syndicate DSL can be entered using `runActor` which calls a Nim body with a
|
|||
### Publish
|
||||
|
||||
``` nim
|
||||
runActor("main") do (dataspace: Ref; turn: var Turn):
|
||||
runActor("main") do (dataspace: Ref; turn: Turn):
|
||||
let presenceHandle = publish(turn, dataspace, Present(username: "Judy"))
|
||||
# publish <Present "Judy"> to the dataspace
|
||||
# the assertion can be later retracted by handle
|
||||
|
@ -67,7 +67,7 @@ runActor("main") do (dataspace: Ref; turn: var Turn):
|
|||
We can react to assertions and messages within dataspaces using [patterns](https://synit.org/book/glossary.html#dataspace-pattern). Patterns are constructed using a Nim type and the `?` operator. Again a Nim type is used rather than a raw Preserves for schema consistency.
|
||||
|
||||
``` nim
|
||||
runActor("main") do (dataspace: Ref; turn: var Turn):
|
||||
runActor("main") do (dataspace: Ref; turn: Turn):
|
||||
during(turn, dataspace, ?Present) do (who: string):
|
||||
# This body is active when the ?Present pattern is matched.
|
||||
# The Present type contains two atomic values that can be matched
|
||||
|
|
|
@ -0,0 +1,677 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
# Use procedures to call into SAM,
|
||||
# use continuations to call within SAM.
|
||||
|
||||
import std/[deques, hashes, options, sets, tables, times]
|
||||
import pkg/cps
|
||||
import preserves
|
||||
import ./protocols/[protocol, sturdy]
|
||||
|
||||
# const traceSyndicate {.booldefine.}: bool = true
|
||||
const traceSyndicate* = true
|
||||
|
||||
when traceSyndicate:
|
||||
import std/streams
|
||||
from std/os import getEnv
|
||||
import ./protocols/trace
|
||||
|
||||
export protocol.Handle
|
||||
|
||||
type
|
||||
Cont* = ref object of Continuation
|
||||
turn: Turn
|
||||
|
||||
PublishProc* = proc (e: Entity; v: Value; h: Handle) {.cps: Cont.}
|
||||
RetractProc* = proc (e: Entity; h: Handle) {.cps: Cont.}
|
||||
MessageProc* = proc (e: Entity; v: Value) {.cps: Cont.}
|
||||
SyncProc* = proc (e: Entity; peer: Cap) {.cps: Cont.}
|
||||
|
||||
Handler* = proc() {.closure.}
|
||||
|
||||
HandlerDeque = seq[ContinuationProc[Continuation]]
|
||||
|
||||
FacetState = enum fFresh, fRunning, fEnded
|
||||
|
||||
Callback* = proc () {.closure.}
|
||||
|
||||
OutboundTable = Table[Handle, OutboundAssertion]
|
||||
OutboundAssertion = ref object
|
||||
handle: Handle
|
||||
peer: Cap
|
||||
established: bool
|
||||
|
||||
Facet* = ref object
|
||||
## https://synit.org/book/glossary.html#facet
|
||||
actor: Actor
|
||||
parent: Facet
|
||||
children: seq[Facet]
|
||||
outbound: OutboundTable
|
||||
stopHandlers: HandlerDeque
|
||||
stopCallbacks: seq[Callback]
|
||||
state: FacetState
|
||||
id: FacetId
|
||||
|
||||
FacetProc* = proc (f: Facet) {.closure.}
|
||||
## Type for callbacks to be called within a turn.
|
||||
## The `Facet` parameter is the owning facet.
|
||||
|
||||
Turn = ref object
|
||||
## https://synit.org/book/glossary.html#turn
|
||||
actor: Actor
|
||||
work: Dequeue[Cont]
|
||||
actions: seq[Cont]
|
||||
event: Option[protocol.Event]
|
||||
rollback: bool
|
||||
when traceSyndicate:
|
||||
desc: TurnDescription
|
||||
|
||||
Entity* = ref object of RootObj
|
||||
## https://synit.org/book/glossary.html#entity
|
||||
publishImpl: PublishProc
|
||||
retractImpl: RetractProc
|
||||
messageImpl: MessageProc
|
||||
syncImpl: SyncProc
|
||||
facet*: Facet
|
||||
# This implementation associates Entities to
|
||||
# Facets, which is not to be taken as a SAM
|
||||
# axiom.
|
||||
oid*: sturdy.Oid # oid is how Entities are identified over the wire
|
||||
|
||||
Cap* {.final, preservesEmbedded.} = ref object of EmbeddedObj
|
||||
target*: Entity
|
||||
attenuation*: seq[sturdy.Caveat]
|
||||
|
||||
Actor* = ref object
|
||||
## https://synit.org/book/glossary.html#actor
|
||||
# TODO: run on a seperate thread.
|
||||
# crashHandlers: HandlerDeque
|
||||
root: Facet
|
||||
handleAllocator: Handle
|
||||
facetIdAllocator: int
|
||||
id: ActorId
|
||||
when traceSyndicate:
|
||||
traceStream: FileStream
|
||||
stopped: bool
|
||||
|
||||
var turnQueue {.threadvar, used.}: Deque[Turn]
|
||||
|
||||
proc queueTurn(turn: sink Turn) =
|
||||
turnQueue.addLast(turn)
|
||||
|
||||
proc turnsPending*(): bool =
|
||||
turnQueue.len > 0
|
||||
|
||||
template turnWork*(prc: typed): untyped =
|
||||
## Pragma to mark work that executes in a `Turn` context.
|
||||
cps(Cont, prc)
|
||||
|
||||
proc activeTurn(c: Cont): Turn {.cpsVoodoo.} =
|
||||
## Return the active `Turn` within a turn context.
|
||||
assert not c.turn.isNil
|
||||
c.turn
|
||||
|
||||
proc activeFacet(c: Cont): Facet {.cpsVoodoo.} =
|
||||
## Return the active `Facet` within a turn context.
|
||||
assert not c.turn.isNil
|
||||
assert not c.turn.facet.isNil
|
||||
c.turn.facet
|
||||
|
||||
using
|
||||
actor: Actor
|
||||
facet: Facet
|
||||
entity: Entity
|
||||
cap: Cap
|
||||
turn: Turn
|
||||
|
||||
proc `$`*(facet): string = $facet.id
|
||||
proc `$`*(cap): string = "#:" & $cast[uint](cap.unsafeAddr)
|
||||
|
||||
proc hash*(x: Actor|Facet|Cap): Hash = x.unsafeAddr.hash
|
||||
|
||||
proc relay*(cap): Facet =
|
||||
assert not cap.target.facet.isNil
|
||||
cap.target.facet
|
||||
|
||||
proc collectPath(result: var seq[FacetId]; facet) =
|
||||
if not facet.parent.isNil:
|
||||
collectPath(result, facet.parent)
|
||||
result.add(facet.id)
|
||||
|
||||
proc stopped*(facet): bool = facet.state != fRunning
|
||||
|
||||
when traceSyndicate:
|
||||
proc traceFlush(actor) =
|
||||
if not actor.traceStream.isNil:
|
||||
actor.traceStream.flush()
|
||||
|
||||
proc trace(actor; act: ActorActivation) =
|
||||
if not actor.traceStream.isNil:
|
||||
var entry = TraceEntry(
|
||||
timestamp: getTime().toUnixFloat(),
|
||||
actor: actor.id,
|
||||
item: act,
|
||||
)
|
||||
actor.traceStream.writeLine($entry.toPreserves)
|
||||
|
||||
proc traceTurn(actor) =
|
||||
if not actor.traceStream.isNil:
|
||||
actor.trace(ActorActivation(
|
||||
orKind: ActorActivationKind.turn,
|
||||
turn: actor.turn.desc,
|
||||
))
|
||||
actor.traceFlush()
|
||||
reset actor.turn.desc
|
||||
|
||||
proc traceTarget(facet): trace.Target =
|
||||
Target(
|
||||
actor: facet.actor.id,
|
||||
facet: facet.id,
|
||||
)
|
||||
|
||||
proc traceTarget(cap): trace.Target =
|
||||
let facet = cap.relay
|
||||
Target(
|
||||
actor: facet.actor.id,
|
||||
facet: facet.id,
|
||||
oid: cap.target.oid.toPreserves,
|
||||
)
|
||||
|
||||
proc traceEnqueue(actor; e: TargetedTurnEvent) =
|
||||
actor.turn.desc.actions.add ActionDescription(
|
||||
orKind: ActionDescriptionKind.enqueue,
|
||||
enqueue: ActionDescriptionEnqueue(event: e),
|
||||
)
|
||||
|
||||
proc traceDequeue(actor; e: TargetedTurnEvent) =
|
||||
actor.turn.desc.actions.add ActionDescription(
|
||||
orKind: ActionDescriptionKind.dequeue,
|
||||
dequeue: ActionDescriptionDequeue(event: e),
|
||||
)
|
||||
|
||||
proc pass*(a, b: Cont): Cont =
|
||||
assert not a.facet.isNil
|
||||
b.facet = a.facet
|
||||
b
|
||||
|
||||
proc queueWork*(facet; c: Cont) =
|
||||
c.facet = facet
|
||||
facet.actor.turn.work.addLast(c)
|
||||
|
||||
proc queueAction*(facet; c: Cont) =
|
||||
c.facet = facet
|
||||
facet.actor.turn.actions.add(c)
|
||||
|
||||
proc yieldWork(c: Cont): Cont {.cpsMagic.} =
|
||||
## Suspend and enqueue the caller until later in the turn.
|
||||
assert not c.facet.isNil
|
||||
c.facet.queueWork(c)
|
||||
nil
|
||||
|
||||
proc yieldToActions(c: Cont): Cont {.cpsMagic.} =
|
||||
assert not c.facet.isNil
|
||||
c.facet.actor.turn.actions.add(c)
|
||||
nil
|
||||
|
||||
proc terminate(actor; err: ref Exception) =
|
||||
when traceSyndicate:
|
||||
actor.traceTurn()
|
||||
raise err
|
||||
|
||||
proc terminate(facet; err: ref Exception) =
|
||||
terminate(facet.actor, err)
|
||||
|
||||
proc complete(c: Cont) =
|
||||
var c = c
|
||||
try:
|
||||
while not c.isNil and not c.fn.isNil:
|
||||
var y = c.fn
|
||||
var x = y(c)
|
||||
c = Cont(x)
|
||||
except CatchableError as err:
|
||||
c.facet.actor.turn.rollback = true
|
||||
if not c.dismissed:
|
||||
writeStackFrames c
|
||||
terminate(c.facet, err)
|
||||
|
||||
proc run*(actor): bool =
|
||||
if actor.stopped: return
|
||||
try:
|
||||
result = actor.turn.work.len > 0
|
||||
while actor.turn.work.len > 0:
|
||||
actor.turn.work.popFirst().complete()
|
||||
var i: int
|
||||
while i < actor.turn.actions.len:
|
||||
complete(move actor.turn.actions[i])
|
||||
inc i
|
||||
actor.turn.actions.setLen(0)
|
||||
when traceSyndicate:
|
||||
actor.traceTurn()
|
||||
if actor.stopped:
|
||||
trace(actor, ActorActivation(orkind: ActorActivationKind.stop))
|
||||
for child in actor.children:
|
||||
result = result and run(child)
|
||||
except Exception as err:
|
||||
actor.terminate(err)
|
||||
|
||||
proc start(actor; cont: Cont) =
|
||||
when traceSyndicate:
|
||||
var act = ActorActivation(orkind: ActorActivationKind.start)
|
||||
trace(actor, act)
|
||||
actor.root.state = fRunning
|
||||
actor.root.startExternalTurn()
|
||||
actor.root.queueWork(cont)
|
||||
|
||||
proc stop*(actor)
|
||||
|
||||
proc runNextStop(c: Cont; facet: Facet): Cont {.cpsMagic.} =
|
||||
c.fn = facet.stopHandlers.pop()
|
||||
result = c
|
||||
|
||||
proc runNextFacetStop() {.cps: Cont.} =
|
||||
activeFacet().runNextStop()
|
||||
|
||||
proc stop(facet; reason: FacetStopReason) =
|
||||
let actor = facet.actor
|
||||
while facet.stopHandlers.len > 0:
|
||||
var c = whelp runNextFacetStop()
|
||||
c.facet = facet
|
||||
complete(c)
|
||||
while facet.stopCallbacks.len > 0:
|
||||
var cb = facet.stopCallbacks.pop()
|
||||
cb()
|
||||
while facet.children.len > 0:
|
||||
stop(facet.children.pop(), FacetStopReason.parentStopping)
|
||||
when traceSyndicate:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetstop)
|
||||
collectPath(act.facetstop.path, facet)
|
||||
act.facetStop.reason = reason
|
||||
actor.turn.desc.actions.add act
|
||||
if facet.parent.isNil:
|
||||
actor.root = nil
|
||||
stop(actor)
|
||||
|
||||
proc stop*(actor) =
|
||||
if not actor.root.isNil:
|
||||
stop(actor.root, FacetStopReason.actorStopping)
|
||||
actor.stopped = true
|
||||
|
||||
proc stopped*(actor): bool =
|
||||
actor.`stopped`
|
||||
|
||||
# proc stopFacetAction(reason: FacetStopReason) {.syndicate.} =
|
||||
# stop(c.facet, reason)
|
||||
|
||||
proc stopActorAction() {.cps: Cont.} =
|
||||
yieldToActions()
|
||||
activeFacet().actor.stop()
|
||||
|
||||
proc stopActor*(facet) =
|
||||
let c = whelp stopActorAction()
|
||||
c.facet = facet
|
||||
facet.actor.turn.actions.add(c)
|
||||
|
||||
proc stopActor*(cap) =
|
||||
## Stop an `Actor` from a `Cap`.
|
||||
stopActor(cap.relay)
|
||||
|
||||
type
|
||||
AssertionRef* = ref object
|
||||
value*: Value
|
||||
# if the Enity methods take a Value object then the generated
|
||||
# C code has "redefinition of struct" problems when orc is enabled
|
||||
|
||||
proc newCap*(facet; entity): Cap =
|
||||
doAssert entity.facet.isNil
|
||||
entity.facet = facet
|
||||
Cap(target: entity)
|
||||
|
||||
proc nextHandle(facet: Facet): Handle =
|
||||
inc(facet.actor.handleAllocator)
|
||||
facet.actor.handleAllocator
|
||||
|
||||
proc actor(cap): Actor = cap.relay.actor
|
||||
|
||||
type Bindings = Table[Value, Value]
|
||||
|
||||
proc attenuate(cap; att: seq[Caveat]): Cap =
|
||||
if att.len == 0: cap
|
||||
else: Cap(
|
||||
target: cap.target,
|
||||
attenuation: att & cap.attenuation)
|
||||
|
||||
proc match(bindings: var Bindings; p: Pattern; v: Value): bool =
|
||||
case p.orKind
|
||||
of PatternKind.Pdiscard: result = true
|
||||
of PatternKind.Patom:
|
||||
result = case p.patom
|
||||
of PAtom.Boolean: v.isBoolean
|
||||
of PAtom.Double: v.isDouble
|
||||
of PAtom.Signedinteger: v.isInteger
|
||||
of PAtom.String: v.isString
|
||||
of PAtom.Bytestring: v.isByteString
|
||||
of PAtom.Symbol: v.isSymbol
|
||||
of PatternKind.Pembedded:
|
||||
result = v.isEmbedded
|
||||
of PatternKind.Pbind:
|
||||
if match(bindings, p.pbind.pattern, v):
|
||||
bindings[p.pbind.pattern.toPreserves] = v
|
||||
result = true
|
||||
of PatternKind.Pand:
|
||||
for pp in p.pand.patterns:
|
||||
result = match(bindings, pp, v)
|
||||
if not result: break
|
||||
of PatternKind.Pnot:
|
||||
var b: Bindings
|
||||
result = not match(b, p.pnot.pattern, v)
|
||||
of PatternKind.Lit:
|
||||
result = p.lit.value == v
|
||||
of PatternKind.PCompound:
|
||||
case p.pcompound.orKind
|
||||
of PCompoundKind.rec:
|
||||
if v.isRecord and
|
||||
p.pcompound.rec.label == v.label and
|
||||
p.pcompound.rec.fields.len == v.arity:
|
||||
result = true
|
||||
for i, pp in p.pcompound.rec.fields:
|
||||
if not match(bindings, pp, v[i]):
|
||||
result = false
|
||||
break
|
||||
of PCompoundKind.arr:
|
||||
if v.isSequence and p.pcompound.arr.items.len == v.sequence.len:
|
||||
result = true
|
||||
for i, pp in p.pcompound.arr.items:
|
||||
if not match(bindings, pp, v[i]):
|
||||
result = false
|
||||
break
|
||||
of PCompoundKind.dict:
|
||||
if v.isDictionary:
|
||||
result = true
|
||||
for key, pp in p.pcompound.dict.entries:
|
||||
let vv = step(v, key)
|
||||
if vv.isNone or not match(bindings, pp, get vv):
|
||||
result = true
|
||||
break
|
||||
|
||||
proc match(p: Pattern; v: Value): Option[Bindings] =
|
||||
var b: Bindings
|
||||
if match(b, p, v):
|
||||
result = some b
|
||||
|
||||
proc instantiate(t: Template; bindings: Bindings): Value =
|
||||
case t.orKind
|
||||
of TemplateKind.Tattenuate:
|
||||
let v = instantiate(t.tattenuate.template, bindings)
|
||||
let cap = v.unembed(Cap)
|
||||
if cap.isNone:
|
||||
raise newException(ValueError, "Attempt to attenuate non-capability")
|
||||
result = attenuate(get cap, t.tattenuate.attenuation).embed
|
||||
of TemplateKind.TRef:
|
||||
let n = $t.tref.binding.int
|
||||
try: result = bindings[n.toPreserves]
|
||||
except KeyError:
|
||||
raise newException(ValueError, "unbound reference: " & n)
|
||||
of TemplateKind.Lit:
|
||||
result = t.lit.value
|
||||
of TemplateKind.Tcompound:
|
||||
case t.tcompound.orKind
|
||||
of TCompoundKind.rec:
|
||||
result = initRecord(t.tcompound.rec.label, t.tcompound.rec.fields.len)
|
||||
for i, tt in t.tcompound.rec.fields:
|
||||
result[i] = instantiate(tt, bindings)
|
||||
of TCompoundKind.arr:
|
||||
result = initSequence(t.tcompound.arr.items.len)
|
||||
for i, tt in t.tcompound.arr.items:
|
||||
result[i] = instantiate(tt, bindings)
|
||||
of TCompoundKind.dict:
|
||||
result = initDictionary()
|
||||
for key, tt in t.tcompound.dict.entries:
|
||||
result[key] = instantiate(tt, bindings)
|
||||
|
||||
proc rewrite(r: Rewrite; v: Value): Value =
|
||||
let bindings = match(r.pattern, v)
|
||||
if bindings.isSome:
|
||||
result = instantiate(r.template, get bindings)
|
||||
|
||||
proc examineAlternatives(cav: Caveat; v: Value): Value =
|
||||
case cav.orKind
|
||||
of CaveatKind.Rewrite:
|
||||
result = rewrite(cav.rewrite, v)
|
||||
of CaveatKind.Alts:
|
||||
for r in cav.alts.alternatives:
|
||||
result = rewrite(r, v)
|
||||
if not result.isFalse: break
|
||||
of CaveatKind.Reject: discard
|
||||
of CaveatKind.unknown: discard
|
||||
|
||||
proc runRewrites(v: Value; a: openarray[Caveat]): Value =
|
||||
result = v
|
||||
for stage in a:
|
||||
result = examineAlternatives(stage, result)
|
||||
if result.isFalse: break
|
||||
|
||||
proc setActions*(entity;
|
||||
publish = PublishProc();
|
||||
retract = RetractProc();
|
||||
message = MessageProc();
|
||||
sync = SyncProc();
|
||||
): Entity {.discardable} =
|
||||
## Set the action handlers for an `Entity`.
|
||||
result = entity
|
||||
result.publishImpl = publish
|
||||
result.retractImpl = retract
|
||||
result.messageImpl = message
|
||||
result.syncImpl = sync
|
||||
|
||||
proc publish(c: Cont; e: Entity; v: Value; h: Handle): Cont {.cpsMagic.} =
|
||||
if not e.publishImpl.fn.isNil:
|
||||
result = pass(c, e.publishImpl.call(e, v, h))
|
||||
|
||||
proc retract(c: Cont; e: Entity; h: Handle): Cont {.cpsMagic.} =
|
||||
if not e.retractImpl.fn.isNil:
|
||||
result = pass(c, e.retractImpl.call(e, h))
|
||||
|
||||
proc message(c: Cont; e: Entity; v: Value): Cont {.cpsMagic.} =
|
||||
if not e.messageImpl.fn.isNil:
|
||||
result = pass(c, e.messageImpl.call(e, v))
|
||||
|
||||
proc sync(c: Cont; e: Entity; p: Cap): Cont {.cpsMagic.} =
|
||||
if not e.syncImpl.fn.isNil:
|
||||
result = pass(c, e.syncImpl.call(e, p))
|
||||
|
||||
proc turnPublish(cap: Cap; val: Value; h: Handle) {.turnWork.} =
|
||||
when traceSyndicate:
|
||||
var traceEvent = TargetedTurnEvent(
|
||||
target: cap.traceTarget,
|
||||
detail: trace.TurnEvent(orKind: trace.TurnEventKind.assert)
|
||||
)
|
||||
traceEvent.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
|
||||
traceEvent.detail.assert = TurnEventAssert(
|
||||
assertion: AssertionDescription(orKind: AssertionDescriptionKind.value),
|
||||
handle: h,
|
||||
)
|
||||
traceEvent.detail.assert.assertion.value.value = val
|
||||
cap.actor.traceEnqueue(traceEvent)
|
||||
cap.relay.outbound[h] = OutboundAssertion(handle: h, peer: cap)
|
||||
yieldToActions()
|
||||
if activeTurn().rollback:
|
||||
cap.relay.outbound.del(h)
|
||||
else:
|
||||
when traceSyndicate:
|
||||
cap.actor.traceDequeue(traceEvent)
|
||||
cap.target.publish(val, h)
|
||||
cap.relay.outbound[h].established = true
|
||||
|
||||
proc turnRetract(cap: Cap; h: Handle) {.turnWork.} =
|
||||
when traceSyndicate:
|
||||
var traceEvent = TargetedTurnEvent(
|
||||
target: cap.traceTarget,
|
||||
detail: trace.TurnEvent(orKind: trace.TurnEventKind.retract)
|
||||
)
|
||||
traceEvent.detail.retract.handle = h
|
||||
cap.actor.traceEnqueue(traceEvent)
|
||||
yieldToActions()
|
||||
if not activeTurn().rollback:
|
||||
when traceSyndicate:
|
||||
cap.actor.traceDequeue(traceEvent)
|
||||
var e: OutboundAssertion
|
||||
if cap.relay.outbound.pop(h, e):
|
||||
cap.target.retract(h)
|
||||
|
||||
proc turnMessage(cap: Cap; val: Value) {.turnWork.} =
|
||||
var val = runRewrites(val, cap.attenuation)
|
||||
when traceSyndicate:
|
||||
var traceEvent = TargetedTurnEvent(
|
||||
target: cap.traceTarget,
|
||||
detail: trace.TurnEvent(orKind: trace.TurnEventKind.message)
|
||||
)
|
||||
traceEvent.detail.message.body.value.value = val
|
||||
cap.actor.traceEnqueue(traceEvent)
|
||||
yieldToActions()
|
||||
if not activeTurn().rollback:
|
||||
when traceSyndicate:
|
||||
cap.actor.traceDequeue(traceEvent)
|
||||
cap.target.message(val)
|
||||
|
||||
proc turnSync(cap: Cap; peer: Cap) {.turnWork.} =
|
||||
when traceSyndicate:
|
||||
var traceEvent = TargetedTurnEvent(
|
||||
target: cap.traceTarget,
|
||||
detail: trace.TurnEvent(orKind: trace.TurnEventKind.sync)
|
||||
)
|
||||
traceEvent.detail.sync.peer = peer.traceTarget
|
||||
cap.actor.traceEnqueue(traceEvent)
|
||||
yieldToActions()
|
||||
if not activeTurn().rollback:
|
||||
when traceSyndicate:
|
||||
cap.actor.traceDequeue(traceEvent)
|
||||
cap.target.sync(peer)
|
||||
|
||||
proc publish*(cap; val: Value): Handle =
|
||||
## Publish a `Value` to a `Cap` returning `Handle`
|
||||
## for later retraction.
|
||||
# The publish action on an Entity always goes through
|
||||
# here first.
|
||||
var val = runRewrites(val, cap.attenuation)
|
||||
# TODO: attenuation to nothing?
|
||||
result = cap.relay.nextHandle()
|
||||
cap.relay.queueWork(whelp turnPublish(cap, val, result))
|
||||
|
||||
proc publish*[T](cap; x: T): Handle =
|
||||
## Publish Preserves-convertable value to a `Cap`
|
||||
## returning `Handle` for later retraction.
|
||||
publish(cap, x.toPreserves)
|
||||
|
||||
proc retract*(cap; h: Handle) =
|
||||
## Retract a `Handle` from a `Cap`.
|
||||
cap.relay.queueWork(whelp turnRetract(cap, h))
|
||||
|
||||
proc message*(cap; val: Value) =
|
||||
var val = runRewrites(val, cap.attenuation)
|
||||
cap.relay.queueWork(whelp turnMessage(cap, val))
|
||||
|
||||
proc message*[T](cap; x: T) =
|
||||
message(cap, x.toPreserves)
|
||||
|
||||
proc sync*(cap, peer: Cap) =
|
||||
cap.relay.queueWork(whelp turnSync(cap, peer))
|
||||
|
||||
proc installStopHook(c: Cont, facet: Facet): Cont {.cpsMagic.} =
|
||||
facet.stopHandlers.add(c.fn)
|
||||
return c
|
||||
|
||||
proc addOnStopHandler(c: Cont; cb: Callback): Cont {.cpsMagic.} =
|
||||
c.facet.stopCallbacks.add(cb)
|
||||
result = c
|
||||
|
||||
proc onStop*(facet; cb: Callback) =
|
||||
facet.stopCallbacks.add(cb)
|
||||
|
||||
proc facetCall(prc: FacetProc; f: Facet) {.cps: Cont.} =
|
||||
prc(f)
|
||||
|
||||
proc facetCall(prc: FacetProc) {.cps: Cont.} =
|
||||
prc(activeFacet())
|
||||
|
||||
proc workCall(cb: Callback) {.cps: Cont.} =
|
||||
cb()
|
||||
|
||||
proc god(facet): Actor =
|
||||
## Return the parent of all actors associated with `facet`.
|
||||
var facet = facet
|
||||
while not facet.parent.isNil:
|
||||
facet = facet.parent
|
||||
facet.actor
|
||||
|
||||
proc newExternalTurn(facet; desc: Value) =
|
||||
result = Turn(facet: facet)
|
||||
let actor = facet.actor
|
||||
when traceSyndicate:
|
||||
result.desc.cause = TurnCause(orKind: TurnCauseKind.external))
|
||||
result.desc.cause.external description = desc
|
||||
|
||||
proc runExternalTurn*(facet: Facet; cb: proc ()) =
|
||||
echo "startExternalTurn"
|
||||
startExternalTurn(facet)
|
||||
facet.queueWork(whelp workCall(cb))
|
||||
while run(facet.actor): discard
|
||||
echo "runExternalTurn finished"
|
||||
assert facet.actor.turn.work.len == 0
|
||||
assert facet.actor.turn.actions.len == 0
|
||||
|
||||
proc newFacet(turn: Turn; actor; parent: Facet): Facet =
|
||||
inc(actor.facetIdAllocator)
|
||||
result = Facet(
|
||||
actor: actor,
|
||||
parent: parent,
|
||||
id: actor.facetIdAllocator.toPreserves,
|
||||
)
|
||||
if not parent.isNil:
|
||||
parent.children.add result
|
||||
when traceSyndicate:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
|
||||
collectPath(act.facetstart.path, result)
|
||||
actor.turn.desc.actions.add act
|
||||
|
||||
proc newActor(parent: Facet; name: string): Actor =
|
||||
result = Actor(id: name.toPreserves)
|
||||
result.root = newFacet(result, parent)
|
||||
when traceSyndicate:
|
||||
if parent.isNil:
|
||||
let path = getEnv("SYNDICATE_TRACE_FILE", "")
|
||||
case path
|
||||
of "": discard
|
||||
of "-": result.traceStream = newFileStream(stderr)
|
||||
else: result.traceStream = openFileStream(path, fmWrite)
|
||||
else:
|
||||
result.traceStream = parent.actor.traceStream
|
||||
|
||||
proc spawnActor*(parent: Facet; name: string; bootProc: FacetProc): Actor {.discardable.} =
|
||||
## Spawn a new `Actor` at `parent`.
|
||||
result = newActor(parent, name)
|
||||
var turn = newExternalTurn(result.root, "bootActor".toPreserves)
|
||||
turn.queueWork(whelp facetCall(bootProc))
|
||||
queueTurn(turn)
|
||||
|
||||
proc bootActor*(name: string; bootProc: FacetProc): Actor =
|
||||
## Boot a new `Actor`.
|
||||
result = newActor(nil, name)
|
||||
var turn = newExternalTurn(result.root, "bootActor".toPreserves)
|
||||
turn.queueWork(whelp facetCall(bootProc))
|
||||
queueTurn(turn)
|
||||
|
||||
proc run(turn: sink Turn) =
|
||||
try:
|
||||
while turn.work.len > 0:
|
||||
discard trampoline turn.work.pop()
|
||||
except CatchableError as err:
|
||||
terminate turn.actor(err)
|
||||
return
|
||||
var i: int
|
||||
while i < turn.actions.len:
|
||||
discard trampoline turn.actions[i]
|
||||
|
||||
proc runTurn*() =
|
||||
## Run one turn.
|
||||
turnQueue.pop().run()
|
|
@ -39,9 +39,9 @@ type
|
|||
RetractProc = proc (turn: var Turn; h: Handle) {.closure.}
|
||||
MessageProc = proc (turn: var Turn; v: Value) {.closure.}
|
||||
ClosureEntity = ref object of Entity
|
||||
publishImpl: PublishProc
|
||||
retractImpl: RetractProc
|
||||
messageImpl: MessageProc
|
||||
publishImpl*: PublishProc
|
||||
retractImpl*: RetractProc
|
||||
messageImpl*: MessageProc
|
||||
|
||||
method publish(e: ClosureEntity; turn: var Turn; a: AssertionRef; h: Handle) =
|
||||
if not e.publishImpl.isNil: e.publishImpl(turn, a.value, h)
|
||||
|
@ -190,5 +190,5 @@ macro during*(turn: untyped; ds: Cap; pattern: Pattern; publishBody: untyped) =
|
|||
|
||||
proc runActor*(name: string; bootProc: TurnAction) =
|
||||
## Boot an actor `Actor` and churn ioqueue once.
|
||||
let actor = bootActor(name, bootProc)
|
||||
ioqueue.run()
|
||||
discard bootActor(name, bootProc)
|
||||
actors.run()
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[hashes, monotimes, options, sets, tables, times]
|
||||
import std/[deques, hashes, monotimes, options, sets, tables, times]
|
||||
import pkg/cps
|
||||
import pkg/sys/ioqueue
|
||||
|
||||
import preserves
|
||||
import ../syndicate/protocols/[protocol, sturdy]
|
||||
|
||||
|
@ -14,17 +17,6 @@ when tracing:
|
|||
|
||||
export Handle
|
||||
|
||||
template generateIdType(typ: untyped) =
|
||||
type typ* = distinct Natural
|
||||
proc `==`*(x, y: typ): bool {.borrow.}
|
||||
proc `$`*(id: typ): string {.borrow.}
|
||||
|
||||
generateIdType(ActorId)
|
||||
generateIdType(FacetId)
|
||||
generateIdType(EndpointId)
|
||||
generateIdType(FieldId)
|
||||
generateIdType(TurnId)
|
||||
|
||||
type
|
||||
Oid = sturdy.Oid
|
||||
Caveat = sturdy.Caveat
|
||||
|
@ -37,11 +29,13 @@ type
|
|||
# C code has "redefinition of struct" problems when orc is enabled
|
||||
|
||||
Entity* = ref object of RootObj
|
||||
facet*: Facet
|
||||
oid*: Oid # oid is how Entities are identified over the wire
|
||||
|
||||
Cap* {.preservesEmbedded.} = ref object of EmbeddedObj
|
||||
relay*: Facet
|
||||
target*: Entity
|
||||
relay*: Facet
|
||||
# Entity has facet but a Cap is also scoped to a relay Facet
|
||||
attenuation*: Attenuation
|
||||
|
||||
Ref* {.deprecated: "Ref was renamed to Cap".} = Cap
|
||||
|
@ -53,6 +47,7 @@ type
|
|||
OutboundTable = Table[Handle, OutboundAssertion]
|
||||
|
||||
Actor* = ref object
|
||||
next: Actor
|
||||
name: string
|
||||
handleAllocator: ref Handle
|
||||
# a fresh actor gets a new ref Handle and
|
||||
|
@ -61,18 +56,15 @@ type
|
|||
exitReason: ref Exception
|
||||
exitHooks: seq[TurnAction]
|
||||
id: ActorId
|
||||
facetIdAllocator: uint
|
||||
exiting, exited: bool
|
||||
when tracing:
|
||||
turnIdAllocator: ref TurnId
|
||||
traceStream: FileStream
|
||||
|
||||
TurnAction* = proc (t: var Turn) {.closure.}
|
||||
|
||||
Queues = TableRef[Facet, seq[TurnAction]]
|
||||
|
||||
Turn* = object # an object that should remain on the stack
|
||||
facet: Facet
|
||||
queues: Queues # a ref object that can outlive Turn
|
||||
Turn* = ref object
|
||||
facet: Facet # active facet that may change during a turn
|
||||
work: Deque[tuple[facet: Facet, act: TurnAction]]
|
||||
effects: seq[Turn]
|
||||
when tracing:
|
||||
desc: TurnDescription
|
||||
|
||||
|
@ -87,20 +79,30 @@ type
|
|||
id: FacetId
|
||||
isAlive: bool
|
||||
|
||||
when tracing:
|
||||
var turnQueue {.threadvar.}: Deque[Turn]
|
||||
|
||||
proc nextTurnId(facet: Facet): TurnId =
|
||||
result = succ(facet.actor.turnIdAllocator[])
|
||||
facet.actor.turnIdAllocator[] = result
|
||||
when tracing:
|
||||
proc openTraceStream: FileStream =
|
||||
let path = getEnv("SYNDICATE_TRACE_FILE")
|
||||
case path
|
||||
of "": stderr.writeLine "$SYNDICATE_TRACE_FILE unset"
|
||||
of "-": result = newFileStream(stderr)
|
||||
else: result = openFileStream(path, fmWrite)
|
||||
|
||||
let traceStream = openTraceStream()
|
||||
var turnIdAllocator: uint
|
||||
|
||||
proc nextTurnId(): TurnId =
|
||||
inc(turnIdAllocator)
|
||||
turnIdAllocator.toPreserves
|
||||
|
||||
proc trace(actor: Actor; act: ActorActivation) =
|
||||
if not actor.traceStream.isNil:
|
||||
if not traceStream.isNil:
|
||||
var entry = TraceEntry(
|
||||
timestamp: getTime().toUnixFloat(),
|
||||
actor: initRecord("named", actor.name.toPreserves),
|
||||
item: act)
|
||||
actor.traceStream.writeText entry.toPreserves
|
||||
actor.traceStream.writeLine()
|
||||
traceStream.write(entry.toPreserves)
|
||||
|
||||
proc path(facet: Facet): seq[trace.FacetId] =
|
||||
var f = facet
|
||||
|
@ -116,7 +118,7 @@ method sync*(e: Entity; turn: var Turn; peer: Cap) {.base.} = discard
|
|||
using
|
||||
actor: Actor
|
||||
facet: Facet
|
||||
turn: var Turn
|
||||
turn: Turn
|
||||
action: TurnAction
|
||||
|
||||
proc labels(f: Facet): string =
|
||||
|
@ -139,11 +141,14 @@ proc `$`*(r: Cap): string =
|
|||
proc `$`*(actor: Actor): string =
|
||||
"<Actor:" & actor.name & ">" # TODO: ambigous
|
||||
|
||||
proc attenuate(r: Cap; a: Attenuation): Cap =
|
||||
proc `$`*(t: Turn): string =
|
||||
"<Turn:" & $t.desc.id & ">"
|
||||
|
||||
proc attenuate*(r: Cap; a: Attenuation): Cap =
|
||||
if a.len == 0: result = r
|
||||
else: result = Cap(
|
||||
relay: r.relay,
|
||||
target: r.target,
|
||||
relay: r.relay,
|
||||
attenuation: a & r.attenuation)
|
||||
|
||||
proc hash*(facet): Hash =
|
||||
|
@ -151,17 +156,51 @@ proc hash*(facet): Hash =
|
|||
|
||||
proc hash*(r: Cap): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash)
|
||||
|
||||
proc actor*(turn): Actor = turn.facet.actor
|
||||
|
||||
proc nextHandle(facet: Facet): Handle =
|
||||
result = succ(facet.actor.handleAllocator[])
|
||||
facet.actor.handleAllocator[] = result
|
||||
|
||||
proc facet*(turn: var Turn): Facet = turn.facet
|
||||
proc queueWork*(turn: var Turn; facet: Facet; act: TurnAction) =
|
||||
assert not facet.isNil
|
||||
turn.work.addLast((facet, act,))
|
||||
|
||||
proc enqueue(turn: var Turn; target: Facet; action: TurnAction) =
|
||||
if target in turn.queues:
|
||||
turn.queues[target].add action
|
||||
proc queueTurn*(facet: Facet; act: TurnAction) =
|
||||
var turn = Turn(facet: facet)
|
||||
assert not facet.isNil
|
||||
turn.work.addLast((facet, act,))
|
||||
when tracing:
|
||||
turn.desc.id = nextTurnId()
|
||||
turnQueue.addLast(turn)
|
||||
|
||||
proc queueTurn*(prev: var Turn; facet: Facet; act: TurnAction) =
|
||||
var next = Turn(facet: facet)
|
||||
assert not facet.isNil
|
||||
next.work.addLast((facet, act,))
|
||||
when tracing:
|
||||
next.desc.id = nextTurnId()
|
||||
next.desc.cause = TurnCause(orKind: TurnCauseKind.turn)
|
||||
next.desc.cause.turn.id = prev.desc.id
|
||||
turnQueue.addLast(next)
|
||||
|
||||
proc run*(facet: Facet; action: TurnAction) = queueTurn(facet, action)
|
||||
## Alias to queueTurn_.
|
||||
|
||||
proc facet*(turn: Turn): Facet = turn.facet
|
||||
|
||||
proc queueEffect*(turn: var Turn; target: Facet; act: TurnAction) =
|
||||
if target.actor == turn.facet.actor:
|
||||
turn.work.addLast((target, act,))
|
||||
else:
|
||||
turn.queues[target] = @[action]
|
||||
var next = Turn(facet: target)
|
||||
assert not target.isNil
|
||||
next.work.addLast((target, act,))
|
||||
when tracing:
|
||||
next.desc.id = nextTurnId()
|
||||
next.desc.cause = TurnCause(orKind: TurnCauseKind.turn)
|
||||
next.desc.cause.turn.id = turn.desc.id
|
||||
turn.effects.add next
|
||||
|
||||
type Bindings = Table[Value, Value]
|
||||
|
||||
|
@ -275,19 +314,19 @@ proc runRewrites*(a: Attenuation; v: Value): Value =
|
|||
result = examineAlternatives(stage, result)
|
||||
if result.isFalse: break
|
||||
|
||||
proc publish(turn: var Turn; r: Cap; v: Value; h: Handle) =
|
||||
var a = runRewrites(r.attenuation, v)
|
||||
proc publish(turn: var Turn; cap: Cap; v: Value; h: Handle) =
|
||||
var a = runRewrites(cap.attenuation, v)
|
||||
if not a.isFalse:
|
||||
let e = OutboundAssertion(handle: h, peer: r)
|
||||
let e = OutboundAssertion(handle: h, peer: cap)
|
||||
turn.facet.outbound[h] = e
|
||||
enqueue(turn, r.relay) do (turn: var Turn):
|
||||
queueEffect(turn, cap.relay) do (turn: var Turn):
|
||||
e.established = true
|
||||
publish(r.target, turn, AssertionRef(value: a), e.handle)
|
||||
publish(cap.target, turn, AssertionRef(value: a), e.handle)
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
|
||||
act.enqueue.event.target.actor = turn.facet.actor.id.toPreserves
|
||||
act.enqueue.event.target.facet = turn.facet.id.toPreserves
|
||||
act.enqueue.event.target.oid = r.target.oid.toPreserves
|
||||
act.enqueue.event.target.oid = cap.target.oid.toPreserves
|
||||
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
|
||||
act.enqueue.event.detail.assert.assertion.value.value =
|
||||
mapEmbeds(v) do (cap: Value) -> Value: discard
|
||||
|
@ -302,7 +341,7 @@ proc publish*[T](turn: var Turn; r: Cap; a: T): Handle {.discardable.} =
|
|||
publish(turn, r, a.toPreserves)
|
||||
|
||||
proc retract(turn: var Turn; e: OutboundAssertion) =
|
||||
enqueue(turn, e.peer.relay) do (turn: var Turn):
|
||||
queueEffect(turn, e.peer.relay) do (turn: var Turn):
|
||||
if e.established:
|
||||
e.established = false
|
||||
e.peer.target.retract(turn, e.handle)
|
||||
|
@ -315,7 +354,7 @@ proc retract*(turn: var Turn; h: Handle) =
|
|||
proc message*(turn: var Turn; r: Cap; v: Value) =
|
||||
var a = runRewrites(r.attenuation, v)
|
||||
if not a.isFalse:
|
||||
enqueue(turn, r.relay) do (turn: var Turn):
|
||||
queueEffect(turn, r.relay) do (turn: var Turn):
|
||||
r.target.message(turn, AssertionRef(value: a))
|
||||
|
||||
proc message*[T](turn: var Turn; r: Cap; v: T) =
|
||||
|
@ -325,7 +364,7 @@ proc sync(turn: var Turn; e: Entity; peer: Cap) =
|
|||
e.sync(turn, peer)
|
||||
|
||||
proc sync*(turn: var Turn; r, peer: Cap) =
|
||||
enqueue(turn, r.relay) do (turn: var Turn):
|
||||
queueEffect(turn, r.relay) do (turn: var Turn):
|
||||
sync(turn, r.target, peer)
|
||||
|
||||
proc replace*[T](turn: var Turn; cap: Cap; h: Handle; v: T): Handle =
|
||||
|
@ -342,11 +381,10 @@ proc replace*[T](turn: var Turn; cap: Cap; h: var Handle; v: T): Handle {.discar
|
|||
|
||||
proc stop*(turn: var Turn)
|
||||
|
||||
proc run*(facet; action: TurnAction; zombieTurn = false)
|
||||
|
||||
proc newFacet(actor; parent: Facet; initialAssertions: OutboundTable): Facet =
|
||||
inc actor.facetIdAllocator
|
||||
result = Facet(
|
||||
id: getMonoTime().ticks.FacetId,
|
||||
id: actor.facetIdAllocator.toPreserves,
|
||||
actor: actor,
|
||||
parent: parent,
|
||||
outbound: initialAssertions,
|
||||
|
@ -358,9 +396,12 @@ proc newFacet(actor; parent: Facet): Facet =
|
|||
newFacet(actor, parent, initialAssertions)
|
||||
|
||||
proc isInert(facet): bool =
|
||||
result = facet.children.len == 0 and
|
||||
(facet.outbound.len == 0 or facet.parent.isNil) and
|
||||
facet.inertCheckPreventers == 0
|
||||
let
|
||||
noKids = facet.children.len == 0
|
||||
noOutboundHandles = facet.outbound.len == 0
|
||||
isRootFacet = facet.parent.isNil
|
||||
noInertCheckPreventers = facet.inertCheckPreventers == 0
|
||||
noKids and (noOutboundHandles or isRootFacet) and noInertCheckPreventers
|
||||
|
||||
proc preventInertCheck*(facet): (proc()) {.discardable.} =
|
||||
var armed = true
|
||||
|
@ -371,12 +412,6 @@ proc preventInertCheck*(facet): (proc()) {.discardable.} =
|
|||
dec facet.inertCheckPreventers
|
||||
result = disarm
|
||||
|
||||
proc inFacet(turn: var Turn; facet; act: TurnAction) =
|
||||
## Call an action with a facet using a temporary `Turn`
|
||||
## that shares the `Queues` of the calling `Turn`.
|
||||
var t = Turn(facet: facet, queues: turn.queues)
|
||||
act(t)
|
||||
|
||||
proc terminate(actor; turn; reason: ref Exception)
|
||||
|
||||
proc terminate(facet; turn: var Turn; orderly: bool) =
|
||||
|
@ -385,8 +420,7 @@ proc terminate(facet; turn: var Turn; orderly: bool) =
|
|||
let parent = facet.parent
|
||||
if not parent.isNil:
|
||||
parent.children.excl facet
|
||||
block:
|
||||
var turn = Turn(facet: facet, queues: turn.queues)
|
||||
queueWork(turn, facet) do (turn: var Turn):
|
||||
while facet.children.len > 0:
|
||||
facet.children.pop.terminate(turn, orderly)
|
||||
if orderly:
|
||||
|
@ -405,67 +439,70 @@ proc terminate(facet; turn: var Turn; orderly: bool) =
|
|||
turn.desc.actions.add act
|
||||
|
||||
proc stopIfInertAfter(action: TurnAction): TurnAction =
|
||||
# TODO: verify this
|
||||
proc wrapper(turn: var Turn) =
|
||||
action(turn)
|
||||
enqueue(turn, turn.facet) do (turn: var Turn):
|
||||
queueEffect(turn, turn.facet) do (turn: var Turn):
|
||||
if (not turn.facet.parent.isNil and
|
||||
(not turn.facet.parent.isAlive)) or
|
||||
turn.facet.isInert:
|
||||
stop(turn)
|
||||
wrapper
|
||||
|
||||
proc newFacet*(turn: var Turn): Facet = newFacet(turn.facet.actor, turn.facet)
|
||||
proc newFacet(turn: var Turn): Facet = newFacet(turn.facet.actor, turn.facet)
|
||||
|
||||
proc inFacet*(turn: var Turn; bootProc: TurnAction): Facet =
|
||||
result = newFacet(turn)
|
||||
let facet = turn.facet
|
||||
turn.facet = result
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
|
||||
act.facetstart.path.add result.path
|
||||
turn.desc.actions.add act
|
||||
inFacet(turn, result, stopIfInertAfter(bootProc))
|
||||
stopIfInertAfter(bootProc)(turn)
|
||||
turn.facet = facet
|
||||
|
||||
proc facet*(turn: var Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc)
|
||||
proc facet(turn: var Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc)
|
||||
|
||||
proc newActor(name: string; handleAlloc: ref Handle): Actor =
|
||||
let
|
||||
now = getTime()
|
||||
seed = now.toUnix * 1_000_000_000 + now.nanosecond
|
||||
proc newActor(name: string; parent: Facet): Actor =
|
||||
result = Actor(
|
||||
name: name,
|
||||
id: ActorId(seed),
|
||||
handleAllocator: handleAlloc,
|
||||
id: name.toPreserves,
|
||||
)
|
||||
result.root = newFacet(result, nil)
|
||||
if parent.isNil:
|
||||
new result.handleAllocator
|
||||
else:
|
||||
result.handleAllocator = parent.actor.handleAllocator
|
||||
result.root = newFacet(result, parent)
|
||||
when tracing:
|
||||
var act = ActorActivation(orKind: ActorActivationKind.start)
|
||||
act.start.actorName = Name(orKind: NameKind.named)
|
||||
act.start.actorName.named.name = name.toPreserves
|
||||
trace(result, act)
|
||||
|
||||
proc run(actor; bootProc: TurnAction; initialAssertions: OutboundTable) =
|
||||
run(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc))
|
||||
proc run(actor: Actor; bootProc: TurnAction; initialAssertions: OutboundTable) =
|
||||
queueTurn(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc))
|
||||
|
||||
proc bootActor*(name: string; bootProc: TurnAction): Actor =
|
||||
var initialAssertions: OutboundTable
|
||||
result = newActor(name, new(ref Handle))
|
||||
proc bootActor*(name: string; bootProc: TurnAction): Actor {.discardable.} =
|
||||
## Boot a top-level actor.
|
||||
result = newActor(name, nil)
|
||||
new result.handleAllocator
|
||||
var turn = Turn(facet: result.root)
|
||||
assert not result.root.isNil
|
||||
turn.work.addLast((result.root, bootProc,))
|
||||
when tracing:
|
||||
new result.turnIdAllocator
|
||||
let path = getEnv("SYNDICATE_TRACE_FILE", "/tmp/" & name & ".trace.pr")
|
||||
case path
|
||||
of "": stderr.writeLine "$SYNDICATE_TRACE_FILE unset, not tracing actor ", name
|
||||
of "-": result.traceStream = newFileStream(stderr)
|
||||
else: result.traceStream = openFileStream(path, fmWrite)
|
||||
run(result, bootProc, initialAssertions)
|
||||
turn.desc.id = nextTurnId()
|
||||
turn.desc.cause = TurnCause(orKind: TurnCauseKind.external)
|
||||
turn.desc.cause.external.description = "bootActor".toPreserves
|
||||
turnQueue.addLast turn
|
||||
|
||||
proc spawnActor*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||
let actor = newActor(name, turn.facet.actor.handleAllocator)
|
||||
enqueue(turn, turn.facet) do (turn: var Turn):
|
||||
let actor = newActor(name, turn.facet)
|
||||
queueEffect(turn, actor.root) do (turn: var Turn):
|
||||
var newOutBound: Table[Handle, OutboundAssertion]
|
||||
for key in initialAssertions:
|
||||
discard turn.facet.outbound.pop(key, newOutbound[key])
|
||||
when tracing:
|
||||
actor.turnIdAllocator = turn.facet.actor.turnIdAllocator
|
||||
actor.traceStream = turn.facet.actor.traceStream
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.spawn)
|
||||
act.spawn.id = actor.id.toPreserves
|
||||
turn.desc.actions.add act
|
||||
|
@ -475,9 +512,30 @@ proc spawnActor*(name: string; turn: var Turn; bootProc: TurnAction; initialAsse
|
|||
proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||
spawnActor(name, turn, bootProc, initialAssertions)
|
||||
|
||||
type StopOnRetract = ref object of Entity
|
||||
|
||||
method retract*(e: StopOnRetract; turn: var Turn; h: Handle) =
|
||||
stop(turn)
|
||||
|
||||
proc halfLink(facet, other: Facet) =
|
||||
let h = facet.nextHandle()
|
||||
facet.outbound[h] = OutboundAssertion(
|
||||
handle: h,
|
||||
peer: Cap(relay: other, target: StopOnRetract(facet: facet)),
|
||||
established: true,
|
||||
)
|
||||
|
||||
proc spawnLink*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||
result = spawnActor(name, turn, bootProc, initialAssertions)
|
||||
halfLink(turn.facet, result.root)
|
||||
halfLink(result.root, turn.facet)
|
||||
|
||||
var inertActor {.threadvar.}: Actor
|
||||
|
||||
proc newInertCap*(): Cap =
|
||||
let a = bootActor("inert") do (turn: var Turn): turn.stop()
|
||||
Cap(relay: a.root)
|
||||
if inertActor.isNil:
|
||||
inertActor = bootActor("inert") do (turn: var Turn): turn.stop()
|
||||
Cap(relay: inertActor.root)
|
||||
|
||||
proc atExit*(actor; action) = actor.exitHooks.add action
|
||||
|
||||
|
@ -493,114 +551,66 @@ proc terminate(actor; turn; reason: ref Exception) =
|
|||
trace(actor, act)
|
||||
for hook in actor.exitHooks: hook(turn)
|
||||
proc finish(turn: var Turn) =
|
||||
assert not actor.root.isNil, actor.name
|
||||
actor.root.terminate(turn, reason.isNil)
|
||||
actor.exited = true
|
||||
block:
|
||||
run(actor.root, finish, true)
|
||||
queueTurn(actor.root, finish)
|
||||
|
||||
proc terminate*(facet; e: ref Exception) =
|
||||
run(facet.actor.root) do (turn: var Turn):
|
||||
facet.actor.terminate(turn, e)
|
||||
|
||||
#[
|
||||
proc asyncCheck*(facet: Facet; fut: FutureBase) =
|
||||
## Sets a callback on `fut` which propagates exceptions to `facet`.
|
||||
addCallback(fut) do ():
|
||||
if fut.failed: terminate(facet, fut.error)
|
||||
template recallFacet(turn: var Turn; body: untyped): untyped =
|
||||
let facet = turn.facet
|
||||
block:
|
||||
body
|
||||
assert facet.actor == turn.facet.actor, "turn of " & $facet.actor & " ended at " & $turn.facet.actor
|
||||
turn.facet = facet
|
||||
|
||||
proc asyncCheck*(turn; fut: FutureBase) =
|
||||
## Sets a callback on `fut` which propagates exceptions to the facet of `turn`.
|
||||
asyncCheck(turn.facet, fut)
|
||||
]#
|
||||
|
||||
template tryFacet(facet; body: untyped) =
|
||||
try: body
|
||||
except CatchableError as err: terminate(facet, err)
|
||||
|
||||
proc run*(facet; action: TurnAction; zombieTurn = false) =
|
||||
if true and zombieTurn or (facet.actor.exitReason.isNil and facet.isAlive):
|
||||
tryFacet(facet):
|
||||
var queues = newTable[Facet, seq[TurnAction]]()
|
||||
block:
|
||||
var turn = Turn(facet: facet, queues: queues)
|
||||
action(turn)
|
||||
when tracing:
|
||||
turn.desc.id = facet.nextTurnId.toPreserves
|
||||
facet.actor.trace ActorActivation(
|
||||
orKind: ActorActivationKind.turn, turn: turn.desc)
|
||||
for facet, queue in queues:
|
||||
for action in queue: run(facet, action)
|
||||
|
||||
proc run*(cap: Cap; action: TurnAction) =
|
||||
## Convenience proc to run a `TurnAction` in the scope of a `Cap`.
|
||||
run(cap.relay, action)
|
||||
|
||||
#[
|
||||
proc addCallback*(fut: FutureBase; facet: Facet; act: TurnAction) =
|
||||
## Add a callback to a `Future` that will be called at a later `Turn`
|
||||
## within the context of `facet`.
|
||||
addCallback(fut) do ():
|
||||
if fut.failed: terminate(facet, fut.error)
|
||||
else:
|
||||
when tracing:
|
||||
run(facet) do (turn: var Turn):
|
||||
turn.desc.cause = TurnCause(orKind: TurnCauseKind.external)
|
||||
turn.desc.cause.external.description = "Future".toPreserves
|
||||
act(turn)
|
||||
proc stopNow(turn: var Turn) =
|
||||
let caller = turn.facet
|
||||
recallFacet turn:
|
||||
while caller.children.len > 0:
|
||||
var child = caller.children.pop()
|
||||
if child.actor == caller.actor:
|
||||
turn.facet = child
|
||||
stopNow(turn)
|
||||
else:
|
||||
run(facet, act)
|
||||
|
||||
proc addCallback*(fut: FutureBase; turn: var Turn; act: TurnAction) =
|
||||
## Add a callback to a `Future` that will be called at a later `Turn`
|
||||
## with the same context as the current.
|
||||
if fut.failed:
|
||||
terminate(turn.facet, fut.error)
|
||||
elif fut.finished:
|
||||
enqueue(turn, turn.facet, act)
|
||||
else:
|
||||
addCallback(fut, turn.facet, act)
|
||||
|
||||
proc addCallback*[T](fut: Future[T]; turn: var Turn; act: proc (t: var Turn, x: T) {.closure.}) =
|
||||
addCallback(fut, turn) do (turn: var Turn):
|
||||
if fut.failed: terminate(turn.facet, fut.error)
|
||||
else:
|
||||
when tracing:
|
||||
turn.desc.cause = TurnCause(orKind: TurnCauseKind.external)
|
||||
turn.desc.cause.external.description = "Future".toPreserves
|
||||
act(turn, read fut)
|
||||
]#
|
||||
queueEffect(turn, child, stopNow)
|
||||
caller.terminate(turn, true)
|
||||
|
||||
proc stop*(turn: var Turn, facet: Facet) =
|
||||
if facet.parent.isNil:
|
||||
facet.terminate(turn, true)
|
||||
else:
|
||||
enqueue(turn, facet.parent) do (turn: var Turn):
|
||||
facet.terminate(turn, true)
|
||||
queueEffect(turn, facet, stopNow)
|
||||
|
||||
proc stop*(turn: var Turn) =
|
||||
stop(turn, turn.facet)
|
||||
|
||||
proc onStop*(facet: Facet; act: TurnAction) =
|
||||
## Add a `proc (turn: var Turn)` action to `facet` to be called as it stops.
|
||||
assert not facet.isNil
|
||||
add(facet.shutdownActions, act)
|
||||
|
||||
proc stopActor*(facet: Facet) =
|
||||
run(facet) do (turn: var Turn):
|
||||
let actor = facet.actor
|
||||
enqueue(turn, actor.root) do (turn: var Turn):
|
||||
terminate(actor, turn, nil)
|
||||
|
||||
proc stopActor*(turn: var Turn) =
|
||||
stopActor(turn.facet)
|
||||
proc onStop*(turn: var Turn; act: TurnAction) =
|
||||
onStop(turn.facet, act)
|
||||
|
||||
proc stop*(actor: Actor) =
|
||||
stopActor(actor.root)
|
||||
queueTurn(actor.root) do (turn: var Turn):
|
||||
assert(not turn.facet.isNil)
|
||||
stop(turn)
|
||||
|
||||
proc freshen*(turn: var Turn, act: TurnAction) =
|
||||
assert(turn.queues.len == 0, "Attempt to freshen a non-stale Turn")
|
||||
proc stopActor*(facet: Facet) =
|
||||
stop(facet.actor)
|
||||
|
||||
proc stopActor*(turn: var Turn) =
|
||||
assert(not turn.facet.isNil)
|
||||
assert(not turn.facet.actor.isNil)
|
||||
assert(not turn.facet.actor.root.isNil)
|
||||
stop(turn, turn.facet.actor.root)
|
||||
|
||||
proc freshen*(turn: var Turn, act: TurnAction) {.deprecated.} =
|
||||
run(turn.facet, act)
|
||||
|
||||
proc newCap*(relay: Facet; e: Entity): Cap =
|
||||
proc newCap*(relay: Facet; e: Entity): Cap {.deprecated.} =
|
||||
Cap(relay: relay, target: e)
|
||||
|
||||
proc newCap*(turn; e: Entity): Cap =
|
||||
|
@ -622,3 +632,32 @@ proc running*(actor): bool =
|
|||
result = not actor.exited
|
||||
if not (result or actor.exitReason.isNil):
|
||||
raise actor.exitReason
|
||||
|
||||
proc run(turn: var Turn) =
|
||||
while turn.work.len > 0:
|
||||
var (facet, act) = turn.work.popFirst()
|
||||
assert not act.isNil
|
||||
turn.facet = facet
|
||||
act(turn)
|
||||
when tracing:
|
||||
var act = ActorActivation(orKind: ActorActivationKind.turn)
|
||||
act.turn = move turn.desc
|
||||
trace(turn.facet.actor, act)
|
||||
turn.facet = nil # invalidate the turn
|
||||
# TODO: catch exceptions here
|
||||
for eff in turn.effects.mitems:
|
||||
turnQueue.addLast(move eff)
|
||||
|
||||
proc run* =
|
||||
## Run actors to completion
|
||||
var ready: seq[Continuation]
|
||||
while true:
|
||||
while turnQueue.len > 0:
|
||||
var turn = turnQueue.popFirst()
|
||||
# TODO: check if actor is still valid
|
||||
run turn
|
||||
ioqueue.poll(ready)
|
||||
if ready.len == 0: break
|
||||
while ready.len > 0:
|
||||
discard trampoline:
|
||||
ready.pop()
|
||||
|
|
|
@ -95,10 +95,10 @@ when defined(linux):
|
|||
discard close(fd)
|
||||
driver.timers.excl(fd)
|
||||
|
||||
proc spawnTimerActor*(ds: Cap; turn: var Turn): Actor {.discardable.} =
|
||||
proc spawnTimerActor*(turn: var Turn; ds: Cap): Actor {.discardable.} =
|
||||
## Spawn a timer actor that responds to
|
||||
## dataspace observations of timeouts on `ds`.
|
||||
spawnActor("timers", turn) do (turn: var Turn):
|
||||
spawnLink("timers", turn) do (turn: var Turn):
|
||||
let driver = spawnTimerDriver(turn.facet, ds)
|
||||
let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()})
|
||||
during(turn, ds, pat) do (deadline: float):
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
import std/[hashes, tables]
|
||||
|
||||
from ./actors import Cap, hash
|
||||
import ./actors
|
||||
from ./protocols/sturdy import Oid
|
||||
|
||||
proc hash(r: Cap): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash)
|
||||
|
|
|
@ -158,8 +158,8 @@ using
|
|||
|
||||
proc lookupLocal(relay; oid: Oid): Cap =
|
||||
let sym = relay.exported.grab oid
|
||||
if sym.isNil: newInertCap()
|
||||
else: sym.cap
|
||||
if not sym.isNil:
|
||||
result = sym.cap
|
||||
|
||||
proc isInert(r: Cap): bool =
|
||||
r.target.isNil
|
||||
|
@ -177,9 +177,11 @@ proc rewriteCapIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Cap
|
|||
imported.add e
|
||||
result = e.cap
|
||||
of WireRefKind.yours:
|
||||
let r = relay.lookupLocal(n.yours.oid)
|
||||
if n.yours.attenuation.len == 0 or r.isInert: result = r
|
||||
else: raiseAssert "attenuation not implemented"
|
||||
result = relay.lookupLocal(n.yours.oid)
|
||||
if result.isNil:
|
||||
result = newInertCap()
|
||||
elif n.yours.attenuation.len > 0:
|
||||
result = attenuate(result, n.yours.attenuation)
|
||||
|
||||
proc rewriteIn(relay; facet; v: Value):
|
||||
tuple[rewritten: Assertion; imported: seq[WireSymbol]] =
|
||||
|
@ -212,14 +214,13 @@ proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) =
|
|||
turn.message(cap, a)
|
||||
|
||||
of EventKind.Sync:
|
||||
discard # TODO
|
||||
#[
|
||||
var imported: seq[WireSymbol]
|
||||
let k = relay.rewriteCapIn(turn, evenr.sync.peer, imported)
|
||||
turn.sync(cap) do (turn: var Turn):
|
||||
turn.message(k, true)
|
||||
for e in imported: relay.imported.del e
|
||||
]#
|
||||
var
|
||||
(v, imported) = rewriteIn(relay, turn.facet, event.sync.peer)
|
||||
peer = unembed(v, Cap)
|
||||
if peer.isSome:
|
||||
turn.message(get peer, true)
|
||||
for e in imported: relay.imported.drop e
|
||||
|
||||
proc dispatch(relay: Relay; v: Value) =
|
||||
trace "S: ", v
|
||||
|
@ -231,10 +232,8 @@ proc dispatch(relay: Relay; v: Value) =
|
|||
# https://synit.org/book/protocol.html#turn-packets
|
||||
for te in pkt.turn:
|
||||
let r = lookupLocal(relay, te.oid.Oid)
|
||||
if not r.isInert:
|
||||
if not r.isNil:
|
||||
dispatch(relay, t, r, te.event)
|
||||
else:
|
||||
stderr.writeLine("discarding event for unknown Cap; ", te.event)
|
||||
of PacketKind.Error:
|
||||
# https://synit.org/book/protocol.html#error-packets
|
||||
when defined(posix):
|
||||
|
@ -262,7 +261,7 @@ type
|
|||
nextLocalOid*: Option[Oid]
|
||||
|
||||
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) =
|
||||
spawnActor(name, turn) do (turn: var Turn):
|
||||
spawnLink(name, turn) do (turn: var Turn):
|
||||
let relay = Relay(
|
||||
facet: turn.facet,
|
||||
packetWriter: opts.packetWriter,
|
||||
|
@ -301,25 +300,26 @@ when defined(posix):
|
|||
import pkg/sys/[files, handles, sockets]
|
||||
export transportAddress.Unix
|
||||
|
||||
type StdioControlEntity = ref object of Entity
|
||||
buf: ref seq[byte]
|
||||
type StdioEntity = ref object of Entity
|
||||
relay: Relay
|
||||
stdin: AsyncFile
|
||||
alive: bool
|
||||
|
||||
method message(entity: StdioControlEntity; turn: var Turn; ass: AssertionRef) =
|
||||
method message(entity: StdioEntity; turn: var Turn; ass: AssertionRef) =
|
||||
if ass.value.preservesTo(ForceDisconnect).isSome:
|
||||
close(entity.stdin)
|
||||
entity.alive = false
|
||||
|
||||
proc loop(entity: StdioControlEntity) {.asyncio.} =
|
||||
new entity.buf
|
||||
entity.buf[].setLen(0x1000)
|
||||
while true:
|
||||
let n = read(entity.stdin, entity.buf)
|
||||
proc loop(entity: StdioEntity) {.asyncio.} =
|
||||
let buf = new seq[byte]
|
||||
entity.alive = true
|
||||
while entity.alive:
|
||||
buf[].setLen(0x1000)
|
||||
let n = read(entity.stdin, buf)
|
||||
if n == 0:
|
||||
stderr.writeLine "empty read on stdin, stopping actor"
|
||||
stopActor(entity.relay.facet)
|
||||
stopActor(entity.facet)
|
||||
else:
|
||||
entity.relay.recv(entity.buf[], 0..<n)
|
||||
entity.relay.recv(buf[], 0..<n)
|
||||
close(entity.stdin)
|
||||
|
||||
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Stdio) =
|
||||
## Connect to an external dataspace over stdio.
|
||||
|
@ -342,15 +342,17 @@ when defined(posix):
|
|||
if flags < 0: raiseOSError(osLastError())
|
||||
if fcntl(fd.cint, F_SETFL, flags or O_NONBLOCK) < 0:
|
||||
raiseOSError(osLastError())
|
||||
let entity = StdioControlEntity(
|
||||
relay: relay, stdin: newAsyncFile(FD fd))
|
||||
let entity = StdioEntity(
|
||||
facet: turn.facet, relay: relay, stdin: newAsyncFile(FD fd))
|
||||
onStop(entity.facet) do (turn: var Turn):
|
||||
entity.alive = false
|
||||
discard trampoline:
|
||||
whelp loop(entity)
|
||||
publish(turn, ds, TransportConnection(
|
||||
`addr`: ta.toPreserves,
|
||||
control: newCap(entity, turn),
|
||||
resolved: relay.peer.accepted,
|
||||
))
|
||||
discard trampoline:
|
||||
whelp loop(entity)
|
||||
|
||||
proc connectStdio*(turn: var Turn; ds: Cap) =
|
||||
## Connect to an external dataspace over stdin and stdout.
|
||||
|
@ -360,54 +362,45 @@ when defined(posix):
|
|||
TcpEntity = ref object of Entity
|
||||
relay: Relay
|
||||
sock: AsyncConn[sockets.Protocol.TCP]
|
||||
buf: ref seq[byte]
|
||||
alive: bool
|
||||
|
||||
UnixEntity = ref object of Entity
|
||||
relay: Relay
|
||||
sock: AsyncConn[sockets.Protocol.Unix]
|
||||
buf: ref seq[byte]
|
||||
alive: bool
|
||||
|
||||
SocketEntity = TcpEntity | UnixEntity
|
||||
|
||||
method message(entity: SocketEntity; turn: var Turn; ass: AssertionRef) =
|
||||
if ass.value.preservesTo(ForceDisconnect).isSome:
|
||||
reset entity.alive
|
||||
close(entity.sock)
|
||||
|
||||
template socketLoop() {.dirty.}=
|
||||
new entity.buf
|
||||
entity.buf[].setLen(0x1000)
|
||||
entity.alive = not entity.alive
|
||||
while entity.alive:
|
||||
let n = read(entity.sock, entity.buf)
|
||||
if n < 0: raiseOSError(osLastError())
|
||||
elif n == 0:
|
||||
stderr.writeLine "empty read on socket, stopping actor"
|
||||
stopActor(entity.relay.facet)
|
||||
else:
|
||||
entity.relay.recv(entity.buf[], 0..<n)
|
||||
stderr.writeLine "breaking socketLoop"
|
||||
|
||||
proc loop(entity: TcpEntity) {.asyncio.} =
|
||||
socketLoop()
|
||||
proc loop(entity: UnixEntity) {.asyncio.} =
|
||||
socketLoop()
|
||||
entity.alive = false
|
||||
|
||||
type ShutdownEntity = ref object of Entity
|
||||
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
|
||||
stopActor(turn)
|
||||
stopActor(e.facet)
|
||||
|
||||
template bootSocketEntity() {.dirty.} =
|
||||
proc publish(turn: var Turn) =
|
||||
proc setup(turn: var Turn) {.closure.} =
|
||||
proc kill(turn: var Turn) =
|
||||
entity.alive = false
|
||||
onStop(turn, kill)
|
||||
publish(turn, ds, TransportConnection(
|
||||
`addr`: ta.toPreserves,
|
||||
control: newCap(entity, turn),
|
||||
resolved: entity.relay.peer.accepted,
|
||||
))
|
||||
run(entity.relay.facet, publish)
|
||||
loop(entity)
|
||||
run(entity.relay.facet, setup)
|
||||
let buf = new seq[byte]
|
||||
entity.alive = true
|
||||
while entity.alive:
|
||||
buf[].setLen(0x1000)
|
||||
let n = read(entity.sock, buf)
|
||||
if n < 0: raiseOSError(osLastError())
|
||||
elif n == 0:
|
||||
stopActor(entity.facet)
|
||||
else:
|
||||
entity.relay.recv(buf[], 0..<n)
|
||||
close(entity.sock)
|
||||
|
||||
proc boot(entity: TcpEntity; ta: transportAddress.Tcp; ds: Cap) {.asyncio.} =
|
||||
entity.sock = connectTcpAsync(ta.host, Port ta.port)
|
||||
|
@ -426,10 +419,8 @@ when defined(posix):
|
|||
initialOid: 0.Oid.some,
|
||||
)
|
||||
spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay):
|
||||
entity.facet = turn.facet
|
||||
entity.relay = relay
|
||||
atExit(turn.facet.actor) do (turn: var Turn):
|
||||
entity.alive = false
|
||||
close(entity.sock)
|
||||
discard trampoline:
|
||||
whelp boot(entity, ta, ds)
|
||||
|
||||
|
@ -485,61 +476,57 @@ proc connectRoute(turn: var Turn; ds: Cap; route: Route; transOff: int) =
|
|||
type StepCallback = proc (turn: var Turn; step: Value; origin, next: Cap) {.closure.}
|
||||
|
||||
proc spawnStepResolver(turn: var Turn; ds: Cap; stepType: Value; cb: StepCallback) =
|
||||
spawnActor($stepType & "-step", turn) do (turn: var Turn):
|
||||
let stepPat = grabRecord(stepType, grab())
|
||||
let pat = ?Observe(pattern: ResolvedPathStep?:{1: stepPat}) ?? {0: grabLit(), 1: grab()}
|
||||
during(turn, ds, pat) do (origin: Cap; stepDetail: Literal[Value]):
|
||||
let step = toRecord(stepType, stepDetail.value)
|
||||
proc duringCallback(turn: var Turn; ass: Value; h: Handle): TurnAction =
|
||||
var res = ass.preservesTo Resolved
|
||||
if res.isSome:
|
||||
if res.get.orKind == ResolvedKind.accepted and
|
||||
res.get.accepted.responderSession of Cap:
|
||||
cb(turn, step, origin, res.get.accepted.responderSession.Cap)
|
||||
else:
|
||||
publish(turn, ds, ResolvedPathStep(
|
||||
origin: origin, pathStep: step, resolved: res.get))
|
||||
proc action(turn: var Turn) =
|
||||
stop(turn)
|
||||
result = action
|
||||
publish(turn, origin, Resolve(
|
||||
step: step, observer: newCap(turn, during(duringCallback))))
|
||||
let stepPat = grabRecord(stepType, grab())
|
||||
let pat = ?Observe(pattern: ResolvedPathStep?:{1: stepPat}) ?? {0: grabLit(), 1: grab()}
|
||||
during(turn, ds, pat) do (origin: Cap; stepDetail: Literal[Value]):
|
||||
let step = toRecord(stepType, stepDetail.value)
|
||||
proc duringCallback(turn: var Turn; ass: Value; h: Handle): TurnAction =
|
||||
var res = ass.preservesTo Resolved
|
||||
if res.isSome:
|
||||
if res.get.orKind == ResolvedKind.accepted and
|
||||
res.get.accepted.responderSession of Cap:
|
||||
cb(turn, step, origin, res.get.accepted.responderSession.Cap)
|
||||
else:
|
||||
publish(turn, ds, ResolvedPathStep(
|
||||
origin: origin, pathStep: step, resolved: res.get))
|
||||
proc action(turn: var Turn) =
|
||||
stop(turn)
|
||||
result = action
|
||||
publish(turn, origin, Resolve(
|
||||
step: step, observer: newCap(turn, during(duringCallback))))
|
||||
|
||||
proc spawnRelays*(turn: var Turn; ds: Cap) =
|
||||
## Spawn actors that manage routes and appeasing gatekeepers.
|
||||
spawnActor("transport-connector", turn) do (turn: var Turn):
|
||||
let pat = ?Observe(pattern: !TransportConnection) ?? { 0: grab() }
|
||||
# Use a generic pattern and type matching
|
||||
# in the during handler because it is easy.
|
||||
let transPat = ?Observe(pattern: !TransportConnection) ?? { 0: grab() }
|
||||
# Use a generic pattern and type matching
|
||||
# in the during handler because it is easy.
|
||||
|
||||
let stdioPat = ?Observe(pattern: TransportConnection?:{0: ?:Stdio})
|
||||
during(turn, ds, stdioPat) do:
|
||||
connectTransport(turn, ds, Stdio())
|
||||
let stdioPat = ?Observe(pattern: TransportConnection?:{0: ?:Stdio})
|
||||
during(turn, ds, stdioPat) do:
|
||||
connectTransport(turn, ds, Stdio())
|
||||
|
||||
# TODO: tcp pattern
|
||||
during(turn, ds, transPat) do (ta: Literal[transportAddress.Tcp]):
|
||||
try: connectTransport(turn, ds, ta.value)
|
||||
except CatchableError as e:
|
||||
publish(turn, ds, TransportConnection(
|
||||
`addr`: ta.toPreserve,
|
||||
resolved: rejected(embed e),
|
||||
))
|
||||
|
||||
# TODO: tcp pattern
|
||||
during(turn, ds, pat) do (ta: Literal[transportAddress.Tcp]):
|
||||
try: connectTransport(turn, ds, ta.value)
|
||||
except CatchableError as e:
|
||||
publish(turn, ds, TransportConnection(
|
||||
`addr`: ta.toPreserve,
|
||||
resolved: rejected(embed e),
|
||||
))
|
||||
# TODO: unix pattern
|
||||
during(turn, ds, transPat) do (ta: Literal[transportAddress.Unix]):
|
||||
try: connectTransport(turn, ds, ta.value)
|
||||
except CatchableError as e:
|
||||
publish(turn, ds, TransportConnection(
|
||||
`addr`: ta.toPreserve,
|
||||
resolved: rejected(embed e),
|
||||
))
|
||||
|
||||
# TODO: unix pattern
|
||||
during(turn, ds, pat) do (ta: Literal[transportAddress.Unix]):
|
||||
try: connectTransport(turn, ds, ta.value)
|
||||
except CatchableError as e:
|
||||
publish(turn, ds, TransportConnection(
|
||||
`addr`: ta.toPreserve,
|
||||
resolved: rejected(embed e),
|
||||
))
|
||||
|
||||
spawnActor("path-resolver", turn) do (turn: var Turn):
|
||||
let pat = ?Observe(pattern: !ResolvePath) ?? {0: grab()}
|
||||
during(turn, ds, pat) do (route: Literal[Route]):
|
||||
for i, transAddr in route.value.transports:
|
||||
connectRoute(turn, ds, route.value, i)
|
||||
let resolvePat = ?Observe(pattern: !ResolvePath) ?? {0: grab()}
|
||||
during(turn, ds, resolvePat) do (route: Literal[Route]):
|
||||
for i, transAddr in route.value.transports:
|
||||
connectRoute(turn, ds, route.value, i)
|
||||
|
||||
spawnStepResolver(turn, ds, "ref".toSymbol) do (
|
||||
turn: var Turn, step: Value, origin: Cap, next: Cap):
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# Package
|
||||
|
||||
version = "20240304"
|
||||
version = "20240308"
|
||||
author = "Emery Hemingway"
|
||||
description = "Syndicated actors for conversational concurrency"
|
||||
license = "Unlicense"
|
||||
|
|
Loading…
Reference in New Issue