syndicate_utils/src/syndesizer/webhooks.nim

106 lines
3.8 KiB
Nim

# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
## An actor for relaying Webhooks.
import std/[asyncdispatch, asynchttpserver, net, strutils, tables, uri]
import preserves, preserves/jsonhooks
import syndicate, syndicate/[bags, relays]
import syndicate/protocols/http
import ../schema/config
type
CapBag = Bag[Cap]
Endpoints = Table[seq[string], Cap]
func splitPath(s: string): seq[string] = s.strip(chars={'/'}).split('/')
proc toRecord(req: Request; seqnum: BiggestInt; path: seq[string]): Value =
## Convert a request value from the std/asynchttpserver module
## to a request type from syndicate/protocols/http.
var record: HttpRequest
record.sequenceNumber = seqnum
record.host = req.hostname
record.`method` = Symbol($req.reqMethod)
record.path = path
for key, val in req.headers.pairs:
record.headers[Symbol key] = val
for key, val in decodeQuery(req.url.query):
record.query[Symbol key] =
@[QueryValue(orKind: QueryValueKind.string, string: val)]
let contentType = req.headers.getOrDefault("content-type")
result = toPreserves record
if req.body.len > 0:
result[7] =
case contentType.toString
of "application/json":
req.body.parsePreserves
of "application/octet-stream":
cast[seq[byte]](req.body).toPreserves
else:
req.body.toPreserves
proc spawnWebhookActor*(turn: var Turn; root: Cap): Actor =
spawn("webhooks", turn) do (turn: var Turn):
let pat = grabRecord("webhooks", grabDictionary({ "listen": ?:config.Tcp }))
# Grab the details on listening for requests.
# Disregard endpoints so the server doesn't restart as those change.
during(turn, root, pat) do (host: string; port: Port):
let endpointsPat = grabRecord("webhooks", grabDictionary({
"listen": ?config.Tcp(host: host, port: BiggestInt port),
"endpoints": grab(),
}))
# construct a pattern for grabbing endpoints when the server is ready
var seqNum: BiggestInt
let facet = turn.facet
let endpoints = newTable[seq[string], CapBag]()
# use a bag so the same capability registered multiple
# times with the same path does not get duplicate messages
proc cb(req: Request): Future[void] =
inc(seqNum)
let path = req.url.path.splitPath
if not endpoints.hasKey path:
result = respond(req, Http404,
"no capabilities registered at $1\n" % [req.url.path])
else:
result = respond(req, Http200, "")
proc act(turn: var Turn) {.gcsafe.} =
let rec = req.toRecord(seqNum, path)
for cap in endpoints[path]:
message(turn, cap, rec)
run(facet, act)
let server = newAsyncHttpServer()
stderr.writeLine("listening for webhooks at ", host, ":", port)
if host.isIpAddress:
var ip = parseIpAddress host
case ip.family
of IPv6:
asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET6))
of IPv4:
asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET))
else:
asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET6))
asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET))
during(turn, root, endpointsPat) do (eps: Endpoints):
for path, cap in eps:
if not endpoints.hasKey path:
endpoints[path] = CapBag()
discard endpoints[path].change(cap, +1)
do:
for path, cap in eps:
discard endpoints[path].change(cap, -1)
do:
stderr.writeLine("closing for webhook server at ", host, ":", port)
close(server)
when isMainModule:
runActor("webhooks") do (turn: var Turn; root: Cap):
connectStdio(turn, root)
discard spawnWebhookActor(turn, root)