syndesizer: add actor for reading files

This commit is contained in:
Emery Hemingway 2024-06-27 17:00:18 +03:00
parent 553e5cb7a9
commit bbf7cecbd3
7 changed files with 147 additions and 1 deletions

View File

@ -10,6 +10,12 @@ CacheArguments = <cache {
lifetime: float
}>.
FileSystemStep = <file-system @detail FileSystemDetail> .
FileSystemDetail = {
# iounit: int
root: string
} .
FileSystemUsageArguments = <file-system-usage {
dataspace: #:any
}>.

4
file_system.prs Normal file
View File

@ -0,0 +1,4 @@
version 1 .
embeddedType EntityRef.Cap .
Read = <read @path string @offset int @count int @sink #:bytes> .

View File

@ -7,7 +7,7 @@
"bom-ref": "pkg:nim/syndicate_utils",
"name": "syndicate_utils",
"description": "Utilites for Syndicated Actors and Synit",
"version": "20240625",
"version": "20240627",
"authors": [
{
"name": "Emery Hemingway"

View File

@ -51,6 +51,9 @@ type
HttpClientStepDetail* {.preservesDictionary.} = object
`response-content-type-override`*: string
FileSystemDetail* {.preservesDictionary.} = object
`root`*: string
JsonSocketTranslatorStepField0* {.preservesDictionary.} = object
`socket`*: SocketAddress
@ -108,6 +111,9 @@ type
`host`*: string
`port`*: BiggestInt
FileSystemStep* {.preservesRecord: "file-system".} = object
`detail`*: FileSystemDetail
UnixAddress* {.preservesRecord: "unix".} = object
`path`*: string
@ -121,6 +127,7 @@ proc `$`*(x: PulseStep | JsonTranslatorArguments | SocketAddress | PulseDetail |
SqliteStep |
XsltArguments |
HttpClientStepDetail |
FileSystemDetail |
JsonSocketTranslatorStep |
FileSystemUsageArguments |
HttpClientStep |
@ -132,6 +139,7 @@ proc `$`*(x: PulseStep | JsonTranslatorArguments | SocketAddress | PulseDetail |
PostgreConnectionParameter |
PulseArguments |
Tcp |
FileSystemStep |
UnixAddress |
PrinterStep): string =
`$`(toPreserves(x))
@ -142,6 +150,7 @@ proc encode*(x: PulseStep | JsonTranslatorArguments | SocketAddress |
SqliteStep |
XsltArguments |
HttpClientStepDetail |
FileSystemDetail |
JsonSocketTranslatorStep |
FileSystemUsageArguments |
HttpClientStep |
@ -153,6 +162,7 @@ proc encode*(x: PulseStep | JsonTranslatorArguments | SocketAddress |
PostgreConnectionParameter |
PulseArguments |
Tcp |
FileSystemStep |
UnixAddress |
PrinterStep): seq[byte] =
encode(toPreserves(x))

View File

@ -0,0 +1,23 @@
import
preserves
type
Read* {.preservesRecord: "read".} = object
`path`*: string
`offset`*: BiggestInt
`count`*: BiggestInt
`sink`* {.preservesEmbedded.}: EmbeddedRef
Write* {.preservesRecord: "Write".} = object
`path`*: string
`offset`*: BiggestInt
`count`*: BiggestInt
`data`*: seq[byte]
`written`* {.preservesEmbedded.}: EmbeddedRef
proc `$`*(x: Read | Write): string =
`$`(toPreserves(x))
proc encode*(x: Read | Write): seq[byte] =
encode(toPreserves(x))

View File

@ -8,6 +8,7 @@ import syndicate, syndicate/relays, syndicate/drivers/timers
import ./syndesizer/[
base64_decoder,
cache_actor,
file_systems,
file_system_usage,
http_driver,
json_socket_translator,
@ -20,6 +21,7 @@ runActor("syndesizer") do (turn: Turn):
discard spawnTimerDriver(turn, relay)
discard spawnBase64Decoder(turn, relay)
discard spawnCacheActor(turn, relay)
discard spawnFileSystemActor(turn, relay)
discard spawnFileSystemUsageActor(turn, relay)
discard spawnHttpDriver(turn, relay)
discard spawnJsonSocketTranslator(turn, relay)

View File

@ -0,0 +1,101 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import
std/[cmdline, oserrors, posix],
pkg/sys/[files, ioqueue],
pkg/preserves,
pkg/syndicate,
pkg/syndicate/protocols/gatekeeper,
pkg/syndicate/drivers/timers
from std/os import `/`
import ../schema/[config, file_system]
from pkg/sys/handles import FD
proc echo(args: varargs[string, `$`]) {.used.} =
stderr.writeLine(args)
proc stopForOsError(turn: Turn; cap: Cap) =
message(turn, cap, initRecord("error", osLastError().osErrorMsg().toPreserves))
turn.stopFacet()
proc stopAsOkay(turn: Turn; cap: Cap) =
message(turn, cap, initRecord"ok")
turn.stopFacet()
const iounit = 0x1000
type Buffer = ref seq[byte]
proc newBuffer(n: int): Buffer =
new result
if n < 0: result[].setLen iounit
else: result[].setLen min(n, iounit)
proc read(facet: Facet; file: AsyncFile; count: BiggestInt; buf: Buffer; dst: Cap)
proc readAsync(facet: Facet; file: AsyncFile; count: BiggestInt; buf: Buffer; dst: Cap) {.asyncio.} =
# TODO: optimise
assert count != 0
let n = file.read(buf)
proc deliver(turn: Turn) {.closure.} =
case n
of -1:
turn.stopForOsError(dst)
else:
if n < buf[].len:
buf[].setLen(n)
if n > 0:
message(turn, dst, buf[])
turn.stopAsOkay(dst)
else:
message(turn, dst, buf[])
var count = count
if count != -1:
count = count - n
read(facet, file, count, buf, dst)
facet.run(deliver)
proc read(facet: Facet; file: AsyncFile; count: BiggestInt; buf: Buffer; dst: Cap) =
discard trampoline:
whelp readAsync(facet, file, count, buf, dst)
proc read(facet: Facet; file: AsyncFile; count: BiggestInt; dst: Cap) =
## Call read with a reuseable buffer.
read(facet, file, count, newBuffer(count.int), dst)
proc serve(turn: Turn; detail: FileSystemDetail; ds: Cap) =
during(turn, ds, Read.grabType) do (op: Read):
let dst = op.sink.Cap
let fd = posix.open(detail.root / op.path, O_RDONLY or O_NONBLOCK, 0)
if fd < 0:
turn.stopForOsError(dst)
else:
if op.count == 0:
discard close(fd)
message(turn, dst, initRecord"ok")
turn.facet.stop()
elif posix.lseek(fd, op.offset, SEEK_SET) < 0:
discard close(fd)
turn.stopForOsError(dst)
else:
# fd is hopefully automagically closed.
turn.facet.read(fd.FD.newAsyncFile, op.count, dst)
proc spawnFileSystemActor*(turn: Turn; relay: Cap): Actor {.discardable.} =
spawnActor(turn, "file-system") do (turn: Turn):
let resolvePat = Resolve?:{ 0: FileSystemStep.grabWithin, 1: grab() }
during(turn, relay, resolvePat) do (detail: FileSystemDetail; observer: Cap):
let ds = turn.newDataspace()
serve(turn, detail, ds)
discard publish(turn, observer, ResolvedAccepted(responderSession: ds))
when isMainModule:
import syndicate/relays
runActor("main") do (turn: Turn):
resolveEnvironment(turn) do (turn: Turn; relay: Cap):
spawnFileSystemActor(turn, relay)