Remove actors broken by CPS migration
This commit is contained in:
parent
aa8ff4c364
commit
242bda24e5
71
README.md
71
README.md
|
@ -28,6 +28,7 @@ Example configuration:
|
|||
$cap <cache { dataspace: $nixspace lifetime: 3600.0 }> ]
|
||||
]
|
||||
```
|
||||
|
||||
### File System Usage
|
||||
|
||||
Summarize the size of file-system directory. Equivalent to `du -s -b`.
|
||||
|
@ -204,51 +205,6 @@ $tuplespace ? [?id ?name] [
|
|||
]
|
||||
```
|
||||
|
||||
### Webooks
|
||||
|
||||
Listens for webhook requests and sends request data to a dataspace as messages.
|
||||
Request data is formated according to the http schema [defined in syndicate-protocols](https://git.syndicate-lang.org/syndicate-lang/syndicate-protocols/src/branch/main/schemas/http.prs), with the exception that messages bodies may be **bytes**, **string**, or **any** for the `content-type`s of `application/octet-stream`, `text/*`, and `application/json` respectively.
|
||||
|
||||
```
|
||||
# Configuration example
|
||||
<require-service <daemon syndesizer>>
|
||||
? <service-object <daemon syndesizer> ?cap> [
|
||||
$cap <webhooks {
|
||||
listen: <tcp "0.0.0.0" 1048>
|
||||
endpoints: {
|
||||
|
||||
# http://0.0.0.0:1048/my-endpoint
|
||||
["my-endpoint"]: $target-dataspace
|
||||
|
||||
# http://0.0.0.0:1048/some/multi-element/path
|
||||
["some", "multi-element", "path"]: $target-dataspace
|
||||
|
||||
}
|
||||
}>
|
||||
]
|
||||
```
|
||||
|
||||
### Websockets
|
||||
|
||||
connects to a websocket endpoint. During the lifetime of the connection a `<connected $URL>` assertion is made. Messages received from the server are sent to the dataspace wrapped in `<recv …>` records and messages observed as `<send …>` are sent to the server.
|
||||
|
||||
```
|
||||
# Configuration example
|
||||
<require-service <daemon syndesizer>>
|
||||
|
||||
let ?websocketspace = dataspace
|
||||
|
||||
? <service-object <daemon syndesizer> ?cap> [
|
||||
$cap <websocket {
|
||||
dataspace: $websocketspace
|
||||
url: "ws://127.0.0.1:5225/"
|
||||
}>
|
||||
]
|
||||
|
||||
$websocketspace ? <connected $websocketUrl> [
|
||||
<bind <ref { oid: "websocket" key: #x"" }> $websocketspace #f>
|
||||
]
|
||||
```
|
||||
### XML translator
|
||||
|
||||
Translates between Preserves and XML according to the [Conventions for Common Data Types](https://preserves.dev/conventions.html).
|
||||
|
@ -345,31 +301,6 @@ Sample Syndicate server script:
|
|||
|
||||
A utility that sends messages to `$SYNDICATE_ROUTE`.
|
||||
|
||||
|
||||
## net_mapper
|
||||
|
||||
Publishes ICMP packet round-trip-times. See [net_mapper.prs](./net_mapper.prs) for a protocol description. [Source](./src/net_mapper.nim).
|
||||
|
||||
Example script:
|
||||
```
|
||||
? <machine-dataspace ?machine> [
|
||||
$machine ? <rtt "10.0.33.136" ?min ?avg ?max> [
|
||||
$log ! <log "-" { ping: { min: $min avg: $avg max: $max } }>
|
||||
]
|
||||
|
||||
$config [
|
||||
<require-service <daemon net_mapper>>
|
||||
<daemon net_mapper {
|
||||
argv: ["/bin/net_mapper"]
|
||||
protocol: application/syndicate
|
||||
}>
|
||||
? <service-object <daemon net_mapper> ?cap> [
|
||||
$cap { dataspace: $machine }
|
||||
]
|
||||
]
|
||||
]
|
||||
```
|
||||
|
||||
## preserve_process_environment
|
||||
|
||||
This utility serializes it's process environment to Preserves and prints it to stdout.
|
||||
|
|
|
@ -1,114 +0,0 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
## An actor for filesystem monitoring.
|
||||
|
||||
import std/[asyncdispatch, asyncfile, tables]
|
||||
import posix, posix/inotify
|
||||
import preserves
|
||||
import syndicate, syndicate/[bags, relays]
|
||||
import ./schema/inotify_actor
|
||||
|
||||
var IN_NONBLOCK {.importc, nodecl.}: cint
|
||||
|
||||
type
|
||||
BootArgs {.preservesDictionary.} = object
|
||||
dataspace: Cap
|
||||
|
||||
proc toMask(sym: Symbol): uint32 =
|
||||
case sym.string
|
||||
of "IN_ACCESS": IN_ACCESS
|
||||
of "IN_MODIFY": IN_MODIFY
|
||||
of "IN_ATTRIB": IN_ATTRIB
|
||||
of "IN_CLOSE_WRITE": IN_CLOSE_WRITE
|
||||
of "IN_CLOSE_NOWRITE": IN_CLOSE_NOWRITE
|
||||
of "IN_CLOSE": IN_CLOSE
|
||||
of "IN_OPEN": IN_OPEN
|
||||
of "IN_MOVED_FROM": IN_MOVED_FROM
|
||||
of "IN_MOVED_TO": IN_MOVED_TO
|
||||
of "IN_MOVE": IN_MOVE
|
||||
of "IN_CREATE": IN_CREATE
|
||||
of "IN_DELETE": IN_DELETE
|
||||
of "IN_DELETE_SELF": IN_DELETE_SELF
|
||||
of "IN_MOVE_SELF": IN_MOVE_SELF
|
||||
else: 0
|
||||
|
||||
func contains(event, bit: uint32): bool = (event and bit) != 0
|
||||
|
||||
iterator symbols(event: uint32): Symbol =
|
||||
if event.contains IN_ACCESS:
|
||||
yield Symbol"IN_ACCESS"
|
||||
if event.contains IN_MODIFY:
|
||||
yield Symbol"IN_MODIFY"
|
||||
if event.contains IN_ATTRIB:
|
||||
yield Symbol"IN_ATTRIB"
|
||||
if event.contains IN_CLOSE_WRITE:
|
||||
yield Symbol"IN_CLOSE_WRITE"
|
||||
if event.contains IN_CLOSE_NOWRITE:
|
||||
yield Symbol"IN_CLOSE_NOWRITE"
|
||||
if event.contains IN_OPEN:
|
||||
yield Symbol"IN_OPEN"
|
||||
if event.contains IN_MOVED_FROM:
|
||||
yield Symbol"IN_MOVED_FROM"
|
||||
if event.contains IN_MOVED_TO:
|
||||
yield Symbol"IN_MOVED_TO"
|
||||
if event.contains IN_CREATE:
|
||||
yield Symbol"IN_CREATE"
|
||||
if event.contains IN_DELETE:
|
||||
yield Symbol"IN_DELETE"
|
||||
if event.contains IN_DELETE_SELF:
|
||||
yield Symbol"IN_DELETE_SELF"
|
||||
if event.contains IN_MOVE_SELF:
|
||||
yield Symbol"IN_MOVE_SELF"
|
||||
if event.contains (IN_CLOSE_WRITE or IN_CLOSE_NOWRITE):
|
||||
yield Symbol"IN_CLOSE"
|
||||
if event.contains (IN_MOVED_FROM or IN_MOVED_TO):
|
||||
yield Symbol"IN_MOVE"
|
||||
|
||||
runActor("inotify_actor") do (root: Cap; turn: var Turn):
|
||||
let buf = newSeq[byte](8192)
|
||||
let eventPattern = ?Observe(pattern: !InotifyMessage) ?? { 0: grabLit(), 1: grabLit() }
|
||||
connectStdio(turn, root)
|
||||
during(turn, root, ?:BootArgs) do (ds: Cap):
|
||||
let inf = inotify_init1(IN_NONBLOCK)
|
||||
doAssert inf != -1, $inf & " - " & $strerror(errno)
|
||||
var
|
||||
registry = initTable[cint, string]()
|
||||
watchBag: Bag[cint]
|
||||
let
|
||||
anf = newAsyncFile(AsyncFD inf)
|
||||
facet = turn.facet
|
||||
var fut: Future[int]
|
||||
proc readEvents() {.gcsafe.} =
|
||||
fut = readBuffer(anf, buf[0].addr, buf.len)
|
||||
addCallback(fut, facet) do (turn: var Turn):
|
||||
let n = read(fut)
|
||||
doAssert n > 0
|
||||
for event in inotify_events(buf[0].addr, n):
|
||||
var msg = InotifyMessage(path: registry[event.wd], cookie: event.cookie.BiggestInt)
|
||||
if event.len > 0:
|
||||
let n = event.len
|
||||
msg.name.setLen(n)
|
||||
copyMem(msg.name[0].addr, event.name.addr, n)
|
||||
for i, c in msg.name:
|
||||
if c == '\0':
|
||||
msg.name.setLen(i)
|
||||
break
|
||||
for sym in event.mask.symbols:
|
||||
msg.event = sym
|
||||
message(turn, ds, msg)
|
||||
readEvents()
|
||||
readEvents()
|
||||
|
||||
during(turn, ds, eventPattern) do (path: string, kind: Symbol):
|
||||
let wd = inotify_add_watch(inf, path, kind.toMask or IN_MASK_ADD)
|
||||
doAssert wd > 0, $strerror(errno)
|
||||
registry[wd] = path
|
||||
discard watchBag.change(wd, 1)
|
||||
|
||||
do:
|
||||
if watchBag.change(wd, -1, clamp = true) == cdPresentToAbsent:
|
||||
discard close(wd)
|
||||
registry.del(wd)
|
||||
do:
|
||||
close(anf)
|
|
@ -1,167 +0,0 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
## A ping utility for Syndicate.
|
||||
|
||||
import std/[asyncdispatch, asyncnet, monotimes, nativesockets, net, os, strutils, tables, times]
|
||||
import preserves
|
||||
import syndicate, syndicate/relays
|
||||
|
||||
import ./schema/net_mapper
|
||||
|
||||
#[
|
||||
var
|
||||
SOL_IP {.importc, nodecl, header: "<sys/socket.h>".}: int
|
||||
IP_TTL {.importc, nodecl, header: "<netinet/in.h>".}: int
|
||||
]#
|
||||
|
||||
proc toPreservesHook(address: IpAddress): Value = toPreserves($address)
|
||||
|
||||
proc fromPreservesHook(address: var IpAddress; pr: Value): bool =
|
||||
try:
|
||||
if pr.isString:
|
||||
address = parseIpAddress(pr.string)
|
||||
result = true
|
||||
except ValueError: discard
|
||||
|
||||
when isMainModule:
|
||||
# verify that the hook catches
|
||||
var ip: IpAddress
|
||||
assert fromPreservesHook(ip, toPreservesHook(ip))
|
||||
|
||||
type
|
||||
IcmpHeader {.packed.} = object
|
||||
`type`: uint8
|
||||
code: uint8
|
||||
checksum: uint16
|
||||
|
||||
IcmpEchoFields {.packed.} = object
|
||||
header: IcmpHeader
|
||||
identifier: array[2, byte]
|
||||
sequenceNumber: uint16
|
||||
|
||||
IcmpEcho {.union.} = object
|
||||
fields: IcmpEchoFields
|
||||
buffer: array[8, uint8]
|
||||
|
||||
IcmpTypes = enum
|
||||
icmpEchoReply = 0,
|
||||
icmpEcho = 8,
|
||||
|
||||
proc initIcmpEcho(): IcmpEcho =
|
||||
result.fields.header.`type` = uint8 icmpEcho
|
||||
# doAssert urandom(result.fields.identifier) # Linux does this?
|
||||
|
||||
proc updateChecksum(msg: var IcmpEcho) =
|
||||
var sum: uint32
|
||||
msg.fields.header.checksum = 0
|
||||
for n in cast[array[4, uint16]](msg.buffer): sum = sum + uint32(n)
|
||||
while (sum and 0xffff0000'u32) != 0:
|
||||
sum = (sum and 0xffff) + (sum shr 16)
|
||||
msg.fields.header.checksum = not uint16(sum)
|
||||
|
||||
proc match(a, b: IcmpEchoFields): bool =
|
||||
({a.header.type, b.header.type} == {uint8 icmpEcho, uint8 icmpEchoReply}) and
|
||||
(a.header.code == b.header.code) and
|
||||
(a.sequenceNumber == b.sequenceNumber)
|
||||
|
||||
type
|
||||
Pinger = ref object
|
||||
facet: Facet
|
||||
ds: Cap
|
||||
rtt: RoundTripTime
|
||||
rttHandle: Handle
|
||||
sum: Duration
|
||||
count: int64
|
||||
msg: IcmpEcho
|
||||
socket: AsyncSocket
|
||||
sad: Sockaddr_storage
|
||||
sadLen: SockLen
|
||||
interval: Duration
|
||||
|
||||
proc newPinger(address: IpAddress; facet: Facet; ds: Cap): Pinger =
|
||||
result = Pinger(
|
||||
facet: facet,
|
||||
ds: ds,
|
||||
rtt: RoundTripTime(address: $address),
|
||||
msg: initIcmpEcho(),
|
||||
socket: newAsyncSocket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP, false, true),
|
||||
interval: initDuration(milliseconds = 500))
|
||||
toSockAddr(address, Port 0, result.sad, result.sadLen)
|
||||
# setSockOptInt(getFd socket, SOL_IP, IP_TTL, _)
|
||||
|
||||
proc close(ping: Pinger) = close(ping.socket)
|
||||
|
||||
proc sqr(dur: Duration): Duration =
|
||||
let us = dur.inMicroseconds
|
||||
initDuration(microseconds = us * us)
|
||||
|
||||
proc update(ping: Pinger; dur: Duration) {.inline.} =
|
||||
let secs = dur.inMicroseconds.float / 1_000_000.0
|
||||
if ping.count == 0: (ping.rtt.minimum, ping.rtt.maximum) = (secs, secs)
|
||||
elif secs < ping.rtt.minimum: ping.rtt.minimum = secs
|
||||
elif secs > ping.rtt.maximum: ping.rtt.maximum = secs
|
||||
ping.sum = ping.sum + dur
|
||||
inc ping.count
|
||||
ping.rtt.average = inMicroseconds(ping.sum div ping.count).float / 1_000_000.0
|
||||
|
||||
proc exchangeEcho(ping: Pinger) {.async.} =
|
||||
inc ping.msg.fields.sequenceNumber
|
||||
# updateChecksum(ping.msg) # Linux does this?
|
||||
let
|
||||
a = getMonoTime()
|
||||
r = sendto(ping.socket.getFd,
|
||||
unsafeAddr ping.msg.buffer[0], ping.msg.buffer.len, 0,
|
||||
cast[ptr SockAddr](unsafeAddr ping.sad), # neckbeard loser API
|
||||
ping.sadLen)
|
||||
if r == -1'i32:
|
||||
let osError = osLastError()
|
||||
raiseOSError(osError)
|
||||
while true:
|
||||
var
|
||||
(data, address, _) = await recvFrom(ping.socket, 128)
|
||||
b = getMonoTime()
|
||||
if address != $ping.rtt.address:
|
||||
stderr.writeLine "want ICMP from ", ping.rtt.address, " but received from ", address, " instead"
|
||||
elif data.len >= ping.msg.buffer.len:
|
||||
let
|
||||
period = b - a
|
||||
resp = cast[ptr IcmpEcho](unsafeAddr data[0])
|
||||
if match(ping.msg.fields, resp.fields):
|
||||
update(ping, period)
|
||||
return
|
||||
else:
|
||||
stderr.writeLine "ICMP mismatch"
|
||||
else:
|
||||
stderr.writeLine "reply data has a bad length ", data.len
|
||||
|
||||
proc kick(ping: Pinger) {.gcsafe.} =
|
||||
if not ping.socket.isClosed:
|
||||
addTimer(ping.interval.inMilliseconds.int, oneshot = true) do (fd: AsyncFD) -> bool:
|
||||
let fut = exchangeEcho(ping)
|
||||
fut.addCallback do ():
|
||||
if fut.failed and ping.rttHandle != Handle(0):
|
||||
ping.facet.run do (turn: var Turn):
|
||||
retract(turn, ping.rttHandle)
|
||||
reset ping.rttHandle
|
||||
else:
|
||||
ping.facet.run do (turn: var Turn):
|
||||
replace(turn, ping.ds, ping.rttHandle, ping.rtt)
|
||||
if ping.interval < initDuration(seconds = 20):
|
||||
ping.interval = ping.interval * 2
|
||||
kick(ping)
|
||||
|
||||
type Args {.preservesDictionary.} = object
|
||||
dataspace: Cap
|
||||
|
||||
runActor("net_mapper") do (root: Cap; turn: var Turn):
|
||||
connectStdio(turn, root)
|
||||
let rttObserver = ?Observe(pattern: !RoundTripTime) ?? {0: grabLit()}
|
||||
during(turn, root, ?:Args) do (ds: Cap):
|
||||
during(turn, ds, rttObserver) do (address: IpAddress):
|
||||
var ping: Pinger
|
||||
if address.family == IpAddressFamily.IPv4:
|
||||
ping = newPinger(address, turn.facet, ds)
|
||||
kick(ping)
|
||||
do:
|
||||
if not ping.isNil: close(ping)
|
|
@ -15,8 +15,6 @@ import ./syndesizer/[
|
|||
json_socket_translator,
|
||||
json_translator,
|
||||
pulses,
|
||||
webhooks,
|
||||
websockets,
|
||||
xml_translator,
|
||||
xslt_actor]
|
||||
|
||||
|
@ -34,8 +32,6 @@ runActor("syndesizer") do (turn: var Turn; root: Cap):
|
|||
discard spawnJsonSocketTranslator(turn, root)
|
||||
discard spawnJsonStdioTranslator(turn, root)
|
||||
discard spawnPulseActor(turn, root)
|
||||
discard spawnWebhookActor(turn, root)
|
||||
discard spawnWebsocketActor(turn, root)
|
||||
discard spawnXmlTranslator(turn, root)
|
||||
discard spawnXsltActor(turn, root)
|
||||
when withPostgre:
|
||||
|
|
|
@ -1,105 +0,0 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
## An actor for relaying Webhooks.
|
||||
|
||||
import std/[asyncdispatch, asynchttpserver, net, strutils, tables, uri]
|
||||
|
||||
import preserves, preserves/jsonhooks
|
||||
import syndicate, syndicate/[bags, relays]
|
||||
import syndicate/protocols/http
|
||||
|
||||
import ../schema/config
|
||||
|
||||
type
|
||||
CapBag = Bag[Cap]
|
||||
Endpoints = Table[seq[string], Cap]
|
||||
|
||||
func splitPath(s: string): seq[string] = s.strip(chars={'/'}).split('/')
|
||||
|
||||
proc toRecord(req: Request; seqnum: BiggestInt; path: seq[string]): Value =
|
||||
## Convert a request value from the std/asynchttpserver module
|
||||
## to a request type from syndicate/protocols/http.
|
||||
var record: HttpRequest
|
||||
record.sequenceNumber = seqnum
|
||||
record.host = req.hostname
|
||||
record.`method` = Symbol($req.reqMethod)
|
||||
record.path = path
|
||||
for key, val in req.headers.pairs:
|
||||
record.headers[Symbol key] = val
|
||||
for key, val in decodeQuery(req.url.query):
|
||||
record.query[Symbol key] =
|
||||
@[QueryValue(orKind: QueryValueKind.string, string: val)]
|
||||
let contentType = req.headers.getOrDefault("content-type")
|
||||
result = toPreserves record
|
||||
if req.body.len > 0:
|
||||
result[7] =
|
||||
case contentType.toString
|
||||
of "application/json":
|
||||
req.body.parsePreserves
|
||||
of "application/octet-stream":
|
||||
cast[seq[byte]](req.body).toPreserves
|
||||
else:
|
||||
req.body.toPreserves
|
||||
|
||||
proc spawnWebhookActor*(turn: var Turn; root: Cap): Actor =
|
||||
spawn("webhooks", turn) do (turn: var Turn):
|
||||
let pat = grabRecord("webhooks", grabDictionary({ "listen": ?:config.Tcp }))
|
||||
# Grab the details on listening for requests.
|
||||
# Disregard endpoints so the server doesn't restart as those change.
|
||||
during(turn, root, pat) do (host: string; port: Port):
|
||||
let endpointsPat = grabRecord("webhooks", grabDictionary({
|
||||
"listen": ?config.Tcp(host: host, port: BiggestInt port),
|
||||
"endpoints": grab(),
|
||||
}))
|
||||
# construct a pattern for grabbing endpoints when the server is ready
|
||||
var seqNum: BiggestInt
|
||||
let facet = turn.facet
|
||||
let endpoints = newTable[seq[string], CapBag]()
|
||||
# use a bag so the same capability registered multiple
|
||||
# times with the same path does not get duplicate messages
|
||||
|
||||
proc cb(req: Request): Future[void] =
|
||||
inc(seqNum)
|
||||
let path = req.url.path.splitPath
|
||||
if not endpoints.hasKey path:
|
||||
result = respond(req, Http404,
|
||||
"no capabilities registered at $1\n" % [req.url.path])
|
||||
else:
|
||||
result = respond(req, Http200, "")
|
||||
proc act(turn: var Turn) {.gcsafe.} =
|
||||
let rec = req.toRecord(seqNum, path)
|
||||
for cap in endpoints[path]:
|
||||
message(turn, cap, rec)
|
||||
run(facet, act)
|
||||
|
||||
let server = newAsyncHttpServer()
|
||||
stderr.writeLine("listening for webhooks at ", host, ":", port)
|
||||
if host.isIpAddress:
|
||||
var ip = parseIpAddress host
|
||||
case ip.family
|
||||
of IPv6:
|
||||
asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET6))
|
||||
of IPv4:
|
||||
asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET))
|
||||
else:
|
||||
asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET6))
|
||||
asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET))
|
||||
|
||||
during(turn, root, endpointsPat) do (eps: Endpoints):
|
||||
for path, cap in eps:
|
||||
if not endpoints.hasKey path:
|
||||
endpoints[path] = CapBag()
|
||||
discard endpoints[path].change(cap, +1)
|
||||
do:
|
||||
for path, cap in eps:
|
||||
discard endpoints[path].change(cap, -1)
|
||||
|
||||
do:
|
||||
stderr.writeLine("closing for webhook server at ", host, ":", port)
|
||||
close(server)
|
||||
|
||||
when isMainModule:
|
||||
runActor("webhooks") do (turn: var Turn; root: Cap):
|
||||
connectStdio(turn, root)
|
||||
discard spawnWebhookActor(turn, root)
|
|
@ -1,55 +0,0 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
import std/[asyncdispatch, json]
|
||||
import preserves
|
||||
import syndicate, syndicate/relays
|
||||
import ws
|
||||
|
||||
import ../schema/config, ../json_messages
|
||||
|
||||
type WebSocket = ws.WebSocket
|
||||
# not the object from the transportAddress schema
|
||||
|
||||
proc spawnWebsocketActor*(turn: var Turn; root: Cap): Actor =
|
||||
spawn("websocket-actor", turn) do (turn: var Turn):
|
||||
during(turn, root, ?:WebsocketArguments) do (ds: Cap, url: string):
|
||||
let facet = turn.facet
|
||||
var
|
||||
ws: WebSocket
|
||||
connectedHandle: Handle
|
||||
newWebSocket(url).addCallback(turn) do (turn: var Turn; sock: WebSocket):
|
||||
ws = sock
|
||||
connectedHandle = publish(turn, ds, initRecord("connected", url.toPreserves))
|
||||
var fut: Future[(Opcode, string)]
|
||||
proc recvMessage() {.gcsafe.} =
|
||||
fut = receivePacket ws
|
||||
addCallback(fut, facet) do (turn: var Turn):
|
||||
let (opcode, data) = read fut
|
||||
case opcode
|
||||
of Text:
|
||||
message(turn, ds,
|
||||
RecvJson(data: data.parseJson))
|
||||
of Binary:
|
||||
message(turn, ds,
|
||||
initRecord("recv", cast[seq[byte]](data).toPreserves))
|
||||
of Ping:
|
||||
asyncCheck(turn, ws.send(data, Pong))
|
||||
of Pong, Cont:
|
||||
discard
|
||||
of Close:
|
||||
retract(turn, connectedHandle)
|
||||
stderr.writeLine "closed connection with ", url
|
||||
stop(turn)
|
||||
return
|
||||
recvMessage()
|
||||
recvMessage()
|
||||
onMessage(turn, ds, ?:SendJson) do (data: JsonNode):
|
||||
asyncCheck(turn, ws.send($data, Text))
|
||||
do:
|
||||
close(ws)
|
||||
|
||||
when isMainModule:
|
||||
runActor("main") do (turn: var Turn; root: Cap):
|
||||
connectStdio(turn, root)
|
||||
discard spawnWebsocketActor(turn, root)
|
|
@ -1,133 +0,0 @@
|
|||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||
# SPDX-License-Identifier: Unlicense
|
||||
|
||||
## This was all Tony's idea, except for the silly name.
|
||||
|
||||
import std/[asyncdispatch, os, terminal]
|
||||
import preserves
|
||||
import syndicate, syndicate/[durings, relays]
|
||||
import illwill
|
||||
|
||||
proc exitProc() {.noconv.} =
|
||||
illwillDeinit()
|
||||
showCursor()
|
||||
quit QuitSuccess
|
||||
|
||||
setControlCHook(exitProc)
|
||||
|
||||
proc parsePattern(pr: Value): Pattern =
|
||||
let
|
||||
dropSigil = initRecord("lit", "_".toSymbol)
|
||||
grabSigil = initRecord("lit", "?".toSymbol)
|
||||
var pr = grab(pr).toPreserves
|
||||
apply(pr) do (pr: var Value):
|
||||
if pr == dropSigil:
|
||||
pr = initRecord("_")
|
||||
elif pr == grabSigil:
|
||||
pr = initRecord("bind", initRecord("_"))
|
||||
doAssert result.fromPreserves(pr)
|
||||
|
||||
proc inputPattern: Pattern =
|
||||
var args = commandLineParams()
|
||||
if args.len != 1:
|
||||
quit "expected a single pattern argument"
|
||||
else:
|
||||
var input = pop args
|
||||
if input == "":
|
||||
quit "expected Preserves Pattern on stdin"
|
||||
else:
|
||||
var pr: Value
|
||||
try: pr = decodePreserves(input)
|
||||
except ValueError: discard
|
||||
try: pr = parsePreserves(input)
|
||||
except ValueError: discard
|
||||
if pr.isFalse:
|
||||
quit "failed to parse Preserves argument"
|
||||
result = parsePattern(pr)
|
||||
|
||||
type TermEntity {.final.} = ref object of Entity
|
||||
pattern: Pattern
|
||||
value: Value
|
||||
|
||||
method publish(te: TermEntity; turn: var Turn; v: AssertionRef; h: Handle) =
|
||||
te.value = v.value
|
||||
var termBuf = newTerminalBuffer(terminalWidth(), terminalHeight())
|
||||
var y = 1
|
||||
termBuf.write(1, y, $te.pattern, styleBright)
|
||||
inc(y)
|
||||
termBuf.drawHorizLine(0, termBuf.width(), y)
|
||||
inc(y)
|
||||
termBuf.write(0, y, $h, styleBright)
|
||||
for i, e in te.value.sequence:
|
||||
inc(y)
|
||||
termBuf.write(1, y, $e)
|
||||
termBuf.display()
|
||||
|
||||
method retract(te: TermEntity; turn: var Turn; h: Handle) =
|
||||
var termBuf = newTerminalBuffer(terminalWidth(), terminalHeight())
|
||||
var y = 1
|
||||
termBuf.write(1, y, $te.pattern, styleDim)
|
||||
inc y
|
||||
termBuf.drawHorizLine(0, termBuf.width(), y, true)
|
||||
inc(y)
|
||||
termBuf.write(0, y, $h, styleBright)
|
||||
if te.value.isSequence:
|
||||
for i, e in te.value.sequence:
|
||||
inc(y)
|
||||
termBuf.write(1, y, $e)
|
||||
else:
|
||||
inc(y)
|
||||
termBuf.write(1, y, $te.value)
|
||||
termBuf.display()
|
||||
|
||||
type DumpEntity {.final.} = ref object of Entity
|
||||
discard
|
||||
|
||||
method publish(dump: DumpEntity; turn: var Turn; ass: AssertionRef; h: Handle) =
|
||||
stdout.writeLine($ass.value)
|
||||
stdout.flushFile()
|
||||
|
||||
method message*(dump: DumpEntity; turn: var Turn; ass: AssertionRef) =
|
||||
stdout.writeLine($ass.value)
|
||||
stdout.flushFile()
|
||||
|
||||
proc exit {.noconv.} =
|
||||
illwillDeinit()
|
||||
showCursor()
|
||||
quit()
|
||||
|
||||
setControlCHook(exit)
|
||||
|
||||
proc main =
|
||||
let
|
||||
route = envRoute()
|
||||
pat = inputPattern()
|
||||
|
||||
if stdout.is_a_TTY:
|
||||
illwillInit()
|
||||
hideCursor()
|
||||
|
||||
discard bootDataspace("syndex_card") do (turn: var Turn; root: Cap):
|
||||
resolve(turn, root, route) do (turn: var Turn; ds: Cap):
|
||||
var termBuf = newTerminalBuffer(terminalWidth(), terminalHeight())
|
||||
termBuf.write(1, 1, $pat, styleBright)
|
||||
termBuf.drawHorizLine(1, termBuf.width(), 2)
|
||||
termBuf.display()
|
||||
|
||||
discard observe(turn, ds, pat, TermEntity(pattern: pat))
|
||||
|
||||
while true:
|
||||
try: poll()
|
||||
except CatchableError:
|
||||
illwillDeinit()
|
||||
showCursor()
|
||||
quit getCurrentExceptionMsg()
|
||||
|
||||
else:
|
||||
let entity = DumpEntity()
|
||||
runActor("syndex_card") do (root: Cap; turn: var Turn):
|
||||
spawnRelays(turn, root)
|
||||
resolve(turn, root, route) do (turn: var Turn; ds: Cap):
|
||||
discard observe(turn, ds, pat, entity)
|
||||
|
||||
main()
|
|
@ -5,9 +5,9 @@ author = "Emery Hemingway"
|
|||
description = "Utilites for Syndicated Actors and Synit"
|
||||
license = "unlicense"
|
||||
srcDir = "src"
|
||||
bin = @["mintsturdyref", "mount_actor", "msg", "net_mapper", "preserve_process_environment", "syndesizer", "syndex_card", "syndump"]
|
||||
bin = @["mintsturdyref", "mount_actor", "msg", "preserve_process_environment", "rofi_script_actor", "syndesizer", "syndump"]
|
||||
|
||||
|
||||
# Dependencies
|
||||
|
||||
requires "nim >= 2.0.0", "illwill", "syndicate >= 20240208", "ws"
|
||||
requires "nim >= 2.0.0", "syndicate >= 20240208"
|
||||
|
|
Loading…
Reference in New Issue