syndicate_utils/src/postgre_actor.nim

159 lines
6.4 KiB
Nim

# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import preserves, syndicate
import ./schema/[config, sql]
{.passL: "-lpq".}
{.pragma: libpq, header: "libpq-fe.h", importc.}
type
Oid = cuint
PGconn {.libpq.} = ptr object
PGresult {.libpq.} = ptr object
ConnStatusType {.libpq.} = enum
CONNECTION_OK, CONNECTION_BAD, ## Non-blocking mode only below here
##
## The existence of these should never be relied upon - they should only
## be used for user feedback or similar purposes.
##
CONNECTION_STARTED, ## Waiting for connection to be made.
CONNECTION_MADE, ## Connection OK; waiting to send.
CONNECTION_AWAITING_RESPONSE, ## Waiting for a response from the
## postmaster.
CONNECTION_AUTH_OK, ## Received authentication; waiting for
## backend startup.
CONNECTION_SETENV, ## This state is no longer used.
CONNECTION_SSL_STARTUP, ## Negotiating SSL.
CONNECTION_NEEDED, ## Internal state: connect() needed
CONNECTION_CHECK_WRITABLE, ## Checking if session is read-write.
CONNECTION_CONSUME, ## Consuming any extra messages.
CONNECTION_GSS_STARTUP, ## Negotiating GSSAPI.
CONNECTION_CHECK_TARGET, ## Checking target server properties.
CONNECTION_CHECK_STANDBY ## Checking if server is in standby mode.
ExecStatusType = enum
PGRES_EMPTY_QUERY = 0, ## empty query string was executed
PGRES_COMMAND_OK, ## a query command that doesn't return
## anything was executed properly by the
## backend
PGRES_TUPLES_OK, ## a query command that returns tuples was
## executed properly by the backend, PGresult
## contains the result tuples
PGRES_COPY_OUT, ## Copy Out data transfer in progress
PGRES_COPY_IN, ## Copy In data transfer in progress
PGRES_BAD_RESPONSE, ## an unexpected response was recv'd from the
## backend
PGRES_NONFATAL_ERROR, ## notice or warning message
PGRES_FATAL_ERROR, ## query failed
PGRES_COPY_BOTH, ## Copy In/Out data transfer in progress
PGRES_SINGLE_TUPLE, ## single tuple from larger resultset
PGRES_PIPELINE_SYNC, ## pipeline synchronization point
PGRES_PIPELINE_ABORTED ## Command didn't run because of an abort
## earlier in a pipeline
proc PQconnectdbParams(
keywords: cstringArray; values: cstringArray; expand_dbname: cint): PGconn {.libpq.}
proc PQerrorMessage(conn: PGconn): cstring {.libpq.}
proc PQfinish(conn: PGconn) {.libpq.}
proc PQstatus(conn: PGconn): ConnStatusType {.libpq.}
proc PQexec(conn: PGconn; query: cstring): PGresult {.libpq.}
proc PQresultStatus(res: PGresult): ExecStatusType {.libpq.}
proc PQresStatus (status: ExecStatusType): cstring {.libpq.}
proc PQresultErrorMessage(res: PGresult): cstring {.libpq.}
proc PQclear(res: PGresult) {.libpq.}
proc PQntuples(res: PGresult): cint {.libpq.}
proc PQnfields(res: PGresult): cint {.libpq.}
proc PQgetvalue(res: PGresult; tup_num: cint; field_num: cint): cstring {.libpq.}
proc PQftype(res: PGresult; field_num: cint): Oid {.libpq.}
proc PQfsize(res: PGresult; field_num: cint): cint {.libpq.}
# proc PQsocket(conn: PGconn): cint
# proc PQconnectStartParams(
# keywords: cstringArray; values: cstringArray; expand_dbname: cint): PGconn
# TODO: async
proc checkPointer(p: pointer) =
if p.isNil: raise newException(OutOfMemDefect, "Postgres returned nil")
type StringPairs = seq[tuple[key: string, val: string]]
proc splitParams(params: StringPairs): (cstringArray, cstringArray) =
var strings = newSeq[string](params.len)
for i, _ in params: strings[i] = params[i][0]
result[0] = allocCStringArray(strings)
for i, _ in params: strings[i] = params[i][1]
result[1] = allocCStringArray(strings)
proc renderSql(tokens: openarray[Value]): string =
for token in tokens:
if result.len > 0: result.add ' '
case token.kind
of pkSymbol:
result.add token.symbol.string
of pkString:
result.add '\''
result.add token.string
result.add '\''
of pkFloat, pkRegister, pkBigInt:
result.add $token
of pkBoolean:
if token.bool: result.add '1'
else: result.add '0'
else:
return ""
proc spawnPostgreActor*(turn: var Turn; root: Cap): Actor {.discardable.} =
spawn("postgre", turn) do (turn: var Turn):
during(turn, root, ?:PostgreArguments) do (params: StringPairs, ds: Cap):
var
conn: PGconn
statusHandle: Handle
(keys, vals) = splitParams(params)
conn = PQconnectdbParams(keys, vals, 0)
checkPointer(conn)
let
status = PQstatus(conn)
msg = $PQerrorMessage(conn)
statusHandle = publish(turn, ds,
initRecord("status", toSymbol($status), msg.toPreserves))
if status == CONNECTION_OK:
during(turn, ds, ?:Query) do (statement: seq[Value], target: Cap):
var text = renderSql statement
if text == "":
discard publish(turn, ds, SqlError(msg: "invalid statement", context: $statement))
else:
var
res = PQexec(conn, text)
st = PQresultStatus(res)
if st == PGRES_TUPLES_OK or st == PGRES_SINGLE_TUPLE:
let tuples = PQntuples(res)
let fields = PQnfields(res)
if tuples > 0 and fields > 0:
for r in 0..<tuples:
var tupl = initSequence(fields)
for f in 0..<fields:
tupl[f] = toPreserves($PQgetvalue(res, r, f))
discard publish(turn, target, tupl)
else:
discard publish(turn, ds, SqlError(
msg: $PQresStatus(st),
context: $PQresultErrorMessage(res),
))
PQclear(res)
else:
stderr.writeLine "refusing to do anything when status is ", status
do:
deallocCStringArray(keys)
deallocCStringArray(vals)
PQfinish(conn)
when isMainModule:
import syndicate/relays
runActor("main") do (turn: var Turn):
resolveEnvironment(turn) do (turn: var Turn; ds: Cap):
spawnPostgreActor(turn, ds)