diff --git a/README.md b/README.md index 996252d..3cc4e54 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,27 @@ $sqlspace ? [ ] ``` +### Pulse proxy + +A proxy actor that passes assertions and messages to a configured capability but only asserts observations on a a periodic pulse. +This can be used to implement polling behavior. + +``` +# Example config +let ?ds = dataspace + +> +? ?cap> [ + $cap +] + +$ds ? [ + $proxy ? [ + $log ! + ] +] +``` + ### SQLite Readonly access to SQLite databases. Asserts rows as records in response to SQL query assertions. Dynamic updates are not implemented. diff --git a/assertions.prs b/assertions.prs index a556e81..ed283c4 100644 --- a/assertions.prs +++ b/assertions.prs @@ -1,3 +1,8 @@ version 1. FileSystemUsage = . + +# This assertion publishes a dataspace that proxies assertions with +# an exception for which is pulsed every periodSec. +# The pulse resolution is no more than one millisecond. +Pulse = . diff --git a/config.prs b/config.prs index 4e1987d..fad1099 100644 --- a/config.prs +++ b/config.prs @@ -28,6 +28,10 @@ PostgreArguments = . PostgreConnectionParameter = [@key string @val string]. +PulseArguments = . + SqliteArguments = assertions. + pulse.timers.target.retract(turn, pulse.timerHandle) + schedule(turn, pulse) + pulse.observing = true + for h, a in pulse.observers.pairs: + pulse.target.publish(turn, a, h) + pulse.target.sync(turn, pulse.self) + +method message(pulse: PulseEntity; turn: var Turn; v: AssertionRef) = + ## Retract observers in response to a sync message. + pulse.observing = false + for h in pulse.observers.keys: + pulse.target.retract(turn, h) + +type ProxyEntity {.final.} = ref object of Entity + ## A proxy `Entity` that diverts observers to a `PulseEntity`. + pulse: PulseEntity + +method publish(proxy: ProxyEntity; turn: var Turn; ass: AssertionRef; h: Handle) = + ## Proxy assertions that are not observations. + if proxy.pulse.observePattern.matches ass.value: + if proxy.pulse.observers.len == 0: + schedule(turn, proxy.pulse) + proxy.pulse.observers[h] = ass + else: + proxy.pulse.target.publish(turn, ass, h) + +method retract(proxy: ProxyEntity; turn: var Turn; h: Handle) = + ## Retract proxied assertions. + var obs: AssertionRef + if proxy.pulse.observers.pop(h, obs): + if proxy.pulse.observing: + proxy.pulse.target.retract(turn, h) + if proxy.pulse.observers.len == 0: + proxy.pulse.timers.target.retract(turn, proxy.pulse.timerHandle) + else: + proxy.pulse.target.retract(turn, h) + +method message(proxy: ProxyEntity; turn: var Turn; v: AssertionRef) = + ## Proxy mesages. + proxy.pulse.target.message(turn, v) + +method sync(proxy: ProxyEntity; turn: var Turn; peer: Cap) = + ## Proxy sync. + proxy.pulse.target.sync(turn, peer) + +proc newProxyEntity(turn: var Turn; timers, ds: Cap; period: float): ProxyEntity = + new result + result.pulse = PulseEntity( + target: ds.target, + timers: timers, + observePattern: ?:Observe, + period: period, + ) + result.pulse.self = newCap(turn, result.pulse) + +proc spawnPulseActor*(turn: var Turn; root: Cap): Actor = + ## Spawn an actor that retracts and re-asserts observers on + ## a timed pulse. Requires a timer service on the `root` capability. + spawn("pulse", turn) do (turn: var Turn): + let grabPeriod = ?Observe(pattern: !Pulse) ?? { 0: grab() } + during(turn, root, ?:PulseArguments) do (ds: Cap): + during(turn, ds, grabPeriod) do (lit: Literal[float]): + if lit.value < 0.000_1: + stderr.writeLine("pulse period is too small: ", lit.value, "s") + else: + let proxyCap = newCap(turn, newProxyEntity(turn, root, ds, lit.value)) + var pulse = Pulse(periodSec: lit.value, proxy: embed proxyCap) + discard publish(turn, ds, pulse) + +when isMainModule: + runActor("main") do (turn: var Turn; root: Cap): + spawnTimers(turn, root) + connectStdio(turn, root) + discard spawnPulseActor(turn, root) diff --git a/syndicate_utils.nimble b/syndicate_utils.nimble index 36a17b7..9319c3b 100644 --- a/syndicate_utils.nimble +++ b/syndicate_utils.nimble @@ -1,6 +1,6 @@ # Package -version = "20240110" +version = "20240114" author = "Emery Hemingway" description = "Utilites for Syndicated Actors and Synit" license = "unlicense" @@ -10,4 +10,4 @@ bin = @["mintsturdyref", "mount_actor", "msg", "net_mapper", "preserve # Dependencies -requires "nim >= 2.0.0", "illwill", "syndicate >= 20240108", "ws" +requires "nim >= 2.0.0", "illwill", "syndicate >= 20240114", "ws"