Compare commits
5 Commits
trunk
...
http_drive
Author | SHA1 | Date |
---|---|---|
Emery Hemingway | 16926a789e | |
Emery Hemingway | 128df6dc03 | |
Emery Hemingway | 3996729824 | |
Emery Hemingway | dbe363052d | |
Emery Hemingway | 0dc419ebab |
11
README.md
11
README.md
|
@ -374,14 +374,3 @@ Example script:
|
||||||
|
|
||||||
This utility serializes it's process environment to Preserves and prints it to stdout.
|
This utility serializes it's process environment to Preserves and prints it to stdout.
|
||||||
It can be used to feed the environment variables of a nested child of the Syndicate server back to the server. For example, to retreive the environmental variables that a desktop manager passed on to its children.
|
It can be used to feed the environment variables of a nested child of the Syndicate server back to the server. For example, to retreive the environmental variables that a desktop manager passed on to its children.
|
||||||
|
|
||||||
|
|
||||||
## syndump
|
|
||||||
|
|
||||||
Utility for printing assertions and messages. Parses the command-line arguments as a pattern, connects a dataspace via `$SYNDICATE_ROUTE`, and writes observations to standard-output. Published assertions are prefixed by the `+` character, retractions by `-`, and messages by `!`.
|
|
||||||
|
|
||||||
Example
|
|
||||||
```sh
|
|
||||||
# Print patterns in use, filter down with AWK to only the published patterns.
|
|
||||||
$ FS=':' syndump '<Observe ? _>' | awk -F : '/^+/ { print $2 }'
|
|
||||||
```
|
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
include_rules
|
||||||
|
NIM_FLAGS += --path:$(TUP_CWD)/../../../taps/pkg
|
||||||
|
: foreach *.nim | $(SYNDICATE_PROTOCOL) ../<schema> |> !nim_bin |> | {bin}
|
||||||
|
: foreach {bin} |> !assert_built |>
|
|
@ -0,0 +1,339 @@
|
||||||
|
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||||
|
# SPDX-License-Identifier: Unlicense
|
||||||
|
|
||||||
|
import std/[httpcore, options, parseutils, sets, streams, strutils, tables, times, uri]
|
||||||
|
import pkg/sys/ioqueue
|
||||||
|
import pkg/preserves
|
||||||
|
import pkg/syndicate
|
||||||
|
import pkg/syndicate/protocols/http
|
||||||
|
import taps
|
||||||
|
import ../schema/config
|
||||||
|
|
||||||
|
const
|
||||||
|
SP = { ' ', '\x09', '\x0b', '\x0c', '\x0d' }
|
||||||
|
SupportedVersion = "HTTP/1.1"
|
||||||
|
IMF = initTimeFormat"ddd, dd MMM yyyy HH:mm:ss"
|
||||||
|
|
||||||
|
proc echo(args: varargs[string, `$`]) =
|
||||||
|
stderr.writeLine(args)
|
||||||
|
|
||||||
|
proc `$`(b: seq[byte]): string = cast[string](b)
|
||||||
|
|
||||||
|
# a Date header on responses must be present if a clock is available
|
||||||
|
|
||||||
|
# An upgrade header can be used to switch over to native syndicate protocol.
|
||||||
|
|
||||||
|
# Check the response encoding matches or otherwise return 415
|
||||||
|
|
||||||
|
type HandlerEntity = ref object of Entity
|
||||||
|
handler: proc (turn: var Turn; req: HttpRequest; cap: Cap)
|
||||||
|
|
||||||
|
method publish(e: HandlerEntity; turn: var Turn; a: AssertionRef; h: Handle) =
|
||||||
|
var ctx = a.value.preservesTo HttpContext
|
||||||
|
if ctx.isSome:
|
||||||
|
var res = ctx.get.res.unembed Cap
|
||||||
|
if res.isSome:
|
||||||
|
e.handler(turn, ctx.get.req, res.get)
|
||||||
|
else:
|
||||||
|
echo "HandlerEntity got a non-Cap ", ctx.get.res
|
||||||
|
else:
|
||||||
|
echo "HandlerEntity got a non-HttpContext ", a.value
|
||||||
|
|
||||||
|
proc respond404(turn: var Turn; req: HttpRequest; cap: Cap) =
|
||||||
|
message(turn, cap, HttpResponse(
|
||||||
|
orKind: HttpResponseKind.status,
|
||||||
|
status: HttpResponseStatus(
|
||||||
|
code: 404,
|
||||||
|
message: "resource not found",
|
||||||
|
)))
|
||||||
|
message(turn, cap, HttpResponse(
|
||||||
|
orKind: HttpResponseKind.header,
|
||||||
|
header: HttpResponseHeader(
|
||||||
|
name: Symbol"content-length",
|
||||||
|
value: "0",
|
||||||
|
)))
|
||||||
|
message(turn, cap, HttpResponse(orKind: HttpResponseKind.done))
|
||||||
|
|
||||||
|
proc bind404Handler(turn: var Turn; ds: Cap; port: Port) =
|
||||||
|
stderr.writeLine "bind 404 handler to ", port
|
||||||
|
var b: HttpBinding
|
||||||
|
b.host = HostPattern(orKind: HostPatternKind.any)
|
||||||
|
b.port = BiggestInt port
|
||||||
|
b.method = MethodPattern(orKind: MethodPatternKind.any)
|
||||||
|
b.path = @[PathPatternElement(orKind: PathPatternElementKind.rest)]
|
||||||
|
b.handler = newCap(turn, HandlerEntity(handler: respond404)).toPreserves
|
||||||
|
discard publish(turn, ds, b)
|
||||||
|
|
||||||
|
proc badRequest(conn: Connection; msg: string) =
|
||||||
|
conn.send(SupportedVersion & " 400 " & msg, endOfMessage = true)
|
||||||
|
close(conn)
|
||||||
|
|
||||||
|
proc extractQuery(s: var string): Table[Symbol, seq[QueryValue]] =
|
||||||
|
let start = succ skipUntil(s, '?')
|
||||||
|
if start < s.len:
|
||||||
|
var query = s[start..s.high]
|
||||||
|
s.setLen(start)
|
||||||
|
for key, val in uri.decodeQuery(query):
|
||||||
|
var list = result.getOrDefault(Symbol key)
|
||||||
|
list.add QueryValue(orKind: QueryValueKind.string, string: val)
|
||||||
|
result[Symbol key] = list
|
||||||
|
|
||||||
|
proc parseRequest(conn: Connection; text: string): (int, HttpRequest) =
|
||||||
|
## Parse an `HttpRequest` request out of a `text` from a `Connection`.
|
||||||
|
var
|
||||||
|
token: string
|
||||||
|
off: int
|
||||||
|
|
||||||
|
template advanceSp =
|
||||||
|
let n = skipWhile(text, SP, off)
|
||||||
|
if n < 1:
|
||||||
|
badRequest(conn, "invalid request")
|
||||||
|
return
|
||||||
|
inc(off, n)
|
||||||
|
|
||||||
|
# method
|
||||||
|
off.inc parseUntil(text, token, SP, off)
|
||||||
|
result[1].method = token.toLowerAscii.Symbol
|
||||||
|
advanceSp()
|
||||||
|
|
||||||
|
# target
|
||||||
|
if text[off] == '/': inc(off) #TODO: always a leading slash?
|
||||||
|
off.inc parseUntil(text, token, SP, off)
|
||||||
|
advanceSp()
|
||||||
|
|
||||||
|
block:
|
||||||
|
var version: string
|
||||||
|
off.inc parseUntil(text, version, SP, off)
|
||||||
|
advanceSp()
|
||||||
|
if version != SupportedVersion:
|
||||||
|
badRequest(conn, "version not supported")
|
||||||
|
return
|
||||||
|
|
||||||
|
result[1].query = extractQuery(token)
|
||||||
|
|
||||||
|
result[1].path = split(token, '/')
|
||||||
|
for p in result[1].path.mitems:
|
||||||
|
# normalize the path
|
||||||
|
for i, c in p:
|
||||||
|
if c in {'A'..'Z'}:
|
||||||
|
p[i] = char c.ord + 0x20
|
||||||
|
|
||||||
|
template advanceLine =
|
||||||
|
inc off, skipWhile(text, {'\x0d'}, off)
|
||||||
|
if text.high < off or text[off] != '\x0a':
|
||||||
|
badRequest(conn, "invalid request")
|
||||||
|
return
|
||||||
|
inc off, 1
|
||||||
|
|
||||||
|
advanceLine()
|
||||||
|
while off < text.len:
|
||||||
|
off.inc parseUntil(text, token, {'\x0d', '\x0a'}, off)
|
||||||
|
if token == "": break
|
||||||
|
advanceLine()
|
||||||
|
var
|
||||||
|
(key, vals) = httpcore.parseHeader(token)
|
||||||
|
k = key.toLowerAscii.Symbol
|
||||||
|
v = result[1].headers.getOrDefault(k)
|
||||||
|
for e in vals.mitems:
|
||||||
|
e = e.toLowerAscii
|
||||||
|
if k == Symbol"host":
|
||||||
|
result[1].host = e
|
||||||
|
if v == "": v = move e
|
||||||
|
else:
|
||||||
|
v.add ", "
|
||||||
|
v.add e
|
||||||
|
if k == Symbol"host":
|
||||||
|
result[1].host = v
|
||||||
|
result[1].headers[k] = v
|
||||||
|
|
||||||
|
result[0] = off
|
||||||
|
|
||||||
|
proc send(conn: Connection; chunk: Chunk) =
|
||||||
|
case chunk.orKind
|
||||||
|
of ChunkKind.string:
|
||||||
|
conn.send(chunk.string, endOfMessage = false)
|
||||||
|
of ChunkKind.bytes:
|
||||||
|
conn.send(chunk.bytes, endOfMessage = false)
|
||||||
|
|
||||||
|
type
|
||||||
|
Driver = ref object
|
||||||
|
facet: Facet
|
||||||
|
ds: Cap
|
||||||
|
bindings: seq[HttpBinding]
|
||||||
|
Session = ref object
|
||||||
|
facet: Facet
|
||||||
|
driver: Driver
|
||||||
|
conn: Connection
|
||||||
|
port: Port
|
||||||
|
Exchange = ref object of Entity
|
||||||
|
ses: Session
|
||||||
|
req: HttpRequest
|
||||||
|
stream: StringStream
|
||||||
|
mode: HttpResponseKind
|
||||||
|
|
||||||
|
proc match(b: HttpBinding, r: HttpRequest): bool =
|
||||||
|
## Check if `HttpBinding` `b` matches `HttpRequest` `r`.
|
||||||
|
result =
|
||||||
|
(b.host.orKind == HostPatternKind.any or
|
||||||
|
b.host.host == r.host) and
|
||||||
|
(b.port == r.port) and
|
||||||
|
(b.method.orKind == MethodPatternKind.any or
|
||||||
|
b.method.specific == r.method)
|
||||||
|
if result:
|
||||||
|
for i, p in b.path:
|
||||||
|
if i > r.path.high: return false
|
||||||
|
case p.orKind
|
||||||
|
of PathPatternElementKind.wildcard: discard
|
||||||
|
of PathPatternElementKind.label:
|
||||||
|
if p.label != r.path[i]: return false
|
||||||
|
of PathPatternElementKind.rest:
|
||||||
|
return i == b.path.high
|
||||||
|
# return false if ... isn't the last element
|
||||||
|
|
||||||
|
proc strongerThan(a, b: HttpBinding): bool =
|
||||||
|
## Check if `a` is a stronger `HttpBinding` than `b`.
|
||||||
|
result =
|
||||||
|
(a.host.orKind != b.host.orKind and
|
||||||
|
a.host.orKind == HostPatternKind.host) or
|
||||||
|
(a.method.orKind != b.method.orKind and
|
||||||
|
a.method.orKind == MethodPatternKind.specific)
|
||||||
|
if not result:
|
||||||
|
if a.path.len > b.path.len: return true
|
||||||
|
for i in a.path.low..b.path.high:
|
||||||
|
if a.path[i].orKind != b.path[i].orKind and
|
||||||
|
a.path[i].orKind == PathPatternElementKind.label:
|
||||||
|
return true
|
||||||
|
|
||||||
|
proc match(driver: Driver; req: HttpRequest): Option[HttpBinding] =
|
||||||
|
for b in driver.bindings:
|
||||||
|
if b.match req:
|
||||||
|
if result.isNone or b.strongerThan(result.get):
|
||||||
|
result = some b
|
||||||
|
else:
|
||||||
|
echo b, " does not match ", req
|
||||||
|
|
||||||
|
method message(e: Exchange; turn: var Turn; a: AssertionRef) =
|
||||||
|
# Send responses back into a connection.
|
||||||
|
var res: HttpResponse
|
||||||
|
if e.mode != HttpResponseKind.done and res.fromPreserves a.value:
|
||||||
|
case res.orKind
|
||||||
|
of HttpResponseKind.status:
|
||||||
|
if e.mode == res.orKind:
|
||||||
|
e.stream.writeLine(SupportedVersion, " ", res.status.code, " ", res.status.message)
|
||||||
|
e.stream.writeLine("date: ", now().format(IMF))
|
||||||
|
# add Date header automatically - RFC 9110 Section 6.6.1.
|
||||||
|
e.mode = HttpResponseKind.header
|
||||||
|
of HttpResponseKind.header:
|
||||||
|
if e.mode == res.orKind:
|
||||||
|
e.stream.writeLine(res.header.name, ": ", res.header.value)
|
||||||
|
of HttpResponseKind.chunk:
|
||||||
|
if e.mode == HttpResponseKind.header:
|
||||||
|
e.mode = res.orKind
|
||||||
|
e.stream.writeLine()
|
||||||
|
e.ses.conn.send(move e.stream.data, endOfMessage = false)
|
||||||
|
e.ses.conn.send(res.chunk.chunk)
|
||||||
|
of HttpResponseKind.done:
|
||||||
|
if e.mode == HttpResponseKind.header:
|
||||||
|
e.stream.writeLine()
|
||||||
|
e.ses.conn.send(move e.stream.data, endOfMessage = false)
|
||||||
|
e.mode = res.orKind
|
||||||
|
e.ses.conn.send(res.done.chunk)
|
||||||
|
stop(turn)
|
||||||
|
# stop the facet scoped to the exchange
|
||||||
|
# so that the response capability is withdrawn
|
||||||
|
|
||||||
|
proc service(turn: var Turn; exch: Exchange) =
|
||||||
|
## Service an HTTP message exchange.
|
||||||
|
var binding = exch.ses.driver.match exch.req
|
||||||
|
if binding.isNone:
|
||||||
|
echo "no binding for ", exch.req
|
||||||
|
stop(turn)
|
||||||
|
else:
|
||||||
|
echo "driver matched binding ", binding.get
|
||||||
|
var handler = binding.get.handler.unembed Cap
|
||||||
|
if handler.isNone:
|
||||||
|
stop(turn)
|
||||||
|
else:
|
||||||
|
publish(turn, handler.get, HttpContext(
|
||||||
|
req: exch.req,
|
||||||
|
res: embed newCap(turn, exch),
|
||||||
|
))
|
||||||
|
|
||||||
|
proc service(ses: Session) =
|
||||||
|
## Service a connection to an HTTP client.
|
||||||
|
ses.facet.onStop do (turn: var Turn):
|
||||||
|
close ses.conn
|
||||||
|
ses.conn.onClosed do ():
|
||||||
|
stop ses.facet
|
||||||
|
ses.conn.onReceivedPartial do (data: seq[byte]; ctx: MessageContext; eom: bool):
|
||||||
|
ses.facet.run do (turn: var Turn):
|
||||||
|
var (n, req) = parseRequest(ses.conn, cast[string](data))
|
||||||
|
if n > 0:
|
||||||
|
req.port = BiggestInt ses.port
|
||||||
|
inFacet(turn) do (turn: var Turn):
|
||||||
|
preventInertCheck(turn)
|
||||||
|
# start a new facet for this message exchange
|
||||||
|
turn.service Exchange(
|
||||||
|
facet: turn.facet,
|
||||||
|
ses: ses,
|
||||||
|
req: req,
|
||||||
|
stream: newStringStream(),
|
||||||
|
mode: HttpResponseKind.status
|
||||||
|
)
|
||||||
|
# ses.conn.receive()
|
||||||
|
ses.conn.receive()
|
||||||
|
|
||||||
|
proc newListener(port: Port): Listener =
|
||||||
|
var lp = newLocalEndpoint()
|
||||||
|
lp.with port
|
||||||
|
listen newPreconnection(local=[lp])
|
||||||
|
|
||||||
|
proc httpListen(turn: var Turn; driver: Driver; port: Port) =
|
||||||
|
let facet = turn.facet
|
||||||
|
var listener = newListener(port)
|
||||||
|
# TODO: let listener
|
||||||
|
listener.onListenError do (err: ref Exception):
|
||||||
|
terminateFacet(facet, err)
|
||||||
|
facet.onStop do (turn: var Turn):
|
||||||
|
stop listener
|
||||||
|
listener.onConnectionReceived do (conn: Connection):
|
||||||
|
driver.facet.run do (turn: var Turn):
|
||||||
|
# start a new turn
|
||||||
|
linkActor(turn, "http-conn") do (turn: var Turn):
|
||||||
|
preventInertCheck(turn)
|
||||||
|
# facet is scoped to the lifetime of the connection
|
||||||
|
service Session(
|
||||||
|
facet: turn.facet,
|
||||||
|
driver: driver,
|
||||||
|
conn: conn,
|
||||||
|
port: port,
|
||||||
|
)
|
||||||
|
|
||||||
|
proc httpDriver(turn: var Turn; ds: Cap) =
|
||||||
|
let driver = Driver(facet: turn.facet, ds: ds)
|
||||||
|
|
||||||
|
during(turn, ds, HttpBinding?:{
|
||||||
|
1: grab(),
|
||||||
|
}) do (port: BiggestInt):
|
||||||
|
publish(turn, ds, HttpListener(port: port))
|
||||||
|
|
||||||
|
during(turn, ds, ?:HttpBinding) do (
|
||||||
|
ho: HostPattern, po: int, me: MethodPattern, pa: PathPattern, e: Value):
|
||||||
|
let b = HttpBinding(host: ho, port: po, `method`: me, path: pa, handler: e)
|
||||||
|
driver.bindings.add b
|
||||||
|
do:
|
||||||
|
raiseAssert("need to remove binding " & $b)
|
||||||
|
|
||||||
|
during(turn, ds, ?:HttpListener) do (port: uint16):
|
||||||
|
bind404Handler(turn, ds, Port port)
|
||||||
|
httpListen(turn, driver, Port port)
|
||||||
|
|
||||||
|
proc spawnHttpDriver*(turn: var Turn; ds: Cap) =
|
||||||
|
during(turn, ds, ?:HttpDriverArguments) do (ds: Cap):
|
||||||
|
spawnActor("http-driver", turn) do (turn: var Turn):
|
||||||
|
httpDriver(turn, ds)
|
||||||
|
|
||||||
|
when isMainModule:
|
||||||
|
import syndicate/relays
|
||||||
|
runActor("main") do (turn: var Turn):
|
||||||
|
resolveEnvironment(turn, spawnHttpDriver)
|
|
@ -1,73 +0,0 @@
|
||||||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
|
||||||
# SPDX-License-Identifier: Unlicense
|
|
||||||
|
|
||||||
import std/[os, tables]
|
|
||||||
import preserves, syndicate, syndicate/[durings, relays]
|
|
||||||
|
|
||||||
proc parsePattern(pr: Value): Pattern =
|
|
||||||
let
|
|
||||||
dropSigil = initRecord("lit", "_".toSymbol)
|
|
||||||
grabSigil = initRecord("lit", "?".toSymbol)
|
|
||||||
var pr = grab(pr).toPreserves
|
|
||||||
apply(pr) do (pr: var Value):
|
|
||||||
if pr == dropSigil:
|
|
||||||
pr = initRecord("_")
|
|
||||||
elif pr == grabSigil:
|
|
||||||
pr = initRecord("bind", initRecord("_"))
|
|
||||||
doAssert result.fromPreserves(pr)
|
|
||||||
|
|
||||||
proc inputPatterns: seq[Pattern] =
|
|
||||||
var args = commandLineParams()
|
|
||||||
result.setLen(args.len)
|
|
||||||
for i, input in args:
|
|
||||||
try: result[i] = input.parsePreserves.parsePattern
|
|
||||||
except ValueError:
|
|
||||||
quit "failed to parse Preserves argument"
|
|
||||||
|
|
||||||
type DumpEntity {.final.} = ref object of Entity
|
|
||||||
assertions: Table[Handle, seq[Value]]
|
|
||||||
|
|
||||||
proc toLine(values: seq[Value]; prefix: char): string =
|
|
||||||
result = newStringOfCap(1024)
|
|
||||||
let sep = getEnv("FS", " ")
|
|
||||||
result.add(prefix)
|
|
||||||
for v in values:
|
|
||||||
add(result, sep)
|
|
||||||
add(result, $v)
|
|
||||||
add(result, '\n')
|
|
||||||
|
|
||||||
method publish(dump: DumpEntity; turn: var Turn; ass: AssertionRef; h: Handle) =
|
|
||||||
var values = ass.value.sequence
|
|
||||||
stdout.write(values.toLine('+'))
|
|
||||||
stdout.flushFile()
|
|
||||||
dump.assertions[h] = values
|
|
||||||
|
|
||||||
method retract(dump: DumpEntity; turn: var Turn; h: Handle) =
|
|
||||||
var values: seq[Value]
|
|
||||||
if dump.assertions.pop(h, values):
|
|
||||||
stdout.write(values.toLine('-'))
|
|
||||||
stdout.flushFile()
|
|
||||||
|
|
||||||
method message*(dump: DumpEntity; turn: var Turn; ass: AssertionRef) =
|
|
||||||
stdout.write(ass.value.sequence.toLine('!'))
|
|
||||||
stdout.flushFile()
|
|
||||||
|
|
||||||
proc exitProc() {.noconv.} =
|
|
||||||
stdout.write('\n')
|
|
||||||
quit()
|
|
||||||
|
|
||||||
proc main =
|
|
||||||
let
|
|
||||||
route = envRoute()
|
|
||||||
patterns = inputPatterns()
|
|
||||||
entity = DumpEntity()
|
|
||||||
runActor("syndex_card") do (root: Cap; turn: var Turn):
|
|
||||||
for pat in patterns:
|
|
||||||
discard observe(turn, root, pat, entity)
|
|
||||||
spawnRelays(turn, root)
|
|
||||||
resolve(turn, root, route) do (turn: var Turn; ds: Cap):
|
|
||||||
for pat in patterns:
|
|
||||||
discard observe(turn, ds, pat, entity)
|
|
||||||
|
|
||||||
setControlCHook(exitProc)
|
|
||||||
main()
|
|
|
@ -1,13 +1,13 @@
|
||||||
# Package
|
# Package
|
||||||
|
|
||||||
version = "20240209"
|
version = "20240319"
|
||||||
author = "Emery Hemingway"
|
author = "Emery Hemingway"
|
||||||
description = "Utilites for Syndicated Actors and Synit"
|
description = "Utilites for Syndicated Actors and Synit"
|
||||||
license = "unlicense"
|
license = "unlicense"
|
||||||
srcDir = "src"
|
srcDir = "src"
|
||||||
bin = @["mintsturdyref", "mount_actor", "msg", "net_mapper", "preserve_process_environment", "syndesizer", "syndex_card", "syndump"]
|
bin = @["mintsturdyref", "mount_actor", "msg", "net_mapper", "preserve_process_environment", "syndesizer", "syndex_card"]
|
||||||
|
|
||||||
|
|
||||||
# Dependencies
|
# Dependencies
|
||||||
|
|
||||||
requires "nim >= 2.0.0", "illwill", "syndicate >= 20240208", "ws"
|
requires "syndicate#b209548f5d15f7391c08fcaec3615ed843f8a410", "https://git.sr.ht/~ehmry/nim_taps#6f1252d0d17cd56fd707b831c893758ddca08755"
|
||||||
|
|
Loading…
Reference in New Issue