diff --git a/src/syndicate.nim b/src/syndicate.nim index e200f8b..e2e4852 100644 --- a/src/syndicate.nim +++ b/src/syndicate.nim @@ -35,21 +35,21 @@ proc `??`*(pat: Pattern; bindings: openArray[(int, Pattern)]): Pattern {.inline. patterns.inject(pat, bindings) type - PublishProc = proc (turn: Turn; v: Value; h: Handle) {.closure.} - RetractProc = proc (turn: Turn; h: Handle) {.closure.} - MessageProc = proc (turn: Turn; v: Value) {.closure.} + PublishProc = proc (turn: var Turn; v: Value; h: Handle) {.closure.} + RetractProc = proc (turn: var Turn; h: Handle) {.closure.} + MessageProc = proc (turn: var Turn; v: Value) {.closure.} ClosureEntity = ref object of Entity publishImpl*: PublishProc retractImpl*: RetractProc messageImpl*: MessageProc -method publish(e: ClosureEntity; turn: Turn; a: AssertionRef; h: Handle) = +method publish(e: ClosureEntity; turn: var Turn; a: AssertionRef; h: Handle) = if not e.publishImpl.isNil: e.publishImpl(turn, a.value, h) -method retract(e: ClosureEntity; turn: Turn; h: Handle) = +method retract(e: ClosureEntity; turn: var Turn; h: Handle) = if not e.retractImpl.isNil: e.retractImpl(turn, h) -method message(e: ClosureEntity; turn: Turn; a: AssertionRef) = +method message(e: ClosureEntity; turn: var Turn; a: AssertionRef) = if not e.messageImpl.isNil: e.messageImpl(turn, a.value) proc argumentCount(handler: NimNode): int = @@ -94,7 +94,7 @@ proc wrapPublishHandler(turn, handler: NimNode): NimNode = handlerSym = genSym(nskProc, "publish") bindingsSym = ident"bindings" quote do: - proc `handlerSym`(`turn`: Turn; `bindingsSym`: Value; `handleSym`: Handle) = + proc `handlerSym`(`turn`: var Turn; `bindingsSym`: Value; `handleSym`: Handle) = `varSection` if fromPreserves(`valuesSym`, bindings): `publishBody` @@ -106,7 +106,7 @@ proc wrapMessageHandler(turn, handler: NimNode): NimNode = handlerSym = genSym(nskProc, "message") bindingsSym = ident"bindings" quote do: - proc `handlerSym`(`turn`: Turn; `bindingsSym`: Value) = + proc `handlerSym`(`turn`: var Turn; `bindingsSym`: Value) = `varSection` if fromPreserves(`valuesSym`, bindings): `body` @@ -120,17 +120,17 @@ proc wrapDuringHandler(turn, entryBody, exitBody: NimNode): NimNode = duringSym = genSym(nskProc, "during") if exitBody.isNil: quote do: - proc `duringSym`(`turn`: Turn; `bindingsSym`: Value; `handleSym`: Handle): TurnAction = + proc `duringSym`(`turn`: var Turn; `bindingsSym`: Value; `handleSym`: Handle): TurnAction = `varSection` if fromPreserves(`valuesSym`, `bindingsSym`): `publishBody` else: quote do: - proc `duringSym`(`turn`: Turn; `bindingsSym`: Value; `handleSym`: Handle): TurnAction = + proc `duringSym`(`turn`: var Turn; `bindingsSym`: Value; `handleSym`: Handle): TurnAction = `varSection` if fromPreserves(`valuesSym`, `bindingsSym`): `publishBody` - proc action(`turn`: Turn) = + proc action(`turn`: var Turn) = `exitBody` result = action diff --git a/src/syndicate/actors.nim b/src/syndicate/actors.nim index a929440..8156312 100644 --- a/src/syndicate/actors.nim +++ b/src/syndicate/actors.nim @@ -59,7 +59,7 @@ type facetIdAllocator: uint exiting, exited: bool - TurnAction* = proc (t: Turn) {.closure.} + TurnAction* = proc (t: var Turn) {.closure.} Turn* = ref object facet: Facet # active facet that may change during a turn @@ -110,15 +110,15 @@ when tracing: result.add f.id.toPreserves f = f.parent -method publish*(e: Entity; turn: Turn; v: AssertionRef; h: Handle) {.base.} = discard -method retract*(e: Entity; turn: Turn; h: Handle) {.base.} = discard -method message*(e: Entity; turn: Turn; v: AssertionRef) {.base.} = discard -method sync*(e: Entity; turn: Turn; peer: Cap) {.base.} = discard +method publish*(e: Entity; turn: var Turn; v: AssertionRef; h: Handle) {.base.} = discard +method retract*(e: Entity; turn: var Turn; h: Handle) {.base.} = discard +method message*(e: Entity; turn: var Turn; v: AssertionRef) {.base.} = discard +method sync*(e: Entity; turn: var Turn; peer: Cap) {.base.} = discard using actor: Actor facet: Facet - turn: Turn + turn: var Turn action: TurnAction proc labels(f: Facet): string = @@ -156,11 +156,13 @@ proc hash*(facet): Hash = proc hash*(r: Cap): Hash = !$(r.relay.hash !& r.target.unsafeAddr.hash) +proc actor*(turn): Actor = turn.facet.actor + proc nextHandle(facet: Facet): Handle = result = succ(facet.actor.handleAllocator[]) facet.actor.handleAllocator[] = result -proc queueWork*(turn: Turn; facet: Facet; act: TurnAction) = +proc queueWork*(turn: var Turn; facet: Facet; act: TurnAction) = assert not facet.isNil turn.work.addLast((facet, act,)) @@ -172,7 +174,7 @@ proc queueTurn*(facet: Facet; act: TurnAction) = turn.desc.id = nextTurnId() turnQueue.addLast(turn) -proc queueTurn*(prev: Turn; facet: Facet; act: TurnAction) = +proc queueTurn*(prev: var Turn; facet: Facet; act: TurnAction) = var next = Turn(facet: facet) assert not facet.isNil next.work.addLast((facet, act,)) @@ -309,12 +311,12 @@ proc runRewrites*(a: Attenuation; v: Value): Value = result = examineAlternatives(stage, result) if result.isFalse: break -proc publish(turn: Turn; cap: Cap; v: Value; h: Handle) = +proc publish(turn: var Turn; cap: Cap; v: Value; h: Handle) = var a = runRewrites(cap.attenuation, v) if not a.isFalse: let e = OutboundAssertion(handle: h, peer: cap) turn.facet.outbound[h] = e - queueEffect(turn, cap.relay) do (turn: Turn): + queueEffect(turn, cap.relay) do (turn: var Turn): e.established = true publish(cap.target, turn, AssertionRef(value: a), e.handle) when tracing: @@ -328,54 +330,54 @@ proc publish(turn: Turn; cap: Cap; v: Value; h: Handle) = act.enqueue.event.detail.assert.handle = h turn.desc.actions.add act -proc publish*(turn: Turn; r: Cap; a: Value): Handle {.discardable.} = +proc publish*(turn: var Turn; r: Cap; a: Value): Handle {.discardable.} = result = turn.facet.nextHandle() publish(turn, r, a, result) -proc publish*[T](turn: Turn; r: Cap; a: T): Handle {.discardable.} = +proc publish*[T](turn: var Turn; r: Cap; a: T): Handle {.discardable.} = publish(turn, r, a.toPreserves) -proc retract(turn: Turn; e: OutboundAssertion) = - queueEffect(turn, e.peer.relay) do (turn: Turn): +proc retract(turn: var Turn; e: OutboundAssertion) = + queueEffect(turn, e.peer.relay) do (turn: var Turn): if e.established: e.established = false e.peer.target.retract(turn, e.handle) -proc retract*(turn: Turn; h: Handle) = +proc retract*(turn: var Turn; h: Handle) = var e: OutboundAssertion if turn.facet.outbound.pop(h, e): turn.retract(e) -proc message*(turn: Turn; r: Cap; v: Value) = +proc message*(turn: var Turn; r: Cap; v: Value) = var a = runRewrites(r.attenuation, v) if not a.isFalse: - queueEffect(turn, r.relay) do (turn: Turn): + queueEffect(turn, r.relay) do (turn: var Turn): r.target.message(turn, AssertionRef(value: a)) proc message*[T](turn: Turn; r: Cap; v: T) = queueEffect(turn, r.relay) do (turn: Turn): message(turn, r, v.toPreserves) -proc sync(turn: Turn; e: Entity; peer: Cap) = +proc sync(turn: var Turn; e: Entity; peer: Cap) = e.sync(turn, peer) -proc sync*(turn: Turn; r, peer: Cap) = - queueEffect(turn, r.relay) do (turn: Turn): +proc sync*(turn: var Turn; r, peer: Cap) = + queueEffect(turn, r.relay) do (turn: var Turn): sync(turn, r.target, peer) -proc replace*[T](turn: Turn; cap: Cap; h: Handle; v: T): Handle = +proc replace*[T](turn: var Turn; cap: Cap; h: Handle; v: T): Handle = result = publish(turn, cap, v) if h != default(Handle): retract(turn, h) -proc replace*[T](turn: Turn; cap: Cap; h: var Handle; v: T): Handle {.discardable.} = +proc replace*[T](turn: var Turn; cap: Cap; h: var Handle; v: T): Handle {.discardable.} = var old = h h = publish(turn, cap, v) if old != default(Handle): retract(turn, old) h -proc stop*(turn: Turn) +proc stop*(turn: var Turn) proc newFacet(actor; parent: Facet; initialAssertions: OutboundTable): Facet = inc actor.facetIdAllocator @@ -410,13 +412,13 @@ proc preventInertCheck*(facet): (proc()) {.discardable.} = proc terminate(actor; turn; reason: ref Exception) -proc terminate(facet; turn: Turn; orderly: bool) = +proc terminate(facet; turn: var Turn; orderly: bool) = if facet.isAlive: facet.isAlive = false let parent = facet.parent if not parent.isNil: parent.children.excl facet - queueWork(turn, facet) do (turn: Turn): + queueWork(turn, facet) do (turn: var Turn): while facet.children.len > 0: facet.children.pop.terminate(turn, orderly) if orderly: @@ -428,7 +430,6 @@ proc terminate(facet; turn: Turn; orderly: bool) = if parent.isInert: parent.terminate(turn, true) else: - stderr.writeLine "terminate facet is terminating ", facet.actor.name terminate(facet.actor, turn, nil) when tracing: var act = ActionDescription(orKind: ActionDescriptionKind.facetStop) @@ -437,19 +438,18 @@ proc terminate(facet; turn: Turn; orderly: bool) = proc stopIfInertAfter(action: TurnAction): TurnAction = # TODO: verify this - proc wrapper(turn: Turn) = + proc wrapper(turn: var Turn) = action(turn) - queueEffect(turn, turn.facet) do (turn: Turn): + queueEffect(turn, turn.facet) do (turn: var Turn): if (not turn.facet.parent.isNil and (not turn.facet.parent.isAlive)) or turn.facet.isInert: - stderr.writeLine("stopIfInertAfter stops turn") stop(turn) wrapper -proc newFacet(turn: Turn): Facet = newFacet(turn.facet.actor, turn.facet) +proc newFacet(turn: var Turn): Facet = newFacet(turn.facet.actor, turn.facet) -proc inFacet*(turn: Turn; bootProc: TurnAction): Facet = +proc inFacet*(turn: var Turn; bootProc: TurnAction): Facet = result = newFacet(turn) let facet = turn.facet turn.facet = result @@ -460,7 +460,7 @@ proc inFacet*(turn: Turn; bootProc: TurnAction): Facet = stopIfInertAfter(bootProc)(turn) turn.facet = facet -proc facet(turn: Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc) +proc facet(turn: var Turn; bootProc: TurnAction): Facet {.deprecated.} = inFacet(turn, bootProc) proc newActor(name: string; parent: Facet): Actor = result = Actor( @@ -494,9 +494,9 @@ proc bootActor*(name: string; bootProc: TurnAction): Actor {.discardable.} = turn.desc.cause.external.description = "bootActor".toPreserves turnQueue.addLast turn -proc spawnActor*(name: string; turn: Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} = +proc spawnActor*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} = let actor = newActor(name, turn.facet) - queueEffect(turn, actor.root) do (turn: Turn): + queueEffect(turn, actor.root) do (turn: var Turn): var newOutBound: Table[Handle, OutboundAssertion] for key in initialAssertions: discard turn.facet.outbound.pop(key, newOutbound[key]) @@ -507,13 +507,12 @@ proc spawnActor*(name: string; turn: Turn; bootProc: TurnAction; initialAssertio run(actor, bootProc, newOutBound) actor -proc spawn*(name: string; turn: Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} = +proc spawn*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} = spawnActor(name, turn, bootProc, initialAssertions) type StopOnRetract = ref object of Entity -method retract*(e: StopOnRetract; turn: Turn; h: Handle) = - stderr.writeLIne "StopOnRetract stops turn ", turn, " of ", turn.facet.actor +method retract*(e: StopOnRetract; turn: var Turn; h: Handle) = stop(turn) proc halfLink(facet, other: Facet) = @@ -524,7 +523,7 @@ proc halfLink(facet, other: Facet) = established: true, ) -proc spawnLink*(name: string; turn: Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} = +proc spawnLink*(name: string; turn: var Turn; bootProc: TurnAction; initialAssertions = initHashSet[Handle]()): Actor {.discardable.} = result = spawnActor(name, turn, bootProc, initialAssertions) halfLink(turn.facet, result.root) halfLink(result.root, turn.facet) @@ -547,24 +546,24 @@ proc terminate(actor; turn; reason: ref Exception) = act.stop.status.error.message = reason.msg trace(actor, act) for hook in actor.exitHooks: hook(turn) - proc finish(turn: Turn) = + proc finish(turn: var Turn) = assert not actor.root.isNil, actor.name actor.root.terminate(turn, reason.isNil) actor.exited = true queueTurn(actor.root, finish) proc terminate*(facet; e: ref Exception) = - run(facet.actor.root) do (turn: Turn): + run(facet.actor.root) do (turn: var Turn): facet.actor.terminate(turn, e) -template recallFacet(turn: Turn; body: untyped): untyped = +template recallFacet(turn: var Turn; body: untyped): untyped = let facet = turn.facet block: body assert facet.actor == turn.facet.actor, "turn of " & $facet.actor & " ended at " & $turn.facet.actor turn.facet = facet -proc stopNow(turn: Turn) = +proc stopNow(turn: var Turn) = let caller = turn.facet recallFacet turn: while caller.children.len > 0: @@ -576,35 +575,35 @@ proc stopNow(turn: Turn) = queueEffect(turn, child, stopNow) caller.terminate(turn, true) -proc stop*(turn: Turn, facet: Facet) = +proc stop*(turn: var Turn, facet: Facet) = queueEffect(turn, facet, stopNow) -proc stop*(turn: Turn) = +proc stop*(turn: var Turn) = stop(turn, turn.facet) proc onStop*(facet: Facet; act: TurnAction) = - ## Add a `proc (turn: Turn)` action to `facet` to be called as it stops. + ## Add a `proc (turn: var Turn)` action to `facet` to be called as it stops. assert not facet.isNil add(facet.shutdownActions, act) -proc onStop*(turn: Turn; act: TurnAction) = +proc onStop*(turn: var Turn; act: TurnAction) = onStop(turn.facet, act) proc stop*(actor: Actor) = - queueTurn(actor.root) do (turn: Turn): + queueTurn(actor.root) do (turn: var Turn): assert(not turn.facet.isNil) stop(turn) proc stopActor*(facet: Facet) = stop(facet.actor) -proc stopActor*(turn: Turn) = +proc stopActor*(turn: var Turn) = assert(not turn.facet.isNil) assert(not turn.facet.actor.isNil) assert(not turn.facet.actor.root.isNil) stop(turn, turn.facet.actor.root) -proc freshen*(turn: Turn, act: TurnAction) {.deprecated.} = +proc freshen*(turn: var Turn, act: TurnAction) {.deprecated.} = run(turn.facet, act) proc newCap*(relay: Facet; e: Entity): Cap {.deprecated.} = @@ -619,10 +618,10 @@ proc newCap*(e: Entity; turn): Cap = type SyncContinuation {.final.} = ref object of Entity action: TurnAction -method message(entity: SyncContinuation; turn: Turn; v: AssertionRef) = +method message(entity: SyncContinuation; turn: var Turn; v: AssertionRef) = entity.action(turn) -proc sync*(turn: Turn; refer: Cap; act: TurnAction) = +proc sync*(turn: var Turn; refer: Cap; act: TurnAction) = sync(turn, refer, newCap(turn, SyncContinuation(action: act))) proc running*(actor): bool = @@ -630,7 +629,7 @@ proc running*(actor): bool = if not (result or actor.exitReason.isNil): raise actor.exitReason -proc run(turn: Turn) = +proc run(turn: var Turn) = while turn.work.len > 0: var (facet, act) = turn.work.popFirst() assert not act.isNil diff --git a/src/syndicate/actors/timers.nim b/src/syndicate/actors/timers.nim index ace9f4a..fb53004 100644 --- a/src/syndicate/actors/timers.nim +++ b/src/syndicate/actors/timers.nim @@ -59,7 +59,7 @@ when defined(linux): proc spawnTimerDriver(facet: Facet; cap: Cap): TimerDriver = let driver = TimerDriver(facet: facet, target: cap) - facet.onStop do (turn: Turn): + facet.onStop do (turn: var Turn): for fd in driver.timers: unregister(FD fd) discard close(fd) @@ -89,16 +89,16 @@ when defined(linux): wait(FD fd, Read) if deadline in driver.deadlines: # Check if the deadline is still observed. - proc turnWork(turn: Turn) = + proc turnWork(turn: var Turn) = discard publish(turn, driver.target, LaterThan(seconds: deadline)) run(driver.facet, turnWork) discard close(fd) driver.timers.excl(fd) - proc spawnTimerActor*(turn: Turn; ds: Cap): Actor {.discardable.} = + proc spawnTimerActor*(turn: var Turn; ds: Cap): Actor {.discardable.} = ## Spawn a timer actor that responds to ## dataspace observations of timeouts on `ds`. - spawnLink("timers", turn) do (turn: Turn): + spawnLink("timers", turn) do (turn: var Turn): let driver = spawnTimerDriver(turn.facet, ds) let pat = inject(grab Observe(pattern: dropType LaterThan), {0: grabLit()}) during(turn, ds, pat) do (deadline: float): @@ -108,7 +108,7 @@ when defined(linux): discard change(driver.deadlines, deadline, -1, clamp = true) # TODO: retract assertions that are unobserved. -proc after*(turn: Turn; ds: Cap; dur: Duration; act: TurnAction) = +proc after*(turn: var Turn; ds: Cap; dur: Duration; act: TurnAction) = ## Execute `act` after some duration of time. var later = clock_realtime().toFloat() + dur.inMilliseconds.float / 1_000.0 onPublish(turn, ds, grab LaterThan(seconds: later)): diff --git a/src/syndicate/dataspaces.nim b/src/syndicate/dataspaces.nim index 2cc9ebf..477535a 100644 --- a/src/syndicate/dataspaces.nim +++ b/src/syndicate/dataspaces.nim @@ -16,14 +16,14 @@ type index: Index handleMap: Table[Handle, Assertion] -method publish(ds: Dataspace; turn: Turn; a: AssertionRef; h: Handle) = +method publish(ds: Dataspace; turn: var Turn; a: AssertionRef; h: Handle) = if add(ds.index, turn, a.value): var obs = a.value.preservesTo(Observe) if obs.isSome and obs.get.observer of Cap: ds.index.add(turn, obs.get.pattern, Cap(obs.get.observer)) ds.handleMap[h] = a.value -method retract(ds: Dataspace; turn: Turn; h: Handle) = +method retract(ds: Dataspace; turn: var Turn; h: Handle) = let v = ds.handleMap[h] if remove(ds.index, turn, v): ds.handleMap.del h @@ -31,20 +31,20 @@ method retract(ds: Dataspace; turn: Turn; h: Handle) = if obs.isSome and obs.get.observer of Cap: ds.index.remove(turn, obs.get.pattern, Cap(obs.get.observer)) -method message(ds: Dataspace; turn: Turn; a: AssertionRef) = +method message(ds: Dataspace; turn: var Turn; a: AssertionRef) = ds.index.deliverMessage(turn, a.value) -proc newDataspace*(turn: Turn): Cap = +proc newDataspace*(turn: var Turn): Cap = newCap(turn, Dataspace(index: initIndex())) -type BootProc = proc (turn: Turn; ds: Cap) {.closure.} -type DeprecatedBootProc = proc (ds: Cap; turn: Turn) {.closure.} +type BootProc = proc (turn: var Turn; ds: Cap) {.closure.} +type DeprecatedBootProc = proc (ds: Cap; turn: var Turn) {.closure.} proc bootDataspace*(name: string; bootProc: BootProc): Actor = - bootActor(name) do (turn: Turn): + bootActor(name) do (turn: var Turn): discard turn.facet.preventInertCheck() bootProc(turn, newDataspace(turn)) proc bootDataspace*(name: string; bootProc: DeprecatedBootProc): Actor {.deprecated.} = - bootDataspace(name) do (turn: Turn, ds: Cap): + bootDataspace(name) do (turn: var Turn, ds: Cap): bootProc(ds, turn) diff --git a/src/syndicate/durings.nim b/src/syndicate/durings.nim index 2304c64..0beac46 100644 --- a/src/syndicate/durings.nim +++ b/src/syndicate/durings.nim @@ -6,7 +6,7 @@ import preserves import ./actors, ./patterns, ./protocols/dataspace type - DuringProc* = proc (turn: Turn; a: Value; h: Handle): TurnAction + DuringProc* = proc (turn: var Turn; a: Value; h: Handle): TurnAction DuringActionKind = enum null, dead, act DuringAction = object case kind: DuringActionKind @@ -17,7 +17,7 @@ type cb: DuringProc assertionMap: Table[Handle, DuringAction] -method publish(de: DuringEntity; turn: Turn; a: AssertionRef; h: Handle) = +method publish(de: DuringEntity; turn: var Turn; a: AssertionRef; h: Handle) = let action = de.cb(turn, a.value, h) # assert(not action.isNil "should have put in a no-op action") let g = de.assertionMap.getOrDefault h @@ -30,7 +30,7 @@ method publish(de: DuringEntity; turn: Turn; a: AssertionRef; h: Handle) = of act: raiseAssert("during: duplicate handle in publish: " & $h) -method retract(de: DuringEntity; turn: Turn; h: Handle) = +method retract(de: DuringEntity; turn: var Turn; h: Handle) = let g = de.assertionMap.getOrDefault h case g.kind of null: @@ -44,5 +44,5 @@ method retract(de: DuringEntity; turn: Turn; h: Handle) = proc during*(cb: DuringProc): DuringEntity = DuringEntity(cb: cb) -proc observe*(turn: Turn; ds: Cap; pat: Pattern; e: Entity): Handle = +proc observe*(turn: var Turn; ds: Cap; pat: Pattern; e: Entity): Handle = publish(turn, ds, Observe(pattern: pat, observer: newCap(turn, e))) diff --git a/src/syndicate/relays.nim b/src/syndicate/relays.nim index e185c28..bbc65e4 100644 --- a/src/syndicate/relays.nim +++ b/src/syndicate/relays.nim @@ -27,8 +27,8 @@ type Turn = syndicate.Turn WireRef = sturdy.WireRef - PacketWriter = proc (turn: Turn; buf: seq[byte]) {.closure.} - RelaySetup = proc (turn: Turn; relay: Relay) {.closure.} + PacketWriter = proc (turn: var Turn; buf: seq[byte]) {.closure.} + RelaySetup = proc (turn: var Turn; relay: Relay) {.closure.} Relay* = ref object facet: Facet @@ -56,20 +56,20 @@ type proc releaseCapOut(r: Relay; e: WireSymbol) = r.exported.drop e -method publish(spe: SyncPeerEntity; t: Turn; a: AssertionRef; h: Handle) = +method publish(spe: SyncPeerEntity; t: var Turn; a: AssertionRef; h: Handle) = spe.handleMap[h] = publish(t, spe.peer, a.value) -method retract(se: SyncPeerEntity; t: Turn; h: Handle) = +method retract(se: SyncPeerEntity; t: var Turn; h: Handle) = var other: Handle if se.handleMap.pop(h, other): retract(t, other) -method message(se: SyncPeerEntity; t: Turn; a: AssertionRef) = +method message(se: SyncPeerEntity; t: var Turn; a: AssertionRef) = if not se.e.isNil: se.relay.releaseCapOut(se.e) message(t, se.peer, a.value) -method sync(se: SyncPeerEntity; t: Turn; peer: Cap) = +method sync(se: SyncPeerEntity; t: var Turn; peer: Cap) = sync(t, se.peer, peer) proc newSyncPeerEntity(r: Relay; p: Cap): SyncPeerEntity = @@ -106,40 +106,40 @@ proc deregister(relay: Relay; h: Handle) = if relay.outboundAssertions.pop(h, outbound): for e in outbound: releaseCapOut(relay, e) -proc send(relay: Relay; turn: Turn; rOid: protocol.Oid; m: Event) = +proc send(relay: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) = # TODO: don't send right away. var pendingTurn: protocol.Turn pendingTurn.add TurnEvent(oid: rOid, event: m) - relay.facet.run do (turn: Turn): + relay.facet.run do (turn: var Turn): var pkt = Packet( orKind: PacketKind.Turn, turn: pendingTurn) trace "C: ", pkt relay.packetWriter(turn, encode pkt) -proc send(re: RelayEntity; turn: Turn; ev: Event) = +proc send(re: RelayEntity; turn: var Turn; ev: Event) = send(re.relay, turn, protocol.Oid re.oid, ev) -method publish(re: RelayEntity; t: Turn; a: AssertionRef; h: Handle) = +method publish(re: RelayEntity; t: var Turn; a: AssertionRef; h: Handle) = re.send(t, Event( orKind: EventKind.Assert, `assert`: protocol.Assert( assertion: re.relay.register(a.value, h).rewritten, handle: h))) -method retract(re: RelayEntity; t: Turn; h: Handle) = +method retract(re: RelayEntity; t: var Turn; h: Handle) = re.relay.deregister h re.send(t, Event( orKind: EventKind.Retract, retract: Retract(handle: h))) -method message(re: RelayEntity; turn: Turn; msg: AssertionRef) = +method message(re: RelayEntity; turn: var Turn; msg: AssertionRef) = var (value, exported) = rewriteOut(re.relay, msg.value) assert(len(exported) == 0, "cannot send a reference in a message") if len(exported) == 0: re.send(turn, Event(orKind: EventKind.Message, message: Message(body: value))) -method sync(re: RelayEntity; turn: Turn; peer: Cap) = +method sync(re: RelayEntity; turn: var Turn; peer: Cap) = var peerEntity = newSyncPeerEntity(re.relay, peer) exported: seq[WireSymbol] @@ -193,7 +193,7 @@ proc rewriteIn(relay; facet; v: Value): proc close(r: Relay) = discard -proc dispatch(relay: Relay; turn: Turn; cap: Cap; event: Event) = +proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) = case event.orKind of EventKind.Assert: let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion) @@ -216,14 +216,14 @@ proc dispatch(relay: Relay; turn: Turn; cap: Cap; event: Event) = #[ var imported: seq[WireSymbol] let k = relay.rewriteCapIn(turn, evenr.sync.peer, imported) - turn.sync(cap) do (turn: Turn): + turn.sync(cap) do (turn: var Turn): turn.message(k, true) for e in imported: relay.imported.del e ]# proc dispatch(relay: Relay; v: Value) = trace "S: ", v - run(relay.facet) do (t: Turn): + run(relay.facet) do (t: var Turn): var pkt: Packet if pkt.fromPreserves(v): case pkt.orKind @@ -261,10 +261,8 @@ type initialCap*: Cap nextLocalOid*: Option[Oid] -proc spawnRelay(name: string; turn: Turn; opts: RelayActorOptions; setup: RelaySetup) = - stderr.writeLine "calling spawnActor for relay ", name - spawnLink(name, turn) do (turn: Turn): - stderr.writeLine "executing body of spawned actor relay ", name +proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) = + spawnLink(name, turn) do (turn: var Turn): let relay = Relay( facet: turn.facet, packetWriter: opts.packetWriter, @@ -308,7 +306,7 @@ when defined(posix): stdin: AsyncFile alive: bool - method message(entity: StdioEntity; turn: Turn; ass: AssertionRef) = + method message(entity: StdioEntity; turn: var Turn; ass: AssertionRef) = if ass.value.preservesTo(ForceDisconnect).isSome: entity.alive = false @@ -324,21 +322,20 @@ when defined(posix): entity.relay.recv(buf[], 0.. 0 stack[stack.high] -proc modify(node: Node; turn: Turn; outerValue: Value; event: EventKind; +proc modify(node: Node; turn: var Turn; outerValue: Value; event: EventKind; modCont: ContinuationProc; modLeaf: LeafProc; modObs: ObserverProc) = - proc walk(cont: Continuation; turn: Turn) = + proc walk(cont: Continuation; turn: var Turn) = modCont(cont, outerValue) for constPaths, constValMap in cont.leafMap.pairs: let constVals = projectPaths(outerValue, constPaths) @@ -142,7 +142,7 @@ proc modify(node: Node; turn: Turn; outerValue: Value; event: EventKind; constValMap.del(get constVals) - proc walk(node: Node; turn: Turn; termStack: TermStack) = + proc walk(node: Node; turn: var Turn; termStack: TermStack) = walk(node.continuation, turn) for selector, table in node.edges: let @@ -227,7 +227,7 @@ proc getEndpoints(leaf: Leaf; capturePaths: Paths): ObserverGroup = if captures.isSome: discard result.cachedCaptures.change(get captures, +1) -proc add*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) = +proc add*(index: var Index; turn: var Turn; pattern: Pattern; observer: Cap) = let cont = index.root.extend(pattern) analysis = analyse pattern @@ -240,7 +240,7 @@ proc add*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) = captureMap[capture] = publish(turn, observer, capture) endpoints.observers[observer] = captureMap -proc remove*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) = +proc remove*(index: var Index; turn: var Turn; pattern: Pattern; observer: Cap) = let cont = index.root.extend(pattern) analysis = analyse pattern @@ -260,7 +260,7 @@ proc remove*(index: var Index; turn: Turn; pattern: Pattern; observer: Cap) = if constValMap.len == 0: cont.leafMap.del(analysis.constPaths) -proc adjustAssertion(index: var Index; turn: Turn; outerValue: Value; delta: int): bool = +proc adjustAssertion(index: var Index; turn: var Turn; outerValue: Value; delta: int): bool = case index.allAssertions.change(outerValue, delta) of cdAbsentToPresent: result = true @@ -268,7 +268,7 @@ proc adjustAssertion(index: var Index; turn: Turn; outerValue: Value; delta: int c.cache.incl(v) proc modLeaf(l: Leaf; v: Value) = l.cache.incl(v) - proc modObserver(turn: Turn; group: ObserverGroup; vs: seq[Value]) = + proc modObserver(turn: var Turn; group: ObserverGroup; vs: seq[Value]) = let change = group.cachedCaptures.change(vs, +1) if change == cdAbsentToPresent: for (observer, captureMap) in group.observers.pairs: @@ -281,7 +281,7 @@ proc adjustAssertion(index: var Index; turn: Turn; outerValue: Value; delta: int c.cache.excl(v) proc modLeaf(l: Leaf; v: Value) = l.cache.excl(v) - proc modObserver(turn: Turn; group: ObserverGroup; vs: seq[Value]) = + proc modObserver(turn: var Turn; group: ObserverGroup; vs: seq[Value]) = if group.cachedCaptures.change(vs, -1) == cdPresentToAbsent: for (observer, captureMap) in group.observers.pairs: var h: Handle @@ -293,12 +293,12 @@ proc adjustAssertion(index: var Index; turn: Turn; outerValue: Value; delta: int proc continuationNoop(c: Continuation; v: Value) = discard proc leafNoop(l: Leaf; v: Value) = discard -proc add*(index: var Index; turn: Turn; v: Value): bool = +proc add*(index: var Index; turn: var Turn; v: Value): bool = adjustAssertion(index, turn, v, +1) -proc remove*(index: var Index; turn: Turn; v: Value): bool = +proc remove*(index: var Index; turn: var Turn; v: Value): bool = adjustAssertion(index, turn, v, -1) -proc deliverMessage*(index: var Index; turn: Turn; v: Value) = - proc observersCb(turn: Turn; group: ObserverGroup; vs: seq[Value]) = +proc deliverMessage*(index: var Index; turn: var Turn; v: Value) = + proc observersCb(turn: var Turn; group: ObserverGroup; vs: seq[Value]) = for observer in group.observers.keys: message(turn, observer, vs) index.root.modify(turn, v, messageEvent, continuationNoop, leafNoop, observersCb) diff --git a/tests/test_timers.nim b/tests/test_timers.nim index 0dc3de2..8c2165c 100644 --- a/tests/test_timers.nim +++ b/tests/test_timers.nim @@ -5,21 +5,21 @@ import std/times import pkg/sys/ioqueue import syndicate, syndicate/actors/timers -let actor = bootActor("timer-test") do (turn: Turn): +let actor = bootActor("timer-test") do (turn: var Turn): let timers = newDataspace(turn) spawnTimerActor(timers, turn) onPublish(turn, timers, ?LaterThan(seconds: 1356100000)): echo "now in 13th bʼakʼtun" - after(turn, timers, initDuration(seconds = 3)) do (turn: Turn): + after(turn, timers, initDuration(seconds = 3)) do (turn: var Turn): echo "third timer expired" stopActor(turn) - after(turn, timers, initDuration(seconds = 1)) do (turn: Turn): + after(turn, timers, initDuration(seconds = 1)) do (turn: var Turn): echo "first timer expired" - after(turn, timers, initDuration(seconds = 2)) do (turn: Turn): + after(turn, timers, initDuration(seconds = 2)) do (turn: var Turn): echo "second timer expired" echo "single run of ioqueue"