Replace pulse actor protocol

This commit is contained in:
Emery Hemingway 2024-06-25 18:11:27 +03:00
parent b6e36b7e1c
commit 152fb258f3
5 changed files with 185 additions and 101 deletions

View File

@ -158,23 +158,41 @@ let ?ds = dataspace
### Pulse proxy ### Pulse proxy
A proxy actor that passes assertions and messages to a configured capability but only asserts observations on a a periodic pulse. An actor that produces proxies that accept assertions put only forwards them during a pulse window.
This can be used to implement polling behavior. This can be used to implement polling behavior or periodic service scheduling.
``` ```
# Example config #!/usr/bin/env -S syndicate-server --control --config
let ?ds = dataspace
let ?destination = dataspace
$destination ? ?x [
$log ! <log "destination" { +: $x }>
?- $log ! <log "destination" { -: $x }>
]
? <pulsator ?pulsator> [
$log ! <log "pulsator" { line: $pulsator }>
$pulsator <greeting "hello world">
]
<require-service <daemon syndesizer>> <require-service <daemon syndesizer>>
let ?resolver = <* $config [<rewrite <accepted ?cap> <pulsator $cap>>]>
? <service-object <daemon syndesizer> ?cap> [ ? <service-object <daemon syndesizer> ?cap> [
$cap <pulse {dataspace: $ds}> $log ! <log "service-object" { line: $cap }>
$cap <resolve <pulse {
target: $destination
interval: 4.0 # Interval between pulses.
period: 1.0 # Duration of pulse window.
dither: 2.0 # Gaussian deviation applied to each interval.
}> $resolver>
] ]
$ds ? <pulse 3600.0 ?proxy> [ <daemon syndesizer {
$proxy ? <assertion-updated-hourly ?value> [ argv: [ "/bin/syndesizer" ]
$log ! <log "-" {assertion-updated-hourly: $value}> clearEnv: #t
] protocol: application/syndicate
] }>
``` ```
### SQLite ### SQLite

View File

@ -47,6 +47,22 @@ PostgreConnectionParameter = [@key string @val string].
PrinterStep = <printer {}> . PrinterStep = <printer {}> .
PulseStep = <pulse @detail PulseDetail> .
PulseDetail = {
# Destination for assertions.
target: #:any
# Interval in seconds at which assertions are forwarded.
interval: float
# Period in seconds of assertion.
period: float
# Dither the @interval with a Gaussian deviation of @dither.
dither: float
} .
PulseArguments = <pulse { PulseArguments = <pulse {
dataspace: #:any dataspace: #:any
}>. }>.

View File

@ -7,7 +7,7 @@
"bom-ref": "pkg:nim/syndicate_utils", "bom-ref": "pkg:nim/syndicate_utils",
"name": "syndicate_utils", "name": "syndicate_utils",
"description": "Utilites for Syndicated Actors and Synit", "description": "Utilites for Syndicated Actors and Synit",
"version": "20240624", "version": "20240625",
"authors": [ "authors": [
{ {
"name": "Emery Hemingway" "name": "Emery Hemingway"
@ -89,10 +89,10 @@
"type": "library", "type": "library",
"bom-ref": "pkg:nim/syndicate", "bom-ref": "pkg:nim/syndicate",
"name": "syndicate", "name": "syndicate",
"version": "20240610", "version": "trunk",
"externalReferences": [ "externalReferences": [
{ {
"url": "https://git.syndicate-lang.org/ehmry/syndicate-nim/archive/7bbcdb7e7705c2ab54ba0165565813d67aea48b0.tar.gz", "url": "https://git.syndicate-lang.org/ehmry/syndicate-nim/archive/c21fdb5003417c99b8bb599df03fd7914cba7466.tar.gz",
"type": "source-distribution" "type": "source-distribution"
}, },
{ {
@ -107,23 +107,23 @@
}, },
{ {
"name": "nix:fod:path", "name": "nix:fod:path",
"value": "/nix/store/mldff990wpr0v9v5qh6ggqjmc2mn3n8g-source" "value": "/nix/store/lw30rzfxk35nzkkp4d53s9nr6xalkg8s-source"
}, },
{ {
"name": "nix:fod:rev", "name": "nix:fod:rev",
"value": "7bbcdb7e7705c2ab54ba0165565813d67aea48b0" "value": "c21fdb5003417c99b8bb599df03fd7914cba7466"
}, },
{ {
"name": "nix:fod:sha256", "name": "nix:fod:sha256",
"value": "0mb3mrj5dkkqm0xp5hg84c5naaci4mi6mv2jjznfi6i7swp3f7vs" "value": "0f14w83hpjym23f12brrirqwlib9b7m52m0g63fzmrcl6ig9y915"
}, },
{ {
"name": "nix:fod:url", "name": "nix:fod:url",
"value": "https://git.syndicate-lang.org/ehmry/syndicate-nim/archive/7bbcdb7e7705c2ab54ba0165565813d67aea48b0.tar.gz" "value": "https://git.syndicate-lang.org/ehmry/syndicate-nim/archive/c21fdb5003417c99b8bb599df03fd7914cba7466.tar.gz"
}, },
{ {
"name": "nix:fod:ref", "name": "nix:fod:ref",
"value": "20240610" "value": "trunk"
}, },
{ {
"name": "nix:fod:srcDir", "name": "nix:fod:srcDir",

View File

@ -3,6 +3,9 @@ import
preserves preserves
type type
PulseStep* {.preservesRecord: "pulse".} = object
`detail`*: PulseDetail
JsonTranslatorArgumentsField0* {.preservesDictionary.} = object JsonTranslatorArgumentsField0* {.preservesDictionary.} = object
`argv`*: seq[string] `argv`*: seq[string]
`dataspace`* {.preservesEmbedded.}: EmbeddedRef `dataspace`* {.preservesEmbedded.}: EmbeddedRef
@ -21,6 +24,12 @@ type
`unixaddress`*: UnixAddress `unixaddress`*: UnixAddress
PulseDetail* {.preservesDictionary.} = object
`dither`*: float
`interval`*: float
`period`*: float
`target`* {.preservesEmbedded.}: EmbeddedRef
Base64DecoderArgumentsField0* {.preservesDictionary.} = object Base64DecoderArgumentsField0* {.preservesDictionary.} = object
`dataspace`* {.preservesEmbedded.}: EmbeddedRef `dataspace`* {.preservesEmbedded.}: EmbeddedRef
@ -107,7 +116,8 @@ type
PrinterStep* {.preservesRecord: "printer".} = object PrinterStep* {.preservesRecord: "printer".} = object
`field0`*: PrinterStepField0 `field0`*: PrinterStepField0
proc `$`*(x: JsonTranslatorArguments | SocketAddress | Base64DecoderArguments | proc `$`*(x: PulseStep | JsonTranslatorArguments | SocketAddress | PulseDetail |
Base64DecoderArguments |
SqliteStep | SqliteStep |
XsltArguments | XsltArguments |
HttpClientStepDetail | HttpClientStepDetail |
@ -126,7 +136,9 @@ proc `$`*(x: JsonTranslatorArguments | SocketAddress | Base64DecoderArguments |
PrinterStep): string = PrinterStep): string =
`$`(toPreserves(x)) `$`(toPreserves(x))
proc encode*(x: JsonTranslatorArguments | SocketAddress | Base64DecoderArguments | proc encode*(x: PulseStep | JsonTranslatorArguments | SocketAddress |
PulseDetail |
Base64DecoderArguments |
SqliteStep | SqliteStep |
XsltArguments | XsltArguments |
HttpClientStepDetail | HttpClientStepDetail |

View File

@ -1,105 +1,143 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense # SPDX-License-Identifier: Unlicense
import std/[options, tables, times] import
import preserves, preserves/sugar std/[options, random, tables, times],
import syndicate, syndicate/drivers/timers pkg/preserves, preserves/sugar,
pkg/syndicate,
pkg/syndicate/protocols/gatekeeper,
pkg/syndicate/drivers/timers
import ../schema/[assertions, config] import ../schema/[assertions, config]
type PulseEntity {.final.} = ref object of Entity type
## An entity that asserts and retracts observers on a pulse. ProxyEntity {.final.} = ref object of Entity
self, timers: Cap ## An entity that asserts and retracts observers on a pulse.
target: Entity self, target: Cap
period: float hold: Table[Handle, Forward]
timerHandle: Handle isActive: bool
observers: Table[Handle, AssertionRef] Forward = tuple
observePattern: Pattern ass: Value
observing: bool hand: Handle
proc schedule(turn: Turn; pulse: PulseEntity) = proc flipOn(proxy: ProxyEntity; turn: Turn) =
## Schedule the next pulse. assert proxy.isActive == false
## The next pulse will be schedule using the current time as proxy.isActive = true
## reference point and not the moment of the previous pulse. for fwd in proxy.hold.mvalues:
let then = getTime().toUnixFloat()+pulse.period assert fwd.hand == 0.Handle
pulse.timerHandle = publish(turn, pulse.timers, Observe( fwd.hand = publish(turn, proxy.target, fwd.ass)
pattern: LaterThan ?: { 0: ?then },
observer: pulse.self,
))
method publish(pulse: PulseEntity; turn: Turn; ass: AssertionRef; h: Handle) = proc flipOff(proxy: ProxyEntity; turn: Turn) =
## Publish observers in reponse to <later-than …> assertions. if proxy.isActive:
pulse.timers.target.retract(turn, pulse.timerHandle) proxy.isActive = false
schedule(turn, pulse) for fwd in proxy.hold.mvalues:
pulse.observing = true assert fwd.hand != 0.Handle
for h, a in pulse.observers.pairs: retract(turn, fwd.hand)
pulse.target.publish(turn, a, h) fwd.hand = 0.Handle
pulse.target.sync(turn, pulse.self)
method message(pulse: PulseEntity; turn: Turn; v: AssertionRef) =
## Retract observers in response to a sync message.
pulse.observing = false
for h in pulse.observers.keys:
pulse.target.retract(turn, h)
type ProxyEntity {.final.} = ref object of Entity
## A proxy `Entity` that diverts observers to a `PulseEntity`.
pulse: PulseEntity
method publish(proxy: ProxyEntity; turn: Turn; ass: AssertionRef; h: Handle) = method publish(proxy: ProxyEntity; turn: Turn; ass: AssertionRef; h: Handle) =
## Proxy assertions that are not observations. var fwd: Forward
if proxy.pulse.observePattern.matches ass.value: fwd.ass = ass.value
if proxy.pulse.observers.len == 0: if proxy.isActive:
schedule(turn, proxy.pulse) fwd.hand = publish(turn, proxy.target, fwd.ass)
proxy.pulse.observers[h] = ass proxy.hold[h] = fwd
else:
proxy.pulse.target.publish(turn, ass, h)
method retract(proxy: ProxyEntity; turn: Turn; h: Handle) = method retract(proxy: ProxyEntity; turn: Turn; h: Handle) =
## Retract proxied assertions. var fwd: Forward
var obs: AssertionRef if proxy.hold.pop(h, fwd):
if proxy.pulse.observers.pop(h, obs): if fwd.hand > 0:
if proxy.pulse.observing: retract(turn, fwd.hand)
proxy.pulse.target.retract(turn, h)
if proxy.pulse.observers.len == 0:
proxy.pulse.timers.target.retract(turn, proxy.pulse.timerHandle)
else:
proxy.pulse.target.retract(turn, h)
method message(proxy: ProxyEntity; turn: Turn; v: AssertionRef) = method message(proxy: ProxyEntity; turn: Turn; v: AssertionRef) =
## Proxy mesages. ## Messages passthru.
proxy.pulse.target.message(turn, v) message(turn, proxy.target, v.value)
method sync(proxy: ProxyEntity; turn: Turn; peer: Cap) = method sync(proxy: ProxyEntity; turn: Turn; peer: Cap) =
## Proxy sync. ## Sync passthru.
proxy.pulse.target.sync(turn, peer) sync(turn, proxy.target, peer)
proc newProxyEntity(turn: Turn; timers, ds: Cap; period: float): ProxyEntity = type
new result PulseEntity {.final.} = ref object of Entity
result.pulse = PulseEntity( self, driver: Cap
target: ds.target, proxy: ProxyEntity
timers: timers, detail: PulseDetail
observePattern: ?:Observe, timerHandle: Handle
period: period,
proc scheduleFlipOn(pulse: PulseEntity; turn: Turn) =
var period: float
while period <= 0.0:
period = gauss(mu = pulse.detail.interval, sigma = pulse.detail.dither)
replace(turn, pulse.driver, pulse.timerHandle, SetTimer(
label: true.toPreserves,
seconds: period,
kind: TimerKind.relative,
peer: pulse.self.embed,
))
proc scheduleFlipOff(pulse: PulseEntity; turn: Turn) =
replace(turn, pulse.driver, pulse.timerHandle, SetTimer(
label: false.toPreserves,
seconds: pulse.detail.period,
kind: TimerKind.relative,
peer: pulse.self.embed,
))
method message(pulse: PulseEntity; turn: Turn; v: AssertionRef) =
var exp: TimerExpired
if exp.fromPreserves(v.value):
if exp.label.isFalse:
pulse.scheduleFlipOn(turn)
pulse.proxy.flipOff(turn)
else:
pulse.scheduleFlipOff(turn)
pulse.proxy.flipOn(turn)
proc stop(pulse: PulseEntity, turn: Turn) =
if pulse.proxy.isActive:
pulse.proxy.flipOff(turn)
retract(turn, pulse.timerHandle)
# TODO: is this automatic?
proc newPulseEntity(turn: Turn; detail: PulseDetail; timerDriver: Cap): PulseEntity =
if not (detail.target of Cap):
raise newException(ValueError, "pulse target is not an embedded Cap")
result = PulseEntity(
facet: turn.facet,
driver: timerDriver,
detail: detail,
proxy: ProxyEntity(
facet: turn.facet,
target: detail.target.Cap,
)
) )
result.pulse.self = newCap(turn, result.pulse) result.proxy.self = newCap(turn, result.proxy)
result.self = newCap(turn, result)
proc spawnPulseActor*(turn: Turn; root: Cap): Actor = proc spawnPulseActor*(turn: Turn; relay: Cap): Actor {.discardable.} =
## Spawn an actor that retracts and re-asserts observers on
## a timed pulse. Requires a timer service on the `root` capability.
spawnActor(turn, "pulse") do (turn: Turn): spawnActor(turn, "pulse") do (turn: Turn):
let grabPeriod = observePattern(!Pulse, { @[%0]: grab() }) let timerDriver = turn.newDataspace()
during(turn, root, ?:PulseArguments) do (ds: Cap): spawnTimerDriver(turn, timerDriver)
during(turn, ds, grabPeriod) do (lit: Literal[float]): let resolvePat = Resolve?:{ 0: PulseStep.grabWithin, 1: grab() }
if lit.value < 0.000_1: during(turn, relay, resolvePat) do (detail: PulseDetail; observer: Cap):
stderr.writeLine("pulse period is too small: ", lit.value, "s") var pulse: PulseEntity
else: if detail.period < 0.000_0001 or
let proxyCap = newCap(turn, newProxyEntity(turn, root, ds, lit.value)) detail.interval < detail.period or
var pulse = Pulse(periodSec: lit.value, proxy: embed proxyCap) detail.interval < detail.dither:
discard publish(turn, ds, pulse) var r = Resolved(orKind: ResolvedKind.Rejected)
r.rejected.detail = "invalid pulse parameters".toPreserves
discard publish(turn, observer, r)
else:
randomize()
pulse = turn.newPulseEntity(detail, timerDriver)
discard publish(turn, observer, ResolvedAccepted(
responderSession: pulse.proxy.self))
pulse.scheduleFlipOn(turn)
do:
if not pulse.isNil:
pulse.stop(turn)
when isMainModule: when isMainModule:
import syndicate/relays import syndicate/relays
runActor("main") do (turn: Turn): runActor("main") do (turn: Turn):
resolveEnvironment(turn) do (turn: Turn; ds: Cap): resolveEnvironment(turn) do (turn: Turn; relay: Cap):
discard spawnPulseActor(turn, ds) spawnPulseActor(turn, relay)