From 152fb258f3959a0862147fd95f928463ee983cba Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Tue, 25 Jun 2024 18:11:27 +0300 Subject: [PATCH] Replace pulse actor protocol --- README.md | 38 ++++++-- config.prs | 16 +++ sbom.json | 16 +-- src/schema/config.nim | 16 ++- src/syndesizer/pulses.nim | 200 +++++++++++++++++++++++--------------- 5 files changed, 185 insertions(+), 101 deletions(-) diff --git a/README.md b/README.md index fefd54b..7cea921 100644 --- a/README.md +++ b/README.md @@ -158,23 +158,41 @@ let ?ds = dataspace ### Pulse proxy -A proxy actor that passes assertions and messages to a configured capability but only asserts observations on a a periodic pulse. -This can be used to implement polling behavior. +An actor that produces proxies that accept assertions put only forwards them during a pulse window. +This can be used to implement polling behavior or periodic service scheduling. ``` -# Example config -let ?ds = dataspace +#!/usr/bin/env -S syndicate-server --control --config + +let ?destination = dataspace +$destination ? ?x [ + $log ! + ?- $log ! +] + +? [ + $log ! + $pulsator +] > + +let ?resolver = <* $config [ >]> ? ?cap> [ - $cap + $log ! + $cap $resolver> ] -$ds ? [ - $proxy ? [ - $log ! - ] -] + ``` ### SQLite diff --git a/config.prs b/config.prs index f5821dd..79cade4 100644 --- a/config.prs +++ b/config.prs @@ -47,6 +47,22 @@ PostgreConnectionParameter = [@key string @val string]. PrinterStep = . +PulseStep = . +PulseDetail = { + + # Destination for assertions. + target: #:any + + # Interval in seconds at which assertions are forwarded. + interval: float + + # Period in seconds of assertion. + period: float + + # Dither the @interval with a Gaussian deviation of @dither. + dither: float +} . + PulseArguments = . diff --git a/sbom.json b/sbom.json index b136425..851bf80 100644 --- a/sbom.json +++ b/sbom.json @@ -7,7 +7,7 @@ "bom-ref": "pkg:nim/syndicate_utils", "name": "syndicate_utils", "description": "Utilites for Syndicated Actors and Synit", - "version": "20240624", + "version": "20240625", "authors": [ { "name": "Emery Hemingway" @@ -89,10 +89,10 @@ "type": "library", "bom-ref": "pkg:nim/syndicate", "name": "syndicate", - "version": "20240610", + "version": "trunk", "externalReferences": [ { - "url": "https://git.syndicate-lang.org/ehmry/syndicate-nim/archive/7bbcdb7e7705c2ab54ba0165565813d67aea48b0.tar.gz", + "url": "https://git.syndicate-lang.org/ehmry/syndicate-nim/archive/c21fdb5003417c99b8bb599df03fd7914cba7466.tar.gz", "type": "source-distribution" }, { @@ -107,23 +107,23 @@ }, { "name": "nix:fod:path", - "value": "/nix/store/mldff990wpr0v9v5qh6ggqjmc2mn3n8g-source" + "value": "/nix/store/lw30rzfxk35nzkkp4d53s9nr6xalkg8s-source" }, { "name": "nix:fod:rev", - "value": "7bbcdb7e7705c2ab54ba0165565813d67aea48b0" + "value": "c21fdb5003417c99b8bb599df03fd7914cba7466" }, { "name": "nix:fod:sha256", - "value": "0mb3mrj5dkkqm0xp5hg84c5naaci4mi6mv2jjznfi6i7swp3f7vs" + "value": "0f14w83hpjym23f12brrirqwlib9b7m52m0g63fzmrcl6ig9y915" }, { "name": "nix:fod:url", - "value": "https://git.syndicate-lang.org/ehmry/syndicate-nim/archive/7bbcdb7e7705c2ab54ba0165565813d67aea48b0.tar.gz" + "value": "https://git.syndicate-lang.org/ehmry/syndicate-nim/archive/c21fdb5003417c99b8bb599df03fd7914cba7466.tar.gz" }, { "name": "nix:fod:ref", - "value": "20240610" + "value": "trunk" }, { "name": "nix:fod:srcDir", diff --git a/src/schema/config.nim b/src/schema/config.nim index 2d505b5..7b72d34 100644 --- a/src/schema/config.nim +++ b/src/schema/config.nim @@ -3,6 +3,9 @@ import preserves type + PulseStep* {.preservesRecord: "pulse".} = object + `detail`*: PulseDetail + JsonTranslatorArgumentsField0* {.preservesDictionary.} = object `argv`*: seq[string] `dataspace`* {.preservesEmbedded.}: EmbeddedRef @@ -21,6 +24,12 @@ type `unixaddress`*: UnixAddress + PulseDetail* {.preservesDictionary.} = object + `dither`*: float + `interval`*: float + `period`*: float + `target`* {.preservesEmbedded.}: EmbeddedRef + Base64DecoderArgumentsField0* {.preservesDictionary.} = object `dataspace`* {.preservesEmbedded.}: EmbeddedRef @@ -107,7 +116,8 @@ type PrinterStep* {.preservesRecord: "printer".} = object `field0`*: PrinterStepField0 -proc `$`*(x: JsonTranslatorArguments | SocketAddress | Base64DecoderArguments | +proc `$`*(x: PulseStep | JsonTranslatorArguments | SocketAddress | PulseDetail | + Base64DecoderArguments | SqliteStep | XsltArguments | HttpClientStepDetail | @@ -126,7 +136,9 @@ proc `$`*(x: JsonTranslatorArguments | SocketAddress | Base64DecoderArguments | PrinterStep): string = `$`(toPreserves(x)) -proc encode*(x: JsonTranslatorArguments | SocketAddress | Base64DecoderArguments | +proc encode*(x: PulseStep | JsonTranslatorArguments | SocketAddress | + PulseDetail | + Base64DecoderArguments | SqliteStep | XsltArguments | HttpClientStepDetail | diff --git a/src/syndesizer/pulses.nim b/src/syndesizer/pulses.nim index 187592f..eccd07d 100644 --- a/src/syndesizer/pulses.nim +++ b/src/syndesizer/pulses.nim @@ -1,105 +1,143 @@ # SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense -import std/[options, tables, times] -import preserves, preserves/sugar -import syndicate, syndicate/drivers/timers +import + std/[options, random, tables, times], + pkg/preserves, preserves/sugar, + pkg/syndicate, + pkg/syndicate/protocols/gatekeeper, + pkg/syndicate/drivers/timers import ../schema/[assertions, config] -type PulseEntity {.final.} = ref object of Entity - ## An entity that asserts and retracts observers on a pulse. - self, timers: Cap - target: Entity - period: float - timerHandle: Handle - observers: Table[Handle, AssertionRef] - observePattern: Pattern - observing: bool +type + ProxyEntity {.final.} = ref object of Entity + ## An entity that asserts and retracts observers on a pulse. + self, target: Cap + hold: Table[Handle, Forward] + isActive: bool + Forward = tuple + ass: Value + hand: Handle -proc schedule(turn: Turn; pulse: PulseEntity) = - ## Schedule the next pulse. - ## The next pulse will be schedule using the current time as - ## reference point and not the moment of the previous pulse. - let then = getTime().toUnixFloat()+pulse.period - pulse.timerHandle = publish(turn, pulse.timers, Observe( - pattern: LaterThan ?: { 0: ?then }, - observer: pulse.self, - )) +proc flipOn(proxy: ProxyEntity; turn: Turn) = + assert proxy.isActive == false + proxy.isActive = true + for fwd in proxy.hold.mvalues: + assert fwd.hand == 0.Handle + fwd.hand = publish(turn, proxy.target, fwd.ass) -method publish(pulse: PulseEntity; turn: Turn; ass: AssertionRef; h: Handle) = - ## Publish observers in reponse to assertions. - pulse.timers.target.retract(turn, pulse.timerHandle) - schedule(turn, pulse) - pulse.observing = true - for h, a in pulse.observers.pairs: - pulse.target.publish(turn, a, h) - pulse.target.sync(turn, pulse.self) - -method message(pulse: PulseEntity; turn: Turn; v: AssertionRef) = - ## Retract observers in response to a sync message. - pulse.observing = false - for h in pulse.observers.keys: - pulse.target.retract(turn, h) - -type ProxyEntity {.final.} = ref object of Entity - ## A proxy `Entity` that diverts observers to a `PulseEntity`. - pulse: PulseEntity +proc flipOff(proxy: ProxyEntity; turn: Turn) = + if proxy.isActive: + proxy.isActive = false + for fwd in proxy.hold.mvalues: + assert fwd.hand != 0.Handle + retract(turn, fwd.hand) + fwd.hand = 0.Handle method publish(proxy: ProxyEntity; turn: Turn; ass: AssertionRef; h: Handle) = - ## Proxy assertions that are not observations. - if proxy.pulse.observePattern.matches ass.value: - if proxy.pulse.observers.len == 0: - schedule(turn, proxy.pulse) - proxy.pulse.observers[h] = ass - else: - proxy.pulse.target.publish(turn, ass, h) + var fwd: Forward + fwd.ass = ass.value + if proxy.isActive: + fwd.hand = publish(turn, proxy.target, fwd.ass) + proxy.hold[h] = fwd method retract(proxy: ProxyEntity; turn: Turn; h: Handle) = - ## Retract proxied assertions. - var obs: AssertionRef - if proxy.pulse.observers.pop(h, obs): - if proxy.pulse.observing: - proxy.pulse.target.retract(turn, h) - if proxy.pulse.observers.len == 0: - proxy.pulse.timers.target.retract(turn, proxy.pulse.timerHandle) - else: - proxy.pulse.target.retract(turn, h) + var fwd: Forward + if proxy.hold.pop(h, fwd): + if fwd.hand > 0: + retract(turn, fwd.hand) method message(proxy: ProxyEntity; turn: Turn; v: AssertionRef) = - ## Proxy mesages. - proxy.pulse.target.message(turn, v) + ## Messages passthru. + message(turn, proxy.target, v.value) method sync(proxy: ProxyEntity; turn: Turn; peer: Cap) = - ## Proxy sync. - proxy.pulse.target.sync(turn, peer) + ## Sync passthru. + sync(turn, proxy.target, peer) -proc newProxyEntity(turn: Turn; timers, ds: Cap; period: float): ProxyEntity = - new result - result.pulse = PulseEntity( - target: ds.target, - timers: timers, - observePattern: ?:Observe, - period: period, +type + PulseEntity {.final.} = ref object of Entity + self, driver: Cap + proxy: ProxyEntity + detail: PulseDetail + timerHandle: Handle + +proc scheduleFlipOn(pulse: PulseEntity; turn: Turn) = + var period: float + while period <= 0.0: + period = gauss(mu = pulse.detail.interval, sigma = pulse.detail.dither) + replace(turn, pulse.driver, pulse.timerHandle, SetTimer( + label: true.toPreserves, + seconds: period, + kind: TimerKind.relative, + peer: pulse.self.embed, + )) + +proc scheduleFlipOff(pulse: PulseEntity; turn: Turn) = + replace(turn, pulse.driver, pulse.timerHandle, SetTimer( + label: false.toPreserves, + seconds: pulse.detail.period, + kind: TimerKind.relative, + peer: pulse.self.embed, + )) + +method message(pulse: PulseEntity; turn: Turn; v: AssertionRef) = + var exp: TimerExpired + if exp.fromPreserves(v.value): + if exp.label.isFalse: + pulse.scheduleFlipOn(turn) + pulse.proxy.flipOff(turn) + else: + pulse.scheduleFlipOff(turn) + pulse.proxy.flipOn(turn) + +proc stop(pulse: PulseEntity, turn: Turn) = + if pulse.proxy.isActive: + pulse.proxy.flipOff(turn) + retract(turn, pulse.timerHandle) + # TODO: is this automatic? + +proc newPulseEntity(turn: Turn; detail: PulseDetail; timerDriver: Cap): PulseEntity = + if not (detail.target of Cap): + raise newException(ValueError, "pulse target is not an embedded Cap") + result = PulseEntity( + facet: turn.facet, + driver: timerDriver, + detail: detail, + proxy: ProxyEntity( + facet: turn.facet, + target: detail.target.Cap, + ) ) - result.pulse.self = newCap(turn, result.pulse) + result.proxy.self = newCap(turn, result.proxy) + result.self = newCap(turn, result) -proc spawnPulseActor*(turn: Turn; root: Cap): Actor = - ## Spawn an actor that retracts and re-asserts observers on - ## a timed pulse. Requires a timer service on the `root` capability. +proc spawnPulseActor*(turn: Turn; relay: Cap): Actor {.discardable.} = spawnActor(turn, "pulse") do (turn: Turn): - let grabPeriod = observePattern(!Pulse, { @[%0]: grab() }) - during(turn, root, ?:PulseArguments) do (ds: Cap): - during(turn, ds, grabPeriod) do (lit: Literal[float]): - if lit.value < 0.000_1: - stderr.writeLine("pulse period is too small: ", lit.value, "s") - else: - let proxyCap = newCap(turn, newProxyEntity(turn, root, ds, lit.value)) - var pulse = Pulse(periodSec: lit.value, proxy: embed proxyCap) - discard publish(turn, ds, pulse) + let timerDriver = turn.newDataspace() + spawnTimerDriver(turn, timerDriver) + let resolvePat = Resolve?:{ 0: PulseStep.grabWithin, 1: grab() } + during(turn, relay, resolvePat) do (detail: PulseDetail; observer: Cap): + var pulse: PulseEntity + if detail.period < 0.000_0001 or + detail.interval < detail.period or + detail.interval < detail.dither: + var r = Resolved(orKind: ResolvedKind.Rejected) + r.rejected.detail = "invalid pulse parameters".toPreserves + discard publish(turn, observer, r) + else: + randomize() + pulse = turn.newPulseEntity(detail, timerDriver) + discard publish(turn, observer, ResolvedAccepted( + responderSession: pulse.proxy.self)) + pulse.scheduleFlipOn(turn) + do: + if not pulse.isNil: + pulse.stop(turn) when isMainModule: import syndicate/relays runActor("main") do (turn: Turn): - resolveEnvironment(turn) do (turn: Turn; ds: Cap): - discard spawnPulseActor(turn, ds) + resolveEnvironment(turn) do (turn: Turn; relay: Cap): + spawnPulseActor(turn, relay)