json_socket_translator: TCP socket support

This commit is contained in:
Emery Hemingway 2024-04-05 14:16:03 +01:00
parent bcc9b7f841
commit 5400e9a79a
4 changed files with 58 additions and 14 deletions

View File

@ -75,7 +75,7 @@ let ?mpvSpace = dataspace
? <service-object <daemon syndesizer> ?cap> [ ? <service-object <daemon syndesizer> ?cap> [
$cap <json-socket-translator { $cap <json-socket-translator {
dataspace: $mpvSpace dataspace: $mpvSpace
socket: "/run/user/1000/mpv.sock" socket: <unix "/run/user/1000/mpv.sock">
}> }>
] ]
] ]

View File

@ -19,11 +19,16 @@ JsonTranslatorArguments = <json-stdio-translator {
dataspace: #:any dataspace: #:any
}>. }>.
JsonTranslatorConnected = <connected @path string>. JsonTranslatorConnected = <connected @address SocketAddress>.
TcpAddress = <tcp @host string @port int>.
UnixAddress = <unix @path string>.
SocketAddress = TcpAddress / UnixAddress .
JsonSocketTranslatorArguments = <json-socket-translator { JsonSocketTranslatorArguments = <json-socket-translator {
dataspace: #:any dataspace: #:any
socket: string socket: SocketAddress
}>. }>.
PostgreArguments = <postgre { PostgreArguments = <postgre {

View File

@ -17,6 +17,17 @@ type
JsonTranslatorArguments* {.preservesRecord: "json-stdio-translator".} = object JsonTranslatorArguments* {.preservesRecord: "json-stdio-translator".} = object
`field0`*: JsonTranslatorArgumentsField0 `field0`*: JsonTranslatorArgumentsField0
SocketAddressKind* {.pure.} = enum
`TcpAddress`, `UnixAddress`
`SocketAddress`* {.preservesOr.} = object
case orKind*: SocketAddressKind
of SocketAddressKind.`TcpAddress`:
`tcpaddress`*: TcpAddress
of SocketAddressKind.`UnixAddress`:
`unixaddress`*: UnixAddress
Base64DecoderArgumentsField0* {.preservesDictionary.} = object Base64DecoderArgumentsField0* {.preservesDictionary.} = object
`dataspace`* {.preservesEmbedded.}: EmbeddedRef `dataspace`* {.preservesEmbedded.}: EmbeddedRef
@ -24,11 +35,11 @@ type
`field0`*: Base64DecoderArgumentsField0 `field0`*: Base64DecoderArgumentsField0
JsonTranslatorConnected* {.preservesRecord: "connected".} = object JsonTranslatorConnected* {.preservesRecord: "connected".} = object
`path`*: string `address`*: SocketAddress
JsonSocketTranslatorArgumentsField0* {.preservesDictionary.} = object JsonSocketTranslatorArgumentsField0* {.preservesDictionary.} = object
`dataspace`* {.preservesEmbedded.}: EmbeddedRef `dataspace`* {.preservesEmbedded.}: EmbeddedRef
`socket`*: string `socket`*: SocketAddress
JsonSocketTranslatorArguments* {.preservesRecord: "json-socket-translator".} = object JsonSocketTranslatorArguments* {.preservesRecord: "json-socket-translator".} = object
`field0`*: JsonSocketTranslatorArgumentsField0 `field0`*: JsonSocketTranslatorArgumentsField0
@ -59,6 +70,10 @@ type
SqliteArguments* {.preservesRecord: "sqlite".} = object SqliteArguments* {.preservesRecord: "sqlite".} = object
`field0`*: SqliteArgumentsField0 `field0`*: SqliteArgumentsField0
TcpAddress* {.preservesRecord: "tcp".} = object
`host`*: string
`port`*: BiggestInt
CacheArgumentsField0* {.preservesDictionary.} = object CacheArgumentsField0* {.preservesDictionary.} = object
`dataspace`* {.preservesEmbedded.}: EmbeddedRef `dataspace`* {.preservesEmbedded.}: EmbeddedRef
`lifetime`*: float `lifetime`*: float
@ -89,11 +104,14 @@ type
PulseArguments* {.preservesRecord: "pulse".} = object PulseArguments* {.preservesRecord: "pulse".} = object
`field0`*: PulseArgumentsField0 `field0`*: PulseArgumentsField0
UnixAddress* {.preservesRecord: "unix".} = object
`path`*: string
Tcp* {.preservesRecord: "tcp".} = object Tcp* {.preservesRecord: "tcp".} = object
`host`*: string `host`*: string
`port`*: BiggestInt `port`*: BiggestInt
proc `$`*(x: WebsocketArguments | JsonTranslatorArguments | proc `$`*(x: WebsocketArguments | JsonTranslatorArguments | SocketAddress |
Base64DecoderArguments | Base64DecoderArguments |
JsonTranslatorConnected | JsonTranslatorConnected |
JsonSocketTranslatorArguments | JsonSocketTranslatorArguments |
@ -101,15 +119,17 @@ proc `$`*(x: WebsocketArguments | JsonTranslatorArguments |
WebhooksArguments | WebhooksArguments |
FileSystemUsageArguments | FileSystemUsageArguments |
SqliteArguments | SqliteArguments |
TcpAddress |
CacheArguments | CacheArguments |
XmlTranslatorArguments | XmlTranslatorArguments |
PostgreConnectionParameter | PostgreConnectionParameter |
PostgreArguments | PostgreArguments |
PulseArguments | PulseArguments |
UnixAddress |
Tcp): string = Tcp): string =
`$`(toPreserves(x)) `$`(toPreserves(x))
proc encode*(x: WebsocketArguments | JsonTranslatorArguments | proc encode*(x: WebsocketArguments | JsonTranslatorArguments | SocketAddress |
Base64DecoderArguments | Base64DecoderArguments |
JsonTranslatorConnected | JsonTranslatorConnected |
JsonSocketTranslatorArguments | JsonSocketTranslatorArguments |
@ -117,10 +137,12 @@ proc encode*(x: WebsocketArguments | JsonTranslatorArguments |
WebhooksArguments | WebhooksArguments |
FileSystemUsageArguments | FileSystemUsageArguments |
SqliteArguments | SqliteArguments |
TcpAddress |
CacheArguments | CacheArguments |
XmlTranslatorArguments | XmlTranslatorArguments |
PostgreConnectionParameter | PostgreConnectionParameter |
PostgreArguments | PostgreArguments |
PulseArguments | PulseArguments |
UnixAddress |
Tcp): seq[byte] = Tcp): seq[byte] =
encode(toPreserves(x)) encode(toPreserves(x))

View File

@ -7,10 +7,9 @@ import preserves, preserves/jsonhooks, syndicate
import ../schema/[config, json_messages] import ../schema/[config, json_messages]
proc translateSocket(facet: Facet; ds: Cap; path: string) {.asyncio.} = template translateSocketBody {.dirty.} =
# Template workaround for CPS and parameterized types.
var var
socket = new AsyncConn[Protocol.Unix]
conn = connectUnixAsync(path)
guard = initGuard(facet) guard = initGuard(facet)
dec = newBufferedDecoder(0) dec = newBufferedDecoder(0)
buf = new string #TODO: get a pointer into the decoder buf = new string #TODO: get a pointer into the decoder
@ -25,7 +24,7 @@ proc translateSocket(facet: Facet; ds: Cap; path: string) {.asyncio.} =
whelp write(socket[], $data & "\n") whelp write(socket[], $data & "\n")
else: else:
stderr.writeLine "dropped send of ", data stderr.writeLine "dropped send of ", data
discard publish(turn, ds, initRecord("connected", path.toPreserves)) discard publish(turn, ds, initRecord("connected", sa.toPreserves))
onStop(facet, kill) onStop(facet, kill)
run(facet, setup) run(facet, setup)
while alive: while alive:
@ -43,15 +42,33 @@ proc translateSocket(facet: Facet; ds: Cap; path: string) {.asyncio.} =
# Closure, not CPS. # Closure, not CPS.
message(turn, ds, initRecord("recv", data.get)) message(turn, ds, initRecord("recv", data.get))
run(facet, send) run(facet, send)
stderr.writeLine "close socket ", path stderr.writeLine "close socket ", sa
close(socket[]) close(socket[])
proc translateSocket(facet: Facet; ds: Cap; sa: TcpAddress) {.asyncio.} =
var
socket = new AsyncConn[Protocol.Tcp]
conn = connectTcpAsync(sa.host, Port sa.port)
socket[] = conn
translateSocketBody()
proc translateSocket(facet: Facet; ds: Cap; sa: UnixAddress) {.asyncio.} =
var
socket = new AsyncConn[Protocol.Unix]
conn = connectUnixAsync(sa.path)
socket[] = conn
translateSocketBody()
proc spawnJsonSocketTranslator*(turn: var Turn; root: Cap): Actor {.discardable.} = proc spawnJsonSocketTranslator*(turn: var Turn; root: Cap): Actor {.discardable.} =
spawnActor(turn, "json-socket-translator") do (turn: var Turn): spawnActor(turn, "json-socket-translator") do (turn: var Turn):
during(turn, root, ?:JsonSocketTranslatorArguments) do (ds: Cap, socketPath: string): during(turn, root, ?:JsonSocketTranslatorArguments) do (ds: Cap, sa: TcpAddress):
linkActor(turn, "json-socket-translator") do (turn: var Turn): linkActor(turn, "json-socket-translator") do (turn: var Turn):
discard trampoline: discard trampoline:
whelp translateSocket(turn.facet, ds, socketPath) whelp translateSocket(turn.facet, ds, sa)
during(turn, root, ?:JsonSocketTranslatorArguments) do (ds: Cap, sa: UnixAddress):
linkActor(turn, "json-socket-translator") do (turn: var Turn):
discard trampoline:
whelp translateSocket(turn.facet, ds, sa)
when isMainModule: when isMainModule:
import syndicate/relays import syndicate/relays