SQL: decompose SQL statements and assert errors
This commit is contained in:
parent
de7683b467
commit
e954fdefec
|
@ -208,8 +208,13 @@ let ?tuplespace = dataspace
|
|||
|
||||
$sqlspace <query "SELECT id, name FROM stuff" $tuplespace>
|
||||
|
||||
$tuplespace ? [?id ?name] [
|
||||
$log ! <log "-" { row: <example-row $id $name> }>
|
||||
$tuplespace [
|
||||
? [?id ?name] [
|
||||
$log ! <log "-" { row: <example-row $id $name> }>
|
||||
]
|
||||
? <sqlite-error ?msg ?ctx> [
|
||||
$log ! <log "-" { msg: $msg ctx: $ctx }>
|
||||
]
|
||||
]
|
||||
```
|
||||
|
||||
|
|
5
sql.prs
5
sql.prs
|
@ -2,4 +2,7 @@ version 1 .
|
|||
|
||||
# When asserted the actor reponds to @target rows as records
|
||||
# of the given label and row columns as record fields.
|
||||
Query = <query @statement string @target #:any> .
|
||||
Query = <query @statement [any ...] @target #:any> .
|
||||
|
||||
# When a query fails this is asserted instead.
|
||||
SqlError = <sql-error @msg string @context string>.
|
||||
|
|
|
@ -87,6 +87,24 @@ proc splitParams(params: StringPairs): (cstringArray, cstringArray) =
|
|||
for i, _ in params: strings[i] = params[i][1]
|
||||
result[1] = allocCStringArray(strings)
|
||||
|
||||
proc renderSql(tokens: openarray[Value]): string =
|
||||
for token in tokens:
|
||||
if result.len > 0: result.add ' '
|
||||
case token.kind
|
||||
of pkSymbol:
|
||||
result.add token.symbol.string
|
||||
of pkString:
|
||||
result.add '\''
|
||||
result.add token.string
|
||||
result.add '\''
|
||||
of pkFloat, pkRegister, pkBigInt:
|
||||
result.add $token
|
||||
of pkBoolean:
|
||||
if token.bool: result.add '1'
|
||||
else: result.add '0'
|
||||
else:
|
||||
return ""
|
||||
|
||||
proc spawnPostgreActor*(turn: var Turn; root: Cap): Actor {.discardable.} =
|
||||
spawn("postgre", turn) do (turn: var Turn):
|
||||
during(turn, root, ?:PostgreArguments) do (params: StringPairs, ds: Cap):
|
||||
|
@ -102,21 +120,29 @@ proc spawnPostgreActor*(turn: var Turn; root: Cap): Actor {.discardable.} =
|
|||
statusHandle = publish(turn, ds,
|
||||
initRecord("status", toSymbol($status), msg.toPreserves))
|
||||
if status == CONNECTION_OK:
|
||||
during(turn, ds, ?:Query) do (statement: string, target: Cap):
|
||||
var res = PQexec(conn, statement)
|
||||
var st = PQresultStatus(res)
|
||||
discard publish(turn, ds, toRecord(
|
||||
"error", statement, toSymbol($PQresStatus(st)), $PQresultErrorMessage(res)))
|
||||
if st == PGRES_TUPLES_OK or st == PGRES_SINGLE_TUPLE:
|
||||
let tuples = PQntuples(res)
|
||||
let fields = PQnfields(res)
|
||||
if tuples > 0 and fields > 0:
|
||||
for r in 0..<tuples:
|
||||
var tupl = initSequence(fields)
|
||||
for f in 0..<fields:
|
||||
tupl[f] = toPreserves($PQgetvalue(res, r, f))
|
||||
discard publish(turn, target, tupl)
|
||||
PQclear(res)
|
||||
during(turn, ds, ?:Query) do (statement: seq[Value], target: Cap):
|
||||
var text = renderSql statement
|
||||
if text == "":
|
||||
discard publish(turn, ds, SqlError(msg: "invalid statement", context: $statement))
|
||||
else:
|
||||
var
|
||||
res = PQexec(conn, text)
|
||||
st = PQresultStatus(res)
|
||||
if st == PGRES_TUPLES_OK or st == PGRES_SINGLE_TUPLE:
|
||||
let tuples = PQntuples(res)
|
||||
let fields = PQnfields(res)
|
||||
if tuples > 0 and fields > 0:
|
||||
for r in 0..<tuples:
|
||||
var tupl = initSequence(fields)
|
||||
for f in 0..<fields:
|
||||
tupl[f] = toPreserves($PQgetvalue(res, r, f))
|
||||
discard publish(turn, target, tupl)
|
||||
else:
|
||||
discard publish(turn, ds, SqlError(
|
||||
msg: $PQresStatus(st),
|
||||
context: $PQresultErrorMessage(res),
|
||||
))
|
||||
PQclear(res)
|
||||
else:
|
||||
stderr.writeLine "refusing to do anything when status is ", status
|
||||
do:
|
||||
|
|
|
@ -4,11 +4,15 @@ import
|
|||
|
||||
type
|
||||
Query* {.preservesRecord: "query".} = object
|
||||
`statement`*: string
|
||||
`statement`*: seq[Value]
|
||||
`target`* {.preservesEmbedded.}: Value
|
||||
|
||||
proc `$`*(x: Query): string =
|
||||
SqlError* {.preservesRecord: "sql-error".} = object
|
||||
`msg`*: string
|
||||
`context`*: string
|
||||
|
||||
proc `$`*(x: Query | SqlError): string =
|
||||
`$`(toPreserves(x))
|
||||
|
||||
proc encode*(x: Query): seq[byte] =
|
||||
proc encode*(x: Query | SqlError): seq[byte] =
|
||||
encode(toPreserves(x))
|
||||
|
|
|
@ -54,8 +54,19 @@ proc finalize(stmt: Stmt): cint {.importSqlite3.}
|
|||
|
||||
doAssert libversion_number() == SQLITE_VERSION_NUMBER
|
||||
|
||||
proc logError(db: Sqlite3; context: string) =
|
||||
writeLine(stderr, errmsg(db), ": ", context)
|
||||
proc assertError(facet: Facet; cap: Cap; db: Sqlite3; context: string) =
|
||||
run(facet) do (turn: var Turn):
|
||||
publish(turn, cap, SqlError(
|
||||
msg: $errmsg(db),
|
||||
context: context,
|
||||
))
|
||||
|
||||
proc assertError(facet: Facet; cap: Cap; msg, context: string) =
|
||||
run(facet) do (turn: var Turn):
|
||||
publish(turn, cap, SqlError(
|
||||
msg: msg,
|
||||
context: context,
|
||||
))
|
||||
|
||||
proc extractValue(stmt: Stmt; col: cint): Value =
|
||||
case column_type(stmt, col)
|
||||
|
@ -78,34 +89,58 @@ proc extractTuple(stmt: Stmt; arity: cint): Value =
|
|||
result = initSequence(arity)
|
||||
for col in 0..<arity: result[col] = extractValue(stmt, col)
|
||||
|
||||
proc renderSql(tokens: openarray[Value]): string =
|
||||
for token in tokens:
|
||||
if result.len > 0: result.add ' '
|
||||
case token.kind
|
||||
of pkSymbol:
|
||||
result.add token.symbol.string
|
||||
of pkString:
|
||||
result.add '\''
|
||||
result.add token.string
|
||||
result.add '\''
|
||||
of pkFloat, pkRegister, pkBigInt:
|
||||
result.add $token
|
||||
of pkBoolean:
|
||||
if token.bool: result.add '1'
|
||||
else: result.add '0'
|
||||
else:
|
||||
return ""
|
||||
|
||||
proc spawnSqliteActor*(turn: var Turn; root: Cap): Actor {.discardable.} =
|
||||
spawn("sqlite-actor", turn) do (turn: var Turn):
|
||||
during(turn, root, ?:SqliteArguments) do (path: string, ds: Cap):
|
||||
stderr.writeLine("opening SQLite database ", path)
|
||||
var db: Sqlite3
|
||||
if open_v2(path, addr db, SQLITE_OPEN_READONLY, nil) != SQLITE_OK:
|
||||
logError(db, path)
|
||||
else:
|
||||
during(turn, ds, ?:Query) do (statement: string, target: Cap):
|
||||
var stmt: Stmt
|
||||
if prepare_v2(db, statement, statement.len.cint, addr stmt, nil) != SQLITE_OK:
|
||||
logError(db, statement)
|
||||
else:
|
||||
try:
|
||||
let arity = column_count(stmt)
|
||||
var res = step(stmt)
|
||||
while res == SQLITE_ROW:
|
||||
var rec = extractTuple(stmt, arity)
|
||||
discard publish(turn, target, rec)
|
||||
res = step(stmt)
|
||||
assert res != 100
|
||||
if res != SQLITE_DONE:
|
||||
logError(db, statement)
|
||||
finally:
|
||||
if finalize(stmt) != SQLITE_OK: logError(db, statement)
|
||||
do:
|
||||
close(db)
|
||||
stderr.writeLine("closed SQLite database ", path)
|
||||
linkActor(turn, path) do (turn: var Turn):
|
||||
let facet = turn.facet
|
||||
stderr.writeLine("opening SQLite database ", path)
|
||||
var db: Sqlite3
|
||||
if open_v2(path, addr db, SQLITE_OPEN_READONLY, nil) != SQLITE_OK:
|
||||
assertError(facet, ds, db, path)
|
||||
else:
|
||||
turn.onStop do (turn: var Turn):
|
||||
close(db)
|
||||
stderr.writeLine("closed SQLite database ", path)
|
||||
during(turn, ds, ?:Query) do (statement: seq[Value], target: Cap):
|
||||
var
|
||||
stmt: Stmt
|
||||
text = renderSql statement
|
||||
if text == "":
|
||||
assertError(facet, target, "invalid statement", $statement)
|
||||
elif prepare_v2(db, text, text.len.cint, addr stmt, nil) != SQLITE_OK:
|
||||
assertError(facet, target, db, text)
|
||||
else:
|
||||
try:
|
||||
let arity = column_count(stmt)
|
||||
var res = step(stmt)
|
||||
while res == SQLITE_ROW:
|
||||
var rec = extractTuple(stmt, arity)
|
||||
discard publish(turn, target, rec)
|
||||
res = step(stmt)
|
||||
assert res != 100
|
||||
if res != SQLITE_DONE:
|
||||
assertError(facet, target, db, text)
|
||||
finally:
|
||||
if finalize(stmt) != SQLITE_OK: assertError(facet, target, db, text)
|
||||
|
||||
when isMainModule:
|
||||
import syndicate/relays
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# Package
|
||||
|
||||
version = "20240407"
|
||||
version = "20240408"
|
||||
author = "Emery Hemingway"
|
||||
description = "Utilites for Syndicated Actors and Synit"
|
||||
license = "unlicense"
|
||||
|
|
Loading…
Reference in New Issue