Compare commits

...

2 Commits

Author SHA1 Message Date
Emery Hemingway 7f481f9ff6 WiP! Postgre actor 2024-01-10 10:16:42 +02:00
Emery Hemingway 651efc1e6b syndesizer: add file-system-usage 2024-01-09 20:49:54 +02:00
8 changed files with 174 additions and 1 deletions

View File

@ -28,6 +28,23 @@ Example configuration:
$cap <cache { dataspace: $nixspace lifetime: 3600.0 }> ] $cap <cache { dataspace: $nixspace lifetime: 3600.0 }> ]
] ]
``` ```
### File System Usage
Summarize the size of file-system directory. Equivalent to `du -s -b`.
Query the size of a directory in bytes by observing `<file-system-usage "/SOME/PATH" ?size>`.
```
# Configuration example
? <exposed-dataspace ?ds> [
<require-service <daemon syndesizer>>
? <service-object <daemon syndesizer> ?cap> [
$cap <file-system-usage { dataspace: $ds }>
]
]
```
### JSON Socket Translator ### JSON Socket Translator

3
assertions.prs Normal file
View File

@ -0,0 +1,3 @@
version 1.
FileSystemUsage = <file-system-usage @path string @size int>.

View File

@ -6,6 +6,10 @@ CacheArguments = <cache {
lifetime: float lifetime: float
}>. }>.
FileSystemUsageArguments = <file-system-usage {
dataspace: #!any
}>.
JsonTranslatorArguments = <json-stdio-translator { JsonTranslatorArguments = <json-stdio-translator {
argv: [string ...] argv: [string ...]
dataspace: #!any dataspace: #!any
@ -18,6 +22,12 @@ JsonSocketTranslatorArguments = <json-socket-translator {
socket: string socket: string
}>. }>.
PostgreArguments = <postgre {
connection: [PostgreConnectionParameter ...]
dataspace: #!any
}>.
PostgreConnectionParameter = [@key string @val string].
SqliteArguments = <sqlite { SqliteArguments = <sqlite {
database: string database: string
dataspace: #!any dataspace: #!any

View File

@ -3,5 +3,5 @@
pkgs.buildNimPackage { pkgs.buildNimPackage {
name = "dummy"; name = "dummy";
propagatedNativeBuildInputs = [ pkgs.pkg-config ]; propagatedNativeBuildInputs = [ pkgs.pkg-config ];
propagatedBuildInputs = [ pkgs.sqlite ]; propagatedBuildInputs = [ pkgs.postgresql pkgs.sqlite ];
} }

View File

@ -34,6 +34,12 @@ type
WebhooksArguments* {.preservesRecord: "webhooks".} = object WebhooksArguments* {.preservesRecord: "webhooks".} = object
`field0`*: WebhooksArgumentsField0 `field0`*: WebhooksArgumentsField0
FileSystemUsageArgumentsField0* {.preservesDictionary.} = object
`dataspace`* {.preservesEmbedded.}: EmbeddedRef
FileSystemUsageArguments* {.preservesRecord: "file-system-usage".} = object
`field0`*: FileSystemUsageArgumentsField0
SqliteArgumentsField0* {.preservesDictionary.} = object SqliteArgumentsField0* {.preservesDictionary.} = object
`database`*: string `database`*: string
`dataspace`* {.preservesEmbedded.}: EmbeddedRef `dataspace`* {.preservesEmbedded.}: EmbeddedRef
@ -48,6 +54,17 @@ type
CacheArguments* {.preservesRecord: "cache".} = object CacheArguments* {.preservesRecord: "cache".} = object
`field0`*: CacheArgumentsField0 `field0`*: CacheArgumentsField0
PostgreConnectionParameter* {.preservesTuple.} = object
`key`*: string
`val`*: string
PostgreArgumentsField0* {.preservesDictionary.} = object
`connection`*: seq[PostgreConnectionParameter]
`dataspace`* {.preservesEmbedded.}: EmbeddedRef
PostgreArguments* {.preservesRecord: "postgre".} = object
`field0`*: PostgreArgumentsField0
Tcp* {.preservesRecord: "tcp".} = object Tcp* {.preservesRecord: "tcp".} = object
`host`*: string `host`*: string
`port`*: BiggestInt `port`*: BiggestInt
@ -56,8 +73,11 @@ proc `$`*(x: WebsocketArguments | JsonTranslatorArguments |
JsonTranslatorConnected | JsonTranslatorConnected |
JsonSocketTranslatorArguments | JsonSocketTranslatorArguments |
WebhooksArguments | WebhooksArguments |
FileSystemUsageArguments |
SqliteArguments | SqliteArguments |
CacheArguments | CacheArguments |
PostgreConnectionParameter |
PostgreArguments |
Tcp): string = Tcp): string =
`$`(toPreserves(x)) `$`(toPreserves(x))
@ -65,7 +85,10 @@ proc encode*(x: WebsocketArguments | JsonTranslatorArguments |
JsonTranslatorConnected | JsonTranslatorConnected |
JsonSocketTranslatorArguments | JsonSocketTranslatorArguments |
WebhooksArguments | WebhooksArguments |
FileSystemUsageArguments |
SqliteArguments | SqliteArguments |
CacheArguments | CacheArguments |
PostgreConnectionParameter |
PostgreArguments |
Tcp): seq[byte] = Tcp): seq[byte] =
encode(toPreserves(x)) encode(toPreserves(x))

View File

@ -5,21 +5,31 @@
import syndicate, syndicate/relays, syndicate/actors/timers import syndicate, syndicate/relays, syndicate/actors/timers
const
withPostgre* {.booldefine.}: bool = true
import ./syndesizer/[ import ./syndesizer/[
cache_actor, cache_actor,
file_system_usage,
json_socket_translator, json_socket_translator,
json_translator, json_translator,
sqlite_actor, sqlite_actor,
webhooks, webhooks,
websockets] websockets]
when withPostgre:
import ./syndesizer/postgre_actor
runActor("syndesizer") do (turn: var Turn; root: Cap): runActor("syndesizer") do (turn: var Turn; root: Cap):
connectStdio(turn, root) connectStdio(turn, root)
discard spawnTimers(turn, root) discard spawnTimers(turn, root)
discard spawnCacheActor(turn, root) discard spawnCacheActor(turn, root)
discard spawnFileSystemUsageActor(turn, root)
discard spawnJsonSocketTranslator(turn, root) discard spawnJsonSocketTranslator(turn, root)
discard spawnJsonStdioTranslator(turn, root) discard spawnJsonStdioTranslator(turn, root)
discard spawnWebhookActor(turn, root) discard spawnWebhookActor(turn, root)
discard spawnWebsocketActor(turn, root) discard spawnWebsocketActor(turn, root)
when withPostgre:
discard spawnPostgreActor(turn, root)
when withSqlite: when withSqlite:
discard spawnSqliteActor(turn, root) discard spawnSqliteActor(turn, root)

View File

@ -0,0 +1,27 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[dirs, os, paths]
import preserves
import syndicate, syndicate/relays
import ../schema/[assertions, config]
proc spawnFileSystemUsageActor*(turn: var Turn; root: Cap): Actor {.discardable.} =
spawn("file-system-usage", turn) do (turn: var Turn):
during(turn, root, ?:FileSystemUsageArguments) do (ds: Cap):
var pat = ?Observe(pattern: !FileSystemUsage) ?? { 0: grab() }
during(turn, ds, pat) do (lit: Literal[string]):
var ass = FileSystemUsage(path: lit.value)
if fileExists(ass.path): ass.size = getFileSize(ass.path)
else:
for fp in walkDirRec(paths.Path(lit.value), yieldFilter={pcFile}):
var fs = getFileSize(string fp)
inc(ass.size, fs)
discard publish(turn, ds, ass)
# TODO: updates?
when isMainModule:
runActor("main") do (turn: var Turn; root: Cap):
connectStdio(turn, root)
discard spawnFileSystemUsageActor(turn, root)

View File

@ -0,0 +1,83 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import preserves, syndicate, syndicate/relays
import ../schema/[config, sql]
{.passL: "-lpq".}
{.pragma: libpq, header: "libpq-fe.h", importc.}
type
PGconn* {.libpq.} = ptr object
ConnStatusType* {.libpq.} = enum
CONNECTION_OK, CONNECTION_BAD, ## Non-blocking mode only below here
##
## The existence of these should never be relied upon - they should only
## be used for user feedback or similar purposes.
##
CONNECTION_STARTED, ## Waiting for connection to be made.
CONNECTION_MADE, ## Connection OK; waiting to send.
CONNECTION_AWAITING_RESPONSE, ## Waiting for a response from the
## postmaster.
CONNECTION_AUTH_OK, ## Received authentication; waiting for
## backend startup.
CONNECTION_SETENV, ## This state is no longer used.
CONNECTION_SSL_STARTUP, ## Negotiating SSL.
CONNECTION_NEEDED, ## Internal state: connect() needed
CONNECTION_CHECK_WRITABLE, ## Checking if session is read-write.
CONNECTION_CONSUME, ## Consuming any extra messages.
CONNECTION_GSS_STARTUP, ## Negotiating GSSAPI.
CONNECTION_CHECK_TARGET, ## Checking target server properties.
CONNECTION_CHECK_STANDBY ## Checking if server is in standby mode.
proc PQconnectdbParams(
keywords: cstringArray; values: cstringArray; expand_dbname: cint): PGconn {.libpq.}
proc PQerrorMessage(conn: PGconn): cstring {.libpq.}
proc PQfinish(conn: PGconn) {.libpq.}
proc PQstatus(conn: PGconn): ConnStatusType {.libpq.}
# proc PQsocket(conn: PGconn): cint
# proc PQconnectStartParams(
# keywords: cstringArray; values: cstringArray; expand_dbname: cint): PGconn
# TODO: async
type StringPairs = seq[tuple[key: string, val: string]]
proc splitParams(params: StringPairs): (cstringArray, cstringArray) =
var strings = newSeq[string](params.len)
for i, _ in params: strings[i] = params[i][0]
result[0] = allocCStringArray(strings)
for i, _ in params: strings[i] = params[i][1]
result[1] = allocCStringArray(strings)
proc spawnPostgreActor*(turn: var Turn; root: Cap): Actor {.discardable.} =
spawn("postgre", turn) do (turn: var Turn):
during(turn, root, ?:PostgreArguments) do (params: StringPairs, ds: Cap):
var
conn: PGconn
statusHandle: Handle
block:
var (keys, vals) = splitParams(params)
conn = PQconnectdbParams(keys, vals, 0)
deallocCStringArray(keys)
deallocCStringArray(vals)
if conn.isNil: raise newException(OutOfMemDefect, "failed to alloc PQ connection")
let
status = PQstatus(conn)
msg = $PQerrorMessage(conn)
statusHandle = publish(turn, ds,
initRecord("status", toSymbol($status), msg.toPreserves))
if status == CONNECTION_OK:
discard
do:
PQfinish(conn)
when isMainModule:
runActor("main") do (turn: var Turn; root: Cap):
connectStdio(turn, root)
spawnPostgreActor(turn, root)