Compare commits

...

5 Commits

5 changed files with 346 additions and 87 deletions

View File

@ -374,14 +374,3 @@ Example script:
This utility serializes it's process environment to Preserves and prints it to stdout. This utility serializes it's process environment to Preserves and prints it to stdout.
It can be used to feed the environment variables of a nested child of the Syndicate server back to the server. For example, to retreive the environmental variables that a desktop manager passed on to its children. It can be used to feed the environment variables of a nested child of the Syndicate server back to the server. For example, to retreive the environmental variables that a desktop manager passed on to its children.
## syndump
Utility for printing assertions and messages. Parses the command-line arguments as a pattern, connects a dataspace via `$SYNDICATE_ROUTE`, and writes observations to standard-output. Published assertions are prefixed by the `+` character, retractions by `-`, and messages by `!`.
Example
```sh
# Print patterns in use, filter down with AWK to only the published patterns.
$ FS=':' syndump '<Observe ? _>' | awk -F : '/^+/ { print $2 }'
```

4
src/drivers/Tupfile Normal file
View File

@ -0,0 +1,4 @@
include_rules
NIM_FLAGS += --path:$(TUP_CWD)/../../../taps/pkg
: foreach *.nim | $(SYNDICATE_PROTOCOL) ../<schema> |> !nim_bin |> | {bin}
: foreach {bin} |> !assert_built |>

339
src/drivers/http_driver.nim Normal file
View File

@ -0,0 +1,339 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[httpcore, options, parseutils, sets, streams, strutils, tables, times, uri]
import pkg/sys/ioqueue
import pkg/preserves
import pkg/syndicate
import pkg/syndicate/protocols/http
import taps
import ../schema/config
const
SP = { ' ', '\x09', '\x0b', '\x0c', '\x0d' }
SupportedVersion = "HTTP/1.1"
IMF = initTimeFormat"ddd, dd MMM yyyy HH:mm:ss"
proc echo(args: varargs[string, `$`]) =
stderr.writeLine(args)
proc `$`(b: seq[byte]): string = cast[string](b)
# a Date header on responses must be present if a clock is available
# An upgrade header can be used to switch over to native syndicate protocol.
# Check the response encoding matches or otherwise return 415
type HandlerEntity = ref object of Entity
handler: proc (turn: var Turn; req: HttpRequest; cap: Cap)
method publish(e: HandlerEntity; turn: var Turn; a: AssertionRef; h: Handle) =
var ctx = a.value.preservesTo HttpContext
if ctx.isSome:
var res = ctx.get.res.unembed Cap
if res.isSome:
e.handler(turn, ctx.get.req, res.get)
else:
echo "HandlerEntity got a non-Cap ", ctx.get.res
else:
echo "HandlerEntity got a non-HttpContext ", a.value
proc respond404(turn: var Turn; req: HttpRequest; cap: Cap) =
message(turn, cap, HttpResponse(
orKind: HttpResponseKind.status,
status: HttpResponseStatus(
code: 404,
message: "resource not found",
)))
message(turn, cap, HttpResponse(
orKind: HttpResponseKind.header,
header: HttpResponseHeader(
name: Symbol"content-length",
value: "0",
)))
message(turn, cap, HttpResponse(orKind: HttpResponseKind.done))
proc bind404Handler(turn: var Turn; ds: Cap; port: Port) =
stderr.writeLine "bind 404 handler to ", port
var b: HttpBinding
b.host = HostPattern(orKind: HostPatternKind.any)
b.port = BiggestInt port
b.method = MethodPattern(orKind: MethodPatternKind.any)
b.path = @[PathPatternElement(orKind: PathPatternElementKind.rest)]
b.handler = newCap(turn, HandlerEntity(handler: respond404)).toPreserves
discard publish(turn, ds, b)
proc badRequest(conn: Connection; msg: string) =
conn.send(SupportedVersion & " 400 " & msg, endOfMessage = true)
close(conn)
proc extractQuery(s: var string): Table[Symbol, seq[QueryValue]] =
let start = succ skipUntil(s, '?')
if start < s.len:
var query = s[start..s.high]
s.setLen(start)
for key, val in uri.decodeQuery(query):
var list = result.getOrDefault(Symbol key)
list.add QueryValue(orKind: QueryValueKind.string, string: val)
result[Symbol key] = list
proc parseRequest(conn: Connection; text: string): (int, HttpRequest) =
## Parse an `HttpRequest` request out of a `text` from a `Connection`.
var
token: string
off: int
template advanceSp =
let n = skipWhile(text, SP, off)
if n < 1:
badRequest(conn, "invalid request")
return
inc(off, n)
# method
off.inc parseUntil(text, token, SP, off)
result[1].method = token.toLowerAscii.Symbol
advanceSp()
# target
if text[off] == '/': inc(off) #TODO: always a leading slash?
off.inc parseUntil(text, token, SP, off)
advanceSp()
block:
var version: string
off.inc parseUntil(text, version, SP, off)
advanceSp()
if version != SupportedVersion:
badRequest(conn, "version not supported")
return
result[1].query = extractQuery(token)
result[1].path = split(token, '/')
for p in result[1].path.mitems:
# normalize the path
for i, c in p:
if c in {'A'..'Z'}:
p[i] = char c.ord + 0x20
template advanceLine =
inc off, skipWhile(text, {'\x0d'}, off)
if text.high < off or text[off] != '\x0a':
badRequest(conn, "invalid request")
return
inc off, 1
advanceLine()
while off < text.len:
off.inc parseUntil(text, token, {'\x0d', '\x0a'}, off)
if token == "": break
advanceLine()
var
(key, vals) = httpcore.parseHeader(token)
k = key.toLowerAscii.Symbol
v = result[1].headers.getOrDefault(k)
for e in vals.mitems:
e = e.toLowerAscii
if k == Symbol"host":
result[1].host = e
if v == "": v = move e
else:
v.add ", "
v.add e
if k == Symbol"host":
result[1].host = v
result[1].headers[k] = v
result[0] = off
proc send(conn: Connection; chunk: Chunk) =
case chunk.orKind
of ChunkKind.string:
conn.send(chunk.string, endOfMessage = false)
of ChunkKind.bytes:
conn.send(chunk.bytes, endOfMessage = false)
type
Driver = ref object
facet: Facet
ds: Cap
bindings: seq[HttpBinding]
Session = ref object
facet: Facet
driver: Driver
conn: Connection
port: Port
Exchange = ref object of Entity
ses: Session
req: HttpRequest
stream: StringStream
mode: HttpResponseKind
proc match(b: HttpBinding, r: HttpRequest): bool =
## Check if `HttpBinding` `b` matches `HttpRequest` `r`.
result =
(b.host.orKind == HostPatternKind.any or
b.host.host == r.host) and
(b.port == r.port) and
(b.method.orKind == MethodPatternKind.any or
b.method.specific == r.method)
if result:
for i, p in b.path:
if i > r.path.high: return false
case p.orKind
of PathPatternElementKind.wildcard: discard
of PathPatternElementKind.label:
if p.label != r.path[i]: return false
of PathPatternElementKind.rest:
return i == b.path.high
# return false if ... isn't the last element
proc strongerThan(a, b: HttpBinding): bool =
## Check if `a` is a stronger `HttpBinding` than `b`.
result =
(a.host.orKind != b.host.orKind and
a.host.orKind == HostPatternKind.host) or
(a.method.orKind != b.method.orKind and
a.method.orKind == MethodPatternKind.specific)
if not result:
if a.path.len > b.path.len: return true
for i in a.path.low..b.path.high:
if a.path[i].orKind != b.path[i].orKind and
a.path[i].orKind == PathPatternElementKind.label:
return true
proc match(driver: Driver; req: HttpRequest): Option[HttpBinding] =
for b in driver.bindings:
if b.match req:
if result.isNone or b.strongerThan(result.get):
result = some b
else:
echo b, " does not match ", req
method message(e: Exchange; turn: var Turn; a: AssertionRef) =
# Send responses back into a connection.
var res: HttpResponse
if e.mode != HttpResponseKind.done and res.fromPreserves a.value:
case res.orKind
of HttpResponseKind.status:
if e.mode == res.orKind:
e.stream.writeLine(SupportedVersion, " ", res.status.code, " ", res.status.message)
e.stream.writeLine("date: ", now().format(IMF))
# add Date header automatically - RFC 9110 Section 6.6.1.
e.mode = HttpResponseKind.header
of HttpResponseKind.header:
if e.mode == res.orKind:
e.stream.writeLine(res.header.name, ": ", res.header.value)
of HttpResponseKind.chunk:
if e.mode == HttpResponseKind.header:
e.mode = res.orKind
e.stream.writeLine()
e.ses.conn.send(move e.stream.data, endOfMessage = false)
e.ses.conn.send(res.chunk.chunk)
of HttpResponseKind.done:
if e.mode == HttpResponseKind.header:
e.stream.writeLine()
e.ses.conn.send(move e.stream.data, endOfMessage = false)
e.mode = res.orKind
e.ses.conn.send(res.done.chunk)
stop(turn)
# stop the facet scoped to the exchange
# so that the response capability is withdrawn
proc service(turn: var Turn; exch: Exchange) =
## Service an HTTP message exchange.
var binding = exch.ses.driver.match exch.req
if binding.isNone:
echo "no binding for ", exch.req
stop(turn)
else:
echo "driver matched binding ", binding.get
var handler = binding.get.handler.unembed Cap
if handler.isNone:
stop(turn)
else:
publish(turn, handler.get, HttpContext(
req: exch.req,
res: embed newCap(turn, exch),
))
proc service(ses: Session) =
## Service a connection to an HTTP client.
ses.facet.onStop do (turn: var Turn):
close ses.conn
ses.conn.onClosed do ():
stop ses.facet
ses.conn.onReceivedPartial do (data: seq[byte]; ctx: MessageContext; eom: bool):
ses.facet.run do (turn: var Turn):
var (n, req) = parseRequest(ses.conn, cast[string](data))
if n > 0:
req.port = BiggestInt ses.port
inFacet(turn) do (turn: var Turn):
preventInertCheck(turn)
# start a new facet for this message exchange
turn.service Exchange(
facet: turn.facet,
ses: ses,
req: req,
stream: newStringStream(),
mode: HttpResponseKind.status
)
# ses.conn.receive()
ses.conn.receive()
proc newListener(port: Port): Listener =
var lp = newLocalEndpoint()
lp.with port
listen newPreconnection(local=[lp])
proc httpListen(turn: var Turn; driver: Driver; port: Port) =
let facet = turn.facet
var listener = newListener(port)
# TODO: let listener
listener.onListenError do (err: ref Exception):
terminateFacet(facet, err)
facet.onStop do (turn: var Turn):
stop listener
listener.onConnectionReceived do (conn: Connection):
driver.facet.run do (turn: var Turn):
# start a new turn
linkActor(turn, "http-conn") do (turn: var Turn):
preventInertCheck(turn)
# facet is scoped to the lifetime of the connection
service Session(
facet: turn.facet,
driver: driver,
conn: conn,
port: port,
)
proc httpDriver(turn: var Turn; ds: Cap) =
let driver = Driver(facet: turn.facet, ds: ds)
during(turn, ds, HttpBinding?:{
1: grab(),
}) do (port: BiggestInt):
publish(turn, ds, HttpListener(port: port))
during(turn, ds, ?:HttpBinding) do (
ho: HostPattern, po: int, me: MethodPattern, pa: PathPattern, e: Value):
let b = HttpBinding(host: ho, port: po, `method`: me, path: pa, handler: e)
driver.bindings.add b
do:
raiseAssert("need to remove binding " & $b)
during(turn, ds, ?:HttpListener) do (port: uint16):
bind404Handler(turn, ds, Port port)
httpListen(turn, driver, Port port)
proc spawnHttpDriver*(turn: var Turn; ds: Cap) =
during(turn, ds, ?:HttpDriverArguments) do (ds: Cap):
spawnActor("http-driver", turn) do (turn: var Turn):
httpDriver(turn, ds)
when isMainModule:
import syndicate/relays
runActor("main") do (turn: var Turn):
resolveEnvironment(turn, spawnHttpDriver)

View File

@ -1,73 +0,0 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[os, tables]
import preserves, syndicate, syndicate/[durings, relays]
proc parsePattern(pr: Value): Pattern =
let
dropSigil = initRecord("lit", "_".toSymbol)
grabSigil = initRecord("lit", "?".toSymbol)
var pr = grab(pr).toPreserves
apply(pr) do (pr: var Value):
if pr == dropSigil:
pr = initRecord("_")
elif pr == grabSigil:
pr = initRecord("bind", initRecord("_"))
doAssert result.fromPreserves(pr)
proc inputPatterns: seq[Pattern] =
var args = commandLineParams()
result.setLen(args.len)
for i, input in args:
try: result[i] = input.parsePreserves.parsePattern
except ValueError:
quit "failed to parse Preserves argument"
type DumpEntity {.final.} = ref object of Entity
assertions: Table[Handle, seq[Value]]
proc toLine(values: seq[Value]; prefix: char): string =
result = newStringOfCap(1024)
let sep = getEnv("FS", " ")
result.add(prefix)
for v in values:
add(result, sep)
add(result, $v)
add(result, '\n')
method publish(dump: DumpEntity; turn: var Turn; ass: AssertionRef; h: Handle) =
var values = ass.value.sequence
stdout.write(values.toLine('+'))
stdout.flushFile()
dump.assertions[h] = values
method retract(dump: DumpEntity; turn: var Turn; h: Handle) =
var values: seq[Value]
if dump.assertions.pop(h, values):
stdout.write(values.toLine('-'))
stdout.flushFile()
method message*(dump: DumpEntity; turn: var Turn; ass: AssertionRef) =
stdout.write(ass.value.sequence.toLine('!'))
stdout.flushFile()
proc exitProc() {.noconv.} =
stdout.write('\n')
quit()
proc main =
let
route = envRoute()
patterns = inputPatterns()
entity = DumpEntity()
runActor("syndex_card") do (root: Cap; turn: var Turn):
for pat in patterns:
discard observe(turn, root, pat, entity)
spawnRelays(turn, root)
resolve(turn, root, route) do (turn: var Turn; ds: Cap):
for pat in patterns:
discard observe(turn, ds, pat, entity)
setControlCHook(exitProc)
main()

View File

@ -1,13 +1,13 @@
# Package # Package
version = "20240209" version = "20240319"
author = "Emery Hemingway" author = "Emery Hemingway"
description = "Utilites for Syndicated Actors and Synit" description = "Utilites for Syndicated Actors and Synit"
license = "unlicense" license = "unlicense"
srcDir = "src" srcDir = "src"
bin = @["mintsturdyref", "mount_actor", "msg", "net_mapper", "preserve_process_environment", "syndesizer", "syndex_card", "syndump"] bin = @["mintsturdyref", "mount_actor", "msg", "net_mapper", "preserve_process_environment", "syndesizer", "syndex_card"]
# Dependencies # Dependencies
requires "nim >= 2.0.0", "illwill", "syndicate >= 20240208", "ws" requires "syndicate#b209548f5d15f7391c08fcaec3615ed843f8a410", "https://git.sr.ht/~ehmry/nim_taps#6f1252d0d17cd56fd707b831c893758ddca08755"