106 lines
3.8 KiB
Nim
106 lines
3.8 KiB
Nim
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
|
# SPDX-License-Identifier: Unlicense
|
|
|
|
## An actor for relaying Webhooks.
|
|
|
|
import std/[asyncdispatch, asynchttpserver, net, strutils, tables, uri]
|
|
|
|
import preserves, preserves/jsonhooks
|
|
import syndicate, syndicate/[bags, relays]
|
|
import syndicate/protocols/http
|
|
|
|
import ../schema/config
|
|
|
|
type
|
|
CapBag = Bag[Cap]
|
|
Endpoints = Table[seq[string], Cap]
|
|
|
|
func splitPath(s: string): seq[string] = s.strip(chars={'/'}).split('/')
|
|
|
|
proc toRecord(req: Request; seqnum: BiggestInt; path: seq[string]): Value =
|
|
## Convert a request value from the std/asynchttpserver module
|
|
## to a request type from syndicate/protocols/http.
|
|
var record: HttpRequest
|
|
record.sequenceNumber = seqnum
|
|
record.host = req.hostname
|
|
record.`method` = Symbol($req.reqMethod)
|
|
record.path = path
|
|
for key, val in req.headers.pairs:
|
|
record.headers[Symbol key] = val
|
|
for key, val in decodeQuery(req.url.query):
|
|
record.query[Symbol key] =
|
|
@[QueryValue(orKind: QueryValueKind.string, string: val)]
|
|
let contentType = req.headers.getOrDefault("content-type")
|
|
result = toPreserves record
|
|
if req.body.len > 0:
|
|
result[7] =
|
|
case contentType.toString
|
|
of "application/json":
|
|
req.body.parsePreserves
|
|
of "application/octet-stream":
|
|
cast[seq[byte]](req.body).toPreserves
|
|
else:
|
|
req.body.toPreserves
|
|
|
|
proc spawnWebhookActor*(turn: var Turn; root: Cap): Actor =
|
|
spawn("webhooks", turn) do (turn: var Turn):
|
|
let pat = grabRecord("webhooks", grabDictionary({ "listen": ?:config.Tcp }))
|
|
# Grab the details on listening for requests.
|
|
# Disregard endpoints so the server doesn't restart as those change.
|
|
during(turn, root, pat) do (host: string; port: Port):
|
|
let endpointsPat = grabRecord("webhooks", grabDictionary({
|
|
"listen": ?config.Tcp(host: host, port: BiggestInt port),
|
|
"endpoints": grab(),
|
|
}))
|
|
# construct a pattern for grabbing endpoints when the server is ready
|
|
var seqNum: BiggestInt
|
|
let facet = turn.facet
|
|
let endpoints = newTable[seq[string], CapBag]()
|
|
# use a bag so the same capability registered multiple
|
|
# times with the same path does not get duplicate messages
|
|
|
|
proc cb(req: Request): Future[void] =
|
|
inc(seqNum)
|
|
let path = req.url.path.splitPath
|
|
if not endpoints.hasKey path:
|
|
result = respond(req, Http404,
|
|
"no capabilities registered at $1\n" % [req.url.path])
|
|
else:
|
|
result = respond(req, Http200, "")
|
|
proc act(turn: var Turn) {.gcsafe.} =
|
|
let rec = req.toRecord(seqNum, path)
|
|
for cap in endpoints[path]:
|
|
message(turn, cap, rec)
|
|
run(facet, act)
|
|
|
|
let server = newAsyncHttpServer()
|
|
stderr.writeLine("listening for webhooks at ", host, ":", port)
|
|
if host.isIpAddress:
|
|
var ip = parseIpAddress host
|
|
case ip.family
|
|
of IPv6:
|
|
asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET6))
|
|
of IPv4:
|
|
asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET))
|
|
else:
|
|
asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET6))
|
|
asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET))
|
|
|
|
during(turn, root, endpointsPat) do (eps: Endpoints):
|
|
for path, cap in eps:
|
|
if not endpoints.hasKey path:
|
|
endpoints[path] = CapBag()
|
|
discard endpoints[path].change(cap, +1)
|
|
do:
|
|
for path, cap in eps:
|
|
discard endpoints[path].change(cap, -1)
|
|
|
|
do:
|
|
stderr.writeLine("closing for webhook server at ", host, ":", port)
|
|
close(server)
|
|
|
|
when isMainModule:
|
|
runActor("webhooks") do (turn: var Turn; root: Cap):
|
|
connectStdio(turn, root)
|
|
discard spawnWebhookActor(turn, root)
|