diff --git a/README.md b/README.md index 063bc7f..5070903 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ Example configuration: $cap ] ] ``` + ### File System Usage Summarize the size of file-system directory. Equivalent to `du -s -b`. @@ -204,51 +205,6 @@ $tuplespace ? [?id ?name] [ ] ``` -### Webooks - -Listens for webhook requests and sends request data to a dataspace as messages. -Request data is formated according to the http schema [defined in syndicate-protocols](https://git.syndicate-lang.org/syndicate-lang/syndicate-protocols/src/branch/main/schemas/http.prs), with the exception that messages bodies may be **bytes**, **string**, or **any** for the `content-type`s of `application/octet-stream`, `text/*`, and `application/json` respectively. - -``` -# Configuration example -> -? ?cap> [ - $cap - endpoints: { - - # http://0.0.0.0:1048/my-endpoint - ["my-endpoint"]: $target-dataspace - - # http://0.0.0.0:1048/some/multi-element/path - ["some", "multi-element", "path"]: $target-dataspace - - } - }> -] -``` - -### Websockets - -connects to a websocket endpoint. During the lifetime of the connection a `` assertion is made. Messages received from the server are sent to the dataspace wrapped in `` records and messages observed as `` are sent to the server. - -``` -# Configuration example -> - -let ?websocketspace = dataspace - -? ?cap> [ - $cap -] - -$websocketspace ? [ - $websocketspace #f> -] -``` ### XML translator Translates between Preserves and XML according to the [Conventions for Common Data Types](https://preserves.dev/conventions.html). @@ -345,31 +301,6 @@ Sample Syndicate server script: A utility that sends messages to `$SYNDICATE_ROUTE`. - -## net_mapper - -Publishes ICMP packet round-trip-times. See [net_mapper.prs](./net_mapper.prs) for a protocol description. [Source](./src/net_mapper.nim). - -Example script: -``` -? [ - $machine ? [ - $log ! - ] - - $config [ - > - - ? ?cap> [ - $cap { dataspace: $machine } - ] - ] -] -``` - ## preserve_process_environment This utility serializes it's process environment to Preserves and prints it to stdout. diff --git a/src/inotify_actor.nim b/src/inotify_actor.nim deleted file mode 100644 index 6d1dab1..0000000 --- a/src/inotify_actor.nim +++ /dev/null @@ -1,114 +0,0 @@ -# SPDX-FileCopyrightText: ☭ Emery Hemingway -# SPDX-License-Identifier: Unlicense - -## An actor for filesystem monitoring. - -import std/[asyncdispatch, asyncfile, tables] -import posix, posix/inotify -import preserves -import syndicate, syndicate/[bags, relays] -import ./schema/inotify_actor - -var IN_NONBLOCK {.importc, nodecl.}: cint - -type - BootArgs {.preservesDictionary.} = object - dataspace: Cap - -proc toMask(sym: Symbol): uint32 = - case sym.string - of "IN_ACCESS": IN_ACCESS - of "IN_MODIFY": IN_MODIFY - of "IN_ATTRIB": IN_ATTRIB - of "IN_CLOSE_WRITE": IN_CLOSE_WRITE - of "IN_CLOSE_NOWRITE": IN_CLOSE_NOWRITE - of "IN_CLOSE": IN_CLOSE - of "IN_OPEN": IN_OPEN - of "IN_MOVED_FROM": IN_MOVED_FROM - of "IN_MOVED_TO": IN_MOVED_TO - of "IN_MOVE": IN_MOVE - of "IN_CREATE": IN_CREATE - of "IN_DELETE": IN_DELETE - of "IN_DELETE_SELF": IN_DELETE_SELF - of "IN_MOVE_SELF": IN_MOVE_SELF - else: 0 - -func contains(event, bit: uint32): bool = (event and bit) != 0 - -iterator symbols(event: uint32): Symbol = - if event.contains IN_ACCESS: - yield Symbol"IN_ACCESS" - if event.contains IN_MODIFY: - yield Symbol"IN_MODIFY" - if event.contains IN_ATTRIB: - yield Symbol"IN_ATTRIB" - if event.contains IN_CLOSE_WRITE: - yield Symbol"IN_CLOSE_WRITE" - if event.contains IN_CLOSE_NOWRITE: - yield Symbol"IN_CLOSE_NOWRITE" - if event.contains IN_OPEN: - yield Symbol"IN_OPEN" - if event.contains IN_MOVED_FROM: - yield Symbol"IN_MOVED_FROM" - if event.contains IN_MOVED_TO: - yield Symbol"IN_MOVED_TO" - if event.contains IN_CREATE: - yield Symbol"IN_CREATE" - if event.contains IN_DELETE: - yield Symbol"IN_DELETE" - if event.contains IN_DELETE_SELF: - yield Symbol"IN_DELETE_SELF" - if event.contains IN_MOVE_SELF: - yield Symbol"IN_MOVE_SELF" - if event.contains (IN_CLOSE_WRITE or IN_CLOSE_NOWRITE): - yield Symbol"IN_CLOSE" - if event.contains (IN_MOVED_FROM or IN_MOVED_TO): - yield Symbol"IN_MOVE" - -runActor("inotify_actor") do (root: Cap; turn: var Turn): - let buf = newSeq[byte](8192) - let eventPattern = ?Observe(pattern: !InotifyMessage) ?? { 0: grabLit(), 1: grabLit() } - connectStdio(turn, root) - during(turn, root, ?:BootArgs) do (ds: Cap): - let inf = inotify_init1(IN_NONBLOCK) - doAssert inf != -1, $inf & " - " & $strerror(errno) - var - registry = initTable[cint, string]() - watchBag: Bag[cint] - let - anf = newAsyncFile(AsyncFD inf) - facet = turn.facet - var fut: Future[int] - proc readEvents() {.gcsafe.} = - fut = readBuffer(anf, buf[0].addr, buf.len) - addCallback(fut, facet) do (turn: var Turn): - let n = read(fut) - doAssert n > 0 - for event in inotify_events(buf[0].addr, n): - var msg = InotifyMessage(path: registry[event.wd], cookie: event.cookie.BiggestInt) - if event.len > 0: - let n = event.len - msg.name.setLen(n) - copyMem(msg.name[0].addr, event.name.addr, n) - for i, c in msg.name: - if c == '\0': - msg.name.setLen(i) - break - for sym in event.mask.symbols: - msg.event = sym - message(turn, ds, msg) - readEvents() - readEvents() - - during(turn, ds, eventPattern) do (path: string, kind: Symbol): - let wd = inotify_add_watch(inf, path, kind.toMask or IN_MASK_ADD) - doAssert wd > 0, $strerror(errno) - registry[wd] = path - discard watchBag.change(wd, 1) - - do: - if watchBag.change(wd, -1, clamp = true) == cdPresentToAbsent: - discard close(wd) - registry.del(wd) - do: - close(anf) diff --git a/src/net_mapper.nim b/src/net_mapper.nim deleted file mode 100644 index bbd538a..0000000 --- a/src/net_mapper.nim +++ /dev/null @@ -1,167 +0,0 @@ -# SPDX-FileCopyrightText: ☭ Emery Hemingway -# SPDX-License-Identifier: Unlicense - -## A ping utility for Syndicate. - -import std/[asyncdispatch, asyncnet, monotimes, nativesockets, net, os, strutils, tables, times] -import preserves -import syndicate, syndicate/relays - -import ./schema/net_mapper - -#[ -var - SOL_IP {.importc, nodecl, header: "".}: int - IP_TTL {.importc, nodecl, header: "".}: int -]# - -proc toPreservesHook(address: IpAddress): Value = toPreserves($address) - -proc fromPreservesHook(address: var IpAddress; pr: Value): bool = - try: - if pr.isString: - address = parseIpAddress(pr.string) - result = true - except ValueError: discard - -when isMainModule: - # verify that the hook catches - var ip: IpAddress - assert fromPreservesHook(ip, toPreservesHook(ip)) - -type - IcmpHeader {.packed.} = object - `type`: uint8 - code: uint8 - checksum: uint16 - - IcmpEchoFields {.packed.} = object - header: IcmpHeader - identifier: array[2, byte] - sequenceNumber: uint16 - - IcmpEcho {.union.} = object - fields: IcmpEchoFields - buffer: array[8, uint8] - - IcmpTypes = enum - icmpEchoReply = 0, - icmpEcho = 8, - -proc initIcmpEcho(): IcmpEcho = - result.fields.header.`type` = uint8 icmpEcho - # doAssert urandom(result.fields.identifier) # Linux does this? - -proc updateChecksum(msg: var IcmpEcho) = - var sum: uint32 - msg.fields.header.checksum = 0 - for n in cast[array[4, uint16]](msg.buffer): sum = sum + uint32(n) - while (sum and 0xffff0000'u32) != 0: - sum = (sum and 0xffff) + (sum shr 16) - msg.fields.header.checksum = not uint16(sum) - -proc match(a, b: IcmpEchoFields): bool = - ({a.header.type, b.header.type} == {uint8 icmpEcho, uint8 icmpEchoReply}) and - (a.header.code == b.header.code) and - (a.sequenceNumber == b.sequenceNumber) - -type - Pinger = ref object - facet: Facet - ds: Cap - rtt: RoundTripTime - rttHandle: Handle - sum: Duration - count: int64 - msg: IcmpEcho - socket: AsyncSocket - sad: Sockaddr_storage - sadLen: SockLen - interval: Duration - -proc newPinger(address: IpAddress; facet: Facet; ds: Cap): Pinger = - result = Pinger( - facet: facet, - ds: ds, - rtt: RoundTripTime(address: $address), - msg: initIcmpEcho(), - socket: newAsyncSocket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP, false, true), - interval: initDuration(milliseconds = 500)) - toSockAddr(address, Port 0, result.sad, result.sadLen) - # setSockOptInt(getFd socket, SOL_IP, IP_TTL, _) - -proc close(ping: Pinger) = close(ping.socket) - -proc sqr(dur: Duration): Duration = - let us = dur.inMicroseconds - initDuration(microseconds = us * us) - -proc update(ping: Pinger; dur: Duration) {.inline.} = - let secs = dur.inMicroseconds.float / 1_000_000.0 - if ping.count == 0: (ping.rtt.minimum, ping.rtt.maximum) = (secs, secs) - elif secs < ping.rtt.minimum: ping.rtt.minimum = secs - elif secs > ping.rtt.maximum: ping.rtt.maximum = secs - ping.sum = ping.sum + dur - inc ping.count - ping.rtt.average = inMicroseconds(ping.sum div ping.count).float / 1_000_000.0 - -proc exchangeEcho(ping: Pinger) {.async.} = - inc ping.msg.fields.sequenceNumber - # updateChecksum(ping.msg) # Linux does this? - let - a = getMonoTime() - r = sendto(ping.socket.getFd, - unsafeAddr ping.msg.buffer[0], ping.msg.buffer.len, 0, - cast[ptr SockAddr](unsafeAddr ping.sad), # neckbeard loser API - ping.sadLen) - if r == -1'i32: - let osError = osLastError() - raiseOSError(osError) - while true: - var - (data, address, _) = await recvFrom(ping.socket, 128) - b = getMonoTime() - if address != $ping.rtt.address: - stderr.writeLine "want ICMP from ", ping.rtt.address, " but received from ", address, " instead" - elif data.len >= ping.msg.buffer.len: - let - period = b - a - resp = cast[ptr IcmpEcho](unsafeAddr data[0]) - if match(ping.msg.fields, resp.fields): - update(ping, period) - return - else: - stderr.writeLine "ICMP mismatch" - else: - stderr.writeLine "reply data has a bad length ", data.len - -proc kick(ping: Pinger) {.gcsafe.} = - if not ping.socket.isClosed: - addTimer(ping.interval.inMilliseconds.int, oneshot = true) do (fd: AsyncFD) -> bool: - let fut = exchangeEcho(ping) - fut.addCallback do (): - if fut.failed and ping.rttHandle != Handle(0): - ping.facet.run do (turn: var Turn): - retract(turn, ping.rttHandle) - reset ping.rttHandle - else: - ping.facet.run do (turn: var Turn): - replace(turn, ping.ds, ping.rttHandle, ping.rtt) - if ping.interval < initDuration(seconds = 20): - ping.interval = ping.interval * 2 - kick(ping) - -type Args {.preservesDictionary.} = object - dataspace: Cap - -runActor("net_mapper") do (root: Cap; turn: var Turn): - connectStdio(turn, root) - let rttObserver = ?Observe(pattern: !RoundTripTime) ?? {0: grabLit()} - during(turn, root, ?:Args) do (ds: Cap): - during(turn, ds, rttObserver) do (address: IpAddress): - var ping: Pinger - if address.family == IpAddressFamily.IPv4: - ping = newPinger(address, turn.facet, ds) - kick(ping) - do: - if not ping.isNil: close(ping) diff --git a/src/syndesizer.nim b/src/syndesizer.nim index fcf2f5e..0bf209e 100644 --- a/src/syndesizer.nim +++ b/src/syndesizer.nim @@ -15,8 +15,6 @@ import ./syndesizer/[ json_socket_translator, json_translator, pulses, - webhooks, - websockets, xml_translator, xslt_actor] @@ -34,8 +32,6 @@ runActor("syndesizer") do (turn: var Turn; root: Cap): discard spawnJsonSocketTranslator(turn, root) discard spawnJsonStdioTranslator(turn, root) discard spawnPulseActor(turn, root) - discard spawnWebhookActor(turn, root) - discard spawnWebsocketActor(turn, root) discard spawnXmlTranslator(turn, root) discard spawnXsltActor(turn, root) when withPostgre: diff --git a/src/syndesizer/webhooks.nim b/src/syndesizer/webhooks.nim deleted file mode 100644 index 1914a87..0000000 --- a/src/syndesizer/webhooks.nim +++ /dev/null @@ -1,105 +0,0 @@ -# SPDX-FileCopyrightText: ☭ Emery Hemingway -# SPDX-License-Identifier: Unlicense - -## An actor for relaying Webhooks. - -import std/[asyncdispatch, asynchttpserver, net, strutils, tables, uri] - -import preserves, preserves/jsonhooks -import syndicate, syndicate/[bags, relays] -import syndicate/protocols/http - -import ../schema/config - -type - CapBag = Bag[Cap] - Endpoints = Table[seq[string], Cap] - -func splitPath(s: string): seq[string] = s.strip(chars={'/'}).split('/') - -proc toRecord(req: Request; seqnum: BiggestInt; path: seq[string]): Value = - ## Convert a request value from the std/asynchttpserver module - ## to a request type from syndicate/protocols/http. - var record: HttpRequest - record.sequenceNumber = seqnum - record.host = req.hostname - record.`method` = Symbol($req.reqMethod) - record.path = path - for key, val in req.headers.pairs: - record.headers[Symbol key] = val - for key, val in decodeQuery(req.url.query): - record.query[Symbol key] = - @[QueryValue(orKind: QueryValueKind.string, string: val)] - let contentType = req.headers.getOrDefault("content-type") - result = toPreserves record - if req.body.len > 0: - result[7] = - case contentType.toString - of "application/json": - req.body.parsePreserves - of "application/octet-stream": - cast[seq[byte]](req.body).toPreserves - else: - req.body.toPreserves - -proc spawnWebhookActor*(turn: var Turn; root: Cap): Actor = - spawn("webhooks", turn) do (turn: var Turn): - let pat = grabRecord("webhooks", grabDictionary({ "listen": ?:config.Tcp })) - # Grab the details on listening for requests. - # Disregard endpoints so the server doesn't restart as those change. - during(turn, root, pat) do (host: string; port: Port): - let endpointsPat = grabRecord("webhooks", grabDictionary({ - "listen": ?config.Tcp(host: host, port: BiggestInt port), - "endpoints": grab(), - })) - # construct a pattern for grabbing endpoints when the server is ready - var seqNum: BiggestInt - let facet = turn.facet - let endpoints = newTable[seq[string], CapBag]() - # use a bag so the same capability registered multiple - # times with the same path does not get duplicate messages - - proc cb(req: Request): Future[void] = - inc(seqNum) - let path = req.url.path.splitPath - if not endpoints.hasKey path: - result = respond(req, Http404, - "no capabilities registered at $1\n" % [req.url.path]) - else: - result = respond(req, Http200, "") - proc act(turn: var Turn) {.gcsafe.} = - let rec = req.toRecord(seqNum, path) - for cap in endpoints[path]: - message(turn, cap, rec) - run(facet, act) - - let server = newAsyncHttpServer() - stderr.writeLine("listening for webhooks at ", host, ":", port) - if host.isIpAddress: - var ip = parseIpAddress host - case ip.family - of IPv6: - asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET6)) - of IPv4: - asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET)) - else: - asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET6)) - asyncCheck(turn, server.serve(port, cb, host, domain = AF_INET)) - - during(turn, root, endpointsPat) do (eps: Endpoints): - for path, cap in eps: - if not endpoints.hasKey path: - endpoints[path] = CapBag() - discard endpoints[path].change(cap, +1) - do: - for path, cap in eps: - discard endpoints[path].change(cap, -1) - - do: - stderr.writeLine("closing for webhook server at ", host, ":", port) - close(server) - -when isMainModule: - runActor("webhooks") do (turn: var Turn; root: Cap): - connectStdio(turn, root) - discard spawnWebhookActor(turn, root) diff --git a/src/syndesizer/websockets.nim b/src/syndesizer/websockets.nim deleted file mode 100644 index c666041..0000000 --- a/src/syndesizer/websockets.nim +++ /dev/null @@ -1,55 +0,0 @@ -# SPDX-FileCopyrightText: ☭ Emery Hemingway -# SPDX-License-Identifier: Unlicense - -import std/[asyncdispatch, json] -import preserves -import syndicate, syndicate/relays -import ws - -import ../schema/config, ../json_messages - -type WebSocket = ws.WebSocket - # not the object from the transportAddress schema - -proc spawnWebsocketActor*(turn: var Turn; root: Cap): Actor = - spawn("websocket-actor", turn) do (turn: var Turn): - during(turn, root, ?:WebsocketArguments) do (ds: Cap, url: string): - let facet = turn.facet - var - ws: WebSocket - connectedHandle: Handle - newWebSocket(url).addCallback(turn) do (turn: var Turn; sock: WebSocket): - ws = sock - connectedHandle = publish(turn, ds, initRecord("connected", url.toPreserves)) - var fut: Future[(Opcode, string)] - proc recvMessage() {.gcsafe.} = - fut = receivePacket ws - addCallback(fut, facet) do (turn: var Turn): - let (opcode, data) = read fut - case opcode - of Text: - message(turn, ds, - RecvJson(data: data.parseJson)) - of Binary: - message(turn, ds, - initRecord("recv", cast[seq[byte]](data).toPreserves)) - of Ping: - asyncCheck(turn, ws.send(data, Pong)) - of Pong, Cont: - discard - of Close: - retract(turn, connectedHandle) - stderr.writeLine "closed connection with ", url - stop(turn) - return - recvMessage() - recvMessage() - onMessage(turn, ds, ?:SendJson) do (data: JsonNode): - asyncCheck(turn, ws.send($data, Text)) - do: - close(ws) - -when isMainModule: - runActor("main") do (turn: var Turn; root: Cap): - connectStdio(turn, root) - discard spawnWebsocketActor(turn, root) diff --git a/src/syndex_card.nim b/src/syndex_card.nim deleted file mode 100644 index 3e2dcc9..0000000 --- a/src/syndex_card.nim +++ /dev/null @@ -1,133 +0,0 @@ -# SPDX-FileCopyrightText: ☭ Emery Hemingway -# SPDX-License-Identifier: Unlicense - -## This was all Tony's idea, except for the silly name. - -import std/[asyncdispatch, os, terminal] -import preserves -import syndicate, syndicate/[durings, relays] -import illwill - -proc exitProc() {.noconv.} = - illwillDeinit() - showCursor() - quit QuitSuccess - -setControlCHook(exitProc) - -proc parsePattern(pr: Value): Pattern = - let - dropSigil = initRecord("lit", "_".toSymbol) - grabSigil = initRecord("lit", "?".toSymbol) - var pr = grab(pr).toPreserves - apply(pr) do (pr: var Value): - if pr == dropSigil: - pr = initRecord("_") - elif pr == grabSigil: - pr = initRecord("bind", initRecord("_")) - doAssert result.fromPreserves(pr) - -proc inputPattern: Pattern = - var args = commandLineParams() - if args.len != 1: - quit "expected a single pattern argument" - else: - var input = pop args - if input == "": - quit "expected Preserves Pattern on stdin" - else: - var pr: Value - try: pr = decodePreserves(input) - except ValueError: discard - try: pr = parsePreserves(input) - except ValueError: discard - if pr.isFalse: - quit "failed to parse Preserves argument" - result = parsePattern(pr) - -type TermEntity {.final.} = ref object of Entity - pattern: Pattern - value: Value - -method publish(te: TermEntity; turn: var Turn; v: AssertionRef; h: Handle) = - te.value = v.value - var termBuf = newTerminalBuffer(terminalWidth(), terminalHeight()) - var y = 1 - termBuf.write(1, y, $te.pattern, styleBright) - inc(y) - termBuf.drawHorizLine(0, termBuf.width(), y) - inc(y) - termBuf.write(0, y, $h, styleBright) - for i, e in te.value.sequence: - inc(y) - termBuf.write(1, y, $e) - termBuf.display() - -method retract(te: TermEntity; turn: var Turn; h: Handle) = - var termBuf = newTerminalBuffer(terminalWidth(), terminalHeight()) - var y = 1 - termBuf.write(1, y, $te.pattern, styleDim) - inc y - termBuf.drawHorizLine(0, termBuf.width(), y, true) - inc(y) - termBuf.write(0, y, $h, styleBright) - if te.value.isSequence: - for i, e in te.value.sequence: - inc(y) - termBuf.write(1, y, $e) - else: - inc(y) - termBuf.write(1, y, $te.value) - termBuf.display() - -type DumpEntity {.final.} = ref object of Entity - discard - -method publish(dump: DumpEntity; turn: var Turn; ass: AssertionRef; h: Handle) = - stdout.writeLine($ass.value) - stdout.flushFile() - -method message*(dump: DumpEntity; turn: var Turn; ass: AssertionRef) = - stdout.writeLine($ass.value) - stdout.flushFile() - -proc exit {.noconv.} = - illwillDeinit() - showCursor() - quit() - -setControlCHook(exit) - -proc main = - let - route = envRoute() - pat = inputPattern() - - if stdout.is_a_TTY: - illwillInit() - hideCursor() - - discard bootDataspace("syndex_card") do (turn: var Turn; root: Cap): - resolve(turn, root, route) do (turn: var Turn; ds: Cap): - var termBuf = newTerminalBuffer(terminalWidth(), terminalHeight()) - termBuf.write(1, 1, $pat, styleBright) - termBuf.drawHorizLine(1, termBuf.width(), 2) - termBuf.display() - - discard observe(turn, ds, pat, TermEntity(pattern: pat)) - - while true: - try: poll() - except CatchableError: - illwillDeinit() - showCursor() - quit getCurrentExceptionMsg() - - else: - let entity = DumpEntity() - runActor("syndex_card") do (root: Cap; turn: var Turn): - spawnRelays(turn, root) - resolve(turn, root, route) do (turn: var Turn; ds: Cap): - discard observe(turn, ds, pat, entity) - -main() diff --git a/syndicate_utils.nimble b/syndicate_utils.nimble index bafcb1e..9cfb242 100644 --- a/syndicate_utils.nimble +++ b/syndicate_utils.nimble @@ -5,7 +5,7 @@ author = "Emery Hemingway" description = "Utilites for Syndicated Actors and Synit" license = "unlicense" srcDir = "src" -bin = @["mintsturdyref", "mount_actor", "msg", "net_mapper", "preserve_process_environment", "syndesizer", "syndex_card"] +bin = @["mintsturdyref", "mount_actor", "msg", "preserve_process_environment", "rofi_script_actor", "syndesizer"] # Dependencies