Add pulse actor

This commit is contained in:
Emery Hemingway 2024-01-14 12:52:17 +02:00
parent 119d89ff1c
commit 48ce4ac7e0
9 changed files with 159 additions and 9 deletions

View File

@ -153,6 +153,27 @@ $sqlspace ? <example-row ?id ?name> [
] ]
``` ```
### Pulse proxy
A proxy actor that passes assertions and messages to a configured capability but only asserts observations on a a periodic pulse.
This can be used to implement polling behavior.
```
# Example config
let ?ds = dataspace
<require-service <daemon syndesizer>>
? <service-object <daemon syndesizer> ?cap> [
$cap <pulse {dataspace: $ds}>
]
$ds ? <pulse 3600.0 ?proxy> [
$proxy ? <assertion-updated-hourly ?value> [
$log ! <log "-" {assertion-updated-hourly: $value}>
]
]
```
### SQLite ### SQLite
Readonly access to SQLite databases. Asserts rows as records in response to SQL query assertions. Dynamic updates are not implemented. Readonly access to SQLite databases. Asserts rows as records in response to SQL query assertions. Dynamic updates are not implemented.

View File

@ -1,3 +1,8 @@
version 1. version 1.
FileSystemUsage = <file-system-usage @path string @size int>. FileSystemUsage = <file-system-usage @path string @size int>.
# This assertion publishes a dataspace that proxies assertions with
# an exception for <Observe …> which is pulsed every periodSec.
# The pulse resolution is no more than one millisecond.
Pulse = <pulse @periodSec double @proxy #!any>.

View File

@ -28,6 +28,10 @@ PostgreArguments = <postgre {
}>. }>.
PostgreConnectionParameter = [@key string @val string]. PostgreConnectionParameter = [@key string @val string].
PulseArguments = <pulse {
dataspace: #!any
}>.
SqliteArguments = <sqlite { SqliteArguments = <sqlite {
database: string database: string
dataspace: #!any dataspace: #!any

View File

@ -76,12 +76,12 @@
"packages": [ "packages": [
"syndicate" "syndicate"
], ],
"path": "/nix/store/hma19sff6k2bi6qj01yscbynz6x2zvxj-source", "path": "/nix/store/43gx5b3y9s9c8cfa1nmhz5dz5z3jsw0a-source",
"ref": "20240108", "ref": "20240114",
"rev": "3e11884a916c0452c90128c29940856e2d347cb7", "rev": "75d1e33bff21f1da511ca0ea7def4bb492fd96fd",
"sha256": "0n1gbwllwwilz9fp5zyp4054vzcq1p7ddzg02sw8d0vqb1wmpsqm", "sha256": "0b51bj3xpw2jbpxh0ry3d7iqbw6ks2wfnc3jpra6rj3ss4hj749n",
"srcDir": "src", "srcDir": "src",
"url": "https://git.syndicate-lang.org/ehmry/syndicate-nim/archive/3e11884a916c0452c90128c29940856e2d347cb7.tar.gz" "url": "https://git.syndicate-lang.org/ehmry/syndicate-nim/archive/75d1e33bff21f1da511ca0ea7def4bb492fd96fd.tar.gz"
}, },
{ {
"method": "fetchzip", "method": "fetchzip",

View File

@ -3,12 +3,16 @@ import
preserves preserves
type type
Pulse* {.preservesRecord: "pulse".} = object
`periodSec`*: float64
`proxy`* {.preservesEmbedded.}: Value
FileSystemUsage* {.preservesRecord: "file-system-usage".} = object FileSystemUsage* {.preservesRecord: "file-system-usage".} = object
`path`*: string `path`*: string
`size`*: BiggestInt `size`*: BiggestInt
proc `$`*(x: FileSystemUsage): string = proc `$`*(x: Pulse | FileSystemUsage): string =
`$`(toPreserves(x)) `$`(toPreserves(x))
proc encode*(x: FileSystemUsage): seq[byte] = proc encode*(x: Pulse | FileSystemUsage): seq[byte] =
encode(toPreserves(x)) encode(toPreserves(x))

View File

@ -65,6 +65,12 @@ type
PostgreArguments* {.preservesRecord: "postgre".} = object PostgreArguments* {.preservesRecord: "postgre".} = object
`field0`*: PostgreArgumentsField0 `field0`*: PostgreArgumentsField0
PulseArgumentsField0* {.preservesDictionary.} = object
`dataspace`* {.preservesEmbedded.}: EmbeddedRef
PulseArguments* {.preservesRecord: "pulse".} = object
`field0`*: PulseArgumentsField0
Tcp* {.preservesRecord: "tcp".} = object Tcp* {.preservesRecord: "tcp".} = object
`host`*: string `host`*: string
`port`*: BiggestInt `port`*: BiggestInt
@ -78,6 +84,7 @@ proc `$`*(x: WebsocketArguments | JsonTranslatorArguments |
CacheArguments | CacheArguments |
PostgreConnectionParameter | PostgreConnectionParameter |
PostgreArguments | PostgreArguments |
PulseArguments |
Tcp): string = Tcp): string =
`$`(toPreserves(x)) `$`(toPreserves(x))
@ -90,5 +97,6 @@ proc encode*(x: WebsocketArguments | JsonTranslatorArguments |
CacheArguments | CacheArguments |
PostgreConnectionParameter | PostgreConnectionParameter |
PostgreArguments | PostgreArguments |
PulseArguments |
Tcp): seq[byte] = Tcp): seq[byte] =
encode(toPreserves(x)) encode(toPreserves(x))

View File

@ -14,6 +14,7 @@ import ./syndesizer/[
file_system_usage, file_system_usage,
json_socket_translator, json_socket_translator,
json_translator, json_translator,
pulses,
webhooks, webhooks,
websockets] websockets]
@ -30,6 +31,7 @@ runActor("syndesizer") do (turn: var Turn; root: Cap):
discard spawnFileSystemUsageActor(turn, root) discard spawnFileSystemUsageActor(turn, root)
discard spawnJsonSocketTranslator(turn, root) discard spawnJsonSocketTranslator(turn, root)
discard spawnJsonStdioTranslator(turn, root) discard spawnJsonStdioTranslator(turn, root)
discard spawnPulseActor(turn, root)
discard spawnWebhookActor(turn, root) discard spawnWebhookActor(turn, root)
discard spawnWebsocketActor(turn, root) discard spawnWebsocketActor(turn, root)
when withPostgre: when withPostgre:

106
src/syndesizer/pulses.nim Normal file
View File

@ -0,0 +1,106 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[options, tables, times]
import preserves, syndicate,
syndicate/relays,
syndicate/actors/timers
import ../schema/[assertions, config]
type PulseEntity {.final.} = ref object of Entity
## An entity that asserts and retracts observers on a pulse.
self, timers: Cap
target: Entity
period: float
timerHandle: Handle
observers: Table[Handle, AssertionRef]
observePattern: Pattern
observing: bool
proc schedule(turn: var Turn; pulse: PulseEntity) =
## Schedule the next pulse.
## The next pulse will be schedule using the current time as
## reference point and not the moment of the previous pulse.
let then = getTime().toUnixFloat()+pulse.period
pulse.timerHandle = publish(turn, pulse.timers, Observe(
pattern: LaterThan ?: { 0: ?then },
observer: pulse.self,
))
method publish(pulse: PulseEntity; turn: var Turn; ass: AssertionRef; h: Handle) =
## Publish observers in reponse to <later-than …> assertions.
pulse.timers.target.retract(turn, pulse.timerHandle)
schedule(turn, pulse)
pulse.observing = true
for h, a in pulse.observers.pairs:
pulse.target.publish(turn, a, h)
pulse.target.sync(turn, pulse.self)
method message(pulse: PulseEntity; turn: var Turn; v: AssertionRef) =
## Retract observers in response to a sync message.
pulse.observing = false
for h in pulse.observers.keys:
pulse.target.retract(turn, h)
type ProxyEntity {.final.} = ref object of Entity
## A proxy `Entity` that diverts observers to a `PulseEntity`.
pulse: PulseEntity
method publish(proxy: ProxyEntity; turn: var Turn; ass: AssertionRef; h: Handle) =
## Proxy assertions that are not observations.
if proxy.pulse.observePattern.matches ass.value:
if proxy.pulse.observers.len == 0:
schedule(turn, proxy.pulse)
proxy.pulse.observers[h] = ass
else:
proxy.pulse.target.publish(turn, ass, h)
method retract(proxy: ProxyEntity; turn: var Turn; h: Handle) =
## Retract proxied assertions.
var obs: AssertionRef
if proxy.pulse.observers.pop(h, obs):
if proxy.pulse.observing:
proxy.pulse.target.retract(turn, h)
if proxy.pulse.observers.len == 0:
proxy.pulse.timers.target.retract(turn, proxy.pulse.timerHandle)
else:
proxy.pulse.target.retract(turn, h)
method message(proxy: ProxyEntity; turn: var Turn; v: AssertionRef) =
## Proxy mesages.
proxy.pulse.target.message(turn, v)
method sync(proxy: ProxyEntity; turn: var Turn; peer: Cap) =
## Proxy sync.
proxy.pulse.target.sync(turn, peer)
proc newProxyEntity(turn: var Turn; timers, ds: Cap; period: float): ProxyEntity =
new result
result.pulse = PulseEntity(
target: ds.target,
timers: timers,
observePattern: ?:Observe,
period: period,
)
result.pulse.self = newCap(turn, result.pulse)
proc spawnPulseActor*(turn: var Turn; root: Cap): Actor =
## Spawn an actor that retracts and re-asserts observers on
## a timed pulse. Requires a timer service on the `root` capability.
spawn("pulse", turn) do (turn: var Turn):
let grabPeriod = ?Observe(pattern: !Pulse) ?? { 0: grab() }
during(turn, root, ?:PulseArguments) do (ds: Cap):
during(turn, ds, grabPeriod) do (lit: Literal[float]):
if lit.value < 0.000_1:
stderr.writeLine("pulse period is too small: ", lit.value, "s")
else:
let proxyCap = newCap(turn, newProxyEntity(turn, root, ds, lit.value))
var pulse = Pulse(periodSec: lit.value, proxy: embed proxyCap)
discard publish(turn, ds, pulse)
when isMainModule:
runActor("main") do (turn: var Turn; root: Cap):
spawnTimers(turn, root)
connectStdio(turn, root)
discard spawnPulseActor(turn, root)

View File

@ -1,6 +1,6 @@
# Package # Package
version = "20240110" version = "20240114"
author = "Emery Hemingway" author = "Emery Hemingway"
description = "Utilites for Syndicated Actors and Synit" description = "Utilites for Syndicated Actors and Synit"
license = "unlicense" license = "unlicense"
@ -10,4 +10,4 @@ bin = @["mintsturdyref", "mount_actor", "msg", "net_mapper", "preserve
# Dependencies # Dependencies
requires "nim >= 2.0.0", "illwill", "syndicate >= 20240108", "ws" requires "nim >= 2.0.0", "illwill", "syndicate >= 20240114", "ws"