# SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense import std/[options, tables, times] import preserves, preserves/sugar import syndicate, syndicate/drivers/timers import ../schema/[assertions, config] type PulseEntity {.final.} = ref object of Entity ## An entity that asserts and retracts observers on a pulse. self, timers: Cap target: Entity period: float timerHandle: Handle observers: Table[Handle, AssertionRef] observePattern: Pattern observing: bool proc schedule(turn: Turn; pulse: PulseEntity) = ## Schedule the next pulse. ## The next pulse will be schedule using the current time as ## reference point and not the moment of the previous pulse. let then = getTime().toUnixFloat()+pulse.period pulse.timerHandle = publish(turn, pulse.timers, Observe( pattern: LaterThan ?: { 0: ?then }, observer: pulse.self, )) method publish(pulse: PulseEntity; turn: Turn; ass: AssertionRef; h: Handle) = ## Publish observers in reponse to assertions. pulse.timers.target.retract(turn, pulse.timerHandle) schedule(turn, pulse) pulse.observing = true for h, a in pulse.observers.pairs: pulse.target.publish(turn, a, h) pulse.target.sync(turn, pulse.self) method message(pulse: PulseEntity; turn: Turn; v: AssertionRef) = ## Retract observers in response to a sync message. pulse.observing = false for h in pulse.observers.keys: pulse.target.retract(turn, h) type ProxyEntity {.final.} = ref object of Entity ## A proxy `Entity` that diverts observers to a `PulseEntity`. pulse: PulseEntity method publish(proxy: ProxyEntity; turn: Turn; ass: AssertionRef; h: Handle) = ## Proxy assertions that are not observations. if proxy.pulse.observePattern.matches ass.value: if proxy.pulse.observers.len == 0: schedule(turn, proxy.pulse) proxy.pulse.observers[h] = ass else: proxy.pulse.target.publish(turn, ass, h) method retract(proxy: ProxyEntity; turn: Turn; h: Handle) = ## Retract proxied assertions. var obs: AssertionRef if proxy.pulse.observers.pop(h, obs): if proxy.pulse.observing: proxy.pulse.target.retract(turn, h) if proxy.pulse.observers.len == 0: proxy.pulse.timers.target.retract(turn, proxy.pulse.timerHandle) else: proxy.pulse.target.retract(turn, h) method message(proxy: ProxyEntity; turn: Turn; v: AssertionRef) = ## Proxy mesages. proxy.pulse.target.message(turn, v) method sync(proxy: ProxyEntity; turn: Turn; peer: Cap) = ## Proxy sync. proxy.pulse.target.sync(turn, peer) proc newProxyEntity(turn: Turn; timers, ds: Cap; period: float): ProxyEntity = new result result.pulse = PulseEntity( target: ds.target, timers: timers, observePattern: ?:Observe, period: period, ) result.pulse.self = newCap(turn, result.pulse) proc spawnPulseActor*(turn: Turn; root: Cap): Actor = ## Spawn an actor that retracts and re-asserts observers on ## a timed pulse. Requires a timer service on the `root` capability. spawnActor(turn, "pulse") do (turn: Turn): let grabPeriod = observePattern(!Pulse, { @[%0]: grab() }) during(turn, root, ?:PulseArguments) do (ds: Cap): during(turn, ds, grabPeriod) do (lit: Literal[float]): if lit.value < 0.000_1: stderr.writeLine("pulse period is too small: ", lit.value, "s") else: let proxyCap = newCap(turn, newProxyEntity(turn, root, ds, lit.value)) var pulse = Pulse(periodSec: lit.value, proxy: embed proxyCap) discard publish(turn, ds, pulse) when isMainModule: import syndicate/relays runActor("main") do (turn: Turn): resolveEnvironment(turn) do (turn: Turn; ds: Cap): discard spawnPulseActor(turn, ds)