Use global turn queue

This commit is contained in:
Emery Hemingway 2024-03-08 12:01:20 +00:00
parent 15d2e8bfb4
commit 9d975bab56
11 changed files with 416 additions and 385 deletions

View File

@ -54,7 +54,7 @@ The Syndicate DSL can be entered using `runActor` which calls a Nim body with a
### Publish
``` nim
runActor("main") do (dataspace: Ref; turn: var Turn):
runActor("main") do (dataspace: Ref; turn: Turn):
let presenceHandle = publish(turn, dataspace, Present(username: "Judy"))
# publish <Present "Judy"> to the dataspace
# the assertion can be later retracted by handle
@ -67,7 +67,7 @@ runActor("main") do (dataspace: Ref; turn: var Turn):
We can react to assertions and messages within dataspaces using [patterns](https://synit.org/book/glossary.html#dataspace-pattern). Patterns are constructed using a Nim type and the `?` operator. Again a Nim type is used rather than a raw Preserves for schema consistency.
``` nim
runActor("main") do (dataspace: Ref; turn: var Turn):
runActor("main") do (dataspace: Ref; turn: Turn):
during(turn, dataspace, ?Present) do (who: string):
# This body is active when the ?Present pattern is matched.
# The Present type contains two atomic values that can be matched

View File

@ -35,21 +35,21 @@ proc `??`*(pat: Pattern; bindings: openArray[(int, Pattern)]): Pattern {.inline.
patterns.inject(pat, bindings)
type
PublishProc = proc (turn: var Turn; v: Value; h: Handle) {.closure.}
RetractProc = proc (turn: var Turn; h: Handle) {.closure.}
MessageProc = proc (turn: var Turn; v: Value) {.closure.}
PublishProc = proc (turn: Turn; v: Value; h: Handle) {.closure.}
RetractProc = proc (turn: Turn; h: Handle) {.closure.}
MessageProc = proc (turn: Turn; v: Value) {.closure.}
ClosureEntity = ref object of Entity
publishImpl: PublishProc
retractImpl: RetractProc
messageImpl: MessageProc
publishImpl*: PublishProc
retractImpl*: RetractProc
messageImpl*: MessageProc
method publish(e: ClosureEntity; turn: var Turn; a: AssertionRef; h: Handle) =
method publish(e: ClosureEntity; turn: Turn; a: AssertionRef; h: Handle) =
if not e.publishImpl.isNil: e.publishImpl(turn, a.value, h)
method retract(e: ClosureEntity; turn: var Turn; h: Handle) =
method retract(e: ClosureEntity; turn: Turn; h: Handle) =
if not e.retractImpl.isNil: e.retractImpl(turn, h)
method message(e: ClosureEntity; turn: var Turn; a: AssertionRef) =
method message(e: ClosureEntity; turn: Turn; a: AssertionRef) =
if not e.messageImpl.isNil: e.messageImpl(turn, a.value)
proc argumentCount(handler: NimNode): int =
@ -94,7 +94,7 @@ proc wrapPublishHandler(turn, handler: NimNode): NimNode =
handlerSym = genSym(nskProc, "publish")
bindingsSym = ident"bindings"
quote do:
proc `handlerSym`(`turn`: var Turn; `bindingsSym`: Value; `handleSym`: Handle) =
proc `handlerSym`(`turn`: Turn; `bindingsSym`: Value; `handleSym`: Handle) =
`varSection`
if fromPreserves(`valuesSym`, bindings):
`publishBody`
@ -106,7 +106,7 @@ proc wrapMessageHandler(turn, handler: NimNode): NimNode =
handlerSym = genSym(nskProc, "message")
bindingsSym = ident"bindings"
quote do:
proc `handlerSym`(`turn`: var Turn; `bindingsSym`: Value) =
proc `handlerSym`(`turn`: Turn; `bindingsSym`: Value) =
`varSection`
if fromPreserves(`valuesSym`, bindings):
`body`
@ -120,17 +120,17 @@ proc wrapDuringHandler(turn, entryBody, exitBody: NimNode): NimNode =
duringSym = genSym(nskProc, "during")
if exitBody.isNil:
quote do:
proc `duringSym`(`turn`: var Turn; `bindingsSym`: Value; `handleSym`: Handle): TurnAction =
proc `duringSym`(`turn`: Turn; `bindingsSym`: Value; `handleSym`: Handle): TurnAction =
`varSection`
if fromPreserves(`valuesSym`, `bindingsSym`):
`publishBody`
else:
quote do:
proc `duringSym`(`turn`: var Turn; `bindingsSym`: Value; `handleSym`: Handle): TurnAction =
proc `duringSym`(`turn`: Turn; `bindingsSym`: Value; `handleSym`: Handle): TurnAction =
`varSection`
if fromPreserves(`valuesSym`, `bindingsSym`):
`publishBody`
proc action(`turn`: var Turn) =
proc action(`turn`: Turn) =
`exitBody`
result = action
@ -190,5 +190,5 @@ macro during*(turn: untyped; ds: Cap; pattern: Pattern; publishBody: untyped) =
proc runActor*(name: string; bootProc: TurnAction) =
## Boot an actor `Actor` and churn ioqueue once.
let actor = bootActor(name, bootProc)
ioqueue.run()
discard bootActor(name, bootProc)
actors.run()

View File

@ -1,7 +1,10 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[hashes, monotimes, options, sets, tables, times]
import std/[deques, hashes, monotimes, options, sets, tables, times]
import pkg/cps
import pkg/sys/ioqueue
import preserves
import ../syndicate/protocols/[protocol, sturdy]
@ -14,17 +17,6 @@ when tracing:
export Handle
template generateIdType(typ: untyped) =
type typ* = distinct Natural
proc `==`*(x, y: typ): bool {.borrow.}
proc `$`*(id: typ): string {.borrow.}
generateIdType(ActorId)
generateIdType(FacetId)
generateIdType(EndpointId)
generateIdType(FieldId)
generateIdType(TurnId)
type
Oid = sturdy.Oid
Caveat = sturdy.Caveat
@ -37,11 +29,13 @@ type
# C code has "redefinition of struct" problems when orc is enabled
Entity* = ref object of RootObj
facet*: Facet
oid*: Oid # oid is how Entities are identified over the wire
Cap* {.preservesEmbedded.} = ref object of EmbeddedObj
relay*: Facet
target*: Entity
relay*: Facet
# Entity has facet but a Cap is also scoped to a relay Facet
attenuation*: Attenuation
Ref* {.deprecated: "Ref was renamed to Cap".} = Cap
@ -53,6 +47,7 @@ type
OutboundTable = Table[Handle, OutboundAssertion]
Actor* = ref object
next: Actor
name: string
handleAllocator: ref Handle
# a fresh actor gets a new ref Handle and
@ -61,18 +56,15 @@ type
exitReason: ref Exception
exitHooks: seq[TurnAction]
id: ActorId
facetIdAllocator: uint
exiting, exited: bool
when tracing:
turnIdAllocator: ref TurnId
traceStream: FileStream
TurnAction* = proc (t: var Turn) {.closure.}
TurnAction* = proc (t: Turn) {.closure.}
Queues = TableRef[Facet, seq[TurnAction]]
Turn* = object # an object that should remain on the stack
facet: Facet
queues: Queues # a ref object that can outlive Turn
Turn* = ref object
facet: Facet # active facet that may change during a turn
work: Deque[tuple[facet: Facet, act: TurnAction]]
effects: seq[Turn]
when tracing:
desc: TurnDescription
@ -87,20 +79,30 @@ type
id: FacetId
isAlive: bool
when tracing:
var turnQueue {.threadvar.}: Deque[Turn]
proc nextTurnId(facet: Facet): TurnId =
result = succ(facet.actor.turnIdAllocator[])
facet.actor.turnIdAllocator[] = result
when tracing:
proc openTraceStream: FileStream =
let path = getEnv("SYNDICATE_TRACE_FILE")
case path
of "": stderr.writeLine "$SYNDICATE_TRACE_FILE unset"
of "-": result = newFileStream(stderr)
else: result = openFileStream(path, fmWrite)
let traceStream = openTraceStream()
var turnIdAllocator: uint
proc nextTurnId(): TurnId =
inc(turnIdAllocator)
turnIdAllocator.toPreserves
proc trace(actor: Actor; act: ActorActivation) =
if not actor.traceStream.isNil:
if not traceStream.isNil:
var entry = TraceEntry(
timestamp: getTime().toUnixFloat(),
actor: initRecord("named", actor.name.toPreserves),
item: act)
actor.traceStream.writeText entry.toPreserves
actor.traceStream.writeLine()
traceStream.write(entry.toPreserves)
proc path(facet: Facet): seq[trace.FacetId] =
var f = facet
@ -108,15 +110,15 @@ when tracing:
result.add f.id.toPreserves
f = f.parent
method publish*(e: Entity; turn: var Turn; v: AssertionRef; h: Handle) {.base.} = discard
method retract*(e: Entity; turn: var Turn; h: Handle) {.base.} = discard
method message*(e: Entity; turn: var Turn; v: AssertionRef) {.base.} = discard
method sync*(e: Entity; turn: var Turn; peer: Cap) {.base.} = discard
method publish*(e: Entity; turn: Turn; v: AssertionRef; h: Handle) {.base.} = discard
method retract*(e: Entity; turn: Turn; h: Handle) {.base.} = discard
method message*(e: Entity; turn: Turn; v: AssertionRef) {.base.} = discard
method sync*(e: Entity; turn: Turn; peer: Cap) {.base.} = discard
using
actor: Actor
facet: Facet
turn: var Turn
turn: Turn
action: TurnAction
proc labels(f: Facet): string =
@ -139,11 +141,14 @@ proc `$`*(r: Cap): string =
proc `$`*(actor: Actor): string =
"<Actor:" & actor.name & ">" # TODO: ambigous
proc `$`*(t: Turn): string =
"<Turn:" & $t.desc.id & ">"
proc attenuate(r: Cap; a: Attenuation): Cap =
if a.len == 0: result = r
else: result = Cap(
relay: r.relay,
target: r.target,
relay: r.relay,
attenuation: a & r.attenuation)
proc hash*(facet): Hash =
@ -155,13 +160,42 @@ proc nextHandle(facet: Facet): Handle =
result = succ(facet.actor.handleAllocator[])
facet.actor.handleAllocator[] = result
proc facet*(turn: var Turn): Facet = turn.facet
proc queueWork*(turn: Turn; facet: Facet; act: TurnAction) =
assert not facet.isNil
turn.work.addLast((facet, act,))
proc enqueue(turn: var Turn; target: Facet; action: TurnAction) =
if target in turn.queues:
turn.queues[target].add action
else:
turn.queues[target] = @[action]
proc queueTurn*(facet: Facet; act: TurnAction) =
var turn = Turn(facet: facet)
assert not facet.isNil
turn.work.addLast((facet, act,))
when tracing:
turn.desc.id = nextTurnId()
turnQueue.addLast(turn)
proc queueTurn*(prev: Turn; facet: Facet; act: TurnAction) =
var next = Turn(facet: facet)
assert not facet.isNil
next.work.addLast((facet, act,))
when tracing:
next.desc.id = nextTurnId()
next.desc.cause = TurnCause(orKind: TurnCauseKind.turn)
next.desc.cause.turn.id = prev.desc.id
turnQueue.addLast(next)
proc run*(facet: Facet; action: TurnAction) = queueTurn(facet, action)
## Alias to queueTurn_.
proc facet*(turn: Turn): Facet = turn.facet
proc queueEffect*(turn: Turn; target: Facet; act: TurnAction) =
var next = Turn(facet: target)
assert not target.isNil
next.work.addLast((target, act,))
when tracing:
next.desc.id = nextTurnId()
next.desc.cause = TurnCause(orKind: TurnCauseKind.turn)
next.desc.cause.turn.id = turn.desc.id
turn.effects.add next
type Bindings = Table[Value, Value]
@ -275,78 +309,78 @@ proc runRewrites*(a: Attenuation; v: Value): Value =
result = examineAlternatives(stage, result)
if result.isFalse: break
proc publish(turn: var Turn; r: Cap; v: Value; h: Handle) =
var a = runRewrites(r.attenuation, v)
proc publish(turn: Turn; cap: Cap; v: Value; h: Handle) =
var a = runRewrites(cap.attenuation, v)
if not a.isFalse:
let e = OutboundAssertion(handle: h, peer: r)
let e = OutboundAssertion(handle: h, peer: cap)
turn.facet.outbound[h] = e
enqueue(turn, r.relay) do (turn: var Turn):
queueEffect(turn, cap.relay) do (turn: Turn):
e.established = true
publish(r.target, turn, AssertionRef(value: a), e.handle)
publish(cap.target, turn, AssertionRef(value: a), e.handle)
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.enqueue)
act.enqueue.event.target.actor = turn.facet.actor.id.toPreserves
act.enqueue.event.target.facet = turn.facet.id.toPreserves
act.enqueue.event.target.oid = r.target.oid.toPreserves
act.enqueue.event.target.oid = cap.target.oid.toPreserves
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.assert)
act.enqueue.event.detail.assert.assertion.value.value =
mapEmbeds(v) do (cap: Value) -> Value: discard
act.enqueue.event.detail.assert.handle = h
turn.desc.actions.add act
proc publish*(turn: var Turn; r: Cap; a: Value): Handle {.discardable.} =
proc publish*(turn: Turn; r: Cap; a: Value): Handle {.discardable.} =
result = turn.facet.nextHandle()
publish(turn, r, a, result)
proc publish*[T](turn: var Turn; r: Cap; a: T): Handle {.discardable.} =
proc publish*[T](turn: Turn; r: Cap; a: T): Handle {.discardable.} =
publish(turn, r, a.toPreserves)
proc retract(turn: var Turn; e: OutboundAssertion) =
enqueue(turn, e.peer.relay) do (turn: var Turn):
proc retract(turn: Turn; e: OutboundAssertion) =
queueEffect(turn, e.peer.relay) do (turn: Turn):
if e.established:
e.established = false
e.peer.target.retract(turn, e.handle)
proc retract*(turn: var Turn; h: Handle) =
proc retract*(turn: Turn; h: Handle) =
var e: OutboundAssertion
if turn.facet.outbound.pop(h, e):
turn.retract(e)
proc message*(turn: var Turn; r: Cap; v: Value) =
proc message*(turn: Turn; r: Cap; v: Value) =
var a = runRewrites(r.attenuation, v)
if not a.isFalse:
enqueue(turn, r.relay) do (turn: var Turn):
queueEffect(turn, r.relay) do (turn: Turn):
r.target.message(turn, AssertionRef(value: a))
proc message*[T](turn: var Turn; r: Cap; v: T) =
message(turn, r, v.toPreserves)
proc message*[T](turn: Turn; r: Cap; v: T) =
queueEffect(turn, r.relay) do (turn: Turn):
message(turn, r, v.toPreserves)
proc sync(turn: var Turn; e: Entity; peer: Cap) =
proc sync(turn: Turn; e: Entity; peer: Cap) =
e.sync(turn, peer)
proc sync*(turn: var Turn; r, peer: Cap) =
enqueue(turn, r.relay) do (turn: var Turn):
proc sync*(turn: Turn; r, peer: Cap) =
queueEffect(turn, r.relay) do (turn: Turn):
sync(turn, r.target, peer)
proc replace*[T](turn: var Turn; cap: Cap; h: Handle; v: T): Handle =
proc replace*[T](turn: Turn; cap: Cap; h: Handle; v: T): Handle =
result = publish(turn, cap, v)
if h != default(Handle):
retract(turn, h)
proc replace*[T](turn: var Turn; cap: Cap; h: var Handle; v: T): Handle {.discardable.} =
proc replace*[T](turn: Turn; cap: Cap; h: var Handle; v: T): Handle {.discardable.} =
var old = h
h = publish(turn, cap, v)
if old != default(Handle):
retract(turn, old)
h
proc stop*(turn: var Turn)
proc run*(facet; action: TurnAction; zombieTurn = false)
proc stop*(turn: Turn)
proc newFacet(actor; parent: Facet; initialAssertions: OutboundTable): Facet =
inc actor.facetIdAllocator
result = Facet(
id: getMonoTime().ticks.FacetId,
id: actor.facetIdAllocator.toPreserves,
actor: actor,
parent: parent,
outbound: initialAssertions,
@ -358,9 +392,12 @@ proc newFacet(actor; parent: Facet): Facet =
newFacet(actor, parent, initialAssertions)
proc isInert(facet): bool =
result = facet.children.len == 0 and
(facet.outbound.len == 0 or facet.parent.isNil) and
facet.inertCheckPreventers == 0
let
noKids = facet.children.len == 0
noOutboundHandles = facet.outbound.len == 0
isRootFacet = facet.parent.isNil
noInertCheckPreventers = facet.inertCheckPreventers == 0
noKids and (noOutboundHandles or isRootFacet) and noInertCheckPreventers
proc preventInertCheck*(facet): (proc()) {.discardable.} =
var armed = true
@ -371,22 +408,15 @@ proc preventInertCheck*(facet): (proc()) {.discardable.} =
dec facet.inertCheckPreventers
result = disarm
proc inFacet(turn: var Turn; facet; act: TurnAction) =
## Call an action with a facet using a temporary `Turn`
## that shares the `Queues` of the calling `Turn`.
var t = Turn(facet: facet, queues: turn.queues)
act(t)
proc terminate(actor; turn; reason: ref Exception)
proc terminate(facet; turn: var Turn; orderly: bool) =
proc terminate(facet; turn: Turn; orderly: bool) =
if facet.isAlive:
facet.isAlive = false
let parent = facet.parent
if not parent.isNil:
parent.children.excl facet
block:
var turn = Turn(facet: facet, queues: turn.queues)
queueWork(turn, facet) do (turn: Turn):
while facet.children.len > 0:
facet.children.pop.terminate(turn, orderly)
if orderly:
@ -398,6 +428,7 @@ proc terminate(facet; turn: var Turn; orderly: bool) =
if parent.isInert:
parent.terminate(turn, true)
else:
stderr.writeLine "terminate facet is terminating ", facet.actor.name
terminate(facet.actor, turn, nil)
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
@ -405,78 +436,102 @@ proc terminate(facet; turn: var Turn; orderly: bool) =
turn.desc.actions.add act
proc stopIfInertAfter(action: TurnAction): TurnAction =
proc wrapper(turn: var Turn) =
# TODO: verify this
proc wrapper(turn: Turn) =
action(turn)
enqueue(turn, turn.facet) do (turn: var Turn):
queueEffect(turn, turn.facet) do (turn: Turn):
if (not turn.facet.parent.isNil and
(not turn.facet.parent.isAlive)) or
turn.facet.isInert:
stderr.writeLine("stopIfInertAfter stops turn")
stop(turn)
wrapper
proc newFacet*(turn: var Turn): Facet = newFacet(turn.facet.actor, turn.facet)
proc newFacet(turn: Turn): Facet = newFacet(turn.facet.actor, turn.facet)
proc inFacet*(turn: var Turn; bootProc: TurnAction): Facet =
proc inFacet*(turn: Turn; bootProc: TurnAction): Facet =
result = newFacet(turn)
let facet = turn.facet
turn.facet = result
when tracing:
var act = ActionDescription(orKind: ActionDescriptionKind.facetstart)
act.facetstart.path.add result.path
turn.desc.actions.add act
inFacet(turn, result, stopIfInertAfter(bootProc))
stopIfInertAfter(bootProc)(turn)
turn.facet = facet
proc facet*(turn: var Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc)
proc facet(turn: Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc)
proc newActor(name: string; handleAlloc: ref Handle): Actor =
let
now = getTime()
seed = now.toUnix * 1_000_000_000 + now.nanosecond
proc newActor(name: string; parent: Facet): Actor =
result = Actor(
name: name,
id: ActorId(seed),
handleAllocator: handleAlloc,
id: name.toPreserves,
)
result.root = newFacet(result, nil)
if parent.isNil:
new result.handleAllocator
else:
result.handleAllocator = parent.actor.handleAllocator
result.root = newFacet(result, parent)
when tracing:
var act = ActorActivation(orKind: ActorActivationKind.start)
act.start.actorName = Name(orKind: NameKind.named)
act.start.actorName.named.name = name.toPreserves
trace(result, act)
proc run(actor; bootProc: TurnAction; initialAssertions: OutboundTable) =
run(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc))
proc run(actor: Actor; bootProc: TurnAction; initialAssertions: OutboundTable) =
queueTurn(newFacet(actor, actor.root, initialAssertions), stopIfInertAfter(bootProc))
proc bootActor*(name: string; bootProc: TurnAction): Actor =
var initialAssertions: OutboundTable
result = newActor(name, new(ref Handle))
proc bootActor*(name: string; bootProc: TurnAction): Actor {.discardable.} =
## Boot a top-level actor.
result = newActor(name, nil)
new result.handleAllocator
var turn = Turn(facet: result.root)
assert not result.root.isNil
turn.work.addLast((result.root, bootProc,))
when tracing:
new result.turnIdAllocator
let path = getEnv("SYNDICATE_TRACE_FILE", "/tmp/" & name & ".trace.pr")
case path
of "": stderr.writeLine "$SYNDICATE_TRACE_FILE unset, not tracing actor ", name
of "-": result.traceStream = newFileStream(stderr)
else: result.traceStream = openFileStream(path, fmWrite)
run(result, bootProc, initialAssertions)
turn.desc.id = nextTurnId()
turn.desc.cause = TurnCause(orKind: TurnCauseKind.external)
turn.desc.cause.external.description = "bootActor".toPreserves
turnQueue.addLast turn
proc spawnActor*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
let actor = newActor(name, turn.facet.actor.handleAllocator)
enqueue(turn, turn.facet) do (turn: var Turn):
proc spawnActor*(name: string; turn: Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
let actor = newActor(name, turn.facet)
queueEffect(turn, actor.root) do (turn: Turn):
var newOutBound: Table[Handle, OutboundAssertion]
for key in initialAssertions:
discard turn.facet.outbound.pop(key, newOutbound[key])
when tracing:
actor.turnIdAllocator = turn.facet.actor.turnIdAllocator
actor.traceStream = turn.facet.actor.traceStream
var act = ActionDescription(orKind: ActionDescriptionKind.spawn)
act.spawn.id = actor.id.toPreserves
turn.desc.actions.add act
run(actor, bootProc, newOutBound)
actor
proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
proc spawn*(name: string; turn: Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
spawnActor(name, turn, bootProc, initialAssertions)
type StopOnRetract = ref object of Entity
method retract*(e: StopOnRetract; turn: Turn; h: Handle) =
stderr.writeLIne "StopOnRetract stops turn ", turn, " of ", turn.facet.actor
stop(turn)
proc halfLink(facet, other: Facet) =
let h = facet.nextHandle()
facet.outbound[h] = OutboundAssertion(
handle: h,
peer: Cap(relay: other, target: StopOnRetract(facet: facet)),
established: true,
)
proc spawnLink*(name: string; turn: Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
result = spawnActor(name, turn, bootProc, initialAssertions)
halfLink(turn.facet, result.root)
halfLink(result.root, turn.facet)
proc newInertCap*(): Cap =
let a = bootActor("inert") do (turn: var Turn): turn.stop()
new result
let a = bootActor("inert") do (turn: Turn): turn.stop()
Cap(relay: a.root)
proc atExit*(actor; action) = actor.exitHooks.add action
@ -492,115 +547,67 @@ proc terminate(actor; turn; reason: ref Exception) =
act.stop.status.error.message = reason.msg
trace(actor, act)
for hook in actor.exitHooks: hook(turn)
proc finish(turn: var Turn) =
proc finish(turn: Turn) =
assert not actor.root.isNil, actor.name
actor.root.terminate(turn, reason.isNil)
actor.exited = true
block:
run(actor.root, finish, true)
queueTurn(actor.root, finish)
proc terminate*(facet; e: ref Exception) =
run(facet.actor.root) do (turn: var Turn):
run(facet.actor.root) do (turn: Turn):
facet.actor.terminate(turn, e)
#[
proc asyncCheck*(facet: Facet; fut: FutureBase) =
## Sets a callback on `fut` which propagates exceptions to `facet`.
addCallback(fut) do ():
if fut.failed: terminate(facet, fut.error)
template recallFacet(turn: Turn; body: untyped): untyped =
let facet = turn.facet
block:
body
assert facet.actor == turn.facet.actor, "turn of " & $facet.actor & " ended at " & $turn.facet.actor
turn.facet = facet
proc asyncCheck*(turn; fut: FutureBase) =
## Sets a callback on `fut` which propagates exceptions to the facet of `turn`.
asyncCheck(turn.facet, fut)
]#
template tryFacet(facet; body: untyped) =
try: body
except CatchableError as err: terminate(facet, err)
proc run*(facet; action: TurnAction; zombieTurn = false) =
if true and zombieTurn or (facet.actor.exitReason.isNil and facet.isAlive):
tryFacet(facet):
var queues = newTable[Facet, seq[TurnAction]]()
block:
var turn = Turn(facet: facet, queues: queues)
action(turn)
when tracing:
turn.desc.id = facet.nextTurnId.toPreserves
facet.actor.trace ActorActivation(
orKind: ActorActivationKind.turn, turn: turn.desc)
for facet, queue in queues:
for action in queue: run(facet, action)
proc run*(cap: Cap; action: TurnAction) =
## Convenience proc to run a `TurnAction` in the scope of a `Cap`.
run(cap.relay, action)
#[
proc addCallback*(fut: FutureBase; facet: Facet; act: TurnAction) =
## Add a callback to a `Future` that will be called at a later `Turn`
## within the context of `facet`.
addCallback(fut) do ():
if fut.failed: terminate(facet, fut.error)
else:
when tracing:
run(facet) do (turn: var Turn):
turn.desc.cause = TurnCause(orKind: TurnCauseKind.external)
turn.desc.cause.external.description = "Future".toPreserves
act(turn)
proc stopNow(turn: Turn) =
let caller = turn.facet
recallFacet turn:
while caller.children.len > 0:
var child = caller.children.pop()
if child.actor == caller.actor:
turn.facet = child
stopNow(turn)
else:
run(facet, act)
queueEffect(turn, child, stopNow)
caller.terminate(turn, true)
proc addCallback*(fut: FutureBase; turn: var Turn; act: TurnAction) =
## Add a callback to a `Future` that will be called at a later `Turn`
## with the same context as the current.
if fut.failed:
terminate(turn.facet, fut.error)
elif fut.finished:
enqueue(turn, turn.facet, act)
else:
addCallback(fut, turn.facet, act)
proc stop*(turn: Turn, facet: Facet) =
queueEffect(turn, facet, stopNow)
proc addCallback*[T](fut: Future[T]; turn: var Turn; act: proc (t: var Turn, x: T) {.closure.}) =
addCallback(fut, turn) do (turn: var Turn):
if fut.failed: terminate(turn.facet, fut.error)
else:
when tracing:
turn.desc.cause = TurnCause(orKind: TurnCauseKind.external)
turn.desc.cause.external.description = "Future".toPreserves
act(turn, read fut)
]#
proc stop*(turn: var Turn, facet: Facet) =
if facet.parent.isNil:
facet.terminate(turn, true)
else:
enqueue(turn, facet.parent) do (turn: var Turn):
facet.terminate(turn, true)
proc stop*(turn: var Turn) =
proc stop*(turn: Turn) =
stop(turn, turn.facet)
proc onStop*(facet: Facet; act: TurnAction) =
## Add a `proc (turn: var Turn)` action to `facet` to be called as it stops.
## Add a `proc (turn: Turn)` action to `facet` to be called as it stops.
assert not facet.isNil
add(facet.shutdownActions, act)
proc stopActor*(facet: Facet) =
run(facet) do (turn: var Turn):
let actor = facet.actor
enqueue(turn, actor.root) do (turn: var Turn):
terminate(actor, turn, nil)
proc stopActor*(turn: var Turn) =
stopActor(turn.facet)
proc onStop*(turn: Turn; act: TurnAction) =
onStop(turn.facet, act)
proc stop*(actor: Actor) =
stopActor(actor.root)
queueTurn(actor.root) do (turn: Turn):
assert(not turn.facet.isNil)
stop(turn)
proc freshen*(turn: var Turn, act: TurnAction) =
assert(turn.queues.len == 0, "Attempt to freshen a non-stale Turn")
proc stopActor*(facet: Facet) =
stop(facet.actor)
proc stopActor*(turn: Turn) =
assert(not turn.facet.isNil)
assert(not turn.facet.actor.isNil)
assert(not turn.facet.actor.root.isNil)
stop(turn, turn.facet.actor.root)
proc freshen*(turn: Turn, act: TurnAction) {.deprecated.} =
run(turn.facet, act)
proc newCap*(relay: Facet; e: Entity): Cap =
proc newCap*(relay: Facet; e: Entity): Cap {.deprecated.} =
Cap(relay: relay, target: e)
proc newCap*(turn; e: Entity): Cap =
@ -612,13 +619,42 @@ proc newCap*(e: Entity; turn): Cap =
type SyncContinuation {.final.} = ref object of Entity
action: TurnAction
method message(entity: SyncContinuation; turn: var Turn; v: AssertionRef) =
method message(entity: SyncContinuation; turn: Turn; v: AssertionRef) =
entity.action(turn)
proc sync*(turn: var Turn; refer: Cap; act: TurnAction) =
proc sync*(turn: Turn; refer: Cap; act: TurnAction) =
sync(turn, refer, newCap(turn, SyncContinuation(action: act)))
proc running*(actor): bool =
result = not actor.exited
if not (result or actor.exitReason.isNil):
raise actor.exitReason
proc run(turn: Turn) =
while turn.work.len > 0:
var (facet, act) = turn.work.popFirst()
assert not act.isNil
turn.facet = facet
act(turn)
when tracing:
var act = ActorActivation(orKind: ActorActivationKind.turn)
act.turn = move turn.desc
trace(turn.facet.actor, act)
turn.facet = nil # invalidate the turn
# TODO: catch exceptions here
while turn.effects.len > 0:
turnQueue.addLast turn.effects.pop()
proc run* =
## Run actors to completion
var ready: seq[Continuation]
while true:
while turnQueue.len > 0:
var turn = turnQueue.popFirst()
# TODO: check if actor is still valid
run turn
ioqueue.poll(ready)
if ready.len == 0: break
while ready.len > 0:
discard trampoline:
ready.pop()

View File

@ -59,7 +59,7 @@ when defined(linux):
proc spawnTimerDriver(facet: Facet; cap: Cap): TimerDriver =
let driver = TimerDriver(facet: facet, target: cap)
facet.onStop do (turn: var Turn):
facet.onStop do (turn: Turn):
for fd in driver.timers:
unregister(FD fd)
discard close(fd)
@ -89,16 +89,16 @@ when defined(linux):
wait(FD fd, Read)
if deadline in driver.deadlines:
# Check if the deadline is still observed.
proc turnWork(turn: var Turn) =
proc turnWork(turn: Turn) =
discard publish(turn, driver.target, LaterThan(seconds: deadline))
run(driver.facet, turnWork)
discard close(fd)
driver.timers.excl(fd)
proc spawnTimerActor*(ds: Cap; turn: var Turn): Actor {.discardable.} =
proc spawnTimerActor*(turn: Turn; ds: Cap): Actor {.discardable.} =
## Spawn a timer actor that responds to
## dataspace observations of timeouts on `ds`.
spawnActor("timers", turn) do (turn: var Turn):
spawnLink("timers", turn) do (turn: Turn):
let driver = spawnTimerDriver(turn.facet, ds)
let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()})
during(turn, ds, pat) do (deadline: float):
@ -108,7 +108,7 @@ when defined(linux):
discard change(driver.deadlines, deadline, -1, clamp = true)
# TODO: retract assertions that are unobserved.
proc after*(turn: var Turn; ds: Cap; dur: Duration; act: TurnAction) =
proc after*(turn: Turn; ds: Cap; dur: Duration; act: TurnAction) =
## Execute `act` after some duration of time.
var later = clock_realtime().toFloat() + dur.inMilliseconds.float / 1_000.0
onPublish(turn, ds, grab LaterThan(seconds: later)):

View File

@ -16,14 +16,14 @@ type
index: Index
handleMap: Table[Handle, Assertion]
method publish(ds: Dataspace; turn: var Turn; a: AssertionRef; h: Handle) =
method publish(ds: Dataspace; turn: Turn; a: AssertionRef; h: Handle) =
if add(ds.index, turn, a.value):
var obs = a.value.preservesTo(Observe)
if obs.isSome and obs.get.observer of Cap:
ds.index.add(turn, obs.get.pattern, Cap(obs.get.observer))
ds.handleMap[h] = a.value
method retract(ds: Dataspace; turn: var Turn; h: Handle) =
method retract(ds: Dataspace; turn: Turn; h: Handle) =
let v = ds.handleMap[h]
if remove(ds.index, turn, v):
ds.handleMap.del h
@ -31,20 +31,20 @@ method retract(ds: Dataspace; turn: var Turn; h: Handle) =
if obs.isSome and obs.get.observer of Cap:
ds.index.remove(turn, obs.get.pattern, Cap(obs.get.observer))
method message(ds: Dataspace; turn: var Turn; a: AssertionRef) =
method message(ds: Dataspace; turn: Turn; a: AssertionRef) =
ds.index.deliverMessage(turn, a.value)
proc newDataspace*(turn: var Turn): Cap =
proc newDataspace*(turn: Turn): Cap =
newCap(turn, Dataspace(index: initIndex()))
type BootProc = proc (turn: var Turn; ds: Cap) {.closure.}
type DeprecatedBootProc = proc (ds: Cap; turn: var Turn) {.closure.}
type BootProc = proc (turn: Turn; ds: Cap) {.closure.}
type DeprecatedBootProc = proc (ds: Cap; turn: Turn) {.closure.}
proc bootDataspace*(name: string; bootProc: BootProc): Actor =
bootActor(name) do (turn: var Turn):
bootActor(name) do (turn: Turn):
discard turn.facet.preventInertCheck()
bootProc(turn, newDataspace(turn))
proc bootDataspace*(name: string; bootProc: DeprecatedBootProc): Actor {.deprecated.} =
bootDataspace(name) do (turn: var Turn, ds: Cap):
bootDataspace(name) do (turn: Turn, ds: Cap):
bootProc(ds, turn)

View File

@ -6,7 +6,7 @@ import preserves
import ./actors, ./patterns, ./protocols/dataspace
type
DuringProc* = proc (turn: var Turn; a: Value; h: Handle): TurnAction
DuringProc* = proc (turn: Turn; a: Value; h: Handle): TurnAction
DuringActionKind = enum null, dead, act
DuringAction = object
case kind: DuringActionKind
@ -17,7 +17,7 @@ type
cb: DuringProc
assertionMap: Table[Handle, DuringAction]
method publish(de: DuringEntity; turn: var Turn; a: AssertionRef; h: Handle) =
method publish(de: DuringEntity; turn: Turn; a: AssertionRef; h: Handle) =
let action = de.cb(turn, a.value, h)
# assert(not action.isNil "should have put in a no-op action")
let g = de.assertionMap.getOrDefault h
@ -30,7 +30,7 @@ method publish(de: DuringEntity; turn: var Turn; a: AssertionRef; h: Handle) =
of act:
raiseAssert("during: duplicate handle in publish: " & $h)
method retract(de: DuringEntity; turn: var Turn; h: Handle) =
method retract(de: DuringEntity; turn: Turn; h: Handle) =
let g = de.assertionMap.getOrDefault h
case g.kind
of null:
@ -44,5 +44,5 @@ method retract(de: DuringEntity; turn: var Turn; h: Handle) =
proc during*(cb: DuringProc): DuringEntity = DuringEntity(cb: cb)
proc observe*(turn: var Turn; ds: Cap; pat: Pattern; e: Entity): Handle =
proc observe*(turn: Turn; ds: Cap; pat: Pattern; e: Entity): Handle =
publish(turn, ds, Observe(pattern: pat, observer: newCap(turn, e)))

View File

@ -3,7 +3,7 @@
import std/[hashes, tables]
from ./actors import Cap, hash
import ./actors
from ./protocols/sturdy import Oid
proc hash(r: Cap): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash)

View File

@ -27,8 +27,8 @@ type
Turn = syndicate.Turn
WireRef = sturdy.WireRef
PacketWriter = proc (turn: var Turn; buf: seq[byte]) {.closure.}
RelaySetup = proc (turn: var Turn; relay: Relay) {.closure.}
PacketWriter = proc (turn: Turn; buf: seq[byte]) {.closure.}
RelaySetup = proc (turn: Turn; relay: Relay) {.closure.}
Relay* = ref object
facet: Facet
@ -56,20 +56,20 @@ type
proc releaseCapOut(r: Relay; e: WireSymbol) =
r.exported.drop e
method publish(spe: SyncPeerEntity; t: var Turn; a: AssertionRef; h: Handle) =
method publish(spe: SyncPeerEntity; t: Turn; a: AssertionRef; h: Handle) =
spe.handleMap[h] = publish(t, spe.peer, a.value)
method retract(se: SyncPeerEntity; t: var Turn; h: Handle) =
method retract(se: SyncPeerEntity; t: Turn; h: Handle) =
var other: Handle
if se.handleMap.pop(h, other):
retract(t, other)
method message(se: SyncPeerEntity; t: var Turn; a: AssertionRef) =
method message(se: SyncPeerEntity; t: Turn; a: AssertionRef) =
if not se.e.isNil:
se.relay.releaseCapOut(se.e)
message(t, se.peer, a.value)
method sync(se: SyncPeerEntity; t: var Turn; peer: Cap) =
method sync(se: SyncPeerEntity; t: Turn; peer: Cap) =
sync(t, se.peer, peer)
proc newSyncPeerEntity(r: Relay; p: Cap): SyncPeerEntity =
@ -106,40 +106,40 @@ proc deregister(relay: Relay; h: Handle) =
if relay.outboundAssertions.pop(h, outbound):
for e in outbound: releaseCapOut(relay, e)
proc send(relay: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
proc send(relay: Relay; turn: Turn; rOid: protocol.Oid; m: Event) =
# TODO: don't send right away.
var pendingTurn: protocol.Turn
pendingTurn.add TurnEvent(oid: rOid, event: m)
relay.facet.run do (turn: var Turn):
relay.facet.run do (turn: Turn):
var pkt = Packet(
orKind: PacketKind.Turn,
turn: pendingTurn)
trace "C: ", pkt
relay.packetWriter(turn, encode pkt)
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
proc send(re: RelayEntity; turn: Turn; ev: Event) =
send(re.relay, turn, protocol.Oid re.oid, ev)
method publish(re: RelayEntity; t: var Turn; a: AssertionRef; h: Handle) =
method publish(re: RelayEntity; t: Turn; a: AssertionRef; h: Handle) =
re.send(t, Event(
orKind: EventKind.Assert,
`assert`: protocol.Assert(
assertion: re.relay.register(a.value, h).rewritten,
handle: h)))
method retract(re: RelayEntity; t: var Turn; h: Handle) =
method retract(re: RelayEntity; t: Turn; h: Handle) =
re.relay.deregister h
re.send(t, Event(
orKind: EventKind.Retract,
retract: Retract(handle: h)))
method message(re: RelayEntity; turn: var Turn; msg: AssertionRef) =
method message(re: RelayEntity; turn: Turn; msg: AssertionRef) =
var (value, exported) = rewriteOut(re.relay, msg.value)
assert(len(exported) == 0, "cannot send a reference in a message")
if len(exported) == 0:
re.send(turn, Event(orKind: EventKind.Message, message: Message(body: value)))
method sync(re: RelayEntity; turn: var Turn; peer: Cap) =
method sync(re: RelayEntity; turn: Turn; peer: Cap) =
var
peerEntity = newSyncPeerEntity(re.relay, peer)
exported: seq[WireSymbol]
@ -193,7 +193,7 @@ proc rewriteIn(relay; facet; v: Value):
proc close(r: Relay) = discard
proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) =
proc dispatch(relay: Relay; turn: Turn; cap: Cap; event: Event) =
case event.orKind
of EventKind.Assert:
let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion)
@ -216,14 +216,14 @@ proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) =
#[
var imported: seq[WireSymbol]
let k = relay.rewriteCapIn(turn, evenr.sync.peer, imported)
turn.sync(cap) do (turn: var Turn):
turn.sync(cap) do (turn: Turn):
turn.message(k, true)
for e in imported: relay.imported.del e
]#
proc dispatch(relay: Relay; v: Value) =
trace "S: ", v
run(relay.facet) do (t: var Turn):
run(relay.facet) do (t: Turn):
var pkt: Packet
if pkt.fromPreserves(v):
case pkt.orKind
@ -261,8 +261,10 @@ type
initialCap*: Cap
nextLocalOid*: Option[Oid]
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) =
spawnActor(name, turn) do (turn: var Turn):
proc spawnRelay(name: string; turn: Turn; opts: RelayActorOptions; setup: RelaySetup) =
stderr.writeLine "calling spawnActor for relay ", name
spawnLink(name, turn) do (turn: Turn):
stderr.writeLine "executing body of spawned actor relay ", name
let relay = Relay(
facet: turn.facet,
packetWriter: opts.packetWriter,
@ -301,40 +303,42 @@ when defined(posix):
import pkg/sys/[files, handles, sockets]
export transportAddress.Unix
type StdioControlEntity = ref object of Entity
buf: ref seq[byte]
type StdioEntity = ref object of Entity
relay: Relay
stdin: AsyncFile
alive: bool
method message(entity: StdioControlEntity; turn: var Turn; ass: AssertionRef) =
method message(entity: StdioEntity; turn: Turn; ass: AssertionRef) =
if ass.value.preservesTo(ForceDisconnect).isSome:
close(entity.stdin)
entity.alive = false
proc loop(entity: StdioControlEntity) {.asyncio.} =
new entity.buf
entity.buf[].setLen(0x1000)
while true:
let n = read(entity.stdin, entity.buf)
proc loop(entity: StdioEntity) {.asyncio.} =
let buf = new seq[byte]
entity.alive = true
while entity.alive:
buf[].setLen(0x1000)
let n = read(entity.stdin, buf)
if n == 0:
stderr.writeLine "empty read on stdin, stopping actor"
stopActor(entity.relay.facet)
stopActor(entity.facet)
else:
entity.relay.recv(entity.buf[], 0..<n)
entity.relay.recv(buf[], 0..<n)
close(entity.stdin)
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Stdio) =
proc connectTransport(turn: Turn; ds: Cap; ta: transportAddress.Stdio) =
## Connect to an external dataspace over stdio.
proc stdoutWriter(turn: var Turn; buf: seq[byte]) =
proc stdoutWriter(turn: Turn; buf: seq[byte]) =
## Blocking write to stdout.
let n = writeBytes(stdout, buf, 0, buf.len)
flushFile(stdout)
if n != buf.len:
stderr.writeLine "short write to stdout, stopping actor"
stopActor(turn)
var opts = RelayActorOptions(
packetWriter: stdoutWriter,
initialCap: ds,
initialOid: 0.Oid.some,
)
spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
spawnRelay("stdio", turn, opts) do (turn: Turn; relay: Relay):
let
facet = turn.facet
fd = stdin.getOsFileHandle()
@ -342,17 +346,20 @@ when defined(posix):
if flags < 0: raiseOSError(osLastError())
if fcntl(fd.cint, F_SETFL, flags or O_NONBLOCK) < 0:
raiseOSError(osLastError())
let entity = StdioControlEntity(
relay: relay, stdin: newAsyncFile(FD fd))
let entity = StdioEntity(
facet: turn.facet, relay: relay, stdin: newAsyncFile(FD fd))
onStop(entity.facet) do (turn: Turn):
stderr.writeLine "entity for stdio stopped, set alive to false"
entity.alive = false
discard trampoline:
whelp loop(entity)
publish(turn, ds, TransportConnection(
`addr`: ta.toPreserves,
control: newCap(entity, turn),
resolved: relay.peer.accepted,
))
discard trampoline:
whelp loop(entity)
proc connectStdio*(turn: var Turn; ds: Cap) =
proc connectStdio*(turn: Turn; ds: Cap) =
## Connect to an external dataspace over stdin and stdout.
connectTransport(turn, ds, transportAddress.Stdio())
@ -360,54 +367,45 @@ when defined(posix):
TcpEntity = ref object of Entity
relay: Relay
sock: AsyncConn[sockets.Protocol.TCP]
buf: ref seq[byte]
alive: bool
UnixEntity = ref object of Entity
relay: Relay
sock: AsyncConn[sockets.Protocol.Unix]
buf: ref seq[byte]
alive: bool
SocketEntity = TcpEntity | UnixEntity
method message(entity: SocketEntity; turn: var Turn; ass: AssertionRef) =
method message(entity: SocketEntity; turn: Turn; ass: AssertionRef) =
if ass.value.preservesTo(ForceDisconnect).isSome:
reset entity.alive
close(entity.sock)
template socketLoop() {.dirty.}=
new entity.buf
entity.buf[].setLen(0x1000)
entity.alive = not entity.alive
while entity.alive:
let n = read(entity.sock, entity.buf)
if n < 0: raiseOSError(osLastError())
elif n == 0:
stderr.writeLine "empty read on socket, stopping actor"
stopActor(entity.relay.facet)
else:
entity.relay.recv(entity.buf[], 0..<n)
stderr.writeLine "breaking socketLoop"
proc loop(entity: TcpEntity) {.asyncio.} =
socketLoop()
proc loop(entity: UnixEntity) {.asyncio.} =
socketLoop()
entity.alive = false
type ShutdownEntity = ref object of Entity
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
stopActor(turn)
method retract(e: ShutdownEntity; turn: Turn; h: Handle) =
stopActor(e.facet)
template bootSocketEntity() {.dirty.} =
proc publish(turn: var Turn) =
proc setup(turn: Turn) {.closure.} =
proc kill(turn: Turn) =
entity.alive = false
onStop(turn, kill)
publish(turn, ds, TransportConnection(
`addr`: ta.toPreserves,
control: newCap(entity, turn),
resolved: entity.relay.peer.accepted,
))
run(entity.relay.facet, publish)
loop(entity)
run(entity.relay.facet, setup)
let buf = new seq[byte]
entity.alive = true
while entity.alive:
buf[].setLen(0x1000)
let n = read(entity.sock, buf)
if n < 0: raiseOSError(osLastError())
elif n == 0:
stopActor(entity.facet)
else:
entity.relay.recv(buf[], 0..<n)
close(entity.sock)
proc boot(entity: TcpEntity; ta: transportAddress.Tcp; ds: Cap) {.asyncio.} =
entity.sock = connectTcpAsync(ta.host, Port ta.port)
@ -418,30 +416,30 @@ when defined(posix):
bootSocketEntity()
template spawnSocketRelay() {.dirty.} =
proc writeConn(turn: var Turn; buf: seq[byte]) =
proc writeConn(turn: Turn; buf: seq[byte]) =
stderr.writeLine "writing to socket…"
discard trampoline:
whelp write(entity.sock, buf)
stderr.writeLine "maybe wrote to to socket"
var ops = RelayActorOptions(
packetWriter: writeConn,
initialOid: 0.Oid.some,
)
spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay):
spawnRelay("socket", turn, ops) do (turn: Turn; relay: Relay):
entity.facet = turn.facet
entity.relay = relay
atExit(turn.facet.actor) do (turn: var Turn):
entity.alive = false
close(entity.sock)
discard trampoline:
whelp boot(entity, ta, ds)
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Tcp) =
proc connectTransport(turn: Turn; ds: Cap; ta: transportAddress.Tcp) =
let entity = TcpEntity()
spawnSocketRelay()
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Unix) =
proc connectTransport(turn: Turn; ds: Cap; ta: transportAddress.Unix) =
let entity = UnixEntity()
spawnSocketRelay()
proc walk(turn: var Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) =
proc walk(turn: Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) =
if stepOff < route.pathSteps.len:
let
step = route.pathSteps[stepOff]
@ -464,7 +462,7 @@ proc walk(turn: var Turn; ds, origin: Cap; route: Route; transOff, stepOff: int)
resolved: origin.accepted,
))
proc connectRoute(turn: var Turn; ds: Cap; route: Route; transOff: int) =
proc connectRoute(turn: Turn; ds: Cap; route: Route; transOff: int) =
let rejectPat = TransportConnection ?: {
0: ?route.transports[transOff],
2: ?:Rejected,
@ -482,71 +480,68 @@ proc connectRoute(turn: var Turn; ds: Cap; route: Route; transOff: int) =
onPublish(turn, ds, acceptPat) do (origin: Cap):
walk(turn, ds, origin, route, transOff, 0)
type StepCallback = proc (turn: var Turn; step: Value; origin, next: Cap) {.closure.}
type StepCallback = proc (turn: Turn; step: Value; origin, next: Cap) {.closure.}
proc spawnStepResolver(turn: var Turn; ds: Cap; stepType: Value; cb: StepCallback) =
spawnActor($stepType & "-step", turn) do (turn: var Turn):
let stepPat = grabRecord(stepType, grab())
let pat = ?Observe(pattern: ResolvedPathStep?:{1: stepPat}) ?? {0: grabLit(), 1: grab()}
during(turn, ds, pat) do (origin: Cap; stepDetail: Literal[Value]):
let step = toRecord(stepType, stepDetail.value)
proc duringCallback(turn: var Turn; ass: Value; h: Handle): TurnAction =
var res = ass.preservesTo Resolved
if res.isSome:
if res.get.orKind == ResolvedKind.accepted and
res.get.accepted.responderSession of Cap:
cb(turn, step, origin, res.get.accepted.responderSession.Cap)
else:
publish(turn, ds, ResolvedPathStep(
origin: origin, pathStep: step, resolved: res.get))
proc action(turn: var Turn) =
stop(turn)
result = action
publish(turn, origin, Resolve(
step: step, observer: newCap(turn, during(duringCallback))))
proc spawnStepResolver(turn: Turn; ds: Cap; stepType: Value; cb: StepCallback) =
let stepPat = grabRecord(stepType, grab())
let pat = ?Observe(pattern: ResolvedPathStep?:{1: stepPat}) ?? {0: grabLit(), 1: grab()}
during(turn, ds, pat) do (origin: Cap; stepDetail: Literal[Value]):
let step = toRecord(stepType, stepDetail.value)
proc duringCallback(turn: Turn; ass: Value; h: Handle): TurnAction =
var res = ass.preservesTo Resolved
if res.isSome:
if res.get.orKind == ResolvedKind.accepted and
res.get.accepted.responderSession of Cap:
cb(turn, step, origin, res.get.accepted.responderSession.Cap)
else:
publish(turn, ds, ResolvedPathStep(
origin: origin, pathStep: step, resolved: res.get))
proc action(turn: Turn) =
stop(turn)
result = action
publish(turn, origin, Resolve(
step: step, observer: newCap(turn, during(duringCallback))))
proc spawnRelays*(turn: var Turn; ds: Cap) =
proc spawnRelays*(turn: Turn; ds: Cap) =
## Spawn actors that manage routes and appeasing gatekeepers.
spawnActor("transport-connector", turn) do (turn: var Turn):
let pat = ?Observe(pattern: !TransportConnection) ?? { 0: grab() }
# Use a generic pattern and type matching
# in the during handler because it is easy.
stderr.writeLine "spawnRelays body is running"
let transPat = ?Observe(pattern: !TransportConnection) ?? { 0: grab() }
# Use a generic pattern and type matching
# in the during handler because it is easy.
let stdioPat = ?Observe(pattern: TransportConnection?:{0: ?:Stdio})
during(turn, ds, stdioPat) do:
connectTransport(turn, ds, Stdio())
let stdioPat = ?Observe(pattern: TransportConnection?:{0: ?:Stdio})
during(turn, ds, stdioPat) do:
connectTransport(turn, ds, Stdio())
# TODO: tcp pattern
during(turn, ds, transPat) do (ta: Literal[transportAddress.Tcp]):
try: connectTransport(turn, ds, ta.value)
except CatchableError as e:
publish(turn, ds, TransportConnection(
`addr`: ta.toPreserve,
resolved: rejected(embed e),
))
# TODO: tcp pattern
during(turn, ds, pat) do (ta: Literal[transportAddress.Tcp]):
try: connectTransport(turn, ds, ta.value)
except CatchableError as e:
publish(turn, ds, TransportConnection(
`addr`: ta.toPreserve,
resolved: rejected(embed e),
))
# TODO: unix pattern
during(turn, ds, transPat) do (ta: Literal[transportAddress.Unix]):
try: connectTransport(turn, ds, ta.value)
except CatchableError as e:
publish(turn, ds, TransportConnection(
`addr`: ta.toPreserve,
resolved: rejected(embed e),
))
# TODO: unix pattern
during(turn, ds, pat) do (ta: Literal[transportAddress.Unix]):
try: connectTransport(turn, ds, ta.value)
except CatchableError as e:
publish(turn, ds, TransportConnection(
`addr`: ta.toPreserve,
resolved: rejected(embed e),
))
spawnActor("path-resolver", turn) do (turn: var Turn):
let pat = ?Observe(pattern: !ResolvePath) ?? {0: grab()}
during(turn, ds, pat) do (route: Literal[Route]):
for i, transAddr in route.value.transports:
connectRoute(turn, ds, route.value, i)
let resolvePat = ?Observe(pattern: !ResolvePath) ?? {0: grab()}
during(turn, ds, resolvePat) do (route: Literal[Route]):
for i, transAddr in route.value.transports:
connectRoute(turn, ds, route.value, i)
spawnStepResolver(turn, ds, "ref".toSymbol) do (
turn: var Turn, step: Value, origin: Cap, next: Cap):
turn: Turn, step: Value, origin: Cap, next: Cap):
publish(turn, ds, ResolvedPathStep(
origin: origin, pathStep: step, resolved: next.accepted))
type BootProc* = proc (turn: var Turn; ds: Cap) {.closure.}
type BootProc* = proc (turn: Turn; ds: Cap) {.closure.}
const defaultRoute* = "<route [<stdio>]>"
@ -565,12 +560,12 @@ proc envRoute*: Route =
if not result.fromPreserves(pr):
raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr)
proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
proc resolve*(turn: Turn; ds: Cap; route: Route; bootProc: BootProc) =
## Resolve `route` within `ds` and call `bootProc` with resolved capabilities.
during(turn, ds, ResolvePath ?: {0: ?route, 3: ?:ResolvedAccepted}) do (dst: Cap):
bootProc(turn, dst)
proc resolveEnvironment*(turn: var Turn; bootProc: BootProc) =
proc resolveEnvironment*(turn: Turn; bootProc: BootProc) =
## Resolve a capability from the calling environment
## and call `bootProc`. See envRoute_.
let

View File

@ -67,7 +67,7 @@ func isEmpty(cont: Continuation): bool =
type
ContinuationProc = proc (c: Continuation; v: Value) {.closure.}
LeafProc = proc (l: Leaf; v: Value) {.closure.}
ObserverProc = proc (turn: var Turn; group: ObserverGroup; vs: seq[Value]) {.closure.}
ObserverProc = proc (turn: Turn; group: ObserverGroup; vs: seq[Value]) {.closure.}
proc getLeaves(cont: Continuation; constPaths: Paths): LeafMap =
result = cont.leafMap.getOrDefault(constPaths)
@ -114,10 +114,10 @@ proc top(stack: TermStack): Value =
assert stack.len > 0
stack[stack.high]
proc modify(node: Node; turn: var Turn; outerValue: Value; event: EventKind;
proc modify(node: Node; turn: Turn; outerValue: Value; event: EventKind;
modCont: ContinuationProc; modLeaf: LeafProc; modObs: ObserverProc) =
proc walk(cont: Continuation; turn: var Turn) =
proc walk(cont: Continuation; turn: Turn) =
modCont(cont, outerValue)
for constPaths, constValMap in cont.leafMap.pairs:
let constVals = projectPaths(outerValue, constPaths)
@ -142,7 +142,7 @@ proc modify(node: Node; turn: var Turn; outerValue: Value; event: EventKind;
constValMap.del(get constVals)
proc walk(node: Node; turn: var Turn; termStack: TermStack) =
proc walk(node: Node; turn: Turn; termStack: TermStack) =
walk(node.continuation, turn)
for selector, table in node.edges:
let
@ -227,7 +227,7 @@ proc getEndpoints(leaf: Leaf; capturePaths: Paths): ObserverGroup =
if captures.isSome:
discard result.cachedCaptures.change(get captures, +1)
proc add*(index: var Index; turn: var Turn; pattern: Pattern; observer: Cap) =
proc add*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) =
let
cont = index.root.extend(pattern)
analysis = analyse pattern
@ -240,7 +240,7 @@ proc add*(index: var Index; turn: var Turn; pattern: Pattern; observer: Cap) =
captureMap[capture] = publish(turn, observer, capture)
endpoints.observers[observer] = captureMap
proc remove*(index: var Index; turn: var Turn; pattern: Pattern; observer: Cap) =
proc remove*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) =
let
cont = index.root.extend(pattern)
analysis = analyse pattern
@ -260,7 +260,7 @@ proc remove*(index: var Index; turn: var Turn; pattern: Pattern; observer: Cap)
if constValMap.len == 0:
cont.leafMap.del(analysis.constPaths)
proc adjustAssertion(index: var Index; turn: var Turn; outerValue: Value; delta: int): bool =
proc adjustAssertion(index: var Index; turn: Turn; outerValue: Value; delta: int): bool =
case index.allAssertions.change(outerValue, delta)
of cdAbsentToPresent:
result = true
@ -268,7 +268,7 @@ proc adjustAssertion(index: var Index; turn: var Turn; outerValue: Value; delta:
c.cache.incl(v)
proc modLeaf(l: Leaf; v: Value) =
l.cache.incl(v)
proc modObserver(turn: var Turn; group: ObserverGroup; vs: seq[Value]) =
proc modObserver(turn: Turn; group: ObserverGroup; vs: seq[Value]) =
let change = group.cachedCaptures.change(vs, +1)
if change == cdAbsentToPresent:
for (observer, captureMap) in group.observers.pairs:
@ -281,7 +281,7 @@ proc adjustAssertion(index: var Index; turn: var Turn; outerValue: Value; delta:
c.cache.excl(v)
proc modLeaf(l: Leaf; v: Value) =
l.cache.excl(v)
proc modObserver(turn: var Turn; group: ObserverGroup; vs: seq[Value]) =
proc modObserver(turn: Turn; group: ObserverGroup; vs: seq[Value]) =
if group.cachedCaptures.change(vs, -1) == cdPresentToAbsent:
for (observer, captureMap) in group.observers.pairs:
var h: Handle
@ -293,12 +293,12 @@ proc adjustAssertion(index: var Index; turn: var Turn; outerValue: Value; delta:
proc continuationNoop(c: Continuation; v: Value) = discard
proc leafNoop(l: Leaf; v: Value) = discard
proc add*(index: var Index; turn: var Turn; v: Value): bool =
proc add*(index: var Index; turn: Turn; v: Value): bool =
adjustAssertion(index, turn, v, +1)
proc remove*(index: var Index; turn: var Turn; v: Value): bool =
proc remove*(index: var Index; turn: Turn; v: Value): bool =
adjustAssertion(index, turn, v, -1)
proc deliverMessage*(index: var Index; turn: var Turn; v: Value) =
proc observersCb(turn: var Turn; group: ObserverGroup; vs: seq[Value]) =
proc deliverMessage*(index: var Index; turn: Turn; v: Value) =
proc observersCb(turn: Turn; group: ObserverGroup; vs: seq[Value]) =
for observer in group.observers.keys: message(turn, observer, vs)
index.root.modify(turn, v, messageEvent, continuationNoop, leafNoop, observersCb)

View File

@ -1,6 +1,6 @@
# Package
version = "20240304"
version = "20240308"
author = "Emery Hemingway"
description = "Syndicated actors for conversational concurrency"
license = "Unlicense"

View File

@ -5,21 +5,21 @@ import std/times
import pkg/sys/ioqueue
import syndicate, syndicate/actors/timers
let actor = bootActor("timer-test") do (turn: var Turn):
let actor = bootActor("timer-test") do (turn: Turn):
let timers = newDataspace(turn)
spawnTimerActor(timers, turn)
onPublish(turn, timers, ?LaterThan(seconds: 1356100000)):
echo "now in 13th bʼakʼtun"
after(turn, timers, initDuration(seconds = 3)) do (turn: var Turn):
after(turn, timers, initDuration(seconds = 3)) do (turn: Turn):
echo "third timer expired"
stopActor(turn)
after(turn, timers, initDuration(seconds = 1)) do (turn: var Turn):
after(turn, timers, initDuration(seconds = 1)) do (turn: Turn):
echo "first timer expired"
after(turn, timers, initDuration(seconds = 2)) do (turn: var Turn):
after(turn, timers, initDuration(seconds = 2)) do (turn: Turn):
echo "second timer expired"
echo "single run of ioqueue"