syndesizer: absorb sqlite_actor
This commit is contained in:
parent
d8965a398a
commit
2268b75096
26
README.md
26
README.md
|
@ -106,6 +106,32 @@ let ?ds = dataspace
|
||||||
]
|
]
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### SQLite
|
||||||
|
|
||||||
|
Readonly access to SQLite databases. Asserts rows as records in response to SQL query assertions. Dynamic updates are not implemented.
|
||||||
|
|
||||||
|
Can be disabled by passing `--define:withSqlite=no` to the Nim compiler.
|
||||||
|
|
||||||
|
```
|
||||||
|
# Configuration example
|
||||||
|
<require-service <daemon syndesizer>>
|
||||||
|
|
||||||
|
let ?sqlspace = dataspace
|
||||||
|
|
||||||
|
? <service-object <daemon syndesizer> ?cap> [
|
||||||
|
$cap <sqlite {
|
||||||
|
dataspace: $sqlspace
|
||||||
|
database: "/var/db/example.db"
|
||||||
|
}>
|
||||||
|
]
|
||||||
|
|
||||||
|
$sqlspace <query example-row "SELECT id, name FROM stuff">
|
||||||
|
|
||||||
|
$sqlspace ? <example-row ?id ?name> [
|
||||||
|
$log ! <log "-" { row: <example-row $id $name> }>
|
||||||
|
]
|
||||||
|
```
|
||||||
|
|
||||||
### Webooks
|
### Webooks
|
||||||
|
|
||||||
Listens for webhook requests and sends request data to a dataspace as messages.
|
Listens for webhook requests and sends request data to a dataspace as messages.
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
include ../syndicate-nim/depends.tup
|
include ../syndicate-nim/depends.tup
|
||||||
|
NIM = $(DIRENV) $(NIM)
|
||||||
NIM_FLAGS += --path:$(TUP_CWD)/../syndicate-nim/src
|
NIM_FLAGS += --path:$(TUP_CWD)/../syndicate-nim/src
|
||||||
NIM_GROUPS += $(TUP_CWD)/<lock>
|
NIM_GROUPS += $(TUP_CWD)/<lock>
|
||||||
|
|
|
@ -18,6 +18,11 @@ JsonSocketTranslatorArguments = <json-socket-translator {
|
||||||
socket: string
|
socket: string
|
||||||
}>.
|
}>.
|
||||||
|
|
||||||
|
SqliteArguments = <sqlite {
|
||||||
|
database: string
|
||||||
|
dataspace: #!any
|
||||||
|
}>.
|
||||||
|
|
||||||
WebhooksArguments = <webhooks {
|
WebhooksArguments = <webhooks {
|
||||||
endpoints: {[string ...]: #!any ...:...}
|
endpoints: {[string ...]: #!any ...:...}
|
||||||
listen: Tcp
|
listen: Tcp
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
{ pkgs ? import <nixpkgs> { } }:
|
{ pkgs ? import <nixpkgs> { } }:
|
||||||
|
|
||||||
pkgs.buildNimPackage {
|
pkgs.buildNimPackage {
|
||||||
name = "dummy";
|
name = "dummy";
|
||||||
lockFile = ./lock.json;
|
propagatedNativeBuildInputs = [ pkgs.pkg-config ];
|
||||||
|
propagatedBuildInputs = [ pkgs.sqlite ];
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,6 @@
|
||||||
|
version 1 .
|
||||||
|
|
||||||
|
# When asserted the actor reponds with
|
||||||
|
# rows as records of the given label and
|
||||||
|
# row columns as record fields.
|
||||||
|
Query = <query @label any @statement string> .
|
|
@ -34,6 +34,13 @@ type
|
||||||
WebhooksArguments* {.preservesRecord: "webhooks".} = object
|
WebhooksArguments* {.preservesRecord: "webhooks".} = object
|
||||||
`field0`*: WebhooksArgumentsField0
|
`field0`*: WebhooksArgumentsField0
|
||||||
|
|
||||||
|
SqliteArgumentsField0* {.preservesDictionary.} = object
|
||||||
|
`database`*: string
|
||||||
|
`dataspace`* {.preservesEmbedded.}: EmbeddedRef
|
||||||
|
|
||||||
|
SqliteArguments* {.preservesRecord: "sqlite".} = object
|
||||||
|
`field0`*: SqliteArgumentsField0
|
||||||
|
|
||||||
CacheArgumentsField0* {.preservesDictionary.} = object
|
CacheArgumentsField0* {.preservesDictionary.} = object
|
||||||
`dataspace`* {.preservesEmbedded.}: EmbeddedRef
|
`dataspace`* {.preservesEmbedded.}: EmbeddedRef
|
||||||
`lifetime`*: float32
|
`lifetime`*: float32
|
||||||
|
@ -49,6 +56,7 @@ proc `$`*(x: WebsocketArguments | JsonTranslatorArguments |
|
||||||
JsonTranslatorConnected |
|
JsonTranslatorConnected |
|
||||||
JsonSocketTranslatorArguments |
|
JsonSocketTranslatorArguments |
|
||||||
WebhooksArguments |
|
WebhooksArguments |
|
||||||
|
SqliteArguments |
|
||||||
CacheArguments |
|
CacheArguments |
|
||||||
Tcp): string =
|
Tcp): string =
|
||||||
`$`(toPreserves(x))
|
`$`(toPreserves(x))
|
||||||
|
@ -57,6 +65,7 @@ proc encode*(x: WebsocketArguments | JsonTranslatorArguments |
|
||||||
JsonTranslatorConnected |
|
JsonTranslatorConnected |
|
||||||
JsonSocketTranslatorArguments |
|
JsonSocketTranslatorArguments |
|
||||||
WebhooksArguments |
|
WebhooksArguments |
|
||||||
|
SqliteArguments |
|
||||||
CacheArguments |
|
CacheArguments |
|
||||||
Tcp): seq[byte] =
|
Tcp): seq[byte] =
|
||||||
encode(toPreserves(x))
|
encode(toPreserves(x))
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
|
||||||
|
import
|
||||||
|
preserves
|
||||||
|
|
||||||
|
type
|
||||||
|
Query* {.preservesRecord: "query".} = object
|
||||||
|
`label`*: Value
|
||||||
|
`statement`*: string
|
||||||
|
|
||||||
|
proc `$`*(x: Query): string =
|
||||||
|
`$`(toPreserves(x))
|
||||||
|
|
||||||
|
proc encode*(x: Query): seq[byte] =
|
||||||
|
encode(toPreserves(x))
|
|
@ -6,7 +6,12 @@
|
||||||
import syndicate, syndicate/relays, syndicate/actors/timers
|
import syndicate, syndicate/relays, syndicate/actors/timers
|
||||||
|
|
||||||
import ./syndesizer/[
|
import ./syndesizer/[
|
||||||
cache_actor, json_socket_translator, json_translator, webhooks, websockets]
|
cache_actor,
|
||||||
|
json_socket_translator,
|
||||||
|
json_translator,
|
||||||
|
sqlite_actor,
|
||||||
|
webhooks,
|
||||||
|
websockets]
|
||||||
|
|
||||||
runActor("syndesizer") do (turn: var Turn; root: Cap):
|
runActor("syndesizer") do (turn: var Turn; root: Cap):
|
||||||
connectStdio(turn, root)
|
connectStdio(turn, root)
|
||||||
|
@ -16,3 +21,5 @@ runActor("syndesizer") do (turn: var Turn; root: Cap):
|
||||||
discard spawnJsonStdioTranslator(turn, root)
|
discard spawnJsonStdioTranslator(turn, root)
|
||||||
discard spawnWebhookActor(turn, root)
|
discard spawnWebhookActor(turn, root)
|
||||||
discard spawnWebsocketActor(turn, root)
|
discard spawnWebsocketActor(turn, root)
|
||||||
|
when withSqlite:
|
||||||
|
discard spawnSqliteActor(turn, root)
|
||||||
|
|
|
@ -0,0 +1,114 @@
|
||||||
|
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||||
|
# SPDX-License-Identifier: Unlicense
|
||||||
|
|
||||||
|
const withSqlite* {.booldefine.}: bool = true
|
||||||
|
|
||||||
|
when withSqlite:
|
||||||
|
import preserves, syndicate, syndicate/relays
|
||||||
|
import ../schema/[config, sql]
|
||||||
|
|
||||||
|
# Avoid Sqlite3 from the standard library because it is
|
||||||
|
# only held together by wishful thinking and dlload.
|
||||||
|
|
||||||
|
{.passC: staticExec("pkg-config --cflags sqlite3").}
|
||||||
|
{.passL: staticExec("pkg-config --libs sqlite3").}
|
||||||
|
|
||||||
|
{.pragma: sqlite3h, header: "sqlite3.h".}
|
||||||
|
|
||||||
|
var
|
||||||
|
SQLITE_VERSION_NUMBER {.importc, sqlite3h.}: cint
|
||||||
|
SQLITE_OK {.importc, sqlite3h.}: cint
|
||||||
|
SQLITE_ROW {.importc, sqlite3h.}: cint
|
||||||
|
SQLITE_DONE {.importc, sqlite3h.}: cint
|
||||||
|
SQLITE_OPEN_READONLY {.importc, sqlite3h.}: cint
|
||||||
|
|
||||||
|
const
|
||||||
|
SQLITE_INTEGER = 1
|
||||||
|
SQLITE_FLOAT = 2
|
||||||
|
SQLITE_TEXT = 3
|
||||||
|
SQLITE_BLOB = 4
|
||||||
|
# SQLITE_NULL = 5
|
||||||
|
|
||||||
|
type
|
||||||
|
Sqlite3 {.importc: "sqlite3", sqlite3h.} = distinct pointer
|
||||||
|
Stmt {.importc: "sqlite3_stmt", sqlite3h.} = distinct pointer
|
||||||
|
|
||||||
|
{.pragma: importSqlite3, importc: "sqlite3_$1", sqlite3h.}
|
||||||
|
|
||||||
|
proc libversion_number: cint {.importSqlite3.}
|
||||||
|
|
||||||
|
proc open_v2(filename: cstring; ppDb: ptr Sqlite3; flags: cint; zVfs: cstring): cint {.importSqlite3.}
|
||||||
|
proc close(ds: Sqlite3): int32 {.discardable, importSqlite3.}
|
||||||
|
|
||||||
|
proc errmsg(db: Sqlite3): cstring {.importSqlite3.}
|
||||||
|
|
||||||
|
proc prepare_v2(db: Sqlite3; zSql: cstring, nByte: cint; ppStmt: ptr Stmt; pzTail: ptr cstring): cint {.importSqlite3.}
|
||||||
|
|
||||||
|
proc step(para1: Stmt): cint {.importSqlite3.}
|
||||||
|
|
||||||
|
proc column_count(stmt: Stmt): int32 {.importSqlite3.}
|
||||||
|
proc column_blob(stmt: Stmt; col: cint): pointer {.importSqlite3.}
|
||||||
|
proc column_bytes(stmt: Stmt; col: cint): cint {.importSqlite3.}
|
||||||
|
proc column_double(stmt: Stmt; col: cint): float64 {.importSqlite3.}
|
||||||
|
proc column_int64(stmt: Stmt; col: cint): int64 {.importSqlite3.}
|
||||||
|
proc column_text(stmt: Stmt; col: cint): cstring {.importSqlite3.}
|
||||||
|
proc column_type(stmt: Stmt; col: cint): cint {.importSqlite3.}
|
||||||
|
proc finalize(stmt: Stmt): cint {.importSqlite3.}
|
||||||
|
|
||||||
|
doAssert libversion_number() == SQLITE_VERSION_NUMBER
|
||||||
|
|
||||||
|
proc logError(db: Sqlite3; context: string) =
|
||||||
|
writeLine(stderr, errmsg(db), ": ", context)
|
||||||
|
|
||||||
|
proc extractValue(stmt: Stmt; col: cint): Value =
|
||||||
|
case column_type(stmt, col)
|
||||||
|
of SQLITE_INTEGER:
|
||||||
|
result = toPreserve(column_int64(stmt, col))
|
||||||
|
of SQLITE_FLOAT:
|
||||||
|
result = toPreserve(column_double(stmt, col))
|
||||||
|
of SQLITE_TEXT:
|
||||||
|
result = Value(kind: pkString, string: newString(column_bytes(stmt, col)))
|
||||||
|
if result.string.len > 0:
|
||||||
|
copyMem(addr result.string[0], column_text(stmt, col), result.string.len)
|
||||||
|
of SQLITE_BLOB:
|
||||||
|
result = Value(kind: pkByteString, bytes: newSeq[byte](column_bytes(stmt, col)))
|
||||||
|
if result.bytes.len > 0:
|
||||||
|
copyMem(addr result.bytes[0], column_blob(stmt, col), result.bytes.len)
|
||||||
|
else:
|
||||||
|
result = initRecord("null")
|
||||||
|
|
||||||
|
proc extractRecord(stmt: Stmt; label: Value, arity: cint): Value =
|
||||||
|
result = initRecord(label, arity)
|
||||||
|
for col in 0..<arity: result.record[col] = extractValue(stmt, col)
|
||||||
|
|
||||||
|
proc spawnSqliteActor*(turn: var Turn; root: Cap): Actor {.discardable.} =
|
||||||
|
spawn("sqlite-actor", turn) do (turn: var Turn):
|
||||||
|
during(turn, root, ?:SqliteArguments) do (path: string, ds: Cap):
|
||||||
|
var db: Sqlite3
|
||||||
|
if open_v2(path, addr db, SQLITE_OPEN_READONLY, nil) != SQLITE_OK:
|
||||||
|
logError(db, path)
|
||||||
|
else:
|
||||||
|
during(turn, ds, ?:Query) do (label: Value, statement: string):
|
||||||
|
var stmt: Stmt
|
||||||
|
if prepare_v2(db, statement, statement.len.cint, addr stmt, nil) != SQLITE_OK:
|
||||||
|
logError(db, statement)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
let arity = column_count(stmt)
|
||||||
|
var res = step(stmt)
|
||||||
|
while res == SQLITE_ROW:
|
||||||
|
var rec = extractRecord(stmt, label, arity)
|
||||||
|
discard publish(turn, ds, rec)
|
||||||
|
res = step(stmt)
|
||||||
|
assert res != 100
|
||||||
|
if res != SQLITE_DONE:
|
||||||
|
logError(db, statement)
|
||||||
|
finally:
|
||||||
|
if finalize(stmt) != SQLITE_OK: logError(db, statement)
|
||||||
|
do:
|
||||||
|
close(db)
|
||||||
|
|
||||||
|
when isMainModule:
|
||||||
|
runActor("main") do (turn: var Turn; root: Cap):
|
||||||
|
connectStdio(turn, root)
|
||||||
|
spawnSqliteActor(turn, root)
|
Loading…
Reference in New Issue