Compare commits

...

2 Commits

Author SHA1 Message Date
Emery Hemingway 7f481f9ff6 WiP! Postgre actor 2024-01-10 10:16:42 +02:00
Emery Hemingway 651efc1e6b syndesizer: add file-system-usage 2024-01-09 20:49:54 +02:00
8 changed files with 174 additions and 1 deletions

View File

@ -28,6 +28,23 @@ Example configuration:
$cap <cache { dataspace: $nixspace lifetime: 3600.0 }> ]
]
```
### File System Usage
Summarize the size of file-system directory. Equivalent to `du -s -b`.
Query the size of a directory in bytes by observing `<file-system-usage "/SOME/PATH" ?size>`.
```
# Configuration example
? <exposed-dataspace ?ds> [
<require-service <daemon syndesizer>>
? <service-object <daemon syndesizer> ?cap> [
$cap <file-system-usage { dataspace: $ds }>
]
]
```
### JSON Socket Translator

3
assertions.prs Normal file
View File

@ -0,0 +1,3 @@
version 1.
FileSystemUsage = <file-system-usage @path string @size int>.

View File

@ -6,6 +6,10 @@ CacheArguments = <cache {
lifetime: float
}>.
FileSystemUsageArguments = <file-system-usage {
dataspace: #!any
}>.
JsonTranslatorArguments = <json-stdio-translator {
argv: [string ...]
dataspace: #!any
@ -18,6 +22,12 @@ JsonSocketTranslatorArguments = <json-socket-translator {
socket: string
}>.
PostgreArguments = <postgre {
connection: [PostgreConnectionParameter ...]
dataspace: #!any
}>.
PostgreConnectionParameter = [@key string @val string].
SqliteArguments = <sqlite {
database: string
dataspace: #!any

View File

@ -3,5 +3,5 @@
pkgs.buildNimPackage {
name = "dummy";
propagatedNativeBuildInputs = [ pkgs.pkg-config ];
propagatedBuildInputs = [ pkgs.sqlite ];
propagatedBuildInputs = [ pkgs.postgresql pkgs.sqlite ];
}

View File

@ -34,6 +34,12 @@ type
WebhooksArguments* {.preservesRecord: "webhooks".} = object
`field0`*: WebhooksArgumentsField0
FileSystemUsageArgumentsField0* {.preservesDictionary.} = object
`dataspace`* {.preservesEmbedded.}: EmbeddedRef
FileSystemUsageArguments* {.preservesRecord: "file-system-usage".} = object
`field0`*: FileSystemUsageArgumentsField0
SqliteArgumentsField0* {.preservesDictionary.} = object
`database`*: string
`dataspace`* {.preservesEmbedded.}: EmbeddedRef
@ -48,6 +54,17 @@ type
CacheArguments* {.preservesRecord: "cache".} = object
`field0`*: CacheArgumentsField0
PostgreConnectionParameter* {.preservesTuple.} = object
`key`*: string
`val`*: string
PostgreArgumentsField0* {.preservesDictionary.} = object
`connection`*: seq[PostgreConnectionParameter]
`dataspace`* {.preservesEmbedded.}: EmbeddedRef
PostgreArguments* {.preservesRecord: "postgre".} = object
`field0`*: PostgreArgumentsField0
Tcp* {.preservesRecord: "tcp".} = object
`host`*: string
`port`*: BiggestInt
@ -56,8 +73,11 @@ proc `$`*(x: WebsocketArguments | JsonTranslatorArguments |
JsonTranslatorConnected |
JsonSocketTranslatorArguments |
WebhooksArguments |
FileSystemUsageArguments |
SqliteArguments |
CacheArguments |
PostgreConnectionParameter |
PostgreArguments |
Tcp): string =
`$`(toPreserves(x))
@ -65,7 +85,10 @@ proc encode*(x: WebsocketArguments | JsonTranslatorArguments |
JsonTranslatorConnected |
JsonSocketTranslatorArguments |
WebhooksArguments |
FileSystemUsageArguments |
SqliteArguments |
CacheArguments |
PostgreConnectionParameter |
PostgreArguments |
Tcp): seq[byte] =
encode(toPreserves(x))

View File

@ -5,21 +5,31 @@
import syndicate, syndicate/relays, syndicate/actors/timers
const
withPostgre* {.booldefine.}: bool = true
import ./syndesizer/[
cache_actor,
file_system_usage,
json_socket_translator,
json_translator,
sqlite_actor,
webhooks,
websockets]
when withPostgre:
import ./syndesizer/postgre_actor
runActor("syndesizer") do (turn: var Turn; root: Cap):
connectStdio(turn, root)
discard spawnTimers(turn, root)
discard spawnCacheActor(turn, root)
discard spawnFileSystemUsageActor(turn, root)
discard spawnJsonSocketTranslator(turn, root)
discard spawnJsonStdioTranslator(turn, root)
discard spawnWebhookActor(turn, root)
discard spawnWebsocketActor(turn, root)
when withPostgre:
discard spawnPostgreActor(turn, root)
when withSqlite:
discard spawnSqliteActor(turn, root)

View File

@ -0,0 +1,27 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[dirs, os, paths]
import preserves
import syndicate, syndicate/relays
import ../schema/[assertions, config]
proc spawnFileSystemUsageActor*(turn: var Turn; root: Cap): Actor {.discardable.} =
spawn("file-system-usage", turn) do (turn: var Turn):
during(turn, root, ?:FileSystemUsageArguments) do (ds: Cap):
var pat = ?Observe(pattern: !FileSystemUsage) ?? { 0: grab() }
during(turn, ds, pat) do (lit: Literal[string]):
var ass = FileSystemUsage(path: lit.value)
if fileExists(ass.path): ass.size = getFileSize(ass.path)
else:
for fp in walkDirRec(paths.Path(lit.value), yieldFilter={pcFile}):
var fs = getFileSize(string fp)
inc(ass.size, fs)
discard publish(turn, ds, ass)
# TODO: updates?
when isMainModule:
runActor("main") do (turn: var Turn; root: Cap):
connectStdio(turn, root)
discard spawnFileSystemUsageActor(turn, root)

View File

@ -0,0 +1,83 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import preserves, syndicate, syndicate/relays
import ../schema/[config, sql]
{.passL: "-lpq".}
{.pragma: libpq, header: "libpq-fe.h", importc.}
type
PGconn* {.libpq.} = ptr object
ConnStatusType* {.libpq.} = enum
CONNECTION_OK, CONNECTION_BAD, ## Non-blocking mode only below here
##
## The existence of these should never be relied upon - they should only
## be used for user feedback or similar purposes.
##
CONNECTION_STARTED, ## Waiting for connection to be made.
CONNECTION_MADE, ## Connection OK; waiting to send.
CONNECTION_AWAITING_RESPONSE, ## Waiting for a response from the
## postmaster.
CONNECTION_AUTH_OK, ## Received authentication; waiting for
## backend startup.
CONNECTION_SETENV, ## This state is no longer used.
CONNECTION_SSL_STARTUP, ## Negotiating SSL.
CONNECTION_NEEDED, ## Internal state: connect() needed
CONNECTION_CHECK_WRITABLE, ## Checking if session is read-write.
CONNECTION_CONSUME, ## Consuming any extra messages.
CONNECTION_GSS_STARTUP, ## Negotiating GSSAPI.
CONNECTION_CHECK_TARGET, ## Checking target server properties.
CONNECTION_CHECK_STANDBY ## Checking if server is in standby mode.
proc PQconnectdbParams(
keywords: cstringArray; values: cstringArray; expand_dbname: cint): PGconn {.libpq.}
proc PQerrorMessage(conn: PGconn): cstring {.libpq.}
proc PQfinish(conn: PGconn) {.libpq.}
proc PQstatus(conn: PGconn): ConnStatusType {.libpq.}
# proc PQsocket(conn: PGconn): cint
# proc PQconnectStartParams(
# keywords: cstringArray; values: cstringArray; expand_dbname: cint): PGconn
# TODO: async
type StringPairs = seq[tuple[key: string, val: string]]
proc splitParams(params: StringPairs): (cstringArray, cstringArray) =
var strings = newSeq[string](params.len)
for i, _ in params: strings[i] = params[i][0]
result[0] = allocCStringArray(strings)
for i, _ in params: strings[i] = params[i][1]
result[1] = allocCStringArray(strings)
proc spawnPostgreActor*(turn: var Turn; root: Cap): Actor {.discardable.} =
spawn("postgre", turn) do (turn: var Turn):
during(turn, root, ?:PostgreArguments) do (params: StringPairs, ds: Cap):
var
conn: PGconn
statusHandle: Handle
block:
var (keys, vals) = splitParams(params)
conn = PQconnectdbParams(keys, vals, 0)
deallocCStringArray(keys)
deallocCStringArray(vals)
if conn.isNil: raise newException(OutOfMemDefect, "failed to alloc PQ connection")
let
status = PQstatus(conn)
msg = $PQerrorMessage(conn)
statusHandle = publish(turn, ds,
initRecord("status", toSymbol($status), msg.toPreserves))
if status == CONNECTION_OK:
discard
do:
PQfinish(conn)
when isMainModule:
runActor("main") do (turn: var Turn; root: Cap):
connectStdio(turn, root)
spawnPostgreActor(turn, root)