postgre-actor: convert to gatekeeper protocol
This commit is contained in:
parent
a40cdbce2c
commit
0b4aa89311
49
README.md
49
README.md
|
@ -374,34 +374,37 @@ When called as `assert` (by a symlink or a rename) it will make assertions inste
|
|||
|
||||
## PostgreSQL
|
||||
|
||||
Readonly access to PostgreSQL databases. Asserts rows as records in response to SQL query assertions. Dynamic updates are not implemented.
|
||||
|
||||
Can be disabled by passing `--define:withPostgre=no` to the Nim compiler.
|
||||
Readonly access to PostgreSQL databases.
|
||||
Asserts rows as records in response to SQL query assertions.
|
||||
Dynamic updates are not implemented.
|
||||
|
||||
```
|
||||
# Configuration example
|
||||
<require-service <daemon postgre_actor>>
|
||||
|
||||
let ?sqlspace = dataspace
|
||||
|
||||
? <service-object <daemon postgre_actor> ?cap> [
|
||||
$cap <postgre {
|
||||
dataspace: $sqlspace
|
||||
connection: [
|
||||
["host" "example.com"]
|
||||
["dbname" "foobar"]
|
||||
["user" "hackme"]
|
||||
]
|
||||
}>
|
||||
]
|
||||
let ?postgreStep = <postgre {connection: [["host" "db.example.com"] ["dbname" "example"] ["user" "hackme"]]}>
|
||||
|
||||
let ?tuplespace = dataspace
|
||||
|
||||
$sqlspace <query "SELECT id, name FROM stuff" $tuplespace>
|
||||
|
||||
$tuplespace ? [?id ?name] [
|
||||
$log ! <log "-" { row: <example-row $id $name> }>
|
||||
$tuplespace ? ?row [
|
||||
$log ! <log "-" { line: $row }>
|
||||
]
|
||||
|
||||
let ?resolver = dataspace
|
||||
$resolver ? <accepted ?sqlspace> [
|
||||
$sqlspace ? <sql-error ?msg ?context> [
|
||||
$log ! <log "-" { line: $msg context: $context }>
|
||||
]
|
||||
$sqlspace <query [SELECT firstname FROM users] $tuplespace>
|
||||
]
|
||||
|
||||
<require-service <daemon postgre-actor>>
|
||||
$config ? <service-object <daemon postgre-actor> ?cap> [
|
||||
$cap <resolve $postgreStep $resolver>
|
||||
]
|
||||
|
||||
<daemon postgre-actor {
|
||||
argv: [ "/bin/postgre-actor" ]
|
||||
clearEnv: #t
|
||||
protocol: application/syndicate
|
||||
}>
|
||||
|
||||
```
|
||||
|
||||
## preserve_process_environment
|
||||
|
|
|
@ -36,9 +36,8 @@ JsonSocketTranslatorStep = <json-socket-translator {
|
|||
socket: SocketAddress
|
||||
}>.
|
||||
|
||||
PostgreArguments = <postgre {
|
||||
PostgreStep = <postgre {
|
||||
connection: [PostgreConnectionParameter ...]
|
||||
dataspace: #:any
|
||||
}>.
|
||||
PostgreConnectionParameter = [@key string @val string].
|
||||
|
||||
|
@ -48,9 +47,8 @@ PulseArguments = <pulse {
|
|||
dataspace: #:any
|
||||
}>.
|
||||
|
||||
SqliteArguments = <sqlite {
|
||||
SqliteStep = <sqlite {
|
||||
database: string
|
||||
dataspace: #:any
|
||||
}>.
|
||||
|
||||
WebhooksArguments = <webhooks {
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import preserves, syndicate
|
||||
import ./schema/[config, sql]
|
||||
import
|
||||
pkg/preserves,
|
||||
pkg/syndicate, pkg/syndicate/protocols/[gatekeeper, sturdy],
|
||||
./schema/[config, sql]
|
||||
|
||||
{.passL: "-lpq".}
|
||||
|
||||
|
@ -105,54 +107,57 @@ proc renderSql(tokens: openarray[Value]): string =
|
|||
else:
|
||||
return ""
|
||||
|
||||
proc spawnPostgreActor*(turn: Turn; root: Cap): Actor {.discardable.} =
|
||||
spawn("postgre", turn) do (turn: Turn):
|
||||
during(turn, root, ?:PostgreArguments) do (params: StringPairs, ds: Cap):
|
||||
var
|
||||
conn: PGconn
|
||||
statusHandle: Handle
|
||||
(keys, vals) = splitParams(params)
|
||||
conn = PQconnectdbParams(keys, vals, 0)
|
||||
checkPointer(conn)
|
||||
let
|
||||
status = PQstatus(conn)
|
||||
msg = $PQerrorMessage(conn)
|
||||
statusHandle = publish(turn, ds,
|
||||
initRecord("status", toSymbol($status), msg.toPreserves))
|
||||
if status == CONNECTION_OK:
|
||||
during(turn, ds, ?:Query) do (statement: seq[Value], target: Cap):
|
||||
var text = renderSql statement
|
||||
if text == "":
|
||||
discard publish(turn, ds, SqlError(msg: "invalid statement", context: $statement))
|
||||
else:
|
||||
var
|
||||
res = PQexec(conn, text)
|
||||
st = PQresultStatus(res)
|
||||
if st == PGRES_TUPLES_OK or st == PGRES_SINGLE_TUPLE:
|
||||
let tuples = PQntuples(res)
|
||||
let fields = PQnfields(res)
|
||||
if tuples > 0 and fields > 0:
|
||||
for r in 0..<tuples:
|
||||
var tupl = initSequence(fields)
|
||||
for f in 0..<fields:
|
||||
tupl[f] = toPreserves($PQgetvalue(res, r, f))
|
||||
discard publish(turn, target, tupl)
|
||||
proc spawnPostgreActor*(turn: Turn; relay: Cap): Actor {.discardable.} =
|
||||
result = spawnActor(turn, "postgre") do (turn: Turn):
|
||||
let pat = Resolve?:{ 0: PostgreStep.grabTypeFlat, 1: grab() }
|
||||
during(turn, relay, pat) do (params: StringPairs, observer: Cap):
|
||||
linkActor(turn, path) do (turn: Turn):
|
||||
var
|
||||
(keys, vals) = splitParams(params)
|
||||
conn = PQconnectdbParams(keys, vals, 0)
|
||||
checkPointer(conn)
|
||||
let
|
||||
status = PQstatus(conn)
|
||||
msg = $PQerrorMessage(conn)
|
||||
deallocCStringArray(keys)
|
||||
deallocCStringArray(vals)
|
||||
onStop(turn) do (turn: Turn):
|
||||
PQfinish(conn)
|
||||
if status == CONNECTION_OK:
|
||||
let ds = turn.newDataspace()
|
||||
discard publish(turn, ds, initRecord("status", toSymbol($status), msg.toPreserves))
|
||||
during(turn, ds, ?:Query) do (statement: seq[Value], target: Cap):
|
||||
var text = renderSql statement
|
||||
if text == "":
|
||||
discard publish(turn, ds, SqlError(msg: "invalid statement", context: $statement))
|
||||
else:
|
||||
discard publish(turn, ds, SqlError(
|
||||
msg: $PQresStatus(st),
|
||||
context: $PQresultErrorMessage(res),
|
||||
))
|
||||
PQclear(res)
|
||||
else:
|
||||
stderr.writeLine "refusing to do anything when status is ", status
|
||||
do:
|
||||
deallocCStringArray(keys)
|
||||
deallocCStringArray(vals)
|
||||
PQfinish(conn)
|
||||
var
|
||||
res = PQexec(conn, text)
|
||||
st = PQresultStatus(res)
|
||||
if st == PGRES_TUPLES_OK or st == PGRES_SINGLE_TUPLE:
|
||||
let tuples = PQntuples(res)
|
||||
let fields = PQnfields(res)
|
||||
if tuples > 0 and fields > 0:
|
||||
for r in 0..<tuples:
|
||||
var tupl = initSequence(fields)
|
||||
for f in 0..<fields:
|
||||
tupl[f] = toPreserves($PQgetvalue(res, r, f))
|
||||
discard publish(turn, target, tupl)
|
||||
else:
|
||||
discard publish(turn, ds, SqlError(
|
||||
msg: $PQresStatus(st),
|
||||
context: $PQresultErrorMessage(res),
|
||||
))
|
||||
PQclear(res)
|
||||
discard publish(turn, observer,
|
||||
ResolvedAccepted(responderSession: ds))
|
||||
else:
|
||||
discard publish(turn, observer,
|
||||
Rejected(detail: msg.toPreserves))
|
||||
|
||||
when isMainModule:
|
||||
import syndicate/relays
|
||||
|
||||
runActor("main") do (turn: Turn):
|
||||
resolveEnvironment(turn) do (turn: Turn; ds: Cap):
|
||||
spawnPostgreActor(turn, ds)
|
||||
resolveEnvironment(turn) do (turn: Turn; relay: Cap):
|
||||
spawnPostgreActor(turn, relay)
|
||||
|
|
|
@ -78,6 +78,12 @@ type
|
|||
SqliteArguments* {.preservesRecord: "sqlite".} = object
|
||||
`field0`*: SqliteArgumentsField0
|
||||
|
||||
PostgreStepField0* {.preservesDictionary.} = object
|
||||
`connection`*: seq[PostgreConnectionParameter]
|
||||
|
||||
PostgreStep* {.preservesRecord: "postgre".} = object
|
||||
`field0`*: PostgreStepField0
|
||||
|
||||
TcpAddress* {.preservesRecord: "tcp".} = object
|
||||
`host`*: string
|
||||
`port`*: BiggestInt
|
||||
|
@ -99,19 +105,16 @@ type
|
|||
`key`*: string
|
||||
`val`*: string
|
||||
|
||||
PostgreArgumentsField0* {.preservesDictionary.} = object
|
||||
`connection`*: seq[PostgreConnectionParameter]
|
||||
`dataspace`* {.preservesEmbedded.}: EmbeddedRef
|
||||
|
||||
PostgreArguments* {.preservesRecord: "postgre".} = object
|
||||
`field0`*: PostgreArgumentsField0
|
||||
|
||||
PulseArgumentsField0* {.preservesDictionary.} = object
|
||||
`dataspace`* {.preservesEmbedded.}: EmbeddedRef
|
||||
|
||||
PulseArguments* {.preservesRecord: "pulse".} = object
|
||||
`field0`*: PulseArgumentsField0
|
||||
|
||||
Tcp* {.preservesRecord: "tcp".} = object
|
||||
`host`*: string
|
||||
`port`*: BiggestInt
|
||||
|
||||
UnixAddress* {.preservesRecord: "unix".} = object
|
||||
`path`*: string
|
||||
|
||||
|
@ -120,10 +123,6 @@ type
|
|||
PrinterStep* {.preservesRecord: "printer".} = object
|
||||
`field0`*: PrinterStepField0
|
||||
|
||||
Tcp* {.preservesRecord: "tcp".} = object
|
||||
`host`*: string
|
||||
`port`*: BiggestInt
|
||||
|
||||
proc `$`*(x: WebsocketArguments | HttpClientArguments | JsonTranslatorArguments |
|
||||
SocketAddress |
|
||||
Base64DecoderArguments |
|
||||
|
@ -133,15 +132,15 @@ proc `$`*(x: WebsocketArguments | HttpClientArguments | JsonTranslatorArguments
|
|||
WebhooksArguments |
|
||||
FileSystemUsageArguments |
|
||||
SqliteArguments |
|
||||
PostgreStep |
|
||||
TcpAddress |
|
||||
CacheArguments |
|
||||
XmlTranslatorArguments |
|
||||
PostgreConnectionParameter |
|
||||
PostgreArguments |
|
||||
PulseArguments |
|
||||
Tcp |
|
||||
UnixAddress |
|
||||
PrinterStep |
|
||||
Tcp): string =
|
||||
PrinterStep): string =
|
||||
`$`(toPreserves(x))
|
||||
|
||||
proc encode*(x: WebsocketArguments | HttpClientArguments |
|
||||
|
@ -154,13 +153,13 @@ proc encode*(x: WebsocketArguments | HttpClientArguments |
|
|||
WebhooksArguments |
|
||||
FileSystemUsageArguments |
|
||||
SqliteArguments |
|
||||
PostgreStep |
|
||||
TcpAddress |
|
||||
CacheArguments |
|
||||
XmlTranslatorArguments |
|
||||
PostgreConnectionParameter |
|
||||
PostgreArguments |
|
||||
PulseArguments |
|
||||
Tcp |
|
||||
UnixAddress |
|
||||
PrinterStep |
|
||||
Tcp): seq[byte] =
|
||||
PrinterStep): seq[byte] =
|
||||
encode(toPreserves(x))
|
||||
|
|
|
@ -16,13 +16,13 @@ import ./syndesizer/[
|
|||
xml_translator]
|
||||
|
||||
runActor("syndesizer") do (turn: Turn):
|
||||
resolveEnvironment(turn) do (turn: Turn; ds: Cap):
|
||||
discard spawnTimerDriver(turn, ds)
|
||||
discard spawnBase64Decoder(turn, ds)
|
||||
discard spawnCacheActor(turn, ds)
|
||||
discard spawnFileSystemUsageActor(turn, ds)
|
||||
discard spawnHttpDriver(turn, ds)
|
||||
discard spawnJsonSocketTranslator(turn, ds)
|
||||
discard spawnJsonStdioTranslator(turn, ds)
|
||||
discard spawnPulseActor(turn, ds)
|
||||
discard spawnXmlTranslator(turn, ds)
|
||||
resolveEnvironment(turn) do (turn: Turn; relay: Cap):
|
||||
discard spawnTimerDriver(turn, relay)
|
||||
discard spawnBase64Decoder(turn, relay)
|
||||
discard spawnCacheActor(turn, relay)
|
||||
discard spawnFileSystemUsageActor(turn, relay)
|
||||
discard spawnHttpDriver(turn, relay)
|
||||
discard spawnJsonSocketTranslator(turn, relay)
|
||||
discard spawnJsonStdioTranslator(turn, relay)
|
||||
discard spawnPulseActor(turn, relay)
|
||||
discard spawnXmlTranslator(turn, relay)
|
||||
|
|
Loading…
Reference in New Issue