107 lines
3.7 KiB
Nim
107 lines
3.7 KiB
Nim
|
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||
|
# SPDX-License-Identifier: Unlicense
|
||
|
|
||
|
import std/[options, tables, times]
|
||
|
import preserves, syndicate,
|
||
|
syndicate/relays,
|
||
|
syndicate/actors/timers
|
||
|
|
||
|
import ../schema/[assertions, config]
|
||
|
|
||
|
type PulseEntity {.final.} = ref object of Entity
|
||
|
## An entity that asserts and retracts observers on a pulse.
|
||
|
self, timers: Cap
|
||
|
target: Entity
|
||
|
period: float
|
||
|
timerHandle: Handle
|
||
|
observers: Table[Handle, AssertionRef]
|
||
|
observePattern: Pattern
|
||
|
observing: bool
|
||
|
|
||
|
proc schedule(turn: var Turn; pulse: PulseEntity) =
|
||
|
## Schedule the next pulse.
|
||
|
## The next pulse will be schedule using the current time as
|
||
|
## reference point and not the moment of the previous pulse.
|
||
|
let then = getTime().toUnixFloat()+pulse.period
|
||
|
pulse.timerHandle = publish(turn, pulse.timers, Observe(
|
||
|
pattern: LaterThan ?: { 0: ?then },
|
||
|
observer: pulse.self,
|
||
|
))
|
||
|
|
||
|
method publish(pulse: PulseEntity; turn: var Turn; ass: AssertionRef; h: Handle) =
|
||
|
## Publish observers in reponse to <later-than …> assertions.
|
||
|
pulse.timers.target.retract(turn, pulse.timerHandle)
|
||
|
schedule(turn, pulse)
|
||
|
pulse.observing = true
|
||
|
for h, a in pulse.observers.pairs:
|
||
|
pulse.target.publish(turn, a, h)
|
||
|
pulse.target.sync(turn, pulse.self)
|
||
|
|
||
|
method message(pulse: PulseEntity; turn: var Turn; v: AssertionRef) =
|
||
|
## Retract observers in response to a sync message.
|
||
|
pulse.observing = false
|
||
|
for h in pulse.observers.keys:
|
||
|
pulse.target.retract(turn, h)
|
||
|
|
||
|
type ProxyEntity {.final.} = ref object of Entity
|
||
|
## A proxy `Entity` that diverts observers to a `PulseEntity`.
|
||
|
pulse: PulseEntity
|
||
|
|
||
|
method publish(proxy: ProxyEntity; turn: var Turn; ass: AssertionRef; h: Handle) =
|
||
|
## Proxy assertions that are not observations.
|
||
|
if proxy.pulse.observePattern.matches ass.value:
|
||
|
if proxy.pulse.observers.len == 0:
|
||
|
schedule(turn, proxy.pulse)
|
||
|
proxy.pulse.observers[h] = ass
|
||
|
else:
|
||
|
proxy.pulse.target.publish(turn, ass, h)
|
||
|
|
||
|
method retract(proxy: ProxyEntity; turn: var Turn; h: Handle) =
|
||
|
## Retract proxied assertions.
|
||
|
var obs: AssertionRef
|
||
|
if proxy.pulse.observers.pop(h, obs):
|
||
|
if proxy.pulse.observing:
|
||
|
proxy.pulse.target.retract(turn, h)
|
||
|
if proxy.pulse.observers.len == 0:
|
||
|
proxy.pulse.timers.target.retract(turn, proxy.pulse.timerHandle)
|
||
|
else:
|
||
|
proxy.pulse.target.retract(turn, h)
|
||
|
|
||
|
method message(proxy: ProxyEntity; turn: var Turn; v: AssertionRef) =
|
||
|
## Proxy mesages.
|
||
|
proxy.pulse.target.message(turn, v)
|
||
|
|
||
|
method sync(proxy: ProxyEntity; turn: var Turn; peer: Cap) =
|
||
|
## Proxy sync.
|
||
|
proxy.pulse.target.sync(turn, peer)
|
||
|
|
||
|
proc newProxyEntity(turn: var Turn; timers, ds: Cap; period: float): ProxyEntity =
|
||
|
new result
|
||
|
result.pulse = PulseEntity(
|
||
|
target: ds.target,
|
||
|
timers: timers,
|
||
|
observePattern: ?:Observe,
|
||
|
period: period,
|
||
|
)
|
||
|
result.pulse.self = newCap(turn, result.pulse)
|
||
|
|
||
|
proc spawnPulseActor*(turn: var Turn; root: Cap): Actor =
|
||
|
## Spawn an actor that retracts and re-asserts observers on
|
||
|
## a timed pulse. Requires a timer service on the `root` capability.
|
||
|
spawn("pulse", turn) do (turn: var Turn):
|
||
|
let grabPeriod = ?Observe(pattern: !Pulse) ?? { 0: grab() }
|
||
|
during(turn, root, ?:PulseArguments) do (ds: Cap):
|
||
|
during(turn, ds, grabPeriod) do (lit: Literal[float]):
|
||
|
if lit.value < 0.000_1:
|
||
|
stderr.writeLine("pulse period is too small: ", lit.value, "s")
|
||
|
else:
|
||
|
let proxyCap = newCap(turn, newProxyEntity(turn, root, ds, lit.value))
|
||
|
var pulse = Pulse(periodSec: lit.value, proxy: embed proxyCap)
|
||
|
discard publish(turn, ds, pulse)
|
||
|
|
||
|
when isMainModule:
|
||
|
runActor("main") do (turn: var Turn; root: Cap):
|
||
|
spawnTimers(turn, root)
|
||
|
connectStdio(turn, root)
|
||
|
discard spawnPulseActor(turn, root)
|