json_socket_translator: TCP socket support

This commit is contained in:
Emery Hemingway 2024-04-05 14:16:03 +01:00
parent fc9762eb87
commit 920cd28c89
4 changed files with 58 additions and 14 deletions

View File

@ -76,7 +76,7 @@ let ?mpvSpace = dataspace
? <service-object <daemon syndesizer> ?cap> [
$cap <json-socket-translator {
dataspace: $mpvSpace
socket: "/run/user/1000/mpv.sock"
socket: <unix "/run/user/1000/mpv.sock">
}>
]
]

View File

@ -19,11 +19,16 @@ JsonTranslatorArguments = <json-stdio-translator {
dataspace: #:any
}>.
JsonTranslatorConnected = <connected @path string>.
JsonTranslatorConnected = <connected @address SocketAddress>.
TcpAddress = <tcp @host string @port int>.
UnixAddress = <unix @path string>.
SocketAddress = TcpAddress / UnixAddress .
JsonSocketTranslatorArguments = <json-socket-translator {
dataspace: #:any
socket: string
socket: SocketAddress
}>.
PostgreArguments = <postgre {

View File

@ -17,6 +17,17 @@ type
JsonTranslatorArguments* {.preservesRecord: "json-stdio-translator".} = object
`field0`*: JsonTranslatorArgumentsField0
SocketAddressKind* {.pure.} = enum
`TcpAddress`, `UnixAddress`
`SocketAddress`* {.preservesOr.} = object
case orKind*: SocketAddressKind
of SocketAddressKind.`TcpAddress`:
`tcpaddress`*: TcpAddress
of SocketAddressKind.`UnixAddress`:
`unixaddress`*: UnixAddress
Base64DecoderArgumentsField0* {.preservesDictionary.} = object
`dataspace`* {.preservesEmbedded.}: EmbeddedRef
@ -24,11 +35,11 @@ type
`field0`*: Base64DecoderArgumentsField0
JsonTranslatorConnected* {.preservesRecord: "connected".} = object
`path`*: string
`address`*: SocketAddress
JsonSocketTranslatorArgumentsField0* {.preservesDictionary.} = object
`dataspace`* {.preservesEmbedded.}: EmbeddedRef
`socket`*: string
`socket`*: SocketAddress
JsonSocketTranslatorArguments* {.preservesRecord: "json-socket-translator".} = object
`field0`*: JsonSocketTranslatorArgumentsField0
@ -59,6 +70,10 @@ type
SqliteArguments* {.preservesRecord: "sqlite".} = object
`field0`*: SqliteArgumentsField0
TcpAddress* {.preservesRecord: "tcp".} = object
`host`*: string
`port`*: BiggestInt
CacheArgumentsField0* {.preservesDictionary.} = object
`dataspace`* {.preservesEmbedded.}: EmbeddedRef
`lifetime`*: float
@ -89,11 +104,14 @@ type
PulseArguments* {.preservesRecord: "pulse".} = object
`field0`*: PulseArgumentsField0
UnixAddress* {.preservesRecord: "unix".} = object
`path`*: string
Tcp* {.preservesRecord: "tcp".} = object
`host`*: string
`port`*: BiggestInt
proc `$`*(x: WebsocketArguments | JsonTranslatorArguments |
proc `$`*(x: WebsocketArguments | JsonTranslatorArguments | SocketAddress |
Base64DecoderArguments |
JsonTranslatorConnected |
JsonSocketTranslatorArguments |
@ -101,15 +119,17 @@ proc `$`*(x: WebsocketArguments | JsonTranslatorArguments |
WebhooksArguments |
FileSystemUsageArguments |
SqliteArguments |
TcpAddress |
CacheArguments |
XmlTranslatorArguments |
PostgreConnectionParameter |
PostgreArguments |
PulseArguments |
UnixAddress |
Tcp): string =
`$`(toPreserves(x))
proc encode*(x: WebsocketArguments | JsonTranslatorArguments |
proc encode*(x: WebsocketArguments | JsonTranslatorArguments | SocketAddress |
Base64DecoderArguments |
JsonTranslatorConnected |
JsonSocketTranslatorArguments |
@ -117,10 +137,12 @@ proc encode*(x: WebsocketArguments | JsonTranslatorArguments |
WebhooksArguments |
FileSystemUsageArguments |
SqliteArguments |
TcpAddress |
CacheArguments |
XmlTranslatorArguments |
PostgreConnectionParameter |
PostgreArguments |
PulseArguments |
UnixAddress |
Tcp): seq[byte] =
encode(toPreserves(x))

View File

@ -7,10 +7,9 @@ import preserves, preserves/jsonhooks, syndicate
import ../schema/[config, json_messages]
proc translateSocket(facet: Facet; ds: Cap; path: string) {.asyncio.} =
template translateSocketBody {.dirty.} =
# Template workaround for CPS and parameterized types.
var
socket = new AsyncConn[Protocol.Unix]
conn = connectUnixAsync(path)
guard = initGuard(facet)
dec = newBufferedDecoder(0)
buf = new string #TODO: get a pointer into the decoder
@ -25,7 +24,7 @@ proc translateSocket(facet: Facet; ds: Cap; path: string) {.asyncio.} =
whelp write(socket[], $data & "\n")
else:
stderr.writeLine "dropped send of ", data
discard publish(turn, ds, initRecord("connected", path.toPreserves))
discard publish(turn, ds, initRecord("connected", sa.toPreserves))
onStop(facet, kill)
run(facet, setup)
while alive:
@ -43,15 +42,33 @@ proc translateSocket(facet: Facet; ds: Cap; path: string) {.asyncio.} =
# Closure, not CPS.
message(turn, ds, initRecord("recv", data.get))
run(facet, send)
stderr.writeLine "close socket ", path
stderr.writeLine "close socket ", sa
close(socket[])
proc translateSocket(facet: Facet; ds: Cap; sa: TcpAddress) {.asyncio.} =
var
socket = new AsyncConn[Protocol.Tcp]
conn = connectTcpAsync(sa.host, Port sa.port)
socket[] = conn
translateSocketBody()
proc translateSocket(facet: Facet; ds: Cap; sa: UnixAddress) {.asyncio.} =
var
socket = new AsyncConn[Protocol.Unix]
conn = connectUnixAsync(sa.path)
socket[] = conn
translateSocketBody()
proc spawnJsonSocketTranslator*(turn: var Turn; root: Cap): Actor {.discardable.} =
spawnActor(turn, "json-socket-translator") do (turn: var Turn):
during(turn, root, ?:JsonSocketTranslatorArguments) do (ds: Cap, socketPath: string):
during(turn, root, ?:JsonSocketTranslatorArguments) do (ds: Cap, sa: TcpAddress):
linkActor(turn, "json-socket-translator") do (turn: var Turn):
discard trampoline:
whelp translateSocket(turn.facet, ds, socketPath)
whelp translateSocket(turn.facet, ds, sa)
during(turn, root, ?:JsonSocketTranslatorArguments) do (ds: Cap, sa: UnixAddress):
linkActor(turn, "json-socket-translator") do (turn: var Turn):
discard trampoline:
whelp translateSocket(turn.facet, ds, sa)
when isMainModule:
import syndicate/relays