diff --git a/src/syndicate/drivers/http_driver.nim b/src/syndicate/drivers/http_driver.nim new file mode 100644 index 0000000..1f473b9 --- /dev/null +++ b/src/syndicate/drivers/http_driver.nim @@ -0,0 +1,320 @@ +# SPDX-FileCopyrightText: ☭ Emery Hemingway +# SPDX-License-Identifier: Unlicense + +import std/[httpcore, options, parseutils, sets, streams, strutils, tables, times, uri] +import preserves, ../../syndicate, ../bags +import ../protocols/http +import taps + +const + CRLF = "\x0d\x0a" + SP = { ' ', '\x09', '\x0b', '\x0c', '\x0d' } + SupportedVersion = "HTTP/1.1" + IMF = initTimeFormat"ddd, dd MMM yyyy HH:mm:ss" + +when defined(posix): + proc echo(args: varargs[string, `$`]) {.used.} = + stderr.writeLine(args) + +proc `$`(b: seq[byte]): string = cast[string](b) + +proc badRequest(conn: Connection; msg: string) = + conn.send(SupportedVersion & " 400 " & msg, endOfMessage = true) + +proc extractQuery(s: var string): Table[Symbol, seq[QueryValue]] = + let start = succ skipUntil(s, '?') + if start < s.len: + var query = s[start..s.high] + s.setLen(start) + for key, val in uri.decodeQuery(query): + var list = result.getOrDefault(Symbol key) + list.add QueryValue(orKind: QueryValueKind.string, string: val) + result[Symbol key] = list + +proc parseRequest(conn: Connection; text: string): (int, HttpRequest) = + ## Parse an `HttpRequest` request out of a `text` from a `Connection`. + var + token: string + off: int + + template advanceSp = + let n = skipWhile(text, SP, off) + if n < 1: + badRequest(conn, "invalid request") + return + inc(off, n) + + # method + off.inc parseUntil(text, token, SP, off) + result[1].method = token.toLowerAscii.Symbol + advanceSp() + + # target + if text[off] == '/': inc(off) #TODO: always a leading slash? + off.inc parseUntil(text, token, SP, off) + advanceSp() + + block: + var version: string + off.inc parseUntil(text, version, SP, off) + advanceSp() + if version != SupportedVersion: + badRequest(conn, "version not supported") + return + + result[1].query = extractQuery(token) + + if token != "": + result[1].path = split(token, '/') + for p in result[1].path.mitems: + # normalize the path + for i, c in p: + if c in {'A'..'Z'}: + p[i] = char c.ord + 0x20 + + template advanceLine = + inc off, skipWhile(text, {'\x0d'}, off) + if text.high < off or text[off] != '\x0a': + badRequest(conn, "invalid request") + return + inc off, 1 + + advanceLine() + while off < text.len: + off.inc parseUntil(text, token, {'\x0d', '\x0a'}, off) + if token == "": break + advanceLine() + var + (key, vals) = httpcore.parseHeader(token) + k = key.toLowerAscii.Symbol + v = result[1].headers.getOrDefault(k) + for e in vals.mitems: + e = e.toLowerAscii + if k == Symbol"host": + result[1].host = e + if v == "": v = move e + else: + v.add ", " + v.add e + if k == Symbol"host": + result[1].host = v + result[1].headers[k] = v + + result[0] = off + +proc len(chunk: Chunk): int = + case chunk.orKind + of ChunkKind.string: chunk.string.len + of ChunkKind.bytes: chunk.bytes.len + +proc lenLine(chunk: Chunk): string = + result = chunk.len.toHex.strip(true, false, {'0'}) + result.add CRLF + +type + Driver = ref object + facet: Facet + ds: Cap + bindings: Bag[Value] + # cannot make a bag of HttpBinding, no `==` operator + Session = ref object + facet: Facet + driver: Driver + conn: Connection + port: Port + Exchange = ref object of Entity + ses: Session + req: HttpRequest + stream: StringStream + mode: HttpResponseKind + +proc send[T: byte|char](ses: Session; data: openarray[T]) = + ses.conn.send(addr data[0], data.len, endOfMessage = false) + +proc send(ses: Session; chunk: Chunk) = + case chunk.orKind + of ChunkKind.string: + ses.send(chunk.string) + of ChunkKind.bytes: + ses.send(chunk.bytes) + +proc match(b: HttpBinding, r: HttpRequest): bool = + ## Check if `HttpBinding` `b` matches `HttpRequest` `r`. + result = + (b.host.orKind == HostPatternKind.any or + b.host.host == r.host) and + (b.port == r.port) and + (b.method.orKind == MethodPatternKind.any or + b.method.specific == r.method) + if result: + for i, p in b.path: + if i > r.path.high: return false + case p.orKind + of PathPatternElementKind.wildcard: discard + of PathPatternElementKind.label: + if p.label != r.path[i]: return false + of PathPatternElementKind.rest: + return i == b.path.high + # return false if ... isn't the last element + +proc strongerThan(a, b: HttpBinding): bool = + ## Check if `a` is a stronger `HttpBinding` than `b`. + result = + (a.host.orKind != b.host.orKind and + a.host.orKind == HostPatternKind.host) or + (a.method.orKind != b.method.orKind and + a.method.orKind == MethodPatternKind.specific) + if not result: + if a.path.len > b.path.len: return true + for i in b.path.low..a.path.high: + if a.path[i].orKind != b.path[i].orKind and + a.path[i].orKind == PathPatternElementKind.label: + return true + +proc match(driver: Driver; req: HttpRequest): Option[HttpBinding] = + var b: HttpBinding + for p in driver.bindings: + if b.fromPreserves(p) and b.match req: + if result.isNone or b.strongerThan(result.get): + result = some b + +method message(e: Exchange; turn: var Turn; a: AssertionRef) = + # Send responses back into a connection. + var res: HttpResponse + if e.mode != HttpResponseKind.done and res.fromPreserves a.value: + case res.orKind + + of HttpResponseKind.status: + if e.mode == res.orKind: + e.ses.conn.startBatch() + e.stream.write( + SupportedVersion, " ", res.status.code, " ", res.status.message, CRLF, + "date: ", now().format(IMF), CRLF) + # add Date header automatically - RFC 9110 Section 6.6.1. + e.mode = HttpResponseKind.header + + of HttpResponseKind.header: + if e.mode == res.orKind: + e.stream.write(res.header.name, ": ", res.header.value, CRLF) + + of HttpResponseKind.chunk: + if e.mode == HttpResponseKind.header: + e.stream.write("transfer-encoding: chunked" & CRLF & CRLF) + e.ses.send(move e.stream.data) + e.mode = res.orKind + if e.mode == res.orKind: + e.ses.send(res.chunk.chunk.lenLine) + e.ses.send(res.chunk.chunk) + e.ses.send(CRLF) + + of HttpResponseKind.done: + if e.mode in {HttpResponseKind.header, HttpResponseKind.chunk}: + if e.mode == HttpResponseKind.header: + e.stream.write("content-length: ", $res.done.chunk.len & CRLF & CRLF) + e.ses.send(move e.stream.data) + e.ses.send(res.done.chunk) + elif e.mode == HttpResponseKind.chunk: + e.ses.send(res.done.chunk.lenLine) + e.ses.send(res.done.chunk) + e.ses.send(CRLF & "0" & CRLF & CRLF) + e.mode = res.orKind + e.ses.conn.endBatch() + if e.req.headers.getOrDefault(Symbol"connection") == "close": + e.ses.conn.close() + stop(turn) + # stop the facet scoped to the exchange + # so that the response capability is withdrawn + +proc service(turn: var Turn; exch: Exchange) = + ## Service an HTTP message exchange. + var binding = exch.ses.driver.match exch.req + if binding.isNone: + stop(turn) + else: + var handler = binding.get.handler.unembed Cap + if handler.isNone: + stop(turn) + else: + publish(turn, handler.get, HttpContext( + req: exch.req, + res: embed newCap(turn, exch), + )) + +proc service(ses: Session) = + ## Service a connection to an HTTP client. + ses.facet.onStop do (turn: var Turn): + close ses.conn + ses.conn.onClosed do (): + stop ses.facet + ses.conn.onReceivedPartial do (data: seq[byte]; ctx: MessageContext; eom: bool): + ses.facet.run do (turn: var Turn): + var (n, req) = parseRequest(ses.conn, cast[string](data)) + if n > 0: + req.port = BiggestInt ses.port + inFacet(turn) do (turn: var Turn): + preventInertCheck(turn) + # start a new facet for this message exchange + turn.service Exchange( + facet: turn.facet, + ses: ses, + req: req, + stream: newStringStream(), + mode: HttpResponseKind.status + ) + ses.conn.receive() + ses.conn.receive() + +proc newListener(port: Port): Listener = + var lp = newLocalEndpoint() + lp.with port + listen newPreconnection(local=[lp]) + +proc httpListen(turn: var Turn; driver: Driver; port: Port): Listener = + let facet = turn.facet + var listener = newListener(port) + preventInertCheck(turn) + listener.onListenError do (err: ref Exception): + terminateFacet(facet, err) + facet.onStop do (turn: var Turn): + stop listener + listener.onConnectionReceived do (conn: Connection): + driver.facet.run do (turn: var Turn): + # start a new turn + linkActor(turn, "http-conn") do (turn: var Turn): + preventInertCheck(turn) + let facet = turn.facet + conn.onConnectionError do (err: ref Exception): + terminateFacet(facet, err) + # terminate this actor on exception + # facet is scoped to the lifetime of the connection + service Session( + facet: turn.facet, + driver: driver, + conn: conn, + port: port, + ) + listener + +proc httpDriver(turn: var Turn; ds: Cap) = + let driver = Driver(facet: turn.facet, ds: ds) + + during(turn, ds, HttpBinding?:{ + 1: grab(), + }) do (port: BiggestInt): + publish(turn, ds, HttpListener(port: port)) + + during(turn, ds, ?:HttpBinding) do ( + ho: HostPattern, po: int, me: MethodPattern, pa: PathPattern, e: Value): + let b = HttpBinding(host: ho, port: po, `method`: me, path: pa, handler: e) + discard driver.bindings.change(b.toPreserves, +1) + do: + discard driver.bindings.change(b.toPreserves, -1) + + during(turn, ds, ?:HttpListener) do (port: uint16): + let l = httpListen(turn, driver, Port port) + do: + stop(l) + +proc spawnHttpDriver*(turn: var Turn; ds: Cap): Actor {.discardable.} = + spawnActor(turn, "http-driver") do (turn: var Turn): + httpDriver(turn, ds) diff --git a/syndicate.nimble b/syndicate.nimble index adf0b58..9920183 100644 --- a/syndicate.nimble +++ b/syndicate.nimble @@ -1,6 +1,6 @@ # Package -version = "20240403" +version = "20240405" author = "Emery Hemingway" description = "Syndicated actors for conversational concurrency" license = "Unlicense" @@ -9,4 +9,4 @@ srcDir = "src" # Dependencies -requires "https://github.com/ehmry/hashlib.git >= 20231130", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240312", "https://github.com/ehmry/nim-sys.git#4ef3b624db86e331ba334e705c1aa235d55b05e1", "https://git.sr.ht/~ehmry/nim_taps >= 20240402" +requires "https://github.com/ehmry/hashlib.git >= 20231130", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240312", "https://github.com/ehmry/nim-sys.git#4ef3b624db86e331ba334e705c1aa235d55b05e1", "https://git.sr.ht/~ehmry/nim_taps >= 20240405"