Add PostgreSQL actor

This commit is contained in:
Emery Hemingway 2024-01-10 10:16:42 +02:00
parent c89a9a333a
commit 29b19c711c
7 changed files with 192 additions and 2 deletions

View File

@ -123,6 +123,36 @@ let ?ds = dataspace
] ]
``` ```
### PostgreSQL
Readonly access to PostgreSQL databases. Asserts rows as records in response to SQL query assertions. Dynamic updates are not implemented.
Can be disabled by passing `--define:withPostgre=no` to the Nim compiler.
```
# Configuration example
<require-service <daemon syndesizer>>
let ?sqlspace = dataspace
? <service-object <daemon syndesizer> ?cap> [
$cap <postgre {
dataspace: $sqlspace
connection: [
["host" "example.com"]
["dbname" "foobar"]
["user" "hackme"]
]
}>
]
$sqlspace <query example-row "SELECT id, name FROM stuff">
$sqlspace ? <example-row ?id ?name> [
$log ! <log "-" { row: <example-row $id $name> }>
]
```
### SQLite ### SQLite
Readonly access to SQLite databases. Asserts rows as records in response to SQL query assertions. Dynamic updates are not implemented. Readonly access to SQLite databases. Asserts rows as records in response to SQL query assertions. Dynamic updates are not implemented.

View File

@ -22,6 +22,12 @@ JsonSocketTranslatorArguments = <json-socket-translator {
socket: string socket: string
}>. }>.
PostgreArguments = <postgre {
connection: [PostgreConnectionParameter ...]
dataspace: #!any
}>.
PostgreConnectionParameter = [@key string @val string].
SqliteArguments = <sqlite { SqliteArguments = <sqlite {
database: string database: string
dataspace: #!any dataspace: #!any

View File

@ -3,5 +3,5 @@
pkgs.buildNimPackage { pkgs.buildNimPackage {
name = "dummy"; name = "dummy";
propagatedNativeBuildInputs = [ pkgs.pkg-config ]; propagatedNativeBuildInputs = [ pkgs.pkg-config ];
propagatedBuildInputs = [ pkgs.sqlite ]; propagatedBuildInputs = [ pkgs.postgresql pkgs.sqlite ];
} }

View File

@ -54,6 +54,17 @@ type
CacheArguments* {.preservesRecord: "cache".} = object CacheArguments* {.preservesRecord: "cache".} = object
`field0`*: CacheArgumentsField0 `field0`*: CacheArgumentsField0
PostgreConnectionParameter* {.preservesTuple.} = object
`key`*: string
`val`*: string
PostgreArgumentsField0* {.preservesDictionary.} = object
`connection`*: seq[PostgreConnectionParameter]
`dataspace`* {.preservesEmbedded.}: EmbeddedRef
PostgreArguments* {.preservesRecord: "postgre".} = object
`field0`*: PostgreArgumentsField0
Tcp* {.preservesRecord: "tcp".} = object Tcp* {.preservesRecord: "tcp".} = object
`host`*: string `host`*: string
`port`*: BiggestInt `port`*: BiggestInt
@ -65,6 +76,8 @@ proc `$`*(x: WebsocketArguments | JsonTranslatorArguments |
FileSystemUsageArguments | FileSystemUsageArguments |
SqliteArguments | SqliteArguments |
CacheArguments | CacheArguments |
PostgreConnectionParameter |
PostgreArguments |
Tcp): string = Tcp): string =
`$`(toPreserves(x)) `$`(toPreserves(x))
@ -75,5 +88,7 @@ proc encode*(x: WebsocketArguments | JsonTranslatorArguments |
FileSystemUsageArguments | FileSystemUsageArguments |
SqliteArguments | SqliteArguments |
CacheArguments | CacheArguments |
PostgreConnectionParameter |
PostgreArguments |
Tcp): seq[byte] = Tcp): seq[byte] =
encode(toPreserves(x)) encode(toPreserves(x))

View File

@ -5,6 +5,9 @@
import syndicate, syndicate/relays, syndicate/actors/timers import syndicate, syndicate/relays, syndicate/actors/timers
const
withPostgre* {.booldefine.}: bool = true
import ./syndesizer/[ import ./syndesizer/[
cache_actor, cache_actor,
file_system_usage, file_system_usage,
@ -14,6 +17,9 @@ import ./syndesizer/[
webhooks, webhooks,
websockets] websockets]
when withPostgre:
import ./syndesizer/postgre_actor
runActor("syndesizer") do (turn: var Turn; root: Cap): runActor("syndesizer") do (turn: var Turn; root: Cap):
connectStdio(turn, root) connectStdio(turn, root)
discard spawnTimers(turn, root) discard spawnTimers(turn, root)
@ -23,5 +29,7 @@ runActor("syndesizer") do (turn: var Turn; root: Cap):
discard spawnJsonStdioTranslator(turn, root) discard spawnJsonStdioTranslator(turn, root)
discard spawnWebhookActor(turn, root) discard spawnWebhookActor(turn, root)
discard spawnWebsocketActor(turn, root) discard spawnWebsocketActor(turn, root)
when withPostgre:
discard spawnPostgreActor(turn, root)
when withSqlite: when withSqlite:
discard spawnSqliteActor(turn, root) discard spawnSqliteActor(turn, root)

View File

@ -0,0 +1,131 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import preserves, syndicate, syndicate/relays
import ../schema/[config, sql]
{.passL: "-lpq".}
{.pragma: libpq, header: "libpq-fe.h", importc.}
type
Oid = cuint
PGconn {.libpq.} = ptr object
PGresult {.libpq.} = ptr object
ConnStatusType {.libpq.} = enum
CONNECTION_OK, CONNECTION_BAD, ## Non-blocking mode only below here
##
## The existence of these should never be relied upon - they should only
## be used for user feedback or similar purposes.
##
CONNECTION_STARTED, ## Waiting for connection to be made.
CONNECTION_MADE, ## Connection OK; waiting to send.
CONNECTION_AWAITING_RESPONSE, ## Waiting for a response from the
## postmaster.
CONNECTION_AUTH_OK, ## Received authentication; waiting for
## backend startup.
CONNECTION_SETENV, ## This state is no longer used.
CONNECTION_SSL_STARTUP, ## Negotiating SSL.
CONNECTION_NEEDED, ## Internal state: connect() needed
CONNECTION_CHECK_WRITABLE, ## Checking if session is read-write.
CONNECTION_CONSUME, ## Consuming any extra messages.
CONNECTION_GSS_STARTUP, ## Negotiating GSSAPI.
CONNECTION_CHECK_TARGET, ## Checking target server properties.
CONNECTION_CHECK_STANDBY ## Checking if server is in standby mode.
ExecStatusType = enum
PGRES_EMPTY_QUERY = 0, ## empty query string was executed
PGRES_COMMAND_OK, ## a query command that doesn't return
## anything was executed properly by the
## backend
PGRES_TUPLES_OK, ## a query command that returns tuples was
## executed properly by the backend, PGresult
## contains the result tuples
PGRES_COPY_OUT, ## Copy Out data transfer in progress
PGRES_COPY_IN, ## Copy In data transfer in progress
PGRES_BAD_RESPONSE, ## an unexpected response was recv'd from the
## backend
PGRES_NONFATAL_ERROR, ## notice or warning message
PGRES_FATAL_ERROR, ## query failed
PGRES_COPY_BOTH, ## Copy In/Out data transfer in progress
PGRES_SINGLE_TUPLE, ## single tuple from larger resultset
PGRES_PIPELINE_SYNC, ## pipeline synchronization point
PGRES_PIPELINE_ABORTED ## Command didn't run because of an abort
## earlier in a pipeline
proc PQconnectdbParams(
keywords: cstringArray; values: cstringArray; expand_dbname: cint): PGconn {.libpq.}
proc PQerrorMessage(conn: PGconn): cstring {.libpq.}
proc PQfinish(conn: PGconn) {.libpq.}
proc PQstatus(conn: PGconn): ConnStatusType {.libpq.}
proc PQexec(conn: PGconn; query: cstring): PGresult {.libpq.}
proc PQresultStatus(res: PGresult): ExecStatusType {.libpq.}
proc PQresStatus (status: ExecStatusType): cstring {.libpq.}
proc PQresultErrorMessage(res: PGresult): cstring {.libpq.}
proc PQclear(res: PGresult) {.libpq.}
proc PQntuples(res: PGresult): cint {.libpq.}
proc PQnfields(res: PGresult): cint {.libpq.}
proc PQgetvalue(res: PGresult; tup_num: cint; field_num: cint): cstring {.libpq.}
proc PQftype(res: PGresult; field_num: cint): Oid {.libpq.}
proc PQfsize(res: PGresult; field_num: cint): cint {.libpq.}
# proc PQsocket(conn: PGconn): cint
# proc PQconnectStartParams(
# keywords: cstringArray; values: cstringArray; expand_dbname: cint): PGconn
# TODO: async
proc checkPointer(p: pointer) =
if p.isNil: raise newException(OutOfMemDefect, "Postgres returned nil")
type StringPairs = seq[tuple[key: string, val: string]]
proc splitParams(params: StringPairs): (cstringArray, cstringArray) =
var strings = newSeq[string](params.len)
for i, _ in params: strings[i] = params[i][0]
result[0] = allocCStringArray(strings)
for i, _ in params: strings[i] = params[i][1]
result[1] = allocCStringArray(strings)
proc spawnPostgreActor*(turn: var Turn; root: Cap): Actor {.discardable.} =
spawn("postgre", turn) do (turn: var Turn):
during(turn, root, ?:PostgreArguments) do (params: StringPairs, ds: Cap):
var
conn: PGconn
statusHandle: Handle
(keys, vals) = splitParams(params)
conn = PQconnectdbParams(keys, vals, 0)
checkPointer(conn)
let
status = PQstatus(conn)
msg = $PQerrorMessage(conn)
statusHandle = publish(turn, ds,
initRecord("status", toSymbol($status), msg.toPreserves))
if status == CONNECTION_OK:
during(turn, ds, ?:Query) do (label: Value, statement: string):
var res = PQexec(conn, statement)
var st = PQresultStatus(res)
discard publish(turn, ds, toRecord(
label, toSymbol($PQresStatus(st)), $PQresultErrorMessage(res)))
if st == PGRES_TUPLES_OK or st == PGRES_SINGLE_TUPLE:
let tuples = PQntuples(res)
let fields = PQnfields(res)
if tuples > 0 and fields > 0:
for r in 0..<tuples:
var rec = initRecord(label, fields)
for f in 0..<fields:
rec[f] = toPreserves($PQgetvalue(res, r, f))
discard publish(turn, ds, rec)
PQclear(res)
else:
stderr.writeLine "refusing to do anything when status is ", status
do:
deallocCStringArray(keys)
deallocCStringArray(vals)
PQfinish(conn)
when isMainModule:
runActor("main") do (turn: var Turn; root: Cap):
connectStdio(turn, root)
spawnPostgreActor(turn, root)

View File

@ -1,6 +1,6 @@
# Package # Package
version = "20240109" version = "20240110"
author = "Emery Hemingway" author = "Emery Hemingway"
description = "Utilites for Syndicated Actors and Synit" description = "Utilites for Syndicated Actors and Synit"
license = "unlicense" license = "unlicense"