syndesizer: absorb websockets actor

This commit is contained in:
Emery Hemingway 2024-01-09 11:35:19 +02:00
parent eeaa80db88
commit d8965a398a
7 changed files with 113 additions and 6 deletions

View File

@ -2,7 +2,10 @@
## Syndesizer
A Syndicate multitool. Includes a number of different actors that become active via configuration.
A Syndicate multitool that includes a number of different actors that become active via configuration.
Think of it as a Busybox for Syndicate, if Busybox was created before POSIX.
Whether you use a single instance for many protocols or many specialized instances is up to you.
### Cache
@ -127,6 +130,28 @@ Request data is formated according to the http schema [defined in syndicate-prot
]
```
### Websockets
connects to a websocket endpoint. During the lifetime of the connection a `<connected $URL>` assertion is made. Messages received from the server are sent to the dataspace wrapped in `<recv …>` records and messages observed as `<send …>` are sent to the server.
```
# Configuration example
<require-service <daemon syndesizer>>
let ?websocketspace = dataspace
? <service-object <daemon syndesizer> ?cap> [
$cap <websocket {
dataspace: $websocketspace
url: "ws://127.0.0.1:5225/"
}>
]
$websocketspace ? <connected $websocketUrl> [
<bind <ref { oid: "websocket" key: #x"" }> $websocketspace #f>
]
```
---
## mintsturdyref

View File

@ -23,5 +23,10 @@ WebhooksArguments = <webhooks {
listen: Tcp
}>.
WebsocketArguments = <websocket {
dataspace: #!any
url: string
}>.
# Reused from syndicate-protocols/transportAddress
Tcp = <tcp @host string @port int>.

View File

@ -82,6 +82,18 @@
"sha256": "0n1gbwllwwilz9fp5zyp4054vzcq1p7ddzg02sw8d0vqb1wmpsqm",
"srcDir": "src",
"url": "https://git.syndicate-lang.org/ehmry/syndicate-nim/archive/3e11884a916c0452c90128c29940856e2d347cb7.tar.gz"
},
{
"method": "fetchzip",
"packages": [
"ws"
],
"path": "/nix/store/zd51j4dphs6h1hyhdbzdv840c8813ai8-source",
"ref": "0.5.0",
"rev": "9536bf99ddf5948db221ccb7bb3663aa238a8e21",
"sha256": "0j8z9jlvzb1h60v7rryvh2wx6vg99lra6i62whf3fknc53l641fz",
"srcDir": "src",
"url": "https://github.com/treeform/ws/archive/9536bf99ddf5948db221ccb7bb3663aa238a8e21.tar.gz"
}
]
}

View File

@ -3,6 +3,13 @@ import
preserves, std/tables
type
WebsocketArgumentsField0* {.preservesDictionary.} = object
`dataspace`* {.preservesEmbedded.}: EmbeddedRef
`url`*: string
WebsocketArguments* {.preservesRecord: "websocket".} = object
`field0`*: WebsocketArgumentsField0
JsonTranslatorArgumentsField0* {.preservesDictionary.} = object
`argv`*: seq[string]
`dataspace`* {.preservesEmbedded.}: EmbeddedRef
@ -38,14 +45,16 @@ type
`host`*: string
`port`*: BiggestInt
proc `$`*(x: JsonTranslatorArguments | JsonTranslatorConnected |
proc `$`*(x: WebsocketArguments | JsonTranslatorArguments |
JsonTranslatorConnected |
JsonSocketTranslatorArguments |
WebhooksArguments |
CacheArguments |
Tcp): string =
`$`(toPreserves(x))
proc encode*(x: JsonTranslatorArguments | JsonTranslatorConnected |
proc encode*(x: WebsocketArguments | JsonTranslatorArguments |
JsonTranslatorConnected |
JsonSocketTranslatorArguments |
WebhooksArguments |
CacheArguments |

View File

@ -6,7 +6,7 @@
import syndicate, syndicate/relays, syndicate/actors/timers
import ./syndesizer/[
cache_actor, json_socket_translator, json_translator, webhooks]
cache_actor, json_socket_translator, json_translator, webhooks, websockets]
runActor("syndesizer") do (turn: var Turn; root: Cap):
connectStdio(turn, root)
@ -15,3 +15,4 @@ runActor("syndesizer") do (turn: var Turn; root: Cap):
discard spawnJsonSocketTranslator(turn, root)
discard spawnJsonStdioTranslator(turn, root)
discard spawnWebhookActor(turn, root)
discard spawnWebsocketActor(turn, root)

View File

@ -0,0 +1,55 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[asyncdispatch, json]
import preserves, preserves/jsonhooks
import syndicate, syndicate/relays
import ws
import ../schema/config, ../json_messages
type WebSocket = ws.WebSocket
# not the object from the transportAddress schema
proc spawnWebsocketActor*(turn: var Turn; root: Cap): Actor =
spawn("websocket-actor", turn) do (turn: var Turn):
during(turn, root, ?:WebsocketArguments) do (ds: Cap, url: string):
let facet = turn.facet
var
ws: WebSocket
connectedHandle: Handle
newWebSocket(url).addCallback(turn) do (turn: var Turn; sock: WebSocket):
ws = sock
let connectedHandle = publish(turn, ds, initRecord("connected", url.toPreserves))
var fut: Future[(Opcode, string)]
proc recvMessage() {.gcsafe.} =
fut = receivePacket ws
addCallback(fut, facet) do (turn: var Turn):
let (opcode, data) = read fut
case opcode
of Text:
message(turn, ds,
RecvJson(data: data.parseJson))
of Binary:
message(turn, ds,
initRecord("recv", cast[seq[byte]](data).toPreserves))
of Ping:
asyncCheck(turn, ws.send(data, Pong))
of Pong, Cont:
discard
of Close:
retract(turn, connectedHandle)
stderr.writeLine "closed connection with ", url
stop(turn)
return
recvMessage()
recvMessage()
onMessage(turn, ds, ?:SendJson) do (data: JsonNode):
asyncCheck(turn, ws.send($data, Text))
do:
close(ws)
when isMainModule:
runActor("main") do (turn: var Turn; root: Cap):
connectStdio(turn, root)
discard spawnWebsocketActor(turn, root)

View File

@ -1,6 +1,6 @@
# Package
version = "20240108"
version = "20240109"
author = "Emery Hemingway"
description = "Utilites for Syndicated Actors and Synit"
license = "unlicense"
@ -10,4 +10,4 @@ bin = @["mintsturdyref", "mount_actor", "msg", "net_mapper", "preserve
# Dependencies
requires "nim >= 2.0.0", "illwill", "syndicate >= 20240108"
requires "nim >= 2.0.0", "illwill", "syndicate >= 20240108", "ws"