Revert to var Turn
This commit is contained in:
parent
9d975bab56
commit
cf395dbfa4
|
@ -35,21 +35,21 @@ proc `??`*(pat: Pattern; bindings: openArray[(int, Pattern)]): Pattern {.inline.
|
||||||
patterns.inject(pat, bindings)
|
patterns.inject(pat, bindings)
|
||||||
|
|
||||||
type
|
type
|
||||||
PublishProc = proc (turn: Turn; v: Value; h: Handle) {.closure.}
|
PublishProc = proc (turn: var Turn; v: Value; h: Handle) {.closure.}
|
||||||
RetractProc = proc (turn: Turn; h: Handle) {.closure.}
|
RetractProc = proc (turn: var Turn; h: Handle) {.closure.}
|
||||||
MessageProc = proc (turn: Turn; v: Value) {.closure.}
|
MessageProc = proc (turn: var Turn; v: Value) {.closure.}
|
||||||
ClosureEntity = ref object of Entity
|
ClosureEntity = ref object of Entity
|
||||||
publishImpl*: PublishProc
|
publishImpl*: PublishProc
|
||||||
retractImpl*: RetractProc
|
retractImpl*: RetractProc
|
||||||
messageImpl*: MessageProc
|
messageImpl*: MessageProc
|
||||||
|
|
||||||
method publish(e: ClosureEntity; turn: Turn; a: AssertionRef; h: Handle) =
|
method publish(e: ClosureEntity; turn: var Turn; a: AssertionRef; h: Handle) =
|
||||||
if not e.publishImpl.isNil: e.publishImpl(turn, a.value, h)
|
if not e.publishImpl.isNil: e.publishImpl(turn, a.value, h)
|
||||||
|
|
||||||
method retract(e: ClosureEntity; turn: Turn; h: Handle) =
|
method retract(e: ClosureEntity; turn: var Turn; h: Handle) =
|
||||||
if not e.retractImpl.isNil: e.retractImpl(turn, h)
|
if not e.retractImpl.isNil: e.retractImpl(turn, h)
|
||||||
|
|
||||||
method message(e: ClosureEntity; turn: Turn; a: AssertionRef) =
|
method message(e: ClosureEntity; turn: var Turn; a: AssertionRef) =
|
||||||
if not e.messageImpl.isNil: e.messageImpl(turn, a.value)
|
if not e.messageImpl.isNil: e.messageImpl(turn, a.value)
|
||||||
|
|
||||||
proc argumentCount(handler: NimNode): int =
|
proc argumentCount(handler: NimNode): int =
|
||||||
|
@ -94,7 +94,7 @@ proc wrapPublishHandler(turn, handler: NimNode): NimNode =
|
||||||
handlerSym = genSym(nskProc, "publish")
|
handlerSym = genSym(nskProc, "publish")
|
||||||
bindingsSym = ident"bindings"
|
bindingsSym = ident"bindings"
|
||||||
quote do:
|
quote do:
|
||||||
proc `handlerSym`(`turn`: Turn; `bindingsSym`: Value; `handleSym`: Handle) =
|
proc `handlerSym`(`turn`: var Turn; `bindingsSym`: Value; `handleSym`: Handle) =
|
||||||
`varSection`
|
`varSection`
|
||||||
if fromPreserves(`valuesSym`, bindings):
|
if fromPreserves(`valuesSym`, bindings):
|
||||||
`publishBody`
|
`publishBody`
|
||||||
|
@ -106,7 +106,7 @@ proc wrapMessageHandler(turn, handler: NimNode): NimNode =
|
||||||
handlerSym = genSym(nskProc, "message")
|
handlerSym = genSym(nskProc, "message")
|
||||||
bindingsSym = ident"bindings"
|
bindingsSym = ident"bindings"
|
||||||
quote do:
|
quote do:
|
||||||
proc `handlerSym`(`turn`: Turn; `bindingsSym`: Value) =
|
proc `handlerSym`(`turn`: var Turn; `bindingsSym`: Value) =
|
||||||
`varSection`
|
`varSection`
|
||||||
if fromPreserves(`valuesSym`, bindings):
|
if fromPreserves(`valuesSym`, bindings):
|
||||||
`body`
|
`body`
|
||||||
|
@ -120,17 +120,17 @@ proc wrapDuringHandler(turn, entryBody, exitBody: NimNode): NimNode =
|
||||||
duringSym = genSym(nskProc, "during")
|
duringSym = genSym(nskProc, "during")
|
||||||
if exitBody.isNil:
|
if exitBody.isNil:
|
||||||
quote do:
|
quote do:
|
||||||
proc `duringSym`(`turn`: Turn; `bindingsSym`: Value; `handleSym`: Handle): TurnAction =
|
proc `duringSym`(`turn`: var Turn; `bindingsSym`: Value; `handleSym`: Handle): TurnAction =
|
||||||
`varSection`
|
`varSection`
|
||||||
if fromPreserves(`valuesSym`, `bindingsSym`):
|
if fromPreserves(`valuesSym`, `bindingsSym`):
|
||||||
`publishBody`
|
`publishBody`
|
||||||
else:
|
else:
|
||||||
quote do:
|
quote do:
|
||||||
proc `duringSym`(`turn`: Turn; `bindingsSym`: Value; `handleSym`: Handle): TurnAction =
|
proc `duringSym`(`turn`: var Turn; `bindingsSym`: Value; `handleSym`: Handle): TurnAction =
|
||||||
`varSection`
|
`varSection`
|
||||||
if fromPreserves(`valuesSym`, `bindingsSym`):
|
if fromPreserves(`valuesSym`, `bindingsSym`):
|
||||||
`publishBody`
|
`publishBody`
|
||||||
proc action(`turn`: Turn) =
|
proc action(`turn`: var Turn) =
|
||||||
`exitBody`
|
`exitBody`
|
||||||
result = action
|
result = action
|
||||||
|
|
||||||
|
|
|
@ -59,7 +59,7 @@ type
|
||||||
facetIdAllocator: uint
|
facetIdAllocator: uint
|
||||||
exiting, exited: bool
|
exiting, exited: bool
|
||||||
|
|
||||||
TurnAction* = proc (t: Turn) {.closure.}
|
TurnAction* = proc (t: var Turn) {.closure.}
|
||||||
|
|
||||||
Turn* = ref object
|
Turn* = ref object
|
||||||
facet: Facet # active facet that may change during a turn
|
facet: Facet # active facet that may change during a turn
|
||||||
|
@ -110,15 +110,15 @@ when tracing:
|
||||||
result.add f.id.toPreserves
|
result.add f.id.toPreserves
|
||||||
f = f.parent
|
f = f.parent
|
||||||
|
|
||||||
method publish*(e: Entity; turn: Turn; v: AssertionRef; h: Handle) {.base.} = discard
|
method publish*(e: Entity; turn: var Turn; v: AssertionRef; h: Handle) {.base.} = discard
|
||||||
method retract*(e: Entity; turn: Turn; h: Handle) {.base.} = discard
|
method retract*(e: Entity; turn: var Turn; h: Handle) {.base.} = discard
|
||||||
method message*(e: Entity; turn: Turn; v: AssertionRef) {.base.} = discard
|
method message*(e: Entity; turn: var Turn; v: AssertionRef) {.base.} = discard
|
||||||
method sync*(e: Entity; turn: Turn; peer: Cap) {.base.} = discard
|
method sync*(e: Entity; turn: var Turn; peer: Cap) {.base.} = discard
|
||||||
|
|
||||||
using
|
using
|
||||||
actor: Actor
|
actor: Actor
|
||||||
facet: Facet
|
facet: Facet
|
||||||
turn: Turn
|
turn: var Turn
|
||||||
action: TurnAction
|
action: TurnAction
|
||||||
|
|
||||||
proc labels(f: Facet): string =
|
proc labels(f: Facet): string =
|
||||||
|
@ -156,11 +156,13 @@ proc hash*(facet): Hash =
|
||||||
|
|
||||||
proc hash*(r: Cap): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash)
|
proc hash*(r: Cap): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash)
|
||||||
|
|
||||||
|
proc actor*(turn): Actor = turn.facet.actor
|
||||||
|
|
||||||
proc nextHandle(facet: Facet): Handle =
|
proc nextHandle(facet: Facet): Handle =
|
||||||
result = succ(facet.actor.handleAllocator[])
|
result = succ(facet.actor.handleAllocator[])
|
||||||
facet.actor.handleAllocator[] = result
|
facet.actor.handleAllocator[] = result
|
||||||
|
|
||||||
proc queueWork*(turn: Turn; facet: Facet; act: TurnAction) =
|
proc queueWork*(turn: var Turn; facet: Facet; act: TurnAction) =
|
||||||
assert not facet.isNil
|
assert not facet.isNil
|
||||||
turn.work.addLast((facet, act,))
|
turn.work.addLast((facet, act,))
|
||||||
|
|
||||||
|
@ -172,7 +174,7 @@ proc queueTurn*(facet: Facet; act: TurnAction) =
|
||||||
turn.desc.id = nextTurnId()
|
turn.desc.id = nextTurnId()
|
||||||
turnQueue.addLast(turn)
|
turnQueue.addLast(turn)
|
||||||
|
|
||||||
proc queueTurn*(prev: Turn; facet: Facet; act: TurnAction) =
|
proc queueTurn*(prev: var Turn; facet: Facet; act: TurnAction) =
|
||||||
var next = Turn(facet: facet)
|
var next = Turn(facet: facet)
|
||||||
assert not facet.isNil
|
assert not facet.isNil
|
||||||
next.work.addLast((facet, act,))
|
next.work.addLast((facet, act,))
|
||||||
|
@ -309,12 +311,12 @@ proc runRewrites*(a: Attenuation; v: Value): Value =
|
||||||
result = examineAlternatives(stage, result)
|
result = examineAlternatives(stage, result)
|
||||||
if result.isFalse: break
|
if result.isFalse: break
|
||||||
|
|
||||||
proc publish(turn: Turn; cap: Cap; v: Value; h: Handle) =
|
proc publish(turn: var Turn; cap: Cap; v: Value; h: Handle) =
|
||||||
var a = runRewrites(cap.attenuation, v)
|
var a = runRewrites(cap.attenuation, v)
|
||||||
if not a.isFalse:
|
if not a.isFalse:
|
||||||
let e = OutboundAssertion(handle: h, peer: cap)
|
let e = OutboundAssertion(handle: h, peer: cap)
|
||||||
turn.facet.outbound[h] = e
|
turn.facet.outbound[h] = e
|
||||||
queueEffect(turn, cap.relay) do (turn: Turn):
|
queueEffect(turn, cap.relay) do (turn: var Turn):
|
||||||
e.established = true
|
e.established = true
|
||||||
publish(cap.target, turn, AssertionRef(value: a), e.handle)
|
publish(cap.target, turn, AssertionRef(value: a), e.handle)
|
||||||
when tracing:
|
when tracing:
|
||||||
|
@ -328,54 +330,54 @@ proc publish(turn: Turn; cap: Cap; v: Value; h: Handle) =
|
||||||
act.enqueue.event.detail.assert.handle = h
|
act.enqueue.event.detail.assert.handle = h
|
||||||
turn.desc.actions.add act
|
turn.desc.actions.add act
|
||||||
|
|
||||||
proc publish*(turn: Turn; r: Cap; a: Value): Handle {.discardable.} =
|
proc publish*(turn: var Turn; r: Cap; a: Value): Handle {.discardable.} =
|
||||||
result = turn.facet.nextHandle()
|
result = turn.facet.nextHandle()
|
||||||
publish(turn, r, a, result)
|
publish(turn, r, a, result)
|
||||||
|
|
||||||
proc publish*[T](turn: Turn; r: Cap; a: T): Handle {.discardable.} =
|
proc publish*[T](turn: var Turn; r: Cap; a: T): Handle {.discardable.} =
|
||||||
publish(turn, r, a.toPreserves)
|
publish(turn, r, a.toPreserves)
|
||||||
|
|
||||||
proc retract(turn: Turn; e: OutboundAssertion) =
|
proc retract(turn: var Turn; e: OutboundAssertion) =
|
||||||
queueEffect(turn, e.peer.relay) do (turn: Turn):
|
queueEffect(turn, e.peer.relay) do (turn: var Turn):
|
||||||
if e.established:
|
if e.established:
|
||||||
e.established = false
|
e.established = false
|
||||||
e.peer.target.retract(turn, e.handle)
|
e.peer.target.retract(turn, e.handle)
|
||||||
|
|
||||||
proc retract*(turn: Turn; h: Handle) =
|
proc retract*(turn: var Turn; h: Handle) =
|
||||||
var e: OutboundAssertion
|
var e: OutboundAssertion
|
||||||
if turn.facet.outbound.pop(h, e):
|
if turn.facet.outbound.pop(h, e):
|
||||||
turn.retract(e)
|
turn.retract(e)
|
||||||
|
|
||||||
proc message*(turn: Turn; r: Cap; v: Value) =
|
proc message*(turn: var Turn; r: Cap; v: Value) =
|
||||||
var a = runRewrites(r.attenuation, v)
|
var a = runRewrites(r.attenuation, v)
|
||||||
if not a.isFalse:
|
if not a.isFalse:
|
||||||
queueEffect(turn, r.relay) do (turn: Turn):
|
queueEffect(turn, r.relay) do (turn: var Turn):
|
||||||
r.target.message(turn, AssertionRef(value: a))
|
r.target.message(turn, AssertionRef(value: a))
|
||||||
|
|
||||||
proc message*[T](turn: Turn; r: Cap; v: T) =
|
proc message*[T](turn: Turn; r: Cap; v: T) =
|
||||||
queueEffect(turn, r.relay) do (turn: Turn):
|
queueEffect(turn, r.relay) do (turn: Turn):
|
||||||
message(turn, r, v.toPreserves)
|
message(turn, r, v.toPreserves)
|
||||||
|
|
||||||
proc sync(turn: Turn; e: Entity; peer: Cap) =
|
proc sync(turn: var Turn; e: Entity; peer: Cap) =
|
||||||
e.sync(turn, peer)
|
e.sync(turn, peer)
|
||||||
|
|
||||||
proc sync*(turn: Turn; r, peer: Cap) =
|
proc sync*(turn: var Turn; r, peer: Cap) =
|
||||||
queueEffect(turn, r.relay) do (turn: Turn):
|
queueEffect(turn, r.relay) do (turn: var Turn):
|
||||||
sync(turn, r.target, peer)
|
sync(turn, r.target, peer)
|
||||||
|
|
||||||
proc replace*[T](turn: Turn; cap: Cap; h: Handle; v: T): Handle =
|
proc replace*[T](turn: var Turn; cap: Cap; h: Handle; v: T): Handle =
|
||||||
result = publish(turn, cap, v)
|
result = publish(turn, cap, v)
|
||||||
if h != default(Handle):
|
if h != default(Handle):
|
||||||
retract(turn, h)
|
retract(turn, h)
|
||||||
|
|
||||||
proc replace*[T](turn: Turn; cap: Cap; h: var Handle; v: T): Handle {.discardable.} =
|
proc replace*[T](turn: var Turn; cap: Cap; h: var Handle; v: T): Handle {.discardable.} =
|
||||||
var old = h
|
var old = h
|
||||||
h = publish(turn, cap, v)
|
h = publish(turn, cap, v)
|
||||||
if old != default(Handle):
|
if old != default(Handle):
|
||||||
retract(turn, old)
|
retract(turn, old)
|
||||||
h
|
h
|
||||||
|
|
||||||
proc stop*(turn: Turn)
|
proc stop*(turn: var Turn)
|
||||||
|
|
||||||
proc newFacet(actor; parent: Facet; initialAssertions: OutboundTable): Facet =
|
proc newFacet(actor; parent: Facet; initialAssertions: OutboundTable): Facet =
|
||||||
inc actor.facetIdAllocator
|
inc actor.facetIdAllocator
|
||||||
|
@ -410,13 +412,13 @@ proc preventInertCheck*(facet): (proc()) {.discardable.} =
|
||||||
|
|
||||||
proc terminate(actor; turn; reason: ref Exception)
|
proc terminate(actor; turn; reason: ref Exception)
|
||||||
|
|
||||||
proc terminate(facet; turn: Turn; orderly: bool) =
|
proc terminate(facet; turn: var Turn; orderly: bool) =
|
||||||
if facet.isAlive:
|
if facet.isAlive:
|
||||||
facet.isAlive = false
|
facet.isAlive = false
|
||||||
let parent = facet.parent
|
let parent = facet.parent
|
||||||
if not parent.isNil:
|
if not parent.isNil:
|
||||||
parent.children.excl facet
|
parent.children.excl facet
|
||||||
queueWork(turn, facet) do (turn: Turn):
|
queueWork(turn, facet) do (turn: var Turn):
|
||||||
while facet.children.len > 0:
|
while facet.children.len > 0:
|
||||||
facet.children.pop.terminate(turn, orderly)
|
facet.children.pop.terminate(turn, orderly)
|
||||||
if orderly:
|
if orderly:
|
||||||
|
@ -428,7 +430,6 @@ proc terminate(facet; turn: Turn; orderly: bool) =
|
||||||
if parent.isInert:
|
if parent.isInert:
|
||||||
parent.terminate(turn, true)
|
parent.terminate(turn, true)
|
||||||
else:
|
else:
|
||||||
stderr.writeLine "terminate facet is terminating ", facet.actor.name
|
|
||||||
terminate(facet.actor, turn, nil)
|
terminate(facet.actor, turn, nil)
|
||||||
when tracing:
|
when tracing:
|
||||||
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
|
var act = ActionDescription(orKind: ActionDescriptionKind.facetStop)
|
||||||
|
@ -437,19 +438,18 @@ proc terminate(facet; turn: Turn; orderly: bool) =
|
||||||
|
|
||||||
proc stopIfInertAfter(action: TurnAction): TurnAction =
|
proc stopIfInertAfter(action: TurnAction): TurnAction =
|
||||||
# TODO: verify this
|
# TODO: verify this
|
||||||
proc wrapper(turn: Turn) =
|
proc wrapper(turn: var Turn) =
|
||||||
action(turn)
|
action(turn)
|
||||||
queueEffect(turn, turn.facet) do (turn: Turn):
|
queueEffect(turn, turn.facet) do (turn: var Turn):
|
||||||
if (not turn.facet.parent.isNil and
|
if (not turn.facet.parent.isNil and
|
||||||
(not turn.facet.parent.isAlive)) or
|
(not turn.facet.parent.isAlive)) or
|
||||||
turn.facet.isInert:
|
turn.facet.isInert:
|
||||||
stderr.writeLine("stopIfInertAfter stops turn")
|
|
||||||
stop(turn)
|
stop(turn)
|
||||||
wrapper
|
wrapper
|
||||||
|
|
||||||
proc newFacet(turn: Turn): Facet = newFacet(turn.facet.actor, turn.facet)
|
proc newFacet(turn: var Turn): Facet = newFacet(turn.facet.actor, turn.facet)
|
||||||
|
|
||||||
proc inFacet*(turn: Turn; bootProc: TurnAction): Facet =
|
proc inFacet*(turn: var Turn; bootProc: TurnAction): Facet =
|
||||||
result = newFacet(turn)
|
result = newFacet(turn)
|
||||||
let facet = turn.facet
|
let facet = turn.facet
|
||||||
turn.facet = result
|
turn.facet = result
|
||||||
|
@ -460,7 +460,7 @@ proc inFacet*(turn: Turn; bootProc: TurnAction): Facet =
|
||||||
stopIfInertAfter(bootProc)(turn)
|
stopIfInertAfter(bootProc)(turn)
|
||||||
turn.facet = facet
|
turn.facet = facet
|
||||||
|
|
||||||
proc facet(turn: Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc)
|
proc facet(turn: var Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc)
|
||||||
|
|
||||||
proc newActor(name: string; parent: Facet): Actor =
|
proc newActor(name: string; parent: Facet): Actor =
|
||||||
result = Actor(
|
result = Actor(
|
||||||
|
@ -494,9 +494,9 @@ proc bootActor*(name: string; bootProc: TurnAction): Actor {.discardable.} =
|
||||||
turn.desc.cause.external.description = "bootActor".toPreserves
|
turn.desc.cause.external.description = "bootActor".toPreserves
|
||||||
turnQueue.addLast turn
|
turnQueue.addLast turn
|
||||||
|
|
||||||
proc spawnActor*(name: string; turn: Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
proc spawnActor*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||||
let actor = newActor(name, turn.facet)
|
let actor = newActor(name, turn.facet)
|
||||||
queueEffect(turn, actor.root) do (turn: Turn):
|
queueEffect(turn, actor.root) do (turn: var Turn):
|
||||||
var newOutBound: Table[Handle, OutboundAssertion]
|
var newOutBound: Table[Handle, OutboundAssertion]
|
||||||
for key in initialAssertions:
|
for key in initialAssertions:
|
||||||
discard turn.facet.outbound.pop(key, newOutbound[key])
|
discard turn.facet.outbound.pop(key, newOutbound[key])
|
||||||
|
@ -507,13 +507,12 @@ proc spawnActor*(name: string; turn: Turn; bootProc: TurnAction; initialAssertio
|
||||||
run(actor, bootProc, newOutBound)
|
run(actor, bootProc, newOutBound)
|
||||||
actor
|
actor
|
||||||
|
|
||||||
proc spawn*(name: string; turn: Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||||
spawnActor(name, turn, bootProc, initialAssertions)
|
spawnActor(name, turn, bootProc, initialAssertions)
|
||||||
|
|
||||||
type StopOnRetract = ref object of Entity
|
type StopOnRetract = ref object of Entity
|
||||||
|
|
||||||
method retract*(e: StopOnRetract; turn: Turn; h: Handle) =
|
method retract*(e: StopOnRetract; turn: var Turn; h: Handle) =
|
||||||
stderr.writeLIne "StopOnRetract stops turn ", turn, " of ", turn.facet.actor
|
|
||||||
stop(turn)
|
stop(turn)
|
||||||
|
|
||||||
proc halfLink(facet, other: Facet) =
|
proc halfLink(facet, other: Facet) =
|
||||||
|
@ -524,7 +523,7 @@ proc halfLink(facet, other: Facet) =
|
||||||
established: true,
|
established: true,
|
||||||
)
|
)
|
||||||
|
|
||||||
proc spawnLink*(name: string; turn: Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
proc spawnLink*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} =
|
||||||
result = spawnActor(name, turn, bootProc, initialAssertions)
|
result = spawnActor(name, turn, bootProc, initialAssertions)
|
||||||
halfLink(turn.facet, result.root)
|
halfLink(turn.facet, result.root)
|
||||||
halfLink(result.root, turn.facet)
|
halfLink(result.root, turn.facet)
|
||||||
|
@ -547,24 +546,24 @@ proc terminate(actor; turn; reason: ref Exception) =
|
||||||
act.stop.status.error.message = reason.msg
|
act.stop.status.error.message = reason.msg
|
||||||
trace(actor, act)
|
trace(actor, act)
|
||||||
for hook in actor.exitHooks: hook(turn)
|
for hook in actor.exitHooks: hook(turn)
|
||||||
proc finish(turn: Turn) =
|
proc finish(turn: var Turn) =
|
||||||
assert not actor.root.isNil, actor.name
|
assert not actor.root.isNil, actor.name
|
||||||
actor.root.terminate(turn, reason.isNil)
|
actor.root.terminate(turn, reason.isNil)
|
||||||
actor.exited = true
|
actor.exited = true
|
||||||
queueTurn(actor.root, finish)
|
queueTurn(actor.root, finish)
|
||||||
|
|
||||||
proc terminate*(facet; e: ref Exception) =
|
proc terminate*(facet; e: ref Exception) =
|
||||||
run(facet.actor.root) do (turn: Turn):
|
run(facet.actor.root) do (turn: var Turn):
|
||||||
facet.actor.terminate(turn, e)
|
facet.actor.terminate(turn, e)
|
||||||
|
|
||||||
template recallFacet(turn: Turn; body: untyped): untyped =
|
template recallFacet(turn: var Turn; body: untyped): untyped =
|
||||||
let facet = turn.facet
|
let facet = turn.facet
|
||||||
block:
|
block:
|
||||||
body
|
body
|
||||||
assert facet.actor == turn.facet.actor, "turn of " & $facet.actor & " ended at " & $turn.facet.actor
|
assert facet.actor == turn.facet.actor, "turn of " & $facet.actor & " ended at " & $turn.facet.actor
|
||||||
turn.facet = facet
|
turn.facet = facet
|
||||||
|
|
||||||
proc stopNow(turn: Turn) =
|
proc stopNow(turn: var Turn) =
|
||||||
let caller = turn.facet
|
let caller = turn.facet
|
||||||
recallFacet turn:
|
recallFacet turn:
|
||||||
while caller.children.len > 0:
|
while caller.children.len > 0:
|
||||||
|
@ -576,35 +575,35 @@ proc stopNow(turn: Turn) =
|
||||||
queueEffect(turn, child, stopNow)
|
queueEffect(turn, child, stopNow)
|
||||||
caller.terminate(turn, true)
|
caller.terminate(turn, true)
|
||||||
|
|
||||||
proc stop*(turn: Turn, facet: Facet) =
|
proc stop*(turn: var Turn, facet: Facet) =
|
||||||
queueEffect(turn, facet, stopNow)
|
queueEffect(turn, facet, stopNow)
|
||||||
|
|
||||||
proc stop*(turn: Turn) =
|
proc stop*(turn: var Turn) =
|
||||||
stop(turn, turn.facet)
|
stop(turn, turn.facet)
|
||||||
|
|
||||||
proc onStop*(facet: Facet; act: TurnAction) =
|
proc onStop*(facet: Facet; act: TurnAction) =
|
||||||
## Add a `proc (turn: Turn)` action to `facet` to be called as it stops.
|
## Add a `proc (turn: var Turn)` action to `facet` to be called as it stops.
|
||||||
assert not facet.isNil
|
assert not facet.isNil
|
||||||
add(facet.shutdownActions, act)
|
add(facet.shutdownActions, act)
|
||||||
|
|
||||||
proc onStop*(turn: Turn; act: TurnAction) =
|
proc onStop*(turn: var Turn; act: TurnAction) =
|
||||||
onStop(turn.facet, act)
|
onStop(turn.facet, act)
|
||||||
|
|
||||||
proc stop*(actor: Actor) =
|
proc stop*(actor: Actor) =
|
||||||
queueTurn(actor.root) do (turn: Turn):
|
queueTurn(actor.root) do (turn: var Turn):
|
||||||
assert(not turn.facet.isNil)
|
assert(not turn.facet.isNil)
|
||||||
stop(turn)
|
stop(turn)
|
||||||
|
|
||||||
proc stopActor*(facet: Facet) =
|
proc stopActor*(facet: Facet) =
|
||||||
stop(facet.actor)
|
stop(facet.actor)
|
||||||
|
|
||||||
proc stopActor*(turn: Turn) =
|
proc stopActor*(turn: var Turn) =
|
||||||
assert(not turn.facet.isNil)
|
assert(not turn.facet.isNil)
|
||||||
assert(not turn.facet.actor.isNil)
|
assert(not turn.facet.actor.isNil)
|
||||||
assert(not turn.facet.actor.root.isNil)
|
assert(not turn.facet.actor.root.isNil)
|
||||||
stop(turn, turn.facet.actor.root)
|
stop(turn, turn.facet.actor.root)
|
||||||
|
|
||||||
proc freshen*(turn: Turn, act: TurnAction) {.deprecated.} =
|
proc freshen*(turn: var Turn, act: TurnAction) {.deprecated.} =
|
||||||
run(turn.facet, act)
|
run(turn.facet, act)
|
||||||
|
|
||||||
proc newCap*(relay: Facet; e: Entity): Cap {.deprecated.} =
|
proc newCap*(relay: Facet; e: Entity): Cap {.deprecated.} =
|
||||||
|
@ -619,10 +618,10 @@ proc newCap*(e: Entity; turn): Cap =
|
||||||
type SyncContinuation {.final.} = ref object of Entity
|
type SyncContinuation {.final.} = ref object of Entity
|
||||||
action: TurnAction
|
action: TurnAction
|
||||||
|
|
||||||
method message(entity: SyncContinuation; turn: Turn; v: AssertionRef) =
|
method message(entity: SyncContinuation; turn: var Turn; v: AssertionRef) =
|
||||||
entity.action(turn)
|
entity.action(turn)
|
||||||
|
|
||||||
proc sync*(turn: Turn; refer: Cap; act: TurnAction) =
|
proc sync*(turn: var Turn; refer: Cap; act: TurnAction) =
|
||||||
sync(turn, refer, newCap(turn, SyncContinuation(action: act)))
|
sync(turn, refer, newCap(turn, SyncContinuation(action: act)))
|
||||||
|
|
||||||
proc running*(actor): bool =
|
proc running*(actor): bool =
|
||||||
|
@ -630,7 +629,7 @@ proc running*(actor): bool =
|
||||||
if not (result or actor.exitReason.isNil):
|
if not (result or actor.exitReason.isNil):
|
||||||
raise actor.exitReason
|
raise actor.exitReason
|
||||||
|
|
||||||
proc run(turn: Turn) =
|
proc run(turn: var Turn) =
|
||||||
while turn.work.len > 0:
|
while turn.work.len > 0:
|
||||||
var (facet, act) = turn.work.popFirst()
|
var (facet, act) = turn.work.popFirst()
|
||||||
assert not act.isNil
|
assert not act.isNil
|
||||||
|
|
|
@ -59,7 +59,7 @@ when defined(linux):
|
||||||
|
|
||||||
proc spawnTimerDriver(facet: Facet; cap: Cap): TimerDriver =
|
proc spawnTimerDriver(facet: Facet; cap: Cap): TimerDriver =
|
||||||
let driver = TimerDriver(facet: facet, target: cap)
|
let driver = TimerDriver(facet: facet, target: cap)
|
||||||
facet.onStop do (turn: Turn):
|
facet.onStop do (turn: var Turn):
|
||||||
for fd in driver.timers:
|
for fd in driver.timers:
|
||||||
unregister(FD fd)
|
unregister(FD fd)
|
||||||
discard close(fd)
|
discard close(fd)
|
||||||
|
@ -89,16 +89,16 @@ when defined(linux):
|
||||||
wait(FD fd, Read)
|
wait(FD fd, Read)
|
||||||
if deadline in driver.deadlines:
|
if deadline in driver.deadlines:
|
||||||
# Check if the deadline is still observed.
|
# Check if the deadline is still observed.
|
||||||
proc turnWork(turn: Turn) =
|
proc turnWork(turn: var Turn) =
|
||||||
discard publish(turn, driver.target, LaterThan(seconds: deadline))
|
discard publish(turn, driver.target, LaterThan(seconds: deadline))
|
||||||
run(driver.facet, turnWork)
|
run(driver.facet, turnWork)
|
||||||
discard close(fd)
|
discard close(fd)
|
||||||
driver.timers.excl(fd)
|
driver.timers.excl(fd)
|
||||||
|
|
||||||
proc spawnTimerActor*(turn: Turn; ds: Cap): Actor {.discardable.} =
|
proc spawnTimerActor*(turn: var Turn; ds: Cap): Actor {.discardable.} =
|
||||||
## Spawn a timer actor that responds to
|
## Spawn a timer actor that responds to
|
||||||
## dataspace observations of timeouts on `ds`.
|
## dataspace observations of timeouts on `ds`.
|
||||||
spawnLink("timers", turn) do (turn: Turn):
|
spawnLink("timers", turn) do (turn: var Turn):
|
||||||
let driver = spawnTimerDriver(turn.facet, ds)
|
let driver = spawnTimerDriver(turn.facet, ds)
|
||||||
let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()})
|
let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()})
|
||||||
during(turn, ds, pat) do (deadline: float):
|
during(turn, ds, pat) do (deadline: float):
|
||||||
|
@ -108,7 +108,7 @@ when defined(linux):
|
||||||
discard change(driver.deadlines, deadline, -1, clamp = true)
|
discard change(driver.deadlines, deadline, -1, clamp = true)
|
||||||
# TODO: retract assertions that are unobserved.
|
# TODO: retract assertions that are unobserved.
|
||||||
|
|
||||||
proc after*(turn: Turn; ds: Cap; dur: Duration; act: TurnAction) =
|
proc after*(turn: var Turn; ds: Cap; dur: Duration; act: TurnAction) =
|
||||||
## Execute `act` after some duration of time.
|
## Execute `act` after some duration of time.
|
||||||
var later = clock_realtime().toFloat() + dur.inMilliseconds.float / 1_000.0
|
var later = clock_realtime().toFloat() + dur.inMilliseconds.float / 1_000.0
|
||||||
onPublish(turn, ds, grab LaterThan(seconds: later)):
|
onPublish(turn, ds, grab LaterThan(seconds: later)):
|
||||||
|
|
|
@ -16,14 +16,14 @@ type
|
||||||
index: Index
|
index: Index
|
||||||
handleMap: Table[Handle, Assertion]
|
handleMap: Table[Handle, Assertion]
|
||||||
|
|
||||||
method publish(ds: Dataspace; turn: Turn; a: AssertionRef; h: Handle) =
|
method publish(ds: Dataspace; turn: var Turn; a: AssertionRef; h: Handle) =
|
||||||
if add(ds.index, turn, a.value):
|
if add(ds.index, turn, a.value):
|
||||||
var obs = a.value.preservesTo(Observe)
|
var obs = a.value.preservesTo(Observe)
|
||||||
if obs.isSome and obs.get.observer of Cap:
|
if obs.isSome and obs.get.observer of Cap:
|
||||||
ds.index.add(turn, obs.get.pattern, Cap(obs.get.observer))
|
ds.index.add(turn, obs.get.pattern, Cap(obs.get.observer))
|
||||||
ds.handleMap[h] = a.value
|
ds.handleMap[h] = a.value
|
||||||
|
|
||||||
method retract(ds: Dataspace; turn: Turn; h: Handle) =
|
method retract(ds: Dataspace; turn: var Turn; h: Handle) =
|
||||||
let v = ds.handleMap[h]
|
let v = ds.handleMap[h]
|
||||||
if remove(ds.index, turn, v):
|
if remove(ds.index, turn, v):
|
||||||
ds.handleMap.del h
|
ds.handleMap.del h
|
||||||
|
@ -31,20 +31,20 @@ method retract(ds: Dataspace; turn: Turn; h: Handle) =
|
||||||
if obs.isSome and obs.get.observer of Cap:
|
if obs.isSome and obs.get.observer of Cap:
|
||||||
ds.index.remove(turn, obs.get.pattern, Cap(obs.get.observer))
|
ds.index.remove(turn, obs.get.pattern, Cap(obs.get.observer))
|
||||||
|
|
||||||
method message(ds: Dataspace; turn: Turn; a: AssertionRef) =
|
method message(ds: Dataspace; turn: var Turn; a: AssertionRef) =
|
||||||
ds.index.deliverMessage(turn, a.value)
|
ds.index.deliverMessage(turn, a.value)
|
||||||
|
|
||||||
proc newDataspace*(turn: Turn): Cap =
|
proc newDataspace*(turn: var Turn): Cap =
|
||||||
newCap(turn, Dataspace(index: initIndex()))
|
newCap(turn, Dataspace(index: initIndex()))
|
||||||
|
|
||||||
type BootProc = proc (turn: Turn; ds: Cap) {.closure.}
|
type BootProc = proc (turn: var Turn; ds: Cap) {.closure.}
|
||||||
type DeprecatedBootProc = proc (ds: Cap; turn: Turn) {.closure.}
|
type DeprecatedBootProc = proc (ds: Cap; turn: var Turn) {.closure.}
|
||||||
|
|
||||||
proc bootDataspace*(name: string; bootProc: BootProc): Actor =
|
proc bootDataspace*(name: string; bootProc: BootProc): Actor =
|
||||||
bootActor(name) do (turn: Turn):
|
bootActor(name) do (turn: var Turn):
|
||||||
discard turn.facet.preventInertCheck()
|
discard turn.facet.preventInertCheck()
|
||||||
bootProc(turn, newDataspace(turn))
|
bootProc(turn, newDataspace(turn))
|
||||||
|
|
||||||
proc bootDataspace*(name: string; bootProc: DeprecatedBootProc): Actor {.deprecated.} =
|
proc bootDataspace*(name: string; bootProc: DeprecatedBootProc): Actor {.deprecated.} =
|
||||||
bootDataspace(name) do (turn: Turn, ds: Cap):
|
bootDataspace(name) do (turn: var Turn, ds: Cap):
|
||||||
bootProc(ds, turn)
|
bootProc(ds, turn)
|
||||||
|
|
|
@ -6,7 +6,7 @@ import preserves
|
||||||
import ./actors, ./patterns, ./protocols/dataspace
|
import ./actors, ./patterns, ./protocols/dataspace
|
||||||
|
|
||||||
type
|
type
|
||||||
DuringProc* = proc (turn: Turn; a: Value; h: Handle): TurnAction
|
DuringProc* = proc (turn: var Turn; a: Value; h: Handle): TurnAction
|
||||||
DuringActionKind = enum null, dead, act
|
DuringActionKind = enum null, dead, act
|
||||||
DuringAction = object
|
DuringAction = object
|
||||||
case kind: DuringActionKind
|
case kind: DuringActionKind
|
||||||
|
@ -17,7 +17,7 @@ type
|
||||||
cb: DuringProc
|
cb: DuringProc
|
||||||
assertionMap: Table[Handle, DuringAction]
|
assertionMap: Table[Handle, DuringAction]
|
||||||
|
|
||||||
method publish(de: DuringEntity; turn: Turn; a: AssertionRef; h: Handle) =
|
method publish(de: DuringEntity; turn: var Turn; a: AssertionRef; h: Handle) =
|
||||||
let action = de.cb(turn, a.value, h)
|
let action = de.cb(turn, a.value, h)
|
||||||
# assert(not action.isNil "should have put in a no-op action")
|
# assert(not action.isNil "should have put in a no-op action")
|
||||||
let g = de.assertionMap.getOrDefault h
|
let g = de.assertionMap.getOrDefault h
|
||||||
|
@ -30,7 +30,7 @@ method publish(de: DuringEntity; turn: Turn; a: AssertionRef; h: Handle) =
|
||||||
of act:
|
of act:
|
||||||
raiseAssert("during: duplicate handle in publish: " & $h)
|
raiseAssert("during: duplicate handle in publish: " & $h)
|
||||||
|
|
||||||
method retract(de: DuringEntity; turn: Turn; h: Handle) =
|
method retract(de: DuringEntity; turn: var Turn; h: Handle) =
|
||||||
let g = de.assertionMap.getOrDefault h
|
let g = de.assertionMap.getOrDefault h
|
||||||
case g.kind
|
case g.kind
|
||||||
of null:
|
of null:
|
||||||
|
@ -44,5 +44,5 @@ method retract(de: DuringEntity; turn: Turn; h: Handle) =
|
||||||
|
|
||||||
proc during*(cb: DuringProc): DuringEntity = DuringEntity(cb: cb)
|
proc during*(cb: DuringProc): DuringEntity = DuringEntity(cb: cb)
|
||||||
|
|
||||||
proc observe*(turn: Turn; ds: Cap; pat: Pattern; e: Entity): Handle =
|
proc observe*(turn: var Turn; ds: Cap; pat: Pattern; e: Entity): Handle =
|
||||||
publish(turn, ds, Observe(pattern: pat, observer: newCap(turn, e)))
|
publish(turn, ds, Observe(pattern: pat, observer: newCap(turn, e)))
|
||||||
|
|
|
@ -27,8 +27,8 @@ type
|
||||||
Turn = syndicate.Turn
|
Turn = syndicate.Turn
|
||||||
WireRef = sturdy.WireRef
|
WireRef = sturdy.WireRef
|
||||||
|
|
||||||
PacketWriter = proc (turn: Turn; buf: seq[byte]) {.closure.}
|
PacketWriter = proc (turn: var Turn; buf: seq[byte]) {.closure.}
|
||||||
RelaySetup = proc (turn: Turn; relay: Relay) {.closure.}
|
RelaySetup = proc (turn: var Turn; relay: Relay) {.closure.}
|
||||||
|
|
||||||
Relay* = ref object
|
Relay* = ref object
|
||||||
facet: Facet
|
facet: Facet
|
||||||
|
@ -56,20 +56,20 @@ type
|
||||||
proc releaseCapOut(r: Relay; e: WireSymbol) =
|
proc releaseCapOut(r: Relay; e: WireSymbol) =
|
||||||
r.exported.drop e
|
r.exported.drop e
|
||||||
|
|
||||||
method publish(spe: SyncPeerEntity; t: Turn; a: AssertionRef; h: Handle) =
|
method publish(spe: SyncPeerEntity; t: var Turn; a: AssertionRef; h: Handle) =
|
||||||
spe.handleMap[h] = publish(t, spe.peer, a.value)
|
spe.handleMap[h] = publish(t, spe.peer, a.value)
|
||||||
|
|
||||||
method retract(se: SyncPeerEntity; t: Turn; h: Handle) =
|
method retract(se: SyncPeerEntity; t: var Turn; h: Handle) =
|
||||||
var other: Handle
|
var other: Handle
|
||||||
if se.handleMap.pop(h, other):
|
if se.handleMap.pop(h, other):
|
||||||
retract(t, other)
|
retract(t, other)
|
||||||
|
|
||||||
method message(se: SyncPeerEntity; t: Turn; a: AssertionRef) =
|
method message(se: SyncPeerEntity; t: var Turn; a: AssertionRef) =
|
||||||
if not se.e.isNil:
|
if not se.e.isNil:
|
||||||
se.relay.releaseCapOut(se.e)
|
se.relay.releaseCapOut(se.e)
|
||||||
message(t, se.peer, a.value)
|
message(t, se.peer, a.value)
|
||||||
|
|
||||||
method sync(se: SyncPeerEntity; t: Turn; peer: Cap) =
|
method sync(se: SyncPeerEntity; t: var Turn; peer: Cap) =
|
||||||
sync(t, se.peer, peer)
|
sync(t, se.peer, peer)
|
||||||
|
|
||||||
proc newSyncPeerEntity(r: Relay; p: Cap): SyncPeerEntity =
|
proc newSyncPeerEntity(r: Relay; p: Cap): SyncPeerEntity =
|
||||||
|
@ -106,40 +106,40 @@ proc deregister(relay: Relay; h: Handle) =
|
||||||
if relay.outboundAssertions.pop(h, outbound):
|
if relay.outboundAssertions.pop(h, outbound):
|
||||||
for e in outbound: releaseCapOut(relay, e)
|
for e in outbound: releaseCapOut(relay, e)
|
||||||
|
|
||||||
proc send(relay: Relay; turn: Turn; rOid: protocol.Oid; m: Event) =
|
proc send(relay: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
|
||||||
# TODO: don't send right away.
|
# TODO: don't send right away.
|
||||||
var pendingTurn: protocol.Turn
|
var pendingTurn: protocol.Turn
|
||||||
pendingTurn.add TurnEvent(oid: rOid, event: m)
|
pendingTurn.add TurnEvent(oid: rOid, event: m)
|
||||||
relay.facet.run do (turn: Turn):
|
relay.facet.run do (turn: var Turn):
|
||||||
var pkt = Packet(
|
var pkt = Packet(
|
||||||
orKind: PacketKind.Turn,
|
orKind: PacketKind.Turn,
|
||||||
turn: pendingTurn)
|
turn: pendingTurn)
|
||||||
trace "C: ", pkt
|
trace "C: ", pkt
|
||||||
relay.packetWriter(turn, encode pkt)
|
relay.packetWriter(turn, encode pkt)
|
||||||
|
|
||||||
proc send(re: RelayEntity; turn: Turn; ev: Event) =
|
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
|
||||||
send(re.relay, turn, protocol.Oid re.oid, ev)
|
send(re.relay, turn, protocol.Oid re.oid, ev)
|
||||||
|
|
||||||
method publish(re: RelayEntity; t: Turn; a: AssertionRef; h: Handle) =
|
method publish(re: RelayEntity; t: var Turn; a: AssertionRef; h: Handle) =
|
||||||
re.send(t, Event(
|
re.send(t, Event(
|
||||||
orKind: EventKind.Assert,
|
orKind: EventKind.Assert,
|
||||||
`assert`: protocol.Assert(
|
`assert`: protocol.Assert(
|
||||||
assertion: re.relay.register(a.value, h).rewritten,
|
assertion: re.relay.register(a.value, h).rewritten,
|
||||||
handle: h)))
|
handle: h)))
|
||||||
|
|
||||||
method retract(re: RelayEntity; t: Turn; h: Handle) =
|
method retract(re: RelayEntity; t: var Turn; h: Handle) =
|
||||||
re.relay.deregister h
|
re.relay.deregister h
|
||||||
re.send(t, Event(
|
re.send(t, Event(
|
||||||
orKind: EventKind.Retract,
|
orKind: EventKind.Retract,
|
||||||
retract: Retract(handle: h)))
|
retract: Retract(handle: h)))
|
||||||
|
|
||||||
method message(re: RelayEntity; turn: Turn; msg: AssertionRef) =
|
method message(re: RelayEntity; turn: var Turn; msg: AssertionRef) =
|
||||||
var (value, exported) = rewriteOut(re.relay, msg.value)
|
var (value, exported) = rewriteOut(re.relay, msg.value)
|
||||||
assert(len(exported) == 0, "cannot send a reference in a message")
|
assert(len(exported) == 0, "cannot send a reference in a message")
|
||||||
if len(exported) == 0:
|
if len(exported) == 0:
|
||||||
re.send(turn, Event(orKind: EventKind.Message, message: Message(body: value)))
|
re.send(turn, Event(orKind: EventKind.Message, message: Message(body: value)))
|
||||||
|
|
||||||
method sync(re: RelayEntity; turn: Turn; peer: Cap) =
|
method sync(re: RelayEntity; turn: var Turn; peer: Cap) =
|
||||||
var
|
var
|
||||||
peerEntity = newSyncPeerEntity(re.relay, peer)
|
peerEntity = newSyncPeerEntity(re.relay, peer)
|
||||||
exported: seq[WireSymbol]
|
exported: seq[WireSymbol]
|
||||||
|
@ -193,7 +193,7 @@ proc rewriteIn(relay; facet; v: Value):
|
||||||
|
|
||||||
proc close(r: Relay) = discard
|
proc close(r: Relay) = discard
|
||||||
|
|
||||||
proc dispatch(relay: Relay; turn: Turn; cap: Cap; event: Event) =
|
proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) =
|
||||||
case event.orKind
|
case event.orKind
|
||||||
of EventKind.Assert:
|
of EventKind.Assert:
|
||||||
let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion)
|
let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion)
|
||||||
|
@ -216,14 +216,14 @@ proc dispatch(relay: Relay; turn: Turn; cap: Cap; event: Event) =
|
||||||
#[
|
#[
|
||||||
var imported: seq[WireSymbol]
|
var imported: seq[WireSymbol]
|
||||||
let k = relay.rewriteCapIn(turn, evenr.sync.peer, imported)
|
let k = relay.rewriteCapIn(turn, evenr.sync.peer, imported)
|
||||||
turn.sync(cap) do (turn: Turn):
|
turn.sync(cap) do (turn: var Turn):
|
||||||
turn.message(k, true)
|
turn.message(k, true)
|
||||||
for e in imported: relay.imported.del e
|
for e in imported: relay.imported.del e
|
||||||
]#
|
]#
|
||||||
|
|
||||||
proc dispatch(relay: Relay; v: Value) =
|
proc dispatch(relay: Relay; v: Value) =
|
||||||
trace "S: ", v
|
trace "S: ", v
|
||||||
run(relay.facet) do (t: Turn):
|
run(relay.facet) do (t: var Turn):
|
||||||
var pkt: Packet
|
var pkt: Packet
|
||||||
if pkt.fromPreserves(v):
|
if pkt.fromPreserves(v):
|
||||||
case pkt.orKind
|
case pkt.orKind
|
||||||
|
@ -261,10 +261,8 @@ type
|
||||||
initialCap*: Cap
|
initialCap*: Cap
|
||||||
nextLocalOid*: Option[Oid]
|
nextLocalOid*: Option[Oid]
|
||||||
|
|
||||||
proc spawnRelay(name: string; turn: Turn; opts: RelayActorOptions; setup: RelaySetup) =
|
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) =
|
||||||
stderr.writeLine "calling spawnActor for relay ", name
|
spawnLink(name, turn) do (turn: var Turn):
|
||||||
spawnLink(name, turn) do (turn: Turn):
|
|
||||||
stderr.writeLine "executing body of spawned actor relay ", name
|
|
||||||
let relay = Relay(
|
let relay = Relay(
|
||||||
facet: turn.facet,
|
facet: turn.facet,
|
||||||
packetWriter: opts.packetWriter,
|
packetWriter: opts.packetWriter,
|
||||||
|
@ -308,7 +306,7 @@ when defined(posix):
|
||||||
stdin: AsyncFile
|
stdin: AsyncFile
|
||||||
alive: bool
|
alive: bool
|
||||||
|
|
||||||
method message(entity: StdioEntity; turn: Turn; ass: AssertionRef) =
|
method message(entity: StdioEntity; turn: var Turn; ass: AssertionRef) =
|
||||||
if ass.value.preservesTo(ForceDisconnect).isSome:
|
if ass.value.preservesTo(ForceDisconnect).isSome:
|
||||||
entity.alive = false
|
entity.alive = false
|
||||||
|
|
||||||
|
@ -324,21 +322,20 @@ when defined(posix):
|
||||||
entity.relay.recv(buf[], 0..<n)
|
entity.relay.recv(buf[], 0..<n)
|
||||||
close(entity.stdin)
|
close(entity.stdin)
|
||||||
|
|
||||||
proc connectTransport(turn: Turn; ds: Cap; ta: transportAddress.Stdio) =
|
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Stdio) =
|
||||||
## Connect to an external dataspace over stdio.
|
## Connect to an external dataspace over stdio.
|
||||||
proc stdoutWriter(turn: Turn; buf: seq[byte]) =
|
proc stdoutWriter(turn: var Turn; buf: seq[byte]) =
|
||||||
## Blocking write to stdout.
|
## Blocking write to stdout.
|
||||||
let n = writeBytes(stdout, buf, 0, buf.len)
|
let n = writeBytes(stdout, buf, 0, buf.len)
|
||||||
flushFile(stdout)
|
flushFile(stdout)
|
||||||
if n != buf.len:
|
if n != buf.len:
|
||||||
stderr.writeLine "short write to stdout, stopping actor"
|
|
||||||
stopActor(turn)
|
stopActor(turn)
|
||||||
var opts = RelayActorOptions(
|
var opts = RelayActorOptions(
|
||||||
packetWriter: stdoutWriter,
|
packetWriter: stdoutWriter,
|
||||||
initialCap: ds,
|
initialCap: ds,
|
||||||
initialOid: 0.Oid.some,
|
initialOid: 0.Oid.some,
|
||||||
)
|
)
|
||||||
spawnRelay("stdio", turn, opts) do (turn: Turn; relay: Relay):
|
spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
|
||||||
let
|
let
|
||||||
facet = turn.facet
|
facet = turn.facet
|
||||||
fd = stdin.getOsFileHandle()
|
fd = stdin.getOsFileHandle()
|
||||||
|
@ -348,8 +345,7 @@ when defined(posix):
|
||||||
raiseOSError(osLastError())
|
raiseOSError(osLastError())
|
||||||
let entity = StdioEntity(
|
let entity = StdioEntity(
|
||||||
facet: turn.facet, relay: relay, stdin: newAsyncFile(FD fd))
|
facet: turn.facet, relay: relay, stdin: newAsyncFile(FD fd))
|
||||||
onStop(entity.facet) do (turn: Turn):
|
onStop(entity.facet) do (turn: var Turn):
|
||||||
stderr.writeLine "entity for stdio stopped, set alive to false"
|
|
||||||
entity.alive = false
|
entity.alive = false
|
||||||
discard trampoline:
|
discard trampoline:
|
||||||
whelp loop(entity)
|
whelp loop(entity)
|
||||||
|
@ -359,7 +355,7 @@ when defined(posix):
|
||||||
resolved: relay.peer.accepted,
|
resolved: relay.peer.accepted,
|
||||||
))
|
))
|
||||||
|
|
||||||
proc connectStdio*(turn: Turn; ds: Cap) =
|
proc connectStdio*(turn: var Turn; ds: Cap) =
|
||||||
## Connect to an external dataspace over stdin and stdout.
|
## Connect to an external dataspace over stdin and stdout.
|
||||||
connectTransport(turn, ds, transportAddress.Stdio())
|
connectTransport(turn, ds, transportAddress.Stdio())
|
||||||
|
|
||||||
|
@ -376,17 +372,17 @@ when defined(posix):
|
||||||
|
|
||||||
SocketEntity = TcpEntity | UnixEntity
|
SocketEntity = TcpEntity | UnixEntity
|
||||||
|
|
||||||
method message(entity: SocketEntity; turn: Turn; ass: AssertionRef) =
|
method message(entity: SocketEntity; turn: var Turn; ass: AssertionRef) =
|
||||||
if ass.value.preservesTo(ForceDisconnect).isSome:
|
if ass.value.preservesTo(ForceDisconnect).isSome:
|
||||||
entity.alive = false
|
entity.alive = false
|
||||||
|
|
||||||
type ShutdownEntity = ref object of Entity
|
type ShutdownEntity = ref object of Entity
|
||||||
method retract(e: ShutdownEntity; turn: Turn; h: Handle) =
|
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
|
||||||
stopActor(e.facet)
|
stopActor(e.facet)
|
||||||
|
|
||||||
template bootSocketEntity() {.dirty.} =
|
template bootSocketEntity() {.dirty.} =
|
||||||
proc setup(turn: Turn) {.closure.} =
|
proc setup(turn: var Turn) {.closure.} =
|
||||||
proc kill(turn: Turn) =
|
proc kill(turn: var Turn) =
|
||||||
entity.alive = false
|
entity.alive = false
|
||||||
onStop(turn, kill)
|
onStop(turn, kill)
|
||||||
publish(turn, ds, TransportConnection(
|
publish(turn, ds, TransportConnection(
|
||||||
|
@ -416,30 +412,28 @@ when defined(posix):
|
||||||
bootSocketEntity()
|
bootSocketEntity()
|
||||||
|
|
||||||
template spawnSocketRelay() {.dirty.} =
|
template spawnSocketRelay() {.dirty.} =
|
||||||
proc writeConn(turn: Turn; buf: seq[byte]) =
|
proc writeConn(turn: var Turn; buf: seq[byte]) =
|
||||||
stderr.writeLine "writing to socket…"
|
|
||||||
discard trampoline:
|
discard trampoline:
|
||||||
whelp write(entity.sock, buf)
|
whelp write(entity.sock, buf)
|
||||||
stderr.writeLine "maybe wrote to to socket"
|
|
||||||
var ops = RelayActorOptions(
|
var ops = RelayActorOptions(
|
||||||
packetWriter: writeConn,
|
packetWriter: writeConn,
|
||||||
initialOid: 0.Oid.some,
|
initialOid: 0.Oid.some,
|
||||||
)
|
)
|
||||||
spawnRelay("socket", turn, ops) do (turn: Turn; relay: Relay):
|
spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay):
|
||||||
entity.facet = turn.facet
|
entity.facet = turn.facet
|
||||||
entity.relay = relay
|
entity.relay = relay
|
||||||
discard trampoline:
|
discard trampoline:
|
||||||
whelp boot(entity, ta, ds)
|
whelp boot(entity, ta, ds)
|
||||||
|
|
||||||
proc connectTransport(turn: Turn; ds: Cap; ta: transportAddress.Tcp) =
|
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Tcp) =
|
||||||
let entity = TcpEntity()
|
let entity = TcpEntity()
|
||||||
spawnSocketRelay()
|
spawnSocketRelay()
|
||||||
|
|
||||||
proc connectTransport(turn: Turn; ds: Cap; ta: transportAddress.Unix) =
|
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Unix) =
|
||||||
let entity = UnixEntity()
|
let entity = UnixEntity()
|
||||||
spawnSocketRelay()
|
spawnSocketRelay()
|
||||||
|
|
||||||
proc walk(turn: Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) =
|
proc walk(turn: var Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) =
|
||||||
if stepOff < route.pathSteps.len:
|
if stepOff < route.pathSteps.len:
|
||||||
let
|
let
|
||||||
step = route.pathSteps[stepOff]
|
step = route.pathSteps[stepOff]
|
||||||
|
@ -462,7 +456,7 @@ proc walk(turn: Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) =
|
||||||
resolved: origin.accepted,
|
resolved: origin.accepted,
|
||||||
))
|
))
|
||||||
|
|
||||||
proc connectRoute(turn: Turn; ds: Cap; route: Route; transOff: int) =
|
proc connectRoute(turn: var Turn; ds: Cap; route: Route; transOff: int) =
|
||||||
let rejectPat = TransportConnection ?: {
|
let rejectPat = TransportConnection ?: {
|
||||||
0: ?route.transports[transOff],
|
0: ?route.transports[transOff],
|
||||||
2: ?:Rejected,
|
2: ?:Rejected,
|
||||||
|
@ -480,14 +474,14 @@ proc connectRoute(turn: Turn; ds: Cap; route: Route; transOff: int) =
|
||||||
onPublish(turn, ds, acceptPat) do (origin: Cap):
|
onPublish(turn, ds, acceptPat) do (origin: Cap):
|
||||||
walk(turn, ds, origin, route, transOff, 0)
|
walk(turn, ds, origin, route, transOff, 0)
|
||||||
|
|
||||||
type StepCallback = proc (turn: Turn; step: Value; origin, next: Cap) {.closure.}
|
type StepCallback = proc (turn: var Turn; step: Value; origin, next: Cap) {.closure.}
|
||||||
|
|
||||||
proc spawnStepResolver(turn: Turn; ds: Cap; stepType: Value; cb: StepCallback) =
|
proc spawnStepResolver(turn: var Turn; ds: Cap; stepType: Value; cb: StepCallback) =
|
||||||
let stepPat = grabRecord(stepType, grab())
|
let stepPat = grabRecord(stepType, grab())
|
||||||
let pat = ?Observe(pattern: ResolvedPathStep?:{1: stepPat}) ?? {0: grabLit(), 1: grab()}
|
let pat = ?Observe(pattern: ResolvedPathStep?:{1: stepPat}) ?? {0: grabLit(), 1: grab()}
|
||||||
during(turn, ds, pat) do (origin: Cap; stepDetail: Literal[Value]):
|
during(turn, ds, pat) do (origin: Cap; stepDetail: Literal[Value]):
|
||||||
let step = toRecord(stepType, stepDetail.value)
|
let step = toRecord(stepType, stepDetail.value)
|
||||||
proc duringCallback(turn: Turn; ass: Value; h: Handle): TurnAction =
|
proc duringCallback(turn: var Turn; ass: Value; h: Handle): TurnAction =
|
||||||
var res = ass.preservesTo Resolved
|
var res = ass.preservesTo Resolved
|
||||||
if res.isSome:
|
if res.isSome:
|
||||||
if res.get.orKind == ResolvedKind.accepted and
|
if res.get.orKind == ResolvedKind.accepted and
|
||||||
|
@ -496,15 +490,14 @@ proc spawnStepResolver(turn: Turn; ds: Cap; stepType: Value; cb: StepCallback) =
|
||||||
else:
|
else:
|
||||||
publish(turn, ds, ResolvedPathStep(
|
publish(turn, ds, ResolvedPathStep(
|
||||||
origin: origin, pathStep: step, resolved: res.get))
|
origin: origin, pathStep: step, resolved: res.get))
|
||||||
proc action(turn: Turn) =
|
proc action(turn: var Turn) =
|
||||||
stop(turn)
|
stop(turn)
|
||||||
result = action
|
result = action
|
||||||
publish(turn, origin, Resolve(
|
publish(turn, origin, Resolve(
|
||||||
step: step, observer: newCap(turn, during(duringCallback))))
|
step: step, observer: newCap(turn, during(duringCallback))))
|
||||||
|
|
||||||
proc spawnRelays*(turn: Turn; ds: Cap) =
|
proc spawnRelays*(turn: var Turn; ds: Cap) =
|
||||||
## Spawn actors that manage routes and appeasing gatekeepers.
|
## Spawn actors that manage routes and appeasing gatekeepers.
|
||||||
stderr.writeLine "spawnRelays body is running"
|
|
||||||
let transPat = ?Observe(pattern: !TransportConnection) ?? { 0: grab() }
|
let transPat = ?Observe(pattern: !TransportConnection) ?? { 0: grab() }
|
||||||
# Use a generic pattern and type matching
|
# Use a generic pattern and type matching
|
||||||
# in the during handler because it is easy.
|
# in the during handler because it is easy.
|
||||||
|
@ -537,11 +530,11 @@ proc spawnRelays*(turn: Turn; ds: Cap) =
|
||||||
connectRoute(turn, ds, route.value, i)
|
connectRoute(turn, ds, route.value, i)
|
||||||
|
|
||||||
spawnStepResolver(turn, ds, "ref".toSymbol) do (
|
spawnStepResolver(turn, ds, "ref".toSymbol) do (
|
||||||
turn: Turn, step: Value, origin: Cap, next: Cap):
|
turn: var Turn, step: Value, origin: Cap, next: Cap):
|
||||||
publish(turn, ds, ResolvedPathStep(
|
publish(turn, ds, ResolvedPathStep(
|
||||||
origin: origin, pathStep: step, resolved: next.accepted))
|
origin: origin, pathStep: step, resolved: next.accepted))
|
||||||
|
|
||||||
type BootProc* = proc (turn: Turn; ds: Cap) {.closure.}
|
type BootProc* = proc (turn: var Turn; ds: Cap) {.closure.}
|
||||||
|
|
||||||
const defaultRoute* = "<route [<stdio>]>"
|
const defaultRoute* = "<route [<stdio>]>"
|
||||||
|
|
||||||
|
@ -560,12 +553,12 @@ proc envRoute*: Route =
|
||||||
if not result.fromPreserves(pr):
|
if not result.fromPreserves(pr):
|
||||||
raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr)
|
raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr)
|
||||||
|
|
||||||
proc resolve*(turn: Turn; ds: Cap; route: Route; bootProc: BootProc) =
|
proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
|
||||||
## Resolve `route` within `ds` and call `bootProc` with resolved capabilities.
|
## Resolve `route` within `ds` and call `bootProc` with resolved capabilities.
|
||||||
during(turn, ds, ResolvePath ?: {0: ?route, 3: ?:ResolvedAccepted}) do (dst: Cap):
|
during(turn, ds, ResolvePath ?: {0: ?route, 3: ?:ResolvedAccepted}) do (dst: Cap):
|
||||||
bootProc(turn, dst)
|
bootProc(turn, dst)
|
||||||
|
|
||||||
proc resolveEnvironment*(turn: Turn; bootProc: BootProc) =
|
proc resolveEnvironment*(turn: var Turn; bootProc: BootProc) =
|
||||||
## Resolve a capability from the calling environment
|
## Resolve a capability from the calling environment
|
||||||
## and call `bootProc`. See envRoute_.
|
## and call `bootProc`. See envRoute_.
|
||||||
let
|
let
|
||||||
|
|
|
@ -67,7 +67,7 @@ func isEmpty(cont: Continuation): bool =
|
||||||
type
|
type
|
||||||
ContinuationProc = proc (c: Continuation; v: Value) {.closure.}
|
ContinuationProc = proc (c: Continuation; v: Value) {.closure.}
|
||||||
LeafProc = proc (l: Leaf; v: Value) {.closure.}
|
LeafProc = proc (l: Leaf; v: Value) {.closure.}
|
||||||
ObserverProc = proc (turn: Turn; group: ObserverGroup; vs: seq[Value]) {.closure.}
|
ObserverProc = proc (turn: var Turn; group: ObserverGroup; vs: seq[Value]) {.closure.}
|
||||||
|
|
||||||
proc getLeaves(cont: Continuation; constPaths: Paths): LeafMap =
|
proc getLeaves(cont: Continuation; constPaths: Paths): LeafMap =
|
||||||
result = cont.leafMap.getOrDefault(constPaths)
|
result = cont.leafMap.getOrDefault(constPaths)
|
||||||
|
@ -114,10 +114,10 @@ proc top(stack: TermStack): Value =
|
||||||
assert stack.len > 0
|
assert stack.len > 0
|
||||||
stack[stack.high]
|
stack[stack.high]
|
||||||
|
|
||||||
proc modify(node: Node; turn: Turn; outerValue: Value; event: EventKind;
|
proc modify(node: Node; turn: var Turn; outerValue: Value; event: EventKind;
|
||||||
modCont: ContinuationProc; modLeaf: LeafProc; modObs: ObserverProc) =
|
modCont: ContinuationProc; modLeaf: LeafProc; modObs: ObserverProc) =
|
||||||
|
|
||||||
proc walk(cont: Continuation; turn: Turn) =
|
proc walk(cont: Continuation; turn: var Turn) =
|
||||||
modCont(cont, outerValue)
|
modCont(cont, outerValue)
|
||||||
for constPaths, constValMap in cont.leafMap.pairs:
|
for constPaths, constValMap in cont.leafMap.pairs:
|
||||||
let constVals = projectPaths(outerValue, constPaths)
|
let constVals = projectPaths(outerValue, constPaths)
|
||||||
|
@ -142,7 +142,7 @@ proc modify(node: Node; turn: Turn; outerValue: Value; event: EventKind;
|
||||||
constValMap.del(get constVals)
|
constValMap.del(get constVals)
|
||||||
|
|
||||||
|
|
||||||
proc walk(node: Node; turn: Turn; termStack: TermStack) =
|
proc walk(node: Node; turn: var Turn; termStack: TermStack) =
|
||||||
walk(node.continuation, turn)
|
walk(node.continuation, turn)
|
||||||
for selector, table in node.edges:
|
for selector, table in node.edges:
|
||||||
let
|
let
|
||||||
|
@ -227,7 +227,7 @@ proc getEndpoints(leaf: Leaf; capturePaths: Paths): ObserverGroup =
|
||||||
if captures.isSome:
|
if captures.isSome:
|
||||||
discard result.cachedCaptures.change(get captures, +1)
|
discard result.cachedCaptures.change(get captures, +1)
|
||||||
|
|
||||||
proc add*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) =
|
proc add*(index: var Index; turn: var Turn; pattern: Pattern; observer: Cap) =
|
||||||
let
|
let
|
||||||
cont = index.root.extend(pattern)
|
cont = index.root.extend(pattern)
|
||||||
analysis = analyse pattern
|
analysis = analyse pattern
|
||||||
|
@ -240,7 +240,7 @@ proc add*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) =
|
||||||
captureMap[capture] = publish(turn, observer, capture)
|
captureMap[capture] = publish(turn, observer, capture)
|
||||||
endpoints.observers[observer] = captureMap
|
endpoints.observers[observer] = captureMap
|
||||||
|
|
||||||
proc remove*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) =
|
proc remove*(index: var Index; turn: var Turn; pattern: Pattern; observer: Cap) =
|
||||||
let
|
let
|
||||||
cont = index.root.extend(pattern)
|
cont = index.root.extend(pattern)
|
||||||
analysis = analyse pattern
|
analysis = analyse pattern
|
||||||
|
@ -260,7 +260,7 @@ proc remove*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) =
|
||||||
if constValMap.len == 0:
|
if constValMap.len == 0:
|
||||||
cont.leafMap.del(analysis.constPaths)
|
cont.leafMap.del(analysis.constPaths)
|
||||||
|
|
||||||
proc adjustAssertion(index: var Index; turn: Turn; outerValue: Value; delta: int): bool =
|
proc adjustAssertion(index: var Index; turn: var Turn; outerValue: Value; delta: int): bool =
|
||||||
case index.allAssertions.change(outerValue, delta)
|
case index.allAssertions.change(outerValue, delta)
|
||||||
of cdAbsentToPresent:
|
of cdAbsentToPresent:
|
||||||
result = true
|
result = true
|
||||||
|
@ -268,7 +268,7 @@ proc adjustAssertion(index: var Index; turn: Turn; outerValue: Value; delta: int
|
||||||
c.cache.incl(v)
|
c.cache.incl(v)
|
||||||
proc modLeaf(l: Leaf; v: Value) =
|
proc modLeaf(l: Leaf; v: Value) =
|
||||||
l.cache.incl(v)
|
l.cache.incl(v)
|
||||||
proc modObserver(turn: Turn; group: ObserverGroup; vs: seq[Value]) =
|
proc modObserver(turn: var Turn; group: ObserverGroup; vs: seq[Value]) =
|
||||||
let change = group.cachedCaptures.change(vs, +1)
|
let change = group.cachedCaptures.change(vs, +1)
|
||||||
if change == cdAbsentToPresent:
|
if change == cdAbsentToPresent:
|
||||||
for (observer, captureMap) in group.observers.pairs:
|
for (observer, captureMap) in group.observers.pairs:
|
||||||
|
@ -281,7 +281,7 @@ proc adjustAssertion(index: var Index; turn: Turn; outerValue: Value; delta: int
|
||||||
c.cache.excl(v)
|
c.cache.excl(v)
|
||||||
proc modLeaf(l: Leaf; v: Value) =
|
proc modLeaf(l: Leaf; v: Value) =
|
||||||
l.cache.excl(v)
|
l.cache.excl(v)
|
||||||
proc modObserver(turn: Turn; group: ObserverGroup; vs: seq[Value]) =
|
proc modObserver(turn: var Turn; group: ObserverGroup; vs: seq[Value]) =
|
||||||
if group.cachedCaptures.change(vs, -1) == cdPresentToAbsent:
|
if group.cachedCaptures.change(vs, -1) == cdPresentToAbsent:
|
||||||
for (observer, captureMap) in group.observers.pairs:
|
for (observer, captureMap) in group.observers.pairs:
|
||||||
var h: Handle
|
var h: Handle
|
||||||
|
@ -293,12 +293,12 @@ proc adjustAssertion(index: var Index; turn: Turn; outerValue: Value; delta: int
|
||||||
proc continuationNoop(c: Continuation; v: Value) = discard
|
proc continuationNoop(c: Continuation; v: Value) = discard
|
||||||
proc leafNoop(l: Leaf; v: Value) = discard
|
proc leafNoop(l: Leaf; v: Value) = discard
|
||||||
|
|
||||||
proc add*(index: var Index; turn: Turn; v: Value): bool =
|
proc add*(index: var Index; turn: var Turn; v: Value): bool =
|
||||||
adjustAssertion(index, turn, v, +1)
|
adjustAssertion(index, turn, v, +1)
|
||||||
proc remove*(index: var Index; turn: Turn; v: Value): bool =
|
proc remove*(index: var Index; turn: var Turn; v: Value): bool =
|
||||||
adjustAssertion(index, turn, v, -1)
|
adjustAssertion(index, turn, v, -1)
|
||||||
|
|
||||||
proc deliverMessage*(index: var Index; turn: Turn; v: Value) =
|
proc deliverMessage*(index: var Index; turn: var Turn; v: Value) =
|
||||||
proc observersCb(turn: Turn; group: ObserverGroup; vs: seq[Value]) =
|
proc observersCb(turn: var Turn; group: ObserverGroup; vs: seq[Value]) =
|
||||||
for observer in group.observers.keys: message(turn, observer, vs)
|
for observer in group.observers.keys: message(turn, observer, vs)
|
||||||
index.root.modify(turn, v, messageEvent, continuationNoop, leafNoop, observersCb)
|
index.root.modify(turn, v, messageEvent, continuationNoop, leafNoop, observersCb)
|
||||||
|
|
|
@ -5,21 +5,21 @@ import std/times
|
||||||
import pkg/sys/ioqueue
|
import pkg/sys/ioqueue
|
||||||
import syndicate, syndicate/actors/timers
|
import syndicate, syndicate/actors/timers
|
||||||
|
|
||||||
let actor = bootActor("timer-test") do (turn: Turn):
|
let actor = bootActor("timer-test") do (turn: var Turn):
|
||||||
let timers = newDataspace(turn)
|
let timers = newDataspace(turn)
|
||||||
spawnTimerActor(timers, turn)
|
spawnTimerActor(timers, turn)
|
||||||
|
|
||||||
onPublish(turn, timers, ?LaterThan(seconds: 1356100000)):
|
onPublish(turn, timers, ?LaterThan(seconds: 1356100000)):
|
||||||
echo "now in 13th bʼakʼtun"
|
echo "now in 13th bʼakʼtun"
|
||||||
|
|
||||||
after(turn, timers, initDuration(seconds = 3)) do (turn: Turn):
|
after(turn, timers, initDuration(seconds = 3)) do (turn: var Turn):
|
||||||
echo "third timer expired"
|
echo "third timer expired"
|
||||||
stopActor(turn)
|
stopActor(turn)
|
||||||
|
|
||||||
after(turn, timers, initDuration(seconds = 1)) do (turn: Turn):
|
after(turn, timers, initDuration(seconds = 1)) do (turn: var Turn):
|
||||||
echo "first timer expired"
|
echo "first timer expired"
|
||||||
|
|
||||||
after(turn, timers, initDuration(seconds = 2)) do (turn: Turn):
|
after(turn, timers, initDuration(seconds = 2)) do (turn: var Turn):
|
||||||
echo "second timer expired"
|
echo "second timer expired"
|
||||||
|
|
||||||
echo "single run of ioqueue"
|
echo "single run of ioqueue"
|
||||||
|
|
Loading…
Reference in New Issue