Add syndicate to http driver
This commit is contained in:
parent
dbe363052d
commit
3996729824
|
@ -4,6 +4,7 @@
|
|||
import std/[httpcore, options, parseutils, streams, strutils, tables, times, uri]
|
||||
import pkg/sys/ioqueue
|
||||
import pkg/preserves
|
||||
import pkg/syndicate
|
||||
import pkg/syndicate/protocols/http
|
||||
import taps
|
||||
|
||||
|
@ -18,6 +19,38 @@ proc `$`(b: seq[byte]): string = cast[string](b)
|
|||
|
||||
# Check the response encoding matches or otherwise return 415
|
||||
|
||||
type HandlerEntity = ref object of Entity
|
||||
handler: proc (turn: var Turn; req: HttpRequest; cap: Cap)
|
||||
|
||||
proc dateResponse(): HttpResponse
|
||||
result = HttpResponse(orKind: HttpResponseKind.header)
|
||||
result.header.name = "date"
|
||||
result.header.value = now().format(IMF)
|
||||
|
||||
method publish(e: Handler; turn: var Turn; a: AssertionRef; h: Handle) =
|
||||
var ctx = a.value.preservesTo HttpContext
|
||||
if ctx.isSome:
|
||||
var res = ctx.get.res.unembed Cap
|
||||
if res.isSome:
|
||||
e.handler(turn, ctx.get.req, cap)
|
||||
|
||||
proc respond404(turn: var Turn; req: HttpRequest; cap: Cap) =
|
||||
message(turn, cap, HttpResponse(
|
||||
orKind: HttpResponseKind.status,
|
||||
status: HttpResponseStatus(code: "404"),
|
||||
))
|
||||
message(turn, cap, dateResponse())
|
||||
message(turn, cap, HttpResponse(orKind: HttpResponseKind.done))
|
||||
|
||||
proc bind404Handler(turn: var Turn; ds: Cap; port: Port) =
|
||||
var b: HttpBinding
|
||||
b.host = HostPattern(orKind: HostPatternKind.any)
|
||||
b.port = BiggestInt port
|
||||
b.method = MethodPattern(orKind: MethodPatternKind.any)
|
||||
b.path = @[PathPatternElement(orKind: PathPatternElementKind.rest)]
|
||||
p.handler = newCap(turn, HandlerEntity(handler: respond404))
|
||||
discard publish(turn, ds, b)
|
||||
|
||||
const
|
||||
SP = { ' ', '\x09', '\x0b', '\x0c', '\x0d' }
|
||||
SupportedVersion = "HTTP/1.1"
|
||||
|
@ -97,46 +130,111 @@ proc parseRequest(conn: Connection; text: string): (int, HttpRequest) =
|
|||
|
||||
result[0] = off
|
||||
|
||||
proc connectionHandler(conn: Connection) =
|
||||
# linkActor("http-conn") do (turn: var Turn):
|
||||
block:
|
||||
# let facet = turn.facet
|
||||
var seqNum: BiggestInt
|
||||
conn.onClosed do ():
|
||||
# stopActor(facet)
|
||||
echo "connection closing"
|
||||
close(conn)
|
||||
conn.onReceived do (data: seq[byte]; ctx: MessageContext):
|
||||
echo "connection received ", data.len, " bytes"
|
||||
var (n, req) = parseRequest(conn, cast[string](data))
|
||||
if n < 1:
|
||||
close(conn)
|
||||
else:
|
||||
inc seqNum
|
||||
req.sequenceNumber = seqNum
|
||||
var body = $req
|
||||
var stream = newStringStream()
|
||||
stream.writeLine(SupportedVersion, " 200 OK")
|
||||
stream.writeLine("date: ", now().format(IMF))
|
||||
stream.writeLine("content-length: ", body.len)
|
||||
stream.writeLine()
|
||||
stream.write(body)
|
||||
echo "send ", stream.data.len, " bytes"
|
||||
conn.send(stream.data, endOfMessage = true)
|
||||
# TODO: deal with transfer-encoding
|
||||
#[
|
||||
var body = $req
|
||||
var stream = newStringStream()
|
||||
stream.writeLine(SupportedVersion, " 200 OK")
|
||||
stream.writeLine("date: ", now().format(IMF))
|
||||
stream.writeLine("content-length: ", body.len)
|
||||
stream.writeLine()
|
||||
stream.write(body)
|
||||
echo "send ", stream.data.len, " bytes"
|
||||
conn.send(stream.data, endOfMessage = true)
|
||||
]#
|
||||
|
||||
conn.receive()
|
||||
proc runSubfacet(facet: Facet, act: TurnAction) =
|
||||
run(facet) do (t: var Turn): inFacet(t, act)
|
||||
|
||||
type
|
||||
Driver = ref object
|
||||
facet: Facet
|
||||
ds: Cap
|
||||
Session = ref object
|
||||
facet: Facet
|
||||
driver: Driver
|
||||
conn: Connection
|
||||
port: Port
|
||||
Exchange = ref object
|
||||
facet: Facet
|
||||
ses: Session
|
||||
req: HttpRequest
|
||||
handlers: seq[Cap]
|
||||
|
||||
proc listenHttp: Listener =
|
||||
proc service(turn: var Turn; exch: Exchange) =
|
||||
## Service an HTTP message exchange.
|
||||
let pat = HttpService ?:{
|
||||
0: drop(),
|
||||
1: ?ses.port,
|
||||
2: ?req.method,
|
||||
3: grab()
|
||||
4: grab()
|
||||
}
|
||||
onPublish(turn, ses.driver.ds, pat) do (
|
||||
|
||||
proc service(ses: Session) =
|
||||
## Service a connection to an HTTP client.
|
||||
ses.facet.onStop do (turn: var Turn):
|
||||
close ses.conn
|
||||
ses.conn.onClosed do ():
|
||||
stop ses.facet
|
||||
ses.conn.onReceived do (data: seq[byte]; ctx: MessageContext):
|
||||
echo "connection received ", data.len, " bytes"
|
||||
var (n, req) = parseRequest(conn, cast[string](data))
|
||||
echo "parseRequest parsed ", n, " bytes"
|
||||
if n < 1:
|
||||
stop(ses.facet)
|
||||
else:
|
||||
runSubfacet(facet) do (turn: var Turn):
|
||||
service Exchange(
|
||||
facet: turn.facet,
|
||||
ses: session,
|
||||
req: req,
|
||||
)
|
||||
conn.receive()
|
||||
conn.receive()
|
||||
|
||||
proc newListener(port: Port): Listener =
|
||||
var lp = newLocalEndpoint()
|
||||
lp.with Port 80
|
||||
result = listen newPreconnection(local=[lp])
|
||||
lp.with port
|
||||
listen newPreconnection(local=[lp])
|
||||
|
||||
proc main =
|
||||
var listener = listenHttp()
|
||||
listener.onConnectionReceived(connectionHandler)
|
||||
proc httpListen(turn: var Turn; driver: Driver; port: Port) =
|
||||
let listener = newListener(port)
|
||||
turn.facet.onStop do (turn: var Turn):
|
||||
stop listener
|
||||
listener.onConnectionReceived do (conn: Connection):
|
||||
run(facet) do (turn: var Turn):
|
||||
# start a new turn
|
||||
linkActor("http-conn") do (turn: var Turn):
|
||||
# start a new actor
|
||||
service Session(
|
||||
facet: turn.facet,
|
||||
driver: driver,
|
||||
conn: conn,
|
||||
port: port,
|
||||
)
|
||||
|
||||
ioqueue.run()
|
||||
proc httpDriver(turn: var Turn; ds: Cap) =
|
||||
let driver = Driver(facet: turn.facet, ds: ds)
|
||||
|
||||
main()
|
||||
during(turn, ds, ?:HttpListener) do (port: uint16):
|
||||
bind404Handler(turn, ds, Port port)
|
||||
httpListen(turn, driver, Port port)
|
||||
|
||||
during(turn, ds, HttpBinding?:{
|
||||
1: grab(),
|
||||
}) do (port: BiggestInt):
|
||||
publish(turn, ds, HttpListener(port: port))
|
||||
|
||||
publish(turn, ds, HttpListener(port: 80))
|
||||
# TODO: only here for testing
|
||||
|
||||
proc spawnHttpDriver*(turn: var Turn; root: Cap) =
|
||||
during(turn, root, ?HttpDriverArguments) do (ds: Cap):
|
||||
spawnActor("http-driver") do (turn: var Turn):
|
||||
httpDriver(turn, ds)
|
||||
|
||||
when isMainModule:
|
||||
import syndicate/relays
|
||||
runActor("main") do (turn: var Turn):
|
||||
resolveEnvironment(turn, spawnHttpDriver)
|
||||
|
|
Loading…
Reference in New Issue