Cleanup and document syndesizer/webhooks

This commit is contained in:
Emery Hemingway 2024-01-08 20:48:04 +02:00
parent da8d7ea345
commit ad3b160e56
5 changed files with 79 additions and 15 deletions

View File

@ -1,5 +1,34 @@
# Syndicate utils
## Syndesizer
A Syndicate multitool. Includes a number of different actors that become active via configuration.
Whether you use a single instance for many protocols or many specialized instances is up to you.
### Webooks
Listens for webhook requests and sends request data to a dataspace as messages.
Request data is formated according to the http schema [defined in syndicate-protocols](https://git.syndicate-lang.org/syndicate-lang/syndicate-protocols/src/branch/main/schemas/http.prs), with the exception that messages bodies may be **bytes**, **string**, or **any** for the `content-type`s of `application/octet-stream`, `text/*`, and `application/json` respectively.
```
# Configuration example
<require-service <daemon syndesizer>>
? <service-object <daemon syndesizer> ?cap> [
$cap <webhooks {
listen: <tcp "0.0.0.0" 1048>
endpoints: {
# http://0.0.0.0:1048/my-endpoint
["my-endpoint"]: $target-dataspace
# http://0.0.0.0:1048/some/multi-element/path
["some", "multi-element", "path"]: $target-dataspace
}
}>
]
```
## cache_actor
An actor that observes patterns and reässerts the values they capture for a given lifetime. Takes the arguments `{ dataspace: #!any lifetime: double }`. The lifetime of a cache counts down from moment a value is asserted.

View File

@ -1,4 +1,5 @@
version 1 .
embeddedType EntityRef.Cap .
JsonTranslatorConnected = <connected @path string>.
@ -6,3 +7,11 @@ JsonSocketTranslatorArguments = {
dataspace: #!any
socket: string
}.
WebhooksArguments = <webhooks {
endpoints: {[string ...]: #!any ...:...}
listen: Tcp
}>.
# Reused from syndicate-protocols/transportAddress
Tcp = <tcp @host string @port int>.

View File

@ -1,18 +1,32 @@
import
preserves
preserves, std/tables
type
JsonTranslatorConnected* {.preservesRecord: "connected".} = object
`path`*: string
JsonSocketTranslatorArguments* {.preservesDictionary.} = object
`dataspace`* {.preservesEmbedded.}: Value
`dataspace`* {.preservesEmbedded.}: EmbeddedRef
`socket`*: string
proc `$`*(x: JsonTranslatorConnected | JsonSocketTranslatorArguments): string =
WebhooksArgumentsField0* {.preservesDictionary.} = object
`endpoints`*: Table[seq[string], EmbeddedRef]
`listen`*: Tcp
WebhooksArguments* {.preservesRecord: "webhooks".} = object
`field0`*: WebhooksArgumentsField0
Tcp* {.preservesRecord: "tcp".} = object
`host`*: string
`port`*: BiggestInt
proc `$`*(x: JsonTranslatorConnected | JsonSocketTranslatorArguments |
WebhooksArguments |
Tcp): string =
`$`(toPreserves(x))
proc encode*(x: JsonTranslatorConnected | JsonSocketTranslatorArguments): seq[
byte] =
proc encode*(x: JsonTranslatorConnected | JsonSocketTranslatorArguments |
WebhooksArguments |
Tcp): seq[byte] =
encode(toPreserves(x))

View File

@ -10,4 +10,4 @@ import ./syndesizer/webhooks
runActor("syndesizer") do (turn: var Turn; root: Cap):
connectStdio(turn, root)
discard spawnTimers(turn, root)
discard bootWebhookActor(turn, root)
discard spawnWebhookActor(turn, root)

View File

@ -7,16 +7,13 @@ import std/[asyncdispatch, asynchttpserver, net, strutils, tables, uri]
import preserves, preserves/jsonhooks
import syndicate, syndicate/[bags, relays]
import syndicate/protocols/[http, transportAddress]
import syndicate/protocols/http
import ../schema/config
type
CapBag = Bag[Cap]
Endpoints = Table[seq[string], Cap]
WebhookArgs {.preservesDictionary.} = object
endpoints: Assertion
listen: Tcp
BootArgs {.preservesDictionary.} = object
webhook: WebhookArgs
func splitPath(s: string): seq[string] = s.strip(chars={'/'}).split('/')
@ -45,9 +42,17 @@ proc toRecord(req: Request; seqnum: BiggestInt; path: seq[string]): Value =
else:
req.body.toPreserves
proc bootWebhookActor*(turn: var Turn; root: Cap): Actor =
proc spawnWebhookActor*(turn: var Turn; root: Cap): Actor =
spawn("webhooks", turn) do (turn: var Turn):
during(turn, root, inject(!BootArgs, {1: grab(), 2: grab()})) do (host: string; port: Port):
let pat = grabRecord("webhooks", grabDictionary({ "listen": ?:config.Tcp }))
# Grab the details on listening for requests.
# Disregard endpoints so the server doesn't restart as those change.
during(turn, root, pat) do (host: string; port: Port):
let endpointsPat = grabRecord("webhooks", grabDictionary({
"listen": ?config.Tcp(host: host, port: BiggestInt port),
"endpoints": grab(),
}))
# construct a pattern for grabbing endpoints when the server is ready
var seqNum: BiggestInt
let facet = turn.facet
let endpoints = newTable[seq[string], CapBag]()
@ -69,6 +74,7 @@ proc bootWebhookActor*(turn: var Turn; root: Cap): Actor =
run(facet, act)
let server = newAsyncHttpServer()
stderr.writeLine("listening for webhooks at ", host, ":", port)
if host.isIpAddress:
var ip = parseIpAddress host
case ip.family
@ -80,7 +86,7 @@ proc bootWebhookActor*(turn: var Turn; root: Cap): Actor =
asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET6))
asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET))
during(turn, root, inject(!BootArgs, {0: grab(), 1: ?host, 2: ?port})) do (eps: Endpoints):
during(turn, root, endpointsPat) do (eps: Endpoints):
for path, cap in eps:
if not endpoints.hasKey path:
endpoints[path] = CapBag()
@ -90,4 +96,10 @@ proc bootWebhookActor*(turn: var Turn; root: Cap): Actor =
discard endpoints[path].change(cap, -1)
do:
stderr.writeLine("closing for webhook server at ", host, ":", port)
close(server)
when isMainModule:
runActor("webhooks") do (turn: var Turn; root: Cap):
connectStdio(turn, root)
discard spawnWebhookActor(turn, root)