# SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense ## An actor for relaying Webhooks. import std/[asyncdispatch, asynchttpserver, net, strutils, tables, uri] import preserves, preserves/jsonhooks import syndicate, syndicate/[bags, relays] import syndicate/protocols/http import ../schema/config type CapBag = Bag[Cap] Endpoints = Table[seq[string], Cap] func splitPath(s: string): seq[string] = s.strip(chars={'/'}).split('/') proc toRecord(req: Request; seqnum: BiggestInt; path: seq[string]): Value = ## Convert a request value from the std/asynchttpserver module ## to a request type from syndicate/protocols/http. var record: HttpRequest record.sequenceNumber = seqnum record.host = req.hostname record.`method` = Symbol($req.reqMethod) record.path = path for key, val in req.headers.pairs: record.headers[Symbol key] = val for key, val in decodeQuery(req.url.query): record.query[Symbol key] = @[QueryValue(orKind: QueryValueKind.string, string: val)] let contentType = req.headers.getOrDefault("content-type") result = toPreserves record if req.body.len > 0: result[7] = case contentType.toString of "application/json": req.body.parsePreserves of "application/octet-stream": cast[seq[byte]](req.body).toPreserves else: req.body.toPreserves proc spawnWebhookActor*(turn: var Turn; root: Cap): Actor = spawn("webhooks", turn) do (turn: var Turn): let pat = grabRecord("webhooks", grabDictionary({ "listen": ?:config.Tcp })) # Grab the details on listening for requests. # Disregard endpoints so the server doesn't restart as those change. during(turn, root, pat) do (host: string; port: Port): let endpointsPat = grabRecord("webhooks", grabDictionary({ "listen": ?config.Tcp(host: host, port: BiggestInt port), "endpoints": grab(), })) # construct a pattern for grabbing endpoints when the server is ready var seqNum: BiggestInt let facet = turn.facet let endpoints = newTable[seq[string], CapBag]() # use a bag so the same capability registered multiple # times with the same path does not get duplicate messages proc cb(req: Request): Future[void] = inc(seqNum) let path = req.url.path.splitPath if not endpoints.hasKey path: result = respond(req, Http404, "no capabilities registered at $1\n" % [req.url.path]) else: result = respond(req, Http200, "") proc act(turn: var Turn) {.gcsafe.} = let rec = req.toRecord(seqNum, path) for cap in endpoints[path]: message(turn, cap, rec) run(facet, act) let server = newAsyncHttpServer() stderr.writeLine("listening for webhooks at ", host, ":", port) if host.isIpAddress: var ip = parseIpAddress host case ip.family of IPv6: asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET6)) of IPv4: asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET)) else: asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET6)) asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET)) during(turn, root, endpointsPat) do (eps: Endpoints): for path, cap in eps: if not endpoints.hasKey path: endpoints[path] = CapBag() discard endpoints[path].change(cap, +1) do: for path, cap in eps: discard endpoints[path].change(cap, -1) do: stderr.writeLine("closing for webhook server at ", host, ":", port) close(server) when isMainModule: runActor("webhooks") do (turn: var Turn; root: Cap): connectStdio(turn, root) discard spawnWebhookActor(turn, root)