Compare commits

...

4 Commits

1 changed files with 36 additions and 16 deletions

View File

@ -2,7 +2,7 @@
# SPDX-License-Identifier: Unlicense # SPDX-License-Identifier: Unlicense
import std/[httpcore, options, parseutils, sets, streams, strutils, tables, times, uri] import std/[httpcore, options, parseutils, sets, streams, strutils, tables, times, uri]
import preserves, ../../syndicate, ../bags import preserves, ../../syndicate, ../bags, ./timers
import ../protocols/http import ../protocols/http
import taps import taps
@ -25,7 +25,7 @@ proc extractQuery(s: var string): Table[Symbol, seq[QueryValue]] =
let start = succ skipUntil(s, '?') let start = succ skipUntil(s, '?')
if start < s.len: if start < s.len:
var query = s[start..s.high] var query = s[start..s.high]
s.setLen(start) s.setLen(pred start)
for key, val in uri.decodeQuery(query): for key, val in uri.decodeQuery(query):
var list = result.getOrDefault(Symbol key) var list = result.getOrDefault(Symbol key)
list.add QueryValue(orKind: QueryValueKind.string, string: val) list.add QueryValue(orKind: QueryValueKind.string, string: val)
@ -114,9 +114,10 @@ proc lenLine(chunk: Chunk): string =
type type
Driver = ref object Driver = ref object
facet: Facet facet: Facet
ds: Cap ds, timers: Cap
bindings: Bag[Value] bindings: Bag[Value]
# cannot make a bag of HttpBinding, no `==` operator # cannot make a bag of HttpBinding, no `==` operator
sequenceNumber: BiggestInt
Session = ref object Session = ref object
facet: Facet facet: Facet
driver: Driver driver: Driver
@ -127,6 +128,7 @@ type
req: HttpRequest req: HttpRequest
stream: StringStream stream: StringStream
mode: HttpResponseKind mode: HttpResponseKind
active: bool
proc send[T: byte|char](ses: Session; data: openarray[T]) = proc send[T: byte|char](ses: Session; data: openarray[T]) =
ses.conn.send(addr data[0], data.len, endOfMessage = false) ses.conn.send(addr data[0], data.len, endOfMessage = false)
@ -186,6 +188,7 @@ method message(e: Exchange; turn: var Turn; a: AssertionRef) =
of HttpResponseKind.status: of HttpResponseKind.status:
if e.mode == res.orKind: if e.mode == res.orKind:
e.active = true
e.ses.conn.startBatch() e.ses.conn.startBatch()
e.stream.write( e.stream.write(
SupportedVersion, " ", res.status.code, " ", res.status.message, CRLF, SupportedVersion, " ", res.status.code, " ", res.status.message, CRLF,
@ -198,24 +201,27 @@ method message(e: Exchange; turn: var Turn; a: AssertionRef) =
e.stream.write(res.header.name, ": ", res.header.value, CRLF) e.stream.write(res.header.name, ": ", res.header.value, CRLF)
of HttpResponseKind.chunk: of HttpResponseKind.chunk:
if e.mode == HttpResponseKind.header: if res.chunk.chunk.len > 0:
e.stream.write("transfer-encoding: chunked" & CRLF & CRLF) if e.mode == HttpResponseKind.header:
e.ses.send(move e.stream.data) e.stream.write("transfer-encoding: chunked" & CRLF & CRLF)
e.mode = res.orKind e.ses.send(move e.stream.data)
if e.mode == res.orKind: e.mode = res.orKind
e.ses.send(res.chunk.chunk.lenLine) if e.mode == res.orKind:
e.ses.send(res.chunk.chunk) e.ses.send(res.chunk.chunk.lenLine)
e.ses.send(CRLF) e.ses.send(res.chunk.chunk)
e.ses.send(CRLF)
of HttpResponseKind.done: of HttpResponseKind.done:
if e.mode in {HttpResponseKind.header, HttpResponseKind.chunk}: if e.mode in {HttpResponseKind.header, HttpResponseKind.chunk}:
if e.mode == HttpResponseKind.header: if e.mode == HttpResponseKind.header:
e.stream.write("content-length: ", $res.done.chunk.len & CRLF & CRLF) e.stream.write("content-length: ", $res.done.chunk.len & CRLF & CRLF)
e.ses.send(move e.stream.data) e.ses.send(move e.stream.data)
e.ses.send(res.done.chunk) if res.done.chunk.len > 0:
e.ses.send(res.done.chunk)
elif e.mode == HttpResponseKind.chunk: elif e.mode == HttpResponseKind.chunk:
e.ses.send(res.done.chunk.lenLine) e.ses.send(res.done.chunk.lenLine)
e.ses.send(res.done.chunk) if res.done.chunk.len > 0:
e.ses.send(res.done.chunk)
e.ses.send(CRLF & "0" & CRLF & CRLF) e.ses.send(CRLF & "0" & CRLF & CRLF)
e.mode = res.orKind e.mode = res.orKind
e.ses.conn.endBatch() e.ses.conn.endBatch()
@ -235,10 +241,21 @@ proc service(turn: var Turn; exch: Exchange) =
if handler.isNone: if handler.isNone:
stop(turn) stop(turn)
else: else:
publish(turn, handler.get, HttpContext( let
cap = newCap(turn, exch)
ctx = publish(turn, handler.get, HttpContext(
req: exch.req, req: exch.req,
res: embed newCap(turn, exch), res: embed cap,
)) ))
const timeout = initDuration(seconds = 4)
after(turn, exch.ses.driver.timers, timeout) do (turn: var Turn):
if not exch.active:
var res = HttpResponse(orKind: HttpResponseKind.status)
res.status.code = 504
res.status.message = "Binding timeout"
message(turn, cap, res)
res = HttpResponse(orKind: HttpResponseKind.done)
message(turn, cap, res)
proc service(ses: Session) = proc service(ses: Session) =
## Service a connection to an HTTP client. ## Service a connection to an HTTP client.
@ -250,6 +267,8 @@ proc service(ses: Session) =
ses.facet.run do (turn: var Turn): ses.facet.run do (turn: var Turn):
var (n, req) = parseRequest(ses.conn, cast[string](data)) var (n, req) = parseRequest(ses.conn, cast[string](data))
if n > 0: if n > 0:
inc(ses.driver.sequenceNumber)
req.sequenceNumber = ses.driver.sequenceNumber
req.port = BiggestInt ses.port req.port = BiggestInt ses.port
inFacet(turn) do (turn: var Turn): inFacet(turn) do (turn: var Turn):
preventInertCheck(turn) preventInertCheck(turn)
@ -296,7 +315,8 @@ proc httpListen(turn: var Turn; driver: Driver; port: Port): Listener =
listener listener
proc httpDriver(turn: var Turn; ds: Cap) = proc httpDriver(turn: var Turn; ds: Cap) =
let driver = Driver(facet: turn.facet, ds: ds) let driver = Driver(facet: turn.facet, ds: ds, timers: turn.newDataspace)
spawnTimerDriver(turn, driver.timers)
during(turn, ds, HttpBinding?:{ during(turn, ds, HttpBinding?:{
1: grab(), 1: grab(),