Replace "var Turn" with "Turn"
This commit is contained in:
parent
7e15b37f44
commit
81792ac4ce
28
lock.json
28
lock.json
|
@ -55,23 +55,6 @@
|
|||
"srcDir": "",
|
||||
"url": "https://github.com/ehmry/hashlib/archive/f9455d4be988e14e3dc7933eb7cc7d7c4820b7ac.tar.gz"
|
||||
},
|
||||
{
|
||||
"date": "2024-04-22T13:22:01+02:00",
|
||||
"deepClone": false,
|
||||
"fetchLFS": false,
|
||||
"fetchSubmodules": true,
|
||||
"hash": "sha256-ROpWQYIhgDRJAOJd1dAGkJCVxI2Tz8xoHYjLTQDfXaA=",
|
||||
"leaveDotGit": false,
|
||||
"method": "git",
|
||||
"packages": [
|
||||
"preserves"
|
||||
],
|
||||
"path": "/nix/store/ichqy1plhjd6yql7y3dh540cshq07xv7-preserves-nim",
|
||||
"rev": "fd498c6457cb9ad2f3179daa40da69eec00326dd",
|
||||
"sha256": "182xvw04vjw83mlcrkwkip29b44h0v8dapg2014k9011h90mdsj4",
|
||||
"srcDir": "src",
|
||||
"url": "https://git.syndicate-lang.org/ehmry/preserves-nim.git"
|
||||
},
|
||||
{
|
||||
"method": "fetchzip",
|
||||
"packages": [
|
||||
|
@ -94,6 +77,17 @@
|
|||
"srcDir": "src",
|
||||
"url": "https://github.com/zevv/npeg/archive/ec0cc6e64ea4c62d2aa382b176a4838474238f8d.tar.gz"
|
||||
},
|
||||
{
|
||||
"method": "fetchzip",
|
||||
"packages": [
|
||||
"preserves"
|
||||
],
|
||||
"path": "/nix/store/hzb7af7lbd4kgd5y4hbgxv1lswig36yj-source",
|
||||
"rev": "fd498c6457cb9ad2f3179daa40da69eec00326dd",
|
||||
"sha256": "182xvw04vjw83mlcrkwkip29b44h0v8dapg2014k9011h90mdsj4",
|
||||
"srcDir": "src",
|
||||
"url": "https://git.syndicate-lang.org/ehmry/preserves-nim/archive/fd498c6457cb9ad2f3179daa40da69eec00326dd.tar.gz"
|
||||
},
|
||||
{
|
||||
"method": "fetchzip",
|
||||
"packages": [
|
||||
|
|
|
@ -31,21 +31,21 @@ proc `?:`*(typ: static typedesc; bindings: sink openArray[(Value, Pattern)]): Pa
|
|||
patterns.grab(typ, bindings)
|
||||
|
||||
type
|
||||
PublishProc = proc (turn: var Turn; v: Value; h: Handle) {.closure.}
|
||||
RetractProc = proc (turn: var Turn; h: Handle) {.closure.}
|
||||
MessageProc = proc (turn: var Turn; v: Value) {.closure.}
|
||||
PublishProc = proc (turn: Turn; v: Value; h: Handle) {.closure.}
|
||||
RetractProc = proc (turn: Turn; h: Handle) {.closure.}
|
||||
MessageProc = proc (turn: Turn; v: Value) {.closure.}
|
||||
ClosureEntity = ref object of Entity
|
||||
publishImpl*: PublishProc
|
||||
retractImpl*: RetractProc
|
||||
messageImpl*: MessageProc
|
||||
|
||||
method publish(e: ClosureEntity; turn: var Turn; a: AssertionRef; h: Handle) =
|
||||
method publish(e: ClosureEntity; turn: Turn; a: AssertionRef; h: Handle) =
|
||||
if not e.publishImpl.isNil: e.publishImpl(turn, a.value, h)
|
||||
|
||||
method retract(e: ClosureEntity; turn: var Turn; h: Handle) =
|
||||
method retract(e: ClosureEntity; turn: Turn; h: Handle) =
|
||||
if not e.retractImpl.isNil: e.retractImpl(turn, h)
|
||||
|
||||
method message(e: ClosureEntity; turn: var Turn; a: AssertionRef) =
|
||||
method message(e: ClosureEntity; turn: Turn; a: AssertionRef) =
|
||||
if not e.messageImpl.isNil: e.messageImpl(turn, a.value)
|
||||
|
||||
proc argumentCount(handler: NimNode): int =
|
||||
|
@ -90,7 +90,7 @@ proc wrapPublishHandler(turn, handler: NimNode): NimNode =
|
|||
handlerSym = genSym(nskProc, "publish")
|
||||
bindingsSym = ident"bindings"
|
||||
quote do:
|
||||
proc `handlerSym`(`turn`: var Turn; `bindingsSym`: Value; `handleSym`: Handle) =
|
||||
proc `handlerSym`(`turn`: Turn; `bindingsSym`: Value; `handleSym`: Handle) =
|
||||
`varSection`
|
||||
if fromPreserves(`valuesSym`, bindings):
|
||||
`publishBody`
|
||||
|
@ -102,7 +102,7 @@ proc wrapMessageHandler(turn, handler: NimNode): NimNode =
|
|||
handlerSym = genSym(nskProc, "message")
|
||||
bindingsSym = ident"bindings"
|
||||
quote do:
|
||||
proc `handlerSym`(`turn`: var Turn; `bindingsSym`: Value) =
|
||||
proc `handlerSym`(`turn`: Turn; `bindingsSym`: Value) =
|
||||
`varSection`
|
||||
if fromPreserves(`valuesSym`, bindings):
|
||||
`body`
|
||||
|
@ -116,17 +116,17 @@ proc wrapDuringHandler(turn, entryBody, exitBody: NimNode): NimNode =
|
|||
duringSym = genSym(nskProc, "during")
|
||||
if exitBody.isNil:
|
||||
quote do:
|
||||
proc `duringSym`(`turn`: var Turn; `bindingsSym`: Value; `handleSym`: Handle): TurnAction =
|
||||
proc `duringSym`(`turn`: Turn; `bindingsSym`: Value; `handleSym`: Handle): TurnAction =
|
||||
`varSection`
|
||||
if fromPreserves(`valuesSym`, `bindingsSym`):
|
||||
`publishBody`
|
||||
else:
|
||||
quote do:
|
||||
proc `duringSym`(`turn`: var Turn; `bindingsSym`: Value; `handleSym`: Handle): TurnAction =
|
||||
proc `duringSym`(`turn`: Turn; `bindingsSym`: Value; `handleSym`: Handle): TurnAction =
|
||||
`varSection`
|
||||
if fromPreserves(`valuesSym`, `bindingsSym`):
|
||||
`publishBody`
|
||||
proc action(`turn`: var Turn) =
|
||||
proc action(`turn`: Turn) =
|
||||
`exitBody`
|
||||
result = action
|
||||
|
||||
|
|
|
@ -64,12 +64,13 @@ type
|
|||
facetIdAllocator: uint
|
||||
exiting, exited: bool
|
||||
|
||||
TurnAction* = proc (t: var Turn) {.closure.}
|
||||
TurnAction* = proc (t: Turn) {.closure.}
|
||||
|
||||
Turn* = ref object
|
||||
Turn* = var TurnRef
|
||||
TurnRef* = ref object
|
||||
facet: Facet # active facet that may change during a turn
|
||||
work: Deque[tuple[facet: Facet, act: TurnAction]]
|
||||
effects: Table[Actor, Turn]
|
||||
effects: Table[Actor, TurnRef]
|
||||
when tracing:
|
||||
desc: TurnDescription
|
||||
|
||||
|
@ -84,7 +85,7 @@ type
|
|||
id: FacetId
|
||||
isAlive: bool
|
||||
|
||||
var turnQueue {.threadvar.}: Deque[Turn]
|
||||
var turnQueue {.threadvar.}: Deque[TurnRef]
|
||||
|
||||
when tracing:
|
||||
when defined(solo5):
|
||||
|
@ -152,7 +153,7 @@ converter toFacet(t: Turn): Facet = t.facet
|
|||
using
|
||||
actor: Actor
|
||||
facet: Facet
|
||||
turn: var Turn
|
||||
turn: Turn
|
||||
action: TurnAction
|
||||
|
||||
proc labels(f: Facet): string =
|
||||
|
@ -201,27 +202,27 @@ proc nextHandle(facet: Facet): Handle =
|
|||
result = succ(facet.actor.handleAllocator[])
|
||||
facet.actor.handleAllocator[] = result
|
||||
|
||||
template recallFacet(turn: var Turn; body: untyped): untyped =
|
||||
template recallFacet(turn: Turn; body: untyped): untyped =
|
||||
let facet = turn.facet
|
||||
block:
|
||||
body
|
||||
assert facet.actor == turn.facet.actor
|
||||
turn.facet = facet
|
||||
|
||||
proc queueWork*(turn: var Turn; facet: Facet; act: TurnAction) =
|
||||
proc queueWork*(turn: Turn; facet: Facet; act: TurnAction) =
|
||||
assert not facet.isNil
|
||||
turn.work.addLast((facet, act,))
|
||||
|
||||
proc queueTurn*(facet: Facet; act: TurnAction) =
|
||||
var turn = Turn(facet: facet)
|
||||
var turn = TurnRef(facet: facet)
|
||||
assert not facet.isNil
|
||||
turn.work.addLast((facet, act,))
|
||||
when tracing:
|
||||
turn.desc.id = nextTurnId()
|
||||
turnQueue.addLast(turn)
|
||||
|
||||
proc queueTurn*(prev: var Turn; facet: Facet; act: TurnAction) =
|
||||
var next = Turn(facet: facet)
|
||||
proc queueTurn*(prev: Turn; facet: Facet; act: TurnAction) =
|
||||
var next = TurnRef(facet: facet)
|
||||
assert not facet.isNil
|
||||
next.work.addLast((facet, act,))
|
||||
when tracing:
|
||||
|
@ -235,14 +236,14 @@ proc run*(facet: Facet; action: TurnAction) = queueTurn(facet, action)
|
|||
|
||||
proc facet*(turn: Turn): Facet = turn.facet
|
||||
|
||||
proc queueEffect*(turn: var Turn; target: Facet; act: TurnAction) =
|
||||
proc queueEffect*(turn: Turn; target: Facet; act: TurnAction) =
|
||||
let fremd = target.actor
|
||||
if fremd == turn.facet.actor:
|
||||
turn.work.addLast((target, act,))
|
||||
else:
|
||||
var fremdTurn = turn.effects.getOrDefault(fremd)
|
||||
if fremdTurn.isNil:
|
||||
fremdTurn = Turn(facet: target)
|
||||
fremdTurn = TurnRef(facet: target)
|
||||
turn.effects[fremd] = fremdTurn
|
||||
when tracing:
|
||||
fremdTurn.desc.id = nextTurnId()
|
||||
|
@ -362,9 +363,9 @@ proc runRewrites*(a: Attenuation; v: Value): Value =
|
|||
result = examineAlternatives(stage, result)
|
||||
if result.isFalse: break
|
||||
|
||||
method publish*(e: Entity; turn: var Turn; v: AssertionRef; h: Handle) {.base.} = discard
|
||||
method publish*(e: Entity; turn: Turn; v: AssertionRef; h: Handle) {.base.} = discard
|
||||
|
||||
proc publish(turn: var Turn; cap: Cap; v: Value; h: Handle) =
|
||||
proc publish(turn: Turn; cap: Cap; v: Value; h: Handle) =
|
||||
var a = runRewrites(cap.attenuation, v)
|
||||
if not a.isFalse:
|
||||
let e = OutboundAssertion(handle: h, peer: cap)
|
||||
|
@ -379,42 +380,42 @@ proc publish(turn: var Turn; cap: Cap; v: Value; h: Handle) =
|
|||
mapEmbeds(v) do (cap: Value) -> Value: discard
|
||||
act.enqueue.event.detail.assert.handle = h
|
||||
turn.desc.actions.add act
|
||||
queueEffect(turn, cap.relay) do (turn: var Turn):
|
||||
queueEffect(turn, cap.relay) do (turn: Turn):
|
||||
e.established = true
|
||||
when tracing:
|
||||
turn.desc.actions.add act.toDequeue
|
||||
publish(cap.target, turn, AssertionRef(value: a), e.handle)
|
||||
|
||||
proc publish*(turn: var Turn; r: Cap; a: Value): Handle {.discardable.} =
|
||||
proc publish*(turn: Turn; r: Cap; a: Value): Handle {.discardable.} =
|
||||
result = turn.facet.nextHandle()
|
||||
publish(turn, r, a, result)
|
||||
|
||||
proc publish*[T](turn: var Turn; r: Cap; a: T): Handle {.discardable.} =
|
||||
proc publish*[T](turn: Turn; r: Cap; a: T): Handle {.discardable.} =
|
||||
publish(turn, r, a.toPreserves)
|
||||
|
||||
method retract*(e: Entity; turn: var Turn; h: Handle) {.base.} = discard
|
||||
method retract*(e: Entity; turn: Turn; h: Handle) {.base.} = discard
|
||||
|
||||
proc retract(turn: var Turn; e: OutboundAssertion) =
|
||||
proc retract(turn: Turn; e: OutboundAssertion) =
|
||||
when tracing:
|
||||
var act = initEnqueue(turn, e.peer)
|
||||
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.retract)
|
||||
act.enqueue.event.detail.retract.handle = e.handle
|
||||
turn.desc.actions.add act
|
||||
queueEffect(turn, e.peer.relay) do (turn: var Turn):
|
||||
queueEffect(turn, e.peer.relay) do (turn: Turn):
|
||||
when tracing:
|
||||
turn.desc.actions.add act.toDequeue
|
||||
if e.established:
|
||||
e.established = false
|
||||
e.peer.target.retract(turn, e.handle)
|
||||
|
||||
proc retract*(turn: var Turn; h: Handle) =
|
||||
proc retract*(turn: Turn; h: Handle) =
|
||||
var e: OutboundAssertion
|
||||
if turn.facet.outbound.pop(h, e):
|
||||
turn.retract(e)
|
||||
|
||||
method message*(e: Entity; turn: var Turn; v: AssertionRef) {.base.} = discard
|
||||
method message*(e: Entity; turn: Turn; v: AssertionRef) {.base.} = discard
|
||||
|
||||
proc message*(turn: var Turn; r: Cap; v: Value) =
|
||||
proc message*(turn: Turn; r: Cap; v: Value) =
|
||||
var a = runRewrites(r.attenuation, v)
|
||||
if not a.isFalse:
|
||||
when tracing:
|
||||
|
@ -423,43 +424,43 @@ proc message*(turn: var Turn; r: Cap; v: Value) =
|
|||
act.enqueue.event.detail.message.body.value.value =
|
||||
mapEmbeds(a) do (cap: Value) -> Value: discard
|
||||
turn.desc.actions.add act
|
||||
queueEffect(turn, r.relay) do (turn: var Turn):
|
||||
queueEffect(turn, r.relay) do (turn: Turn):
|
||||
when tracing:
|
||||
turn.desc.actions.add act.toDequeue
|
||||
r.target.message(turn, AssertionRef(value: a))
|
||||
|
||||
proc message*[T](turn: var Turn; r: Cap; v: T) =
|
||||
proc message*[T](turn: Turn; r: Cap; v: T) =
|
||||
message(turn, r, v.toPreserves)
|
||||
|
||||
method sync*(e: Entity; turn: var Turn; peer: Cap) {.base.} =
|
||||
queueTurn(e.facet) do (turn: var Turn):
|
||||
method sync*(e: Entity; turn: Turn; peer: Cap) {.base.} =
|
||||
queueTurn(e.facet) do (turn: Turn):
|
||||
message(turn, peer, true.toPreserves)
|
||||
# complete sync on a later turn
|
||||
|
||||
proc sync*(turn: var Turn; r, peer: Cap) =
|
||||
proc sync*(turn: Turn; r, peer: Cap) =
|
||||
when tracing:
|
||||
var act = initEnqueue(turn, peer)
|
||||
act.enqueue.event.detail = trace.TurnEvent(orKind: TurnEventKind.sync)
|
||||
act.enqueue.event.detail.sync.peer = peer.toTraceTarget
|
||||
turn.desc.actions.add act
|
||||
queueEffect(turn, r.relay) do (turn: var Turn):
|
||||
queueEffect(turn, r.relay) do (turn: Turn):
|
||||
when tracing:
|
||||
turn.desc.actions.add act.toDequeue
|
||||
r.target.sync(turn, peer)
|
||||
|
||||
proc replace*[T](turn: var Turn; cap: Cap; h: Handle; v: T): Handle =
|
||||
proc replace*[T](turn: Turn; cap: Cap; h: Handle; v: T): Handle =
|
||||
result = publish(turn, cap, v)
|
||||
if h != default(Handle):
|
||||
retract(turn, h)
|
||||
|
||||
proc replace*[T](turn: var Turn; cap: Cap; h: var Handle; v: T): Handle {.discardable.} =
|
||||
proc replace*[T](turn: Turn; cap: Cap; h: var Handle; v: T): Handle {.discardable.} =
|
||||
var old = h
|
||||
h = publish(turn, cap, v)
|
||||
if old != default(Handle):
|
||||
retract(turn, old)
|
||||
h
|
||||
|
||||
proc stop*(turn: var Turn)
|
||||
proc stop*(turn: Turn)
|
||||
|
||||
proc newFacet(actor; parent: Facet; initialAssertions: OutboundTable): Facet =
|
||||
inc actor.facetIdAllocator
|
||||
|
@ -488,7 +489,7 @@ proc preventInertCheck*(turn: Turn) =
|
|||
|
||||
proc terminateActor(turn; reason: ref Exception)
|
||||
|
||||
proc terminateFacetOrderly(turn: var Turn) =
|
||||
proc terminateFacetOrderly(turn: Turn) =
|
||||
let facet = turn.facet
|
||||
if facet.isAlive:
|
||||
facet.isAlive = false
|
||||
|
@ -501,7 +502,7 @@ proc terminateFacetOrderly(turn: var Turn) =
|
|||
retract(turn, e)
|
||||
clear facet.outbound
|
||||
|
||||
proc inertCheck(turn: var Turn) =
|
||||
proc inertCheck(turn: Turn) =
|
||||
if (not turn.facet.parent.isNil and
|
||||
(not turn.facet.parent.isAlive)) or
|
||||
turn.facet.isInert:
|
||||
|
@ -512,7 +513,7 @@ proc inertCheck(turn: var Turn) =
|
|||
turn.desc.actions.add act
|
||||
stop(turn)
|
||||
|
||||
proc terminateFacet(turn: var Turn) =
|
||||
proc terminateFacet(turn: Turn) =
|
||||
let facet = turn.facet
|
||||
for child in facet.children:
|
||||
queueWork(turn, child, terminateFacetOrderly)
|
||||
|
@ -523,14 +524,14 @@ proc terminateFacet(turn: var Turn) =
|
|||
# self-termination
|
||||
|
||||
proc stopIfInertAfter(action: TurnAction): TurnAction =
|
||||
proc work(turn: var Turn) =
|
||||
proc work(turn: Turn) =
|
||||
queueEffect(turn, turn.facet, inertCheck)
|
||||
action(turn)
|
||||
work
|
||||
|
||||
proc newFacet(turn: var Turn): Facet = newFacet(turn.facet.actor, turn.facet)
|
||||
proc newFacet(turn: Turn): Facet = newFacet(turn.facet.actor, turn.facet)
|
||||
|
||||
proc inFacet*(turn: var Turn; bootProc: TurnAction): Facet {.discardable.} =
|
||||
proc inFacet*(turn: Turn; bootProc: TurnAction): Facet {.discardable.} =
|
||||
result = newFacet(turn)
|
||||
recallFacet turn:
|
||||
turn.facet = result
|
||||
|
@ -563,7 +564,7 @@ proc bootActor*(name: string; bootProc: TurnAction): Actor {.discardable.} =
|
|||
## Boot a top-level actor.
|
||||
result = newActor(name, nil)
|
||||
new result.handleAllocator
|
||||
var turn = Turn(facet: result.root)
|
||||
var turn = TurnRef(facet: result.root)
|
||||
assert not result.root.isNil
|
||||
turn.work.addLast((result.root, bootProc,))
|
||||
when tracing:
|
||||
|
@ -572,9 +573,9 @@ proc bootActor*(name: string; bootProc: TurnAction): Actor {.discardable.} =
|
|||
turn.desc.cause.external.description = "bootActor".toPreserves
|
||||
turnQueue.addLast turn
|
||||
|
||||
proc spawnActor*(turn: var Turn; name: string; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||
proc spawnActor*(turn: Turn; name: string; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||
let actor = newActor(name, turn.facet)
|
||||
queueEffect(turn, actor.root) do (turn: var Turn):
|
||||
queueEffect(turn, actor.root) do (turn: Turn):
|
||||
var newOutBound: Table[Handle, OutboundAssertion]
|
||||
for key in initialAssertions:
|
||||
discard turn.facet.outbound.pop(key, newOutbound[key])
|
||||
|
@ -585,12 +586,12 @@ proc spawnActor*(turn: var Turn; name: string; bootProc: TurnAction; initialAsse
|
|||
run(actor, bootProc, newOutBound)
|
||||
actor
|
||||
|
||||
proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||
proc spawn*(name: string; turn: Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||
spawnActor(turn, name, bootProc, initialAssertions)
|
||||
|
||||
type StopOnRetract = ref object of Entity
|
||||
|
||||
method retract*(e: StopOnRetract; turn: var Turn; h: Handle) =
|
||||
method retract*(e: StopOnRetract; turn: Turn; h: Handle) =
|
||||
stop(turn)
|
||||
|
||||
proc halfLink(facet, other: Facet) =
|
||||
|
@ -601,7 +602,7 @@ proc halfLink(facet, other: Facet) =
|
|||
established: true,
|
||||
)
|
||||
|
||||
proc linkActor*(turn: var Turn; name: string; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||
proc linkActor*(turn: Turn; name: string; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||
result = spawnActor(turn, name, bootProc, initialAssertions)
|
||||
halfLink(turn.facet, result.root)
|
||||
halfLink(result.root, turn.facet)
|
||||
|
@ -610,7 +611,7 @@ var inertActor {.threadvar.}: Actor
|
|||
|
||||
proc newInertCap*(): Cap =
|
||||
if inertActor.isNil:
|
||||
inertActor = bootActor("inert") do (turn: var Turn): turn.stop()
|
||||
inertActor = bootActor("inert") do (turn: Turn): turn.stop()
|
||||
Cap(relay: inertActor.root)
|
||||
|
||||
proc atExit*(actor; action) = actor.exitHooks.add action
|
||||
|
@ -633,7 +634,7 @@ proc terminateActor(turn; reason: ref Exception) =
|
|||
if reason.isNil:
|
||||
terminateActor(turn, err)
|
||||
return
|
||||
proc finish(turn: var Turn) =
|
||||
proc finish(turn: Turn) =
|
||||
assert not actor.root.isNil, actor.name
|
||||
terminateFacet(turn)
|
||||
actor.root = nil
|
||||
|
@ -641,14 +642,14 @@ proc terminateActor(turn; reason: ref Exception) =
|
|||
queueTurn(actor.root, finish)
|
||||
|
||||
proc terminateFacet*(facet; e: ref Exception) =
|
||||
run(facet.actor.root) do (turn: var Turn):
|
||||
run(facet.actor.root) do (turn: Turn):
|
||||
terminateActor(turn, e)
|
||||
|
||||
proc terminate*(turn: var Turn; e: ref Exception) =
|
||||
proc terminate*(turn: Turn; e: ref Exception) =
|
||||
terminateActor(turn, e)
|
||||
|
||||
proc stop*(turn: var Turn, facet: Facet) =
|
||||
queueEffect(turn, facet) do (turn: var Turn):
|
||||
proc stop*(turn: Turn, facet: Facet) =
|
||||
queueEffect(turn, facet) do (turn: Turn):
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
|
||||
act.facetstop.path = facet.path
|
||||
|
@ -656,17 +657,17 @@ proc stop*(turn: var Turn, facet: Facet) =
|
|||
turn.desc.actions.add act
|
||||
terminateFacet(turn)
|
||||
|
||||
proc stop*(turn: var Turn) =
|
||||
proc stop*(turn: Turn) =
|
||||
stop(turn, turn.facet)
|
||||
|
||||
proc stop*(facet: Facet) =
|
||||
run(facet, stop)
|
||||
|
||||
proc onStop*(facet: Facet; act: TurnAction) =
|
||||
## Add a `proc (turn: var Turn)` action to `facet` to be called as it stops.
|
||||
## Add a `proc (turn: Turn)` action to `facet` to be called as it stops.
|
||||
add(facet.shutdownActions, act)
|
||||
|
||||
proc onStop*(turn: var Turn; act: TurnAction) =
|
||||
proc onStop*(turn: Turn; act: TurnAction) =
|
||||
onStop(turn.facet, act)
|
||||
|
||||
proc isAlive(actor): bool =
|
||||
|
@ -674,7 +675,7 @@ proc isAlive(actor): bool =
|
|||
|
||||
proc stop*(actor: Actor) =
|
||||
if actor.isAlive:
|
||||
queueTurn(actor.root) do (turn: var Turn):
|
||||
queueTurn(actor.root) do (turn: Turn):
|
||||
assert(not turn.facet.isNil)
|
||||
when tracing:
|
||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
|
||||
|
@ -686,10 +687,10 @@ proc stop*(actor: Actor) =
|
|||
proc stopActor*(facet: Facet) =
|
||||
stop(facet.actor)
|
||||
|
||||
proc stopActor*(turn: var Turn) =
|
||||
proc stopActor*(turn: Turn) =
|
||||
stop(turn, turn.facet.actor.root)
|
||||
|
||||
proc freshen*(turn: var Turn, act: TurnAction) {.deprecated.} =
|
||||
proc freshen*(turn: Turn, act: TurnAction) {.deprecated.} =
|
||||
run(turn.facet, act)
|
||||
|
||||
proc newCap*(relay: Facet; entity: Entity): Cap =
|
||||
|
@ -706,10 +707,10 @@ proc newCap*(e: Entity; turn): Cap =
|
|||
type SyncContinuation {.final.} = ref object of Entity
|
||||
action: TurnAction
|
||||
|
||||
method message(entity: SyncContinuation; turn: var Turn; v: AssertionRef) =
|
||||
method message(entity: SyncContinuation; turn: Turn; v: AssertionRef) =
|
||||
entity.action(turn)
|
||||
|
||||
proc sync*(turn: var Turn; refer: Cap; act: TurnAction) =
|
||||
proc sync*(turn: Turn; refer: Cap; act: TurnAction) =
|
||||
sync(turn, refer, newCap(turn, SyncContinuation(action: act)))
|
||||
|
||||
proc running*(actor): bool =
|
||||
|
@ -717,7 +718,7 @@ proc running*(actor): bool =
|
|||
if not (result or actor.exitReason.isNil):
|
||||
raise actor.exitReason
|
||||
|
||||
proc run(turn: var Turn) =
|
||||
proc run(turn: Turn) =
|
||||
while turn.work.len > 0:
|
||||
var (facet, act) = turn.work.popFirst()
|
||||
assert not act.isNil
|
||||
|
|
|
@ -16,14 +16,14 @@ type
|
|||
index: Index
|
||||
handleMap: Table[Handle, Assertion]
|
||||
|
||||
method publish(ds: Dataspace; turn: var Turn; a: AssertionRef; h: Handle) =
|
||||
method publish(ds: Dataspace; turn: Turn; a: AssertionRef; h: Handle) =
|
||||
if add(ds.index, turn, a.value):
|
||||
var obs = a.value.preservesTo(Observe)
|
||||
if obs.isSome and obs.get.observer of Cap:
|
||||
ds.index.add(turn, obs.get.pattern, Cap(obs.get.observer))
|
||||
ds.handleMap[h] = a.value
|
||||
|
||||
method retract(ds: Dataspace; turn: var Turn; h: Handle) =
|
||||
method retract(ds: Dataspace; turn: Turn; h: Handle) =
|
||||
let v = ds.handleMap[h]
|
||||
if remove(ds.index, turn, v):
|
||||
ds.handleMap.del h
|
||||
|
@ -31,20 +31,20 @@ method retract(ds: Dataspace; turn: var Turn; h: Handle) =
|
|||
if obs.isSome and obs.get.observer of Cap:
|
||||
ds.index.remove(turn, obs.get.pattern, Cap(obs.get.observer))
|
||||
|
||||
method message(ds: Dataspace; turn: var Turn; a: AssertionRef) =
|
||||
method message(ds: Dataspace; turn: Turn; a: AssertionRef) =
|
||||
ds.index.deliverMessage(turn, a.value)
|
||||
|
||||
proc newDataspace*(turn: var Turn): Cap =
|
||||
proc newDataspace*(turn: Turn): Cap =
|
||||
newCap(turn, Dataspace(index: initIndex()))
|
||||
|
||||
type BootProc = proc (turn: var Turn; ds: Cap) {.closure.}
|
||||
type DeprecatedBootProc = proc (ds: Cap; turn: var Turn) {.closure.}
|
||||
type BootProc = proc (turn: Turn; ds: Cap) {.closure.}
|
||||
type DeprecatedBootProc = proc (ds: Cap; turn: Turn) {.closure.}
|
||||
|
||||
proc bootDataspace*(name: string; bootProc: BootProc): Actor =
|
||||
bootActor(name) do (turn: var Turn):
|
||||
bootActor(name) do (turn: Turn):
|
||||
turn.preventInertCheck()
|
||||
bootProc(turn, newDataspace(turn))
|
||||
|
||||
proc bootDataspace*(name: string; bootProc: DeprecatedBootProc): Actor {.deprecated.} =
|
||||
bootDataspace(name) do (turn: var Turn, ds: Cap):
|
||||
bootDataspace(name) do (turn: Turn, ds: Cap):
|
||||
bootProc(ds, turn)
|
||||
|
|
|
@ -184,7 +184,7 @@ proc match(driver: Driver; req: HttpRequest): Option[HttpBinding] =
|
|||
if result.isNone or b.strongerThan(result.get):
|
||||
result = some b
|
||||
|
||||
method message(e: Exchange; turn: var Turn; a: AssertionRef) =
|
||||
method message(e: Exchange; turn: Turn; a: AssertionRef) =
|
||||
# Send responses back into a connection.
|
||||
var res: HttpResponse
|
||||
if e.mode != HttpResponseKind.done and res.fromPreserves a.value:
|
||||
|
@ -235,7 +235,7 @@ method message(e: Exchange; turn: var Turn; a: AssertionRef) =
|
|||
# stop the facet scoped to the exchange
|
||||
# so that the response capability is withdrawn
|
||||
|
||||
proc service(turn: var Turn; exch: Exchange) =
|
||||
proc service(turn: Turn; exch: Exchange) =
|
||||
## Service an HTTP message exchange.
|
||||
var binding = exch.ses.driver.match exch.req
|
||||
if binding.isNone:
|
||||
|
@ -251,7 +251,7 @@ proc service(turn: var Turn; exch: Exchange) =
|
|||
res: embed cap,
|
||||
))
|
||||
const timeout = initDuration(seconds = 4)
|
||||
after(turn, exch.ses.driver.timers, timeout) do (turn: var Turn):
|
||||
after(turn, exch.ses.driver.timers, timeout) do (turn: Turn):
|
||||
if not exch.active:
|
||||
var res = HttpResponse(orKind: HttpResponseKind.status)
|
||||
res.status.code = 504
|
||||
|
@ -262,18 +262,18 @@ proc service(turn: var Turn; exch: Exchange) =
|
|||
|
||||
proc service(ses: Session) =
|
||||
## Service a connection to an HTTP client.
|
||||
ses.facet.onStop do (turn: var Turn):
|
||||
ses.facet.onStop do (turn: Turn):
|
||||
close ses.conn
|
||||
ses.conn.onClosed do ():
|
||||
stop ses.facet
|
||||
ses.conn.onReceivedPartial do (data: seq[byte]; ctx: MessageContext; eom: bool):
|
||||
ses.facet.run do (turn: var Turn):
|
||||
ses.facet.run do (turn: Turn):
|
||||
var (n, req) = parseRequest(ses.conn, cast[string](data))
|
||||
if n > 0:
|
||||
inc(ses.driver.sequenceNumber)
|
||||
req.sequenceNumber = ses.driver.sequenceNumber
|
||||
req.port = BiggestInt ses.port
|
||||
inFacet(turn) do (turn: var Turn):
|
||||
inFacet(turn) do (turn: Turn):
|
||||
preventInertCheck(turn)
|
||||
# start a new facet for this message exchange
|
||||
turn.service Exchange(
|
||||
|
@ -291,18 +291,18 @@ proc newListener(port: Port): Listener =
|
|||
lp.with port
|
||||
listen newPreconnection(local=[lp])
|
||||
|
||||
proc httpListen(turn: var Turn; driver: Driver; port: Port): Listener =
|
||||
proc httpListen(turn: Turn; driver: Driver; port: Port): Listener =
|
||||
let facet = turn.facet
|
||||
var listener = newListener(port)
|
||||
preventInertCheck(turn)
|
||||
listener.onListenError do (err: ref Exception):
|
||||
terminateFacet(facet, err)
|
||||
facet.onStop do (turn: var Turn):
|
||||
facet.onStop do (turn: Turn):
|
||||
stop listener
|
||||
listener.onConnectionReceived do (conn: Connection):
|
||||
driver.facet.run do (turn: var Turn):
|
||||
driver.facet.run do (turn: Turn):
|
||||
# start a new turn
|
||||
linkActor(turn, "http-conn") do (turn: var Turn):
|
||||
linkActor(turn, "http-conn") do (turn: Turn):
|
||||
preventInertCheck(turn)
|
||||
let facet = turn.facet
|
||||
conn.onConnectionError do (err: ref Exception):
|
||||
|
@ -317,7 +317,7 @@ proc httpListen(turn: var Turn; driver: Driver; port: Port): Listener =
|
|||
)
|
||||
listener
|
||||
|
||||
proc httpDriver(turn: var Turn; ds: Cap) =
|
||||
proc httpDriver(turn: Turn; ds: Cap) =
|
||||
let driver = Driver(facet: turn.facet, ds: ds, timers: turn.newDataspace)
|
||||
spawnTimerDriver(turn, driver.timers)
|
||||
|
||||
|
@ -338,6 +338,6 @@ proc httpDriver(turn: var Turn; ds: Cap) =
|
|||
do:
|
||||
stop(l)
|
||||
|
||||
proc spawnHttpDriver*(turn: var Turn; ds: Cap): Actor {.discardable.} =
|
||||
spawnActor(turn, "http-driver") do (turn: var Turn):
|
||||
proc spawnHttpDriver*(turn: Turn; ds: Cap): Actor {.discardable.} =
|
||||
spawnActor(turn, "http-driver") do (turn: Turn):
|
||||
httpDriver(turn, ds)
|
||||
|
|
|
@ -38,7 +38,7 @@ when defined(solo5):
|
|||
let facet = driver.deadlines.getOrDefault(deadline)
|
||||
if not facet.isNil:
|
||||
# check if the deadline is still observed
|
||||
proc turnWork(turn: var Turn) =
|
||||
proc turnWork(turn: Turn) =
|
||||
discard publish(turn, driver.target, LaterThan(seconds: deadline))
|
||||
run(facet, turnWork)
|
||||
|
||||
|
@ -92,7 +92,7 @@ else:
|
|||
|
||||
proc spawnTimerDriver(facet: Facet; cap: Cap): TimerDriver =
|
||||
let driver = TimerDriver(facet: facet, target: cap)
|
||||
facet.onStop do (turn: var Turn):
|
||||
facet.onStop do (turn: Turn):
|
||||
for fd in driver.timers:
|
||||
unregister(FD fd)
|
||||
discard close(fd)
|
||||
|
@ -123,16 +123,16 @@ else:
|
|||
let facet = driver.deadlines.getOrDefault(deadline)
|
||||
if not facet.isNil:
|
||||
# Check if the deadline is still observed.
|
||||
proc turnWork(turn: var Turn) =
|
||||
proc turnWork(turn: Turn) =
|
||||
discard publish(turn, driver.target, LaterThan(seconds: deadline))
|
||||
run(facet, turnWork)
|
||||
discard close(fd)
|
||||
driver.timers.excl(fd)
|
||||
|
||||
proc spawnTimerDriver*(turn: var Turn; ds: Cap): Actor {.discardable.} =
|
||||
proc spawnTimerDriver*(turn: Turn; ds: Cap): Actor {.discardable.} =
|
||||
## Spawn a timer actor that responds to
|
||||
## dataspace observations of timeouts on `ds`.
|
||||
linkActor(turn, "timers") do (turn: var Turn):
|
||||
linkActor(turn, "timers") do (turn: Turn):
|
||||
let driver = spawnTimerDriver(turn.facet, ds)
|
||||
let pat = observePattern(!LaterThan, {@[0.toPreserves]: grabLit()})
|
||||
during(turn, ds, pat) do (deadline: float):
|
||||
|
@ -141,7 +141,7 @@ proc spawnTimerDriver*(turn: var Turn; ds: Cap): Actor {.discardable.} =
|
|||
do:
|
||||
driver.deadlines.del deadline
|
||||
|
||||
proc after*(turn: var Turn; ds: Cap; dur: Duration; act: TurnAction) =
|
||||
proc after*(turn: Turn; ds: Cap; dur: Duration; act: TurnAction) =
|
||||
## Execute `act` after some duration of time.
|
||||
var later = wallFloat() + dur.inMilliseconds.float / 1_000.0
|
||||
onPublish(turn, ds, ?LaterThan(seconds: later)):
|
||||
|
|
|
@ -6,7 +6,7 @@ import preserves
|
|||
import ./actors, ./patterns, ./protocols/dataspace
|
||||
|
||||
type
|
||||
DuringProc* = proc (turn: var Turn; a: Value; h: Handle): TurnAction
|
||||
DuringProc* = proc (turn: Turn; a: Value; h: Handle): TurnAction
|
||||
DuringActionKind = enum null, dead, act
|
||||
DuringAction = object
|
||||
case kind: DuringActionKind
|
||||
|
@ -17,8 +17,8 @@ type
|
|||
cb: DuringProc
|
||||
assertionMap: Table[Handle, DuringAction]
|
||||
|
||||
method publish(de: DuringEntity; turn: var Turn; a: AssertionRef; h: Handle) =
|
||||
discard inFacet(turn) do (turn: var Turn):
|
||||
method publish(de: DuringEntity; turn: Turn; a: AssertionRef; h: Handle) =
|
||||
discard inFacet(turn) do (turn: Turn):
|
||||
let action = de.cb(turn, a.value, h)
|
||||
# assert(not action.isNil "should have put in a no-op action")
|
||||
let g = de.assertionMap.getOrDefault h
|
||||
|
@ -31,7 +31,7 @@ method publish(de: DuringEntity; turn: var Turn; a: AssertionRef; h: Handle) =
|
|||
of act:
|
||||
raiseAssert("during: duplicate handle in publish: " & $h)
|
||||
|
||||
method retract(de: DuringEntity; turn: var Turn; h: Handle) =
|
||||
method retract(de: DuringEntity; turn: Turn; h: Handle) =
|
||||
let g = de.assertionMap.getOrDefault h
|
||||
case g.kind
|
||||
of null:
|
||||
|
@ -45,5 +45,5 @@ method retract(de: DuringEntity; turn: var Turn; h: Handle) =
|
|||
|
||||
proc during*(cb: DuringProc): DuringEntity = DuringEntity(cb: cb)
|
||||
|
||||
proc observe*(turn: var Turn; ds: Cap; pat: Pattern; e: Entity): Handle =
|
||||
proc observe*(turn: Turn; ds: Cap; pat: Pattern; e: Entity): Handle =
|
||||
publish(turn, ds, Observe(pattern: pat, observer: newCap(turn, e)))
|
||||
|
|
|
@ -29,8 +29,8 @@ type
|
|||
Turn = syndicate.Turn
|
||||
WireRef = sturdy.WireRef
|
||||
|
||||
PacketWriter = proc (turn: var Turn; buf: seq[byte]) {.closure.}
|
||||
RelaySetup = proc (turn: var Turn; relay: Relay) {.closure.}
|
||||
PacketWriter = proc (turn: Turn; buf: seq[byte]) {.closure.}
|
||||
RelaySetup = proc (turn: Turn; relay: Relay) {.closure.}
|
||||
|
||||
Relay* = ref object
|
||||
facet: Facet
|
||||
|
@ -59,20 +59,20 @@ type
|
|||
proc releaseCapOut(r: Relay; e: WireSymbol) =
|
||||
r.exported.drop e
|
||||
|
||||
method publish(spe: SyncPeerEntity; t: var Turn; a: AssertionRef; h: Handle) =
|
||||
method publish(spe: SyncPeerEntity; t: Turn; a: AssertionRef; h: Handle) =
|
||||
spe.handleMap[h] = publish(t, spe.peer, a.value)
|
||||
|
||||
method retract(se: SyncPeerEntity; t: var Turn; h: Handle) =
|
||||
method retract(se: SyncPeerEntity; t: Turn; h: Handle) =
|
||||
var other: Handle
|
||||
if se.handleMap.pop(h, other):
|
||||
retract(t, other)
|
||||
|
||||
method message(se: SyncPeerEntity; t: var Turn; a: AssertionRef) =
|
||||
method message(se: SyncPeerEntity; t: Turn; a: AssertionRef) =
|
||||
if not se.e.isNil:
|
||||
se.relay.releaseCapOut(se.e)
|
||||
message(t, se.peer, a.value)
|
||||
|
||||
method sync(se: SyncPeerEntity; t: var Turn; peer: Cap) =
|
||||
method sync(se: SyncPeerEntity; t: Turn; peer: Cap) =
|
||||
sync(t, se.peer, peer)
|
||||
|
||||
proc newSyncPeerEntity(r: Relay; p: Cap): SyncPeerEntity =
|
||||
|
@ -109,10 +109,10 @@ proc deregister(relay: Relay; h: Handle) =
|
|||
if relay.outboundAssertions.pop(h, outbound):
|
||||
for e in outbound: releaseCapOut(relay, e)
|
||||
|
||||
proc send(relay: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
|
||||
proc send(relay: Relay; turn: Turn; rOid: protocol.Oid; m: Event) =
|
||||
# TODO: don't send right away.
|
||||
relay.pendingTurn.add TurnEvent(oid: rOid, event: m)
|
||||
queueEffect(turn, relay.facet) do (turn: var Turn):
|
||||
queueEffect(turn, relay.facet) do (turn: Turn):
|
||||
if relay.pendingTurn.len > 0:
|
||||
var pkt = Packet(
|
||||
orKind: PacketKind.Turn,
|
||||
|
@ -120,29 +120,29 @@ proc send(relay: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
|
|||
trace "C: ", pkt
|
||||
relay.packetWriter(turn, encode pkt)
|
||||
|
||||
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
|
||||
proc send(re: RelayEntity; turn: Turn; ev: Event) =
|
||||
send(re.relay, turn, protocol.Oid re.oid, ev)
|
||||
|
||||
method publish(re: RelayEntity; t: var Turn; a: AssertionRef; h: Handle) =
|
||||
method publish(re: RelayEntity; t: Turn; a: AssertionRef; h: Handle) =
|
||||
re.send(t, Event(
|
||||
orKind: EventKind.Assert,
|
||||
`assert`: protocol.Assert(
|
||||
assertion: re.relay.register(a.value, h).rewritten,
|
||||
handle: h)))
|
||||
|
||||
method retract(re: RelayEntity; t: var Turn; h: Handle) =
|
||||
method retract(re: RelayEntity; t: Turn; h: Handle) =
|
||||
re.relay.deregister h
|
||||
re.send(t, Event(
|
||||
orKind: EventKind.Retract,
|
||||
retract: Retract(handle: h)))
|
||||
|
||||
method message(re: RelayEntity; turn: var Turn; msg: AssertionRef) =
|
||||
method message(re: RelayEntity; turn: Turn; msg: AssertionRef) =
|
||||
var (value, exported) = rewriteOut(re.relay, msg.value)
|
||||
assert(len(exported) == 0, "cannot send a reference in a message")
|
||||
if len(exported) == 0:
|
||||
re.send(turn, Event(orKind: EventKind.Message, message: Message(body: value)))
|
||||
|
||||
method sync(re: RelayEntity; turn: var Turn; peer: Cap) =
|
||||
method sync(re: RelayEntity; turn: Turn; peer: Cap) =
|
||||
var
|
||||
peerEntity = newSyncPeerEntity(re.relay, peer)
|
||||
exported: seq[WireSymbol]
|
||||
|
@ -195,7 +195,7 @@ proc rewriteIn(relay; facet; v: Value):
|
|||
|
||||
proc close(r: Relay) = discard
|
||||
|
||||
proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) =
|
||||
proc dispatch(relay: Relay; turn: Turn; cap: Cap; event: Event) =
|
||||
case event.orKind
|
||||
of EventKind.Assert:
|
||||
let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion)
|
||||
|
@ -214,7 +214,7 @@ proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) =
|
|||
turn.message(cap, a)
|
||||
|
||||
of EventKind.Sync:
|
||||
turn.sync(cap) do (turn: var Turn):
|
||||
turn.sync(cap) do (turn: Turn):
|
||||
var
|
||||
(v, imported) = rewriteIn(relay, turn.facet, event.sync.peer)
|
||||
peer = unembed(v, Cap)
|
||||
|
@ -224,7 +224,7 @@ proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) =
|
|||
|
||||
proc dispatch(relay: Relay; v: Value) =
|
||||
trace "S: ", v
|
||||
run(relay.facet) do (t: var Turn):
|
||||
run(relay.facet) do (t: Turn):
|
||||
var pkt: Packet
|
||||
if pkt.fromPreserves(v):
|
||||
case pkt.orKind
|
||||
|
@ -265,8 +265,8 @@ type
|
|||
initialCap*: Cap
|
||||
nextLocalOid*: Option[Oid]
|
||||
|
||||
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) =
|
||||
linkActor(turn, name) do (turn: var Turn):
|
||||
proc spawnRelay(name: string; turn: Turn; opts: RelayActorOptions; setup: RelaySetup) =
|
||||
linkActor(turn, name) do (turn: Turn):
|
||||
turn.preventInertCheck()
|
||||
let relay = Relay(
|
||||
facet: turn.facet,
|
||||
|
@ -300,7 +300,7 @@ proc accepted(cap: Cap): Resolved =
|
|||
result.accepted.responderSession = cap
|
||||
|
||||
type ShutdownEntity = ref object of Entity
|
||||
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
|
||||
method retract(e: ShutdownEntity; turn: Turn; h: Handle) =
|
||||
stopActor(e.facet)
|
||||
|
||||
when defined(posix):
|
||||
|
@ -314,7 +314,7 @@ when defined(posix):
|
|||
stdin: AsyncFile
|
||||
alive: bool
|
||||
|
||||
method message(entity: StdioEntity; turn: var Turn; ass: AssertionRef) =
|
||||
method message(entity: StdioEntity; turn: Turn; ass: AssertionRef) =
|
||||
if ass.value.preservesTo(ForceDisconnect).isSome:
|
||||
entity.alive = false
|
||||
|
||||
|
@ -329,10 +329,10 @@ when defined(posix):
|
|||
else:
|
||||
entity.relay.recv(buf[], 0..<n)
|
||||
|
||||
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Stdio) =
|
||||
proc connectTransport(turn: Turn; ds: Cap; ta: transportAddress.Stdio) =
|
||||
## Connect to an external dataspace over stdio.
|
||||
let localDataspace = newDataspace(turn)
|
||||
proc stdoutWriter(turn: var Turn; buf: seq[byte]) =
|
||||
proc stdoutWriter(turn: Turn; buf: seq[byte]) =
|
||||
## Blocking write to stdout.
|
||||
let n = writeBytes(stdout, buf, 0, buf.len)
|
||||
flushFile(stdout)
|
||||
|
@ -343,7 +343,7 @@ when defined(posix):
|
|||
initialCap: localDataspace,
|
||||
initialOid: 0.Oid.some,
|
||||
)
|
||||
spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
|
||||
spawnRelay("stdio", turn, opts) do (turn: Turn; relay: Relay):
|
||||
let
|
||||
facet = turn.facet
|
||||
fd = stdin.getOsFileHandle()
|
||||
|
@ -353,7 +353,7 @@ when defined(posix):
|
|||
raiseOSError(osLastError())
|
||||
let entity = StdioEntity(
|
||||
facet: turn.facet, relay: relay, stdin: newAsyncFile(FD fd))
|
||||
onStop(entity.facet) do (turn: var Turn):
|
||||
onStop(entity.facet) do (turn: Turn):
|
||||
entity.alive = false
|
||||
close(entity.stdin)
|
||||
# Close stdin to remove it from the ioqueue
|
||||
|
@ -365,7 +365,7 @@ when defined(posix):
|
|||
resolved: localDataspace.accepted,
|
||||
))
|
||||
|
||||
proc connectStdio*(turn: var Turn; ds: Cap) =
|
||||
proc connectStdio*(turn: Turn; ds: Cap) =
|
||||
## Connect to an external dataspace over stdin and stdout.
|
||||
connectTransport(turn, ds, transportAddress.Stdio())
|
||||
|
||||
|
@ -382,13 +382,13 @@ when defined(posix):
|
|||
|
||||
SocketEntity = TcpEntity | UnixEntity
|
||||
|
||||
method message(entity: SocketEntity; turn: var Turn; ass: AssertionRef) =
|
||||
method message(entity: SocketEntity; turn: Turn; ass: AssertionRef) =
|
||||
if ass.value.preservesTo(ForceDisconnect).isSome:
|
||||
entity.alive = false
|
||||
|
||||
template bootSocketEntity() {.dirty.} =
|
||||
proc setup(turn: var Turn) {.closure.} =
|
||||
proc kill(turn: var Turn) =
|
||||
proc setup(turn: Turn) {.closure.} =
|
||||
proc kill(turn: Turn) =
|
||||
if entity.alive:
|
||||
entity.alive = false
|
||||
close(entity.sock)
|
||||
|
@ -421,24 +421,24 @@ when defined(posix):
|
|||
bootSocketEntity()
|
||||
|
||||
template spawnSocketRelay() {.dirty.} =
|
||||
proc writeConn(turn: var Turn; buf: seq[byte]) =
|
||||
proc writeConn(turn: Turn; buf: seq[byte]) =
|
||||
discard trampoline:
|
||||
whelp write(entity.sock, buf)
|
||||
var ops = RelayActorOptions(
|
||||
packetWriter: writeConn,
|
||||
initialOid: 0.Oid.some,
|
||||
)
|
||||
spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay):
|
||||
spawnRelay("socket", turn, ops) do (turn: Turn; relay: Relay):
|
||||
entity.facet = turn.facet
|
||||
entity.relay = relay
|
||||
discard trampoline:
|
||||
whelp boot(entity, ta, ds)
|
||||
|
||||
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Tcp) =
|
||||
proc connectTransport(turn: Turn; ds: Cap; ta: transportAddress.Tcp) =
|
||||
let entity = TcpEntity()
|
||||
spawnSocketRelay()
|
||||
|
||||
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Unix) =
|
||||
proc connectTransport(turn: Turn; ds: Cap; ta: transportAddress.Unix) =
|
||||
let entity = UnixEntity()
|
||||
spawnSocketRelay()
|
||||
|
||||
|
@ -453,14 +453,14 @@ elif defined(solo5):
|
|||
conn: Connection
|
||||
decoder: BufferedDecoder
|
||||
|
||||
method message(entity: TcpEntity; turn: var Turn; ass: AssertionRef) =
|
||||
method message(entity: TcpEntity; turn: Turn; ass: AssertionRef) =
|
||||
if ass.value.preservesTo(ForceDisconnect).isSome:
|
||||
entity.conn.abort()
|
||||
|
||||
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Tcp) =
|
||||
proc connectTransport(turn: Turn; ds: Cap; ta: transportAddress.Tcp) =
|
||||
let entity = TcpEntity(facet: turn.facet)
|
||||
|
||||
proc writeConn(turn: var Turn; buf: seq[byte]) =
|
||||
proc writeConn(turn: Turn; buf: seq[byte]) =
|
||||
assert not entity.conn.isNil
|
||||
entity.conn.batch:
|
||||
entity.conn.send(buf)
|
||||
|
@ -468,7 +468,7 @@ elif defined(solo5):
|
|||
packetWriter: writeConn,
|
||||
initialOid: 0.Oid.some,
|
||||
)
|
||||
spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay):
|
||||
spawnRelay("socket", turn, ops) do (turn: Turn; relay: Relay):
|
||||
entity.facet = turn.facet
|
||||
entity.relay = relay
|
||||
|
||||
|
@ -487,10 +487,10 @@ elif defined(solo5):
|
|||
var preconn = newPreconnection(
|
||||
remote=[ep], transport=tp.some)
|
||||
entity.conn = preconn.initiate()
|
||||
entity.facet.onStop do (turn: var Turn):
|
||||
entity.facet.onStop do (turn: Turn):
|
||||
entity.conn.close()
|
||||
entity.conn.onConnectionError do (err: ref Exception):
|
||||
run(entity.facet) do (turn: var Turn):
|
||||
run(entity.facet) do (turn: Turn):
|
||||
terminate(turn, err)
|
||||
entity.conn.onClosed():
|
||||
stop(entity.facet)
|
||||
|
@ -501,7 +501,7 @@ elif defined(solo5):
|
|||
else:
|
||||
entity.conn.receive()
|
||||
entity.conn.onReady do ():
|
||||
entity.facet.run do (turn: var Turn):
|
||||
entity.facet.run do (turn: Turn):
|
||||
publish(turn, ds, TransportConnection(
|
||||
`addr`: ta.toPreserves,
|
||||
control: newCap(entity, turn),
|
||||
|
@ -509,7 +509,7 @@ elif defined(solo5):
|
|||
))
|
||||
entity.conn.receive()
|
||||
|
||||
proc walk(turn: var Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) =
|
||||
proc walk(turn: Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) =
|
||||
if stepOff < route.pathSteps.len:
|
||||
let
|
||||
step = route.pathSteps[stepOff]
|
||||
|
@ -532,7 +532,7 @@ proc walk(turn: var Turn; ds, origin: Cap; route: Route; transOff, stepOff: int)
|
|||
resolved: origin.accepted,
|
||||
))
|
||||
|
||||
proc connectRoute(turn: var Turn; ds: Cap; route: Route; transOff: int) =
|
||||
proc connectRoute(turn: Turn; ds: Cap; route: Route; transOff: int) =
|
||||
let rejectPat = TransportConnection ?: {
|
||||
0: ?route.transports[transOff],
|
||||
2: ?:Rejected,
|
||||
|
@ -550,25 +550,25 @@ proc connectRoute(turn: var Turn; ds: Cap; route: Route; transOff: int) =
|
|||
onPublish(turn, ds, acceptPat) do (origin: Cap):
|
||||
walk(turn, ds, origin, route, transOff, 0)
|
||||
|
||||
type StepCallback = proc (turn: var Turn; step: Value; origin: Cap; res: Resolved) {.closure.}
|
||||
type StepCallback = proc (turn: Turn; step: Value; origin: Cap; res: Resolved) {.closure.}
|
||||
|
||||
proc spawnStepResolver(turn: var Turn; ds: Cap; stepType: Value; cb: StepCallback) =
|
||||
proc spawnStepResolver(turn: Turn; ds: Cap; stepType: Value; cb: StepCallback) =
|
||||
let pat = observePattern(
|
||||
ResolvedPathStep?:{1: grabRecord(stepType)},
|
||||
{ @[0.toPreserve]: grabLit(), @[1.toPreserve]: grab() },
|
||||
)
|
||||
during(turn, ds, pat) do (origin: Cap; stepDetail: Literal[Value]):
|
||||
proc duringCallback(turn: var Turn; ass: Value; h: Handle): TurnAction =
|
||||
proc duringCallback(turn: Turn; ass: Value; h: Handle): TurnAction =
|
||||
var res: Resolved
|
||||
if res.fromPreserves ass:
|
||||
cb(turn, stepDetail.value, origin, res)
|
||||
proc action(turn: var Turn) =
|
||||
proc action(turn: Turn) =
|
||||
stop(turn)
|
||||
result = action
|
||||
publish(turn, origin, Resolve(
|
||||
step: stepDetail.value, observer: newCap(turn, during(duringCallback))))
|
||||
|
||||
proc spawnRelays*(turn: var Turn; ds: Cap) =
|
||||
proc spawnRelays*(turn: Turn; ds: Cap) =
|
||||
## Spawn actors that manage routes and appease gatekeepers.
|
||||
|
||||
let transPat = observePattern(!TransportConnection, { @[0.toPreserves]: grab() })
|
||||
|
@ -604,13 +604,13 @@ proc spawnRelays*(turn: var Turn; ds: Cap) =
|
|||
connectRoute(turn, ds, route.value, i)
|
||||
|
||||
spawnStepResolver(turn, ds, "ref".toSymbol) do (
|
||||
turn: var Turn, step: Value, origin: Cap, res: Resolved):
|
||||
turn: Turn, step: Value, origin: Cap, res: Resolved):
|
||||
publish(turn, ds, ResolvedPathStep(
|
||||
origin: origin, pathStep: step, resolved: res))
|
||||
|
||||
type BootProc* = proc (turn: var Turn; ds: Cap) {.closure.}
|
||||
type BootProc* = proc (turn: Turn; ds: Cap) {.closure.}
|
||||
|
||||
proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
|
||||
proc resolve*(turn: Turn; ds: Cap; route: Route; bootProc: BootProc) =
|
||||
## Resolve `route` within `ds` and call `bootProc` with resolved capabilities.
|
||||
let pat = ResolvePath ?: {0: ?route, 3: ?:ResolvedAccepted}
|
||||
during(turn, ds, ResolvePath ?: {0: ?route, 3: ?:ResolvedAccepted}) do (dst: Cap):
|
||||
|
@ -634,7 +634,7 @@ when defined(posix):
|
|||
if not result.fromPreserves(pr):
|
||||
raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr)
|
||||
|
||||
proc resolveEnvironment*(turn: var Turn; bootProc: BootProc) =
|
||||
proc resolveEnvironment*(turn: Turn; bootProc: BootProc) =
|
||||
## Resolve a capability from the calling environment
|
||||
## and call `bootProc`. See envRoute_.
|
||||
var resolved = false
|
||||
|
|
|
@ -63,7 +63,7 @@ func isEmpty(cont: Continuation): bool =
|
|||
type
|
||||
ContinuationProc = proc (c: Continuation; v: Value) {.closure.}
|
||||
LeafProc = proc (l: Leaf; v: Value) {.closure.}
|
||||
ObserverProc = proc (turn: var Turn; group: ObserverGroup; vs: seq[Value]) {.closure.}
|
||||
ObserverProc = proc (turn: Turn; group: ObserverGroup; vs: seq[Value]) {.closure.}
|
||||
|
||||
proc getLeaves(cont: Continuation; presentPaths, constPaths: Paths): LeafMap =
|
||||
result = cont.leafMap.getOrDefault(constPaths)
|
||||
|
@ -111,10 +111,10 @@ proc top(stack: TermStack): Value =
|
|||
assert stack.len > 0
|
||||
stack[stack.high]
|
||||
|
||||
proc modify(node: Node; turn: var Turn; outerValue: Value; event: EventKind;
|
||||
proc modify(node: Node; turn: Turn; outerValue: Value; event: EventKind;
|
||||
modCont: ContinuationProc; modLeaf: LeafProc; modObs: ObserverProc) =
|
||||
|
||||
proc walk(cont: Continuation; turn: var Turn) =
|
||||
proc walk(cont: Continuation; turn: Turn) =
|
||||
modCont(cont, outerValue)
|
||||
for constPaths, constValMap in cont.leafMap.pairs:
|
||||
let constVals = projectPaths(outerValue, constPaths)
|
||||
|
@ -139,7 +139,7 @@ proc modify(node: Node; turn: var Turn; outerValue: Value; event: EventKind;
|
|||
constValMap.del(get constVals)
|
||||
|
||||
|
||||
proc walk(node: Node; turn: var Turn; termStack: TermStack) =
|
||||
proc walk(node: Node; turn: Turn; termStack: TermStack) =
|
||||
walk(node.continuation, turn)
|
||||
for selector, table in node.edges:
|
||||
let
|
||||
|
@ -212,7 +212,7 @@ proc getEndpoints(leaf: Leaf; capturePaths: Paths): ObserverGroup =
|
|||
if captures.isSome:
|
||||
discard result.cachedCaptures.change(get captures, +1)
|
||||
|
||||
proc add*(index: var Index; turn: var Turn; pattern: Pattern; observer: Cap) =
|
||||
proc add*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) =
|
||||
let
|
||||
cont = index.root.extend(pattern)
|
||||
analysis = analyse pattern
|
||||
|
@ -225,7 +225,7 @@ proc add*(index: var Index; turn: var Turn; pattern: Pattern; observer: Cap) =
|
|||
captureMap[capture] = publish(turn, observer, capture)
|
||||
endpoints.observers[observer] = captureMap
|
||||
|
||||
proc remove*(index: var Index; turn: var Turn; pattern: Pattern; observer: Cap) =
|
||||
proc remove*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) =
|
||||
let
|
||||
cont = index.root.extend(pattern)
|
||||
analysis = analyse pattern
|
||||
|
@ -245,7 +245,7 @@ proc remove*(index: var Index; turn: var Turn; pattern: Pattern; observer: Cap)
|
|||
if constValMap.len == 0:
|
||||
cont.leafMap.del(analysis.constPaths)
|
||||
|
||||
proc adjustAssertion(index: var Index; turn: var Turn; outerValue: Value; delta: int): bool =
|
||||
proc adjustAssertion(index: var Index; turn: Turn; outerValue: Value; delta: int): bool =
|
||||
case index.allAssertions.change(outerValue, delta)
|
||||
of cdAbsentToPresent:
|
||||
result = true
|
||||
|
@ -253,7 +253,7 @@ proc adjustAssertion(index: var Index; turn: var Turn; outerValue: Value; delta:
|
|||
c.cache.incl(v)
|
||||
proc modLeaf(l: Leaf; v: Value) =
|
||||
l.cache.incl(v)
|
||||
proc modObserver(turn: var Turn; group: ObserverGroup; vs: seq[Value]) =
|
||||
proc modObserver(turn: Turn; group: ObserverGroup; vs: seq[Value]) =
|
||||
let change = group.cachedCaptures.change(vs, +1)
|
||||
if change == cdAbsentToPresent:
|
||||
for (observer, captureMap) in group.observers.pairs:
|
||||
|
@ -266,7 +266,7 @@ proc adjustAssertion(index: var Index; turn: var Turn; outerValue: Value; delta:
|
|||
c.cache.excl(v)
|
||||
proc modLeaf(l: Leaf; v: Value) =
|
||||
l.cache.excl(v)
|
||||
proc modObserver(turn: var Turn; group: ObserverGroup; vs: seq[Value]) =
|
||||
proc modObserver(turn: Turn; group: ObserverGroup; vs: seq[Value]) =
|
||||
if group.cachedCaptures.change(vs, -1) == cdPresentToAbsent:
|
||||
for (observer, captureMap) in group.observers.pairs:
|
||||
var h: Handle
|
||||
|
@ -278,12 +278,12 @@ proc adjustAssertion(index: var Index; turn: var Turn; outerValue: Value; delta:
|
|||
proc continuationNoop(c: Continuation; v: Value) = discard
|
||||
proc leafNoop(l: Leaf; v: Value) = discard
|
||||
|
||||
proc add*(index: var Index; turn: var Turn; v: Value): bool =
|
||||
proc add*(index: var Index; turn: Turn; v: Value): bool =
|
||||
adjustAssertion(index, turn, v, +1)
|
||||
proc remove*(index: var Index; turn: var Turn; v: Value): bool =
|
||||
proc remove*(index: var Index; turn: Turn; v: Value): bool =
|
||||
adjustAssertion(index, turn, v, -1)
|
||||
|
||||
proc deliverMessage*(index: var Index; turn: var Turn; v: Value) =
|
||||
proc observersCb(turn: var Turn; group: ObserverGroup; vs: seq[Value]) =
|
||||
proc deliverMessage*(index: var Index; turn: Turn; v: Value) =
|
||||
proc observersCb(turn: Turn; group: ObserverGroup; vs: seq[Value]) =
|
||||
for observer in group.observers.keys: message(turn, observer, vs)
|
||||
index.root.modify(turn, v, messageEvent, continuationNoop, leafNoop, observersCb)
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# Package
|
||||
|
||||
version = "20240424"
|
||||
version = "20240430"
|
||||
author = "Emery Hemingway"
|
||||
description = "Syndicated actors for conversational concurrency"
|
||||
license = "Unlicense"
|
||||
|
|
|
@ -11,17 +11,17 @@ acquireDevices([("relay", netBasic)], netAcquireHook)
|
|||
type Netif {.preservesRecord: "netif"} = object
|
||||
device, ipAddr: string
|
||||
|
||||
proc spawnNetifActor(turn: var Turn; ds: Cap) =
|
||||
spawnActor(turn, "netif") do (turn: var Turn):
|
||||
proc spawnNetifActor(turn: Turn; ds: Cap) =
|
||||
spawnActor(turn, "netif") do (turn: Turn):
|
||||
let facet = turn.facet
|
||||
onInterfaceUp do (device: string; ip: IpAddress):
|
||||
run(facet) do (turn: var Turn):
|
||||
run(facet) do (turn: Turn):
|
||||
if not ip.isLinkLocal:
|
||||
discard publish(turn, ds, Netif(device: device, ipAddr: $ip))
|
||||
|
||||
runActor("relay-test") do (turn: var Turn):
|
||||
runActor("relay-test") do (turn: Turn):
|
||||
let root = turn.facet
|
||||
onStop(turn) do (turn: var Turn):
|
||||
onStop(turn) do (turn: Turn):
|
||||
quit()
|
||||
let ds = newDataspace(turn)
|
||||
spawnNetifActor(turn, ds)
|
||||
|
@ -33,7 +33,7 @@ runActor("relay-test") do (turn: var Turn):
|
|||
echo "parsed route ", route.toPreserves
|
||||
during(turn, ds, Netif?:{1: grab()}) do (ip: string):
|
||||
echo "Acquired address ", ip
|
||||
resolve(turn, ds, route) do (turn: var Turn; ds: Cap):
|
||||
resolve(turn, ds, route) do (turn: Turn; ds: Cap):
|
||||
echo "route resolved!"
|
||||
echo "stopping root facet"
|
||||
stop(turn, root)
|
||||
|
|
|
@ -7,19 +7,19 @@ import syndicate, syndicate/drivers/timers
|
|||
|
||||
acquireDevices()
|
||||
|
||||
runActor("timer-test") do (turn: var Turn):
|
||||
runActor("timer-test") do (turn: Turn):
|
||||
let timers = newDataspace(turn)
|
||||
spawnTimerDriver(turn, timers)
|
||||
|
||||
onPublish(turn, timers, ?LaterThan(seconds: 1356100000)):
|
||||
echo "now in 13th bʼakʼtun"
|
||||
|
||||
after(turn, timers, initDuration(seconds = 3)) do (turn: var Turn):
|
||||
after(turn, timers, initDuration(seconds = 3)) do (turn: Turn):
|
||||
echo "third timer expired"
|
||||
stopActor(turn)
|
||||
|
||||
after(turn, timers, initDuration(seconds = 1)) do (turn: var Turn):
|
||||
after(turn, timers, initDuration(seconds = 1)) do (turn: Turn):
|
||||
echo "first timer expired"
|
||||
|
||||
after(turn, timers, initDuration(seconds = 2)) do (turn: var Turn):
|
||||
after(turn, timers, initDuration(seconds = 2)) do (turn: Turn):
|
||||
echo "second timer expired"
|
||||
|
|
|
@ -14,7 +14,7 @@ type
|
|||
proc syncAndStop(facet: Facet; cap: Cap) =
|
||||
## Stop the actor responsible for `facet` after
|
||||
## synchronizing with `cap`.
|
||||
run(facet) do (turn: var Turn):
|
||||
run(facet) do (turn: Turn):
|
||||
sync(turn, cap, stopActor)
|
||||
|
||||
proc readStdin(facet: Facet; ds: Cap; username: string) {.asyncio.} =
|
||||
|
@ -37,11 +37,11 @@ proc readStdin(facet: Facet; ds: Cap; username: string) {.asyncio.} =
|
|||
return
|
||||
else:
|
||||
var msg = buf[][0..<n].strip
|
||||
proc send(turn: var Turn) =
|
||||
proc send(turn: Turn) =
|
||||
message(turn, ds, Says(who: username, what: msg))
|
||||
run(facet, send)
|
||||
|
||||
proc chat(turn: var Turn; ds: Cap; username: string) =
|
||||
proc chat(turn: Turn; ds: Cap; username: string) =
|
||||
during(turn, ds, ?:Present) do (who: string):
|
||||
echo who, " joined"
|
||||
do:
|
||||
|
@ -67,8 +67,8 @@ proc main =
|
|||
if username == "":
|
||||
stderr.writeLine "--user: unspecified"
|
||||
else:
|
||||
runActor("chat") do (turn: var Turn):
|
||||
resolveEnvironment(turn) do (turn: var Turn; ds: Cap):
|
||||
runActor("chat") do (turn: Turn):
|
||||
resolveEnvironment(turn) do (turn: Turn; ds: Cap):
|
||||
chat(turn, ds, username)
|
||||
|
||||
main()
|
||||
|
|
|
@ -6,7 +6,7 @@ import syndicate, syndicate/drivers/timers, preserves
|
|||
|
||||
var passCount = 0
|
||||
|
||||
runActor("timer-test") do (turn: var Turn):
|
||||
runActor("timer-test") do (turn: Turn):
|
||||
let timers = newDataspace(turn)
|
||||
spawnTimerDriver(turn, timers)
|
||||
|
||||
|
@ -14,17 +14,17 @@ runActor("timer-test") do (turn: var Turn):
|
|||
echo "now in 13th bʼakʼtun"
|
||||
inc passCount
|
||||
|
||||
after(turn, timers, initDuration(seconds = 3)) do (turn: var Turn):
|
||||
after(turn, timers, initDuration(seconds = 3)) do (turn: Turn):
|
||||
echo "third timer expired"
|
||||
assert passCount == 3
|
||||
inc passCount
|
||||
|
||||
after(turn, timers, initDuration(seconds = 1)) do (turn: var Turn):
|
||||
after(turn, timers, initDuration(seconds = 1)) do (turn: Turn):
|
||||
echo "first timer expired"
|
||||
assert passCount == 1
|
||||
inc passCount
|
||||
|
||||
after(turn, timers, initDuration(seconds = 2)) do (turn: var Turn):
|
||||
after(turn, timers, initDuration(seconds = 2)) do (turn: Turn):
|
||||
echo "second timer expired"
|
||||
assert passCount == 2
|
||||
inc passCount
|
||||
|
|
Loading…
Reference in New Issue