syndicate_utils/src/syndesizer/json_socket_translator.nim

78 lines
2.5 KiB
Nim
Raw Normal View History

2023-05-06 19:08:38 +00:00
# SPDX-FileCopyrightText: ☭ Emery Hemingway
2022-06-09 18:15:13 +00:00
# SPDX-License-Identifier: Unlicense
2024-04-05 13:18:47 +00:00
import std/[json, options]
import pkg/sys/[ioqueue, sockets]
import preserves, preserves/jsonhooks, syndicate
2022-06-09 18:15:13 +00:00
2024-04-02 19:18:11 +00:00
import ../schema/[config, json_messages]
2022-06-09 18:15:13 +00:00
template translateSocketBody {.dirty.} =
# Template workaround for CPS and parameterized types.
2024-04-05 13:18:47 +00:00
var
guard = initGuard(facet)
dec = newBufferedDecoder(0)
buf = new string #TODO: get a pointer into the decoder
alive = true
2024-04-30 11:56:48 +00:00
proc kill(turn: Turn) =
2024-04-05 13:18:47 +00:00
alive = false
2024-04-30 11:56:48 +00:00
proc setup(turn: Turn) =
2024-04-05 13:18:47 +00:00
# Closure, not CPS.
onMessage(turn, ds, ?:SendJson) do (data: JsonNode):
if alive:
discard trampoline:
whelp write(socket[], $data & "\n")
else:
stderr.writeLine "dropped send of ", data
discard publish(turn, ds, initRecord("connected", sa.toPreserves))
2024-04-05 13:18:47 +00:00
onStop(facet, kill)
run(facet, setup)
while alive:
# TODO: parse buffer
buf[].setLen(0x4000)
let n = read(socket[], buf)
if n < 1:
stderr.writeLine "socket read returned ", n
else:
buf[].setLen(n)
dec.feed(buf[])
var data = dec.parse()
if data.isSome:
2024-04-30 11:56:48 +00:00
proc send(turn: Turn) =
2024-04-05 13:18:47 +00:00
# Closure, not CPS.
message(turn, ds, initRecord("recv", data.get))
run(facet, send)
stderr.writeLine "close socket ", sa
2024-04-05 13:18:47 +00:00
close(socket[])
proc translateSocket(facet: Facet; ds: Cap; sa: TcpAddress) {.asyncio.} =
var
socket = new AsyncConn[Protocol.Tcp]
conn = connectTcpAsync(sa.host, Port sa.port)
socket[] = conn
translateSocketBody()
proc translateSocket(facet: Facet; ds: Cap; sa: UnixAddress) {.asyncio.} =
var
socket = new AsyncConn[Protocol.Unix]
conn = connectUnixAsync(sa.path)
socket[] = conn
translateSocketBody()
2024-04-30 11:56:48 +00:00
proc spawnJsonSocketTranslator*(turn: Turn; root: Cap): Actor {.discardable.} =
spawnActor(turn, "json-socket-translator") do (turn: Turn):
during(turn, root, ?:JsonSocketTranslatorArguments) do (ds: Cap, sa: TcpAddress):
2024-04-30 11:56:48 +00:00
linkActor(turn, "json-socket-translator") do (turn: Turn):
discard trampoline:
whelp translateSocket(turn.facet, ds, sa)
during(turn, root, ?:JsonSocketTranslatorArguments) do (ds: Cap, sa: UnixAddress):
2024-04-30 11:56:48 +00:00
linkActor(turn, "json-socket-translator") do (turn: Turn):
2024-04-05 13:18:47 +00:00
discard trampoline:
whelp translateSocket(turn.facet, ds, sa)
2023-12-25 23:11:54 +00:00
when isMainModule:
2024-04-05 13:18:47 +00:00
import syndicate/relays
2024-04-30 11:56:48 +00:00
runActor("main") do (turn: Turn):
resolveEnvironment(turn) do (turn: Turn; ds: Cap):
2024-04-05 13:18:47 +00:00
spawnJsonSocketTranslator(turn, ds)