Refactor for Syndicate API update

This commit is contained in:
Emery Hemingway 2023-12-26 01:11:54 +02:00
parent 0d789361d6
commit bba515cdd8
22 changed files with 151 additions and 134 deletions

1
.envrc
View File

@ -1,2 +1 @@
source_env ..
use nix

View File

@ -1,5 +1,17 @@
{
"depends": [
{
"method": "fetchzip",
"packages": [
"bigints"
],
"path": "/nix/store/jvrm392g8adfsgf36prgwkbyd7vh5jsw-source",
"ref": "20231006",
"rev": "86ea14d31eea9275e1408ca34e6bfe9c99989a96",
"sha256": "15pcpmnk1bnw3k8769rjzcpg00nahyrypwbxs88jnwr4aczp99j4",
"srcDir": "src",
"url": "https://github.com/ehmry/nim-bigints/archive/86ea14d31eea9275e1408ca34e6bfe9c99989a96.tar.gz"
},
{
"method": "fetchzip",
"packages": [
@ -52,24 +64,24 @@
"packages": [
"preserves"
],
"path": "/nix/store/fmb2yckksz7iv3qdkk5gk1j060kppkq9-source",
"ref": "20231102",
"rev": "4faeb766dc3945bcfacaa1a836ef6ab29b20ceb0",
"sha256": "1a3g5bk1l1h250q3p6sqv6r1lpsplp330qqyp48r0i4a5r0jksq3",
"path": "/nix/store/fpkhfxnfbdcri6k7mac21r3byg738bs4-source",
"ref": "20240108",
"rev": "a01ba8c96d65f670862ba074bf82b50cbda6ed99",
"sha256": "0n8pghy2qfywx0psr54yzjvhdhi5av204150jyyzfxhigczd8sr4",
"srcDir": "src",
"url": "https://git.syndicate-lang.org/ehmry/preserves-nim/archive/4faeb766dc3945bcfacaa1a836ef6ab29b20ceb0.tar.gz"
"url": "https://git.syndicate-lang.org/ehmry/preserves-nim/archive/a01ba8c96d65f670862ba074bf82b50cbda6ed99.tar.gz"
},
{
"method": "fetchzip",
"packages": [
"syndicate"
],
"path": "/nix/store/nhpvl223vbzdrlzikw7pgyfxs344w7ma-source",
"ref": "20231108",
"rev": "095418032180e360ea27ec7fcd63193944b68e2c",
"sha256": "09pbml2chzz0v5zpz67fs7raj0mfmg8qrih2vz85xxc51h7ncqvw",
"path": "/nix/store/hma19sff6k2bi6qj01yscbynz6x2zvxj-source",
"ref": "20240108",
"rev": "3e11884a916c0452c90128c29940856e2d347cb7",
"sha256": "0n1gbwllwwilz9fp5zyp4054vzcq1p7ddzg02sw8d0vqb1wmpsqm",
"srcDir": "src",
"url": "https://git.syndicate-lang.org/ehmry/syndicate-nim/archive/095418032180e360ea27ec7fcd63193944b68e2c.tar.gz"
"url": "https://git.syndicate-lang.org/ehmry/syndicate-nim/archive/3e11884a916c0452c90128c29940856e2d347cb7.tar.gz"
}
]
}

View File

@ -1,5 +1,5 @@
version 1.
Mountpoint = <mount @source string @target string @type string @status Status> .
Status = Failure / #t.
Status = Failure / @success #t .
Failure = <failure @msg string> .

View File

@ -37,19 +37,24 @@ proc isObserve(pat: Pattern): bool =
pat.dcompound.orKind == DCompoundKind.rec and
pat.dcompound.rec.label.isSymbol"Observe"
runActor("cache_actor") do (turn: var Turn; root: Cap):
spawnTimers(turn, root)
connectStdio(turn, root)
during(turn, root, ?BootArgs) do (ds: Cap, lifetime: float64):
onPublish(turn, ds, ?Observe) do (pat: Pattern, obs: Cap):
var cache: CacheEntity
if obs.relay != turn.facet and not(pat.isObserve):
# Watch pattern if the observer is not us
# and if the pattern isn't a recursive observe
cache = CacheEntity(
timeouts: root,
target: ds,
pattern: pat,
lifetime: lifetime,
)
discard observe(turn, ds, pat, cache)
proc spawnCacheActor*(turn: var Turn; root: Cap): Actor =
spawn("cache_actor", turn) do (turn: var Turn):
during(turn, root, ?:BootArgs) do (ds: Cap, lifetime: float64):
onPublish(turn, ds, ?:Observe) do (pat: Pattern, obs: Cap):
var cache: CacheEntity
if obs.relay != turn.facet and not(pat.isObserve):
# Watch pattern if the observer is not us
# and if the pattern isn't a recursive observe
cache = CacheEntity(
timeouts: root,
target: ds,
pattern: pat,
lifetime: lifetime,
)
discard observe(turn, ds, pat, cache)
when isMainModule:
runActor("cache_actor") do (turn: var Turn; root: Cap):
spawnTimers(turn, root)
connectStdio(turn, root)
discard spawnCacheActor(turn, root)

View File

@ -7,14 +7,11 @@ import std/[asyncdispatch, asyncfile, tables]
import posix, posix/inotify
import preserves
import syndicate, syndicate/[bags, relays]
from syndicate/protocols/dataspace import Observe
import ./schema/inotify_actor
var IN_NONBLOCK {.importc, nodecl.}: cint
type
Observe = dataspace.Observe[Cap]
# Registry = TableRef[cint, TableRef[Cap, HashSet[string]]]
BootArgs {.preservesDictionary.} = object
dataspace: Cap
@ -72,7 +69,7 @@ runActor("inotify_actor") do (root: Cap; turn: var Turn):
let buf = newSeq[byte](8192)
let eventPattern = ?Observe(pattern: !InotifyMessage) ?? { 0: grabLit(), 1: grabLit() }
connectStdio(turn, root)
during(turn, root, ?BootArgs) do (ds: Cap):
during(turn, root, ?:BootArgs) do (ds: Cap):
let inf = inotify_init1(IN_NONBLOCK)
doAssert inf != -1, $inf & " - " & $strerror(errno)
var

View File

@ -4,7 +4,7 @@
import std/json
import preserves, preserves/jsonhooks
export fromPreserveHook, toPreserveHook
export fromPreservesHook, toPreservesHook
# re-export the hooks so that conversion "just works"
type

View File

@ -7,28 +7,33 @@ import preserves, preserves/jsonhooks, syndicate, syndicate/relays
import ./schema/config, ./json_messages
runActor("main") do (root: Cap; turn: var Turn):
connectStdio(turn, root)
during(turn, root, ?JsonSocketTranslatorArguments) do (ds: Cap, socketPath: string):
let socket = newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false,
)
addCallback(connectUnix(socket, socketPath), turn) do (turn: var Turn):
let a = JsonTranslatorConnected(path: socketPath)
discard publish(turn, ds, a)
proc spawnJsonSocketTranslator*(turn: var Turn; root: Cap): Actor =
spawn("json_socket_translator", turn) do (turn: var Turn):
during(turn, root, ?:JsonSocketTranslatorArguments) do (ds: Cap, socketPath: string):
let socket = newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false,
)
addCallback(connectUnix(socket, socketPath), turn) do (turn: var Turn):
let a = JsonTranslatorConnected(path: socketPath)
discard publish(turn, ds, a)
let socketFacet = turn.facet
proc processOutput(fut: Future[string]) {.gcsafe.} =
run(socketFacet) do (turn: var Turn):
var data = fut.read.parseJson
message(turn, ds, RecvJson(data: data))
let socketFacet = turn.facet
proc processOutput(fut: Future[string]) {.gcsafe.} =
run(socketFacet) do (turn: var Turn):
var data = fut.read.parseJson
message(turn, ds, RecvJson(data: data))
socket.recvLine.addCallback(processOutput)
socket.recvLine.addCallback(processOutput)
socket.recvLine.addCallback(processOutput)
onMessage(turn, ds, ?SendJson) do (data: JsonNode):
asyncCheck(turn, send(socket, $data & "\n"))
do:
close(socket)
onMessage(turn, ds, ?:SendJson) do (data: JsonNode):
asyncCheck(turn, send(socket, $data & "\n"))
do:
close(socket)
when isMainModule:
runActor("json_socket_translator") do (turn: var Turn; root: Cap):
connectStdio(turn, root)
discard spawnJsonSocketTranslator(turn, root)

View File

@ -5,7 +5,7 @@ import std/[json, os, osproc]
import preserves
import syndicate, syndicate/relays
from preserves/jsonhooks import toPreserveHook
from preserves/jsonhooks import toPreservesHook
import ./json_messages

View File

@ -2,7 +2,7 @@
# SPDX-License-Identifier: Unlicense
from os import commandLineParams
import preserves, syndicate/capabilities
import preserves, syndicate/capabilities, syndicate/protocols/sturdy
const usage = """
mintsturdyref OID < SECRET_KEY
@ -19,7 +19,7 @@ See:
"""
proc main =
var oids: seq[Preserve[void]]
var oids: seq[Value]
for p in commandLineParams():
case p
of "-h", "--help", "?":
@ -28,7 +28,7 @@ proc main =
add(oids, parsePreserves p)
if oids.len == 0:
stderr.writeLine """using the "syndicate" OID"""
oids.add(toPreserve "syndicate")
oids.add(toPreserves "syndicate")
var key: array[16, byte]
case readBytes(stdin, key, 0, 16)

View File

@ -25,7 +25,7 @@ runActor("mount_actor") do (turn: var Turn; root: Cap):
targetPat = ?Observe(pattern: !Mountpoint) ?? { 1: grabLit() }
sourcePat = ?Observe(pattern: !Mountpoint) ?? { 0: grabLit(), 2: grabLit() }
connectStdio(turn, root)
during(turn, root, ?BootArgs) do (ds: Cap):
during(turn, root, ?:BootArgs) do (ds: Cap):
during(turn, ds, targetPat) do (target: string):
during(turn, ds, sourcePat) do (source: string, fsType: string):
var mountpoint = Mountpoint(
@ -35,7 +35,7 @@ runActor("mount_actor") do (turn: var Turn; root: Cap):
)
var rc = mount(source, target, fsType, 0, nil)
if rc == 0:
mountpoint.status = Status(orKind: StatusKind.true)
mountpoint.status = Status(orKind: StatusKind.success)
else:
mountpoint.status = Status(orKind: StatusKind.Failure)
mountpoint.status.failure.msg = osErrorMsg(osLastError())

View File

@ -15,9 +15,9 @@ var
IP_TTL {.importc, nodecl, header: "<netinet/in.h>".}: int
]#
proc toPreserveHook(address: IpAddress; E: typedesc): Preserve[E] = toPreserve($address, E)
proc toPreservesHook(address: IpAddress): Value = toPreserves($address)
proc fromPreserveHook[E](address: var IpAddress; pr: Preserve[E]): bool =
proc fromPreservesHook(address: var IpAddress; pr: Value): bool =
try:
if pr.isString:
address = parseIpAddress(pr.string)
@ -27,7 +27,7 @@ proc fromPreserveHook[E](address: var IpAddress; pr: Preserve[E]): bool =
when isMainModule:
# verify that the hook catches
var ip: IpAddress
assert fromPreserveHook(ip, toPreserveHook(ip, void))
assert fromPreservesHook(ip, toPreservesHook(ip))
type
IcmpHeader {.packed.} = object
@ -157,7 +157,7 @@ type Args {.preservesDictionary.} = object
runActor("net_mapper") do (root: Cap; turn: var Turn):
connectStdio(turn, root)
let rttObserver = ?Observe(pattern: !RoundTripTime) ?? {0: grabLit()}
during(turn, root, ?Args) do (ds: Cap):
during(turn, root, ?:Args) do (ds: Cap):
during(turn, ds, rttObserver) do (address: IpAddress):
var ping: Pinger
if address.family == IpAddressFamily.IPv4:

View File

@ -16,6 +16,6 @@ proc main =
info.argv = commandLineParams()
for key, val in envPairs(): info.env[key] = val
info.dir = getCurrentDir()
writeLine(stdout, info.toPreserve)
writeLine(stdout, info.toPreserves)
main()

View File

@ -19,7 +19,7 @@ proc main =
resolve(turn, root, route) do (turn: var Turn; ds: Cap):
case paramCount()
of 0:
let pat = ?Options
let pat = ?:Options
onPublish(turn, ds, pat) do (options: seq[string]):
stdout.writeLine options.join("\n")
quit()

View File

@ -7,12 +7,12 @@ type
`path`*: string
JsonSocketTranslatorArguments* {.preservesDictionary.} = object
`dataspace`* {.preservesEmbedded.}: Preserve[void]
`dataspace`* {.preservesEmbedded.}: Value
`socket`*: string
proc `$`*(x: JsonTranslatorConnected | JsonSocketTranslatorArguments): string =
`$`(toPreserve(x))
`$`(toPreserves(x))
proc encode*(x: JsonTranslatorConnected | JsonSocketTranslatorArguments): seq[
byte] =
encode(toPreserve(x))
encode(toPreserves(x))

View File

@ -10,7 +10,7 @@ type
`name`*: string
proc `$`*(x: InotifyMessage): string =
`$`(toPreserve(x))
`$`(toPreserves(x))
proc encode*(x: InotifyMessage): seq[byte] =
encode(toPreserve(x))
encode(toPreserves(x))

View File

@ -13,18 +13,18 @@ type
`status`*: Status
StatusKind* {.pure.} = enum
`Failure`, `true`
`Failure`, `success`
`Status`* {.preservesOr.} = object
case orKind*: StatusKind
of StatusKind.`Failure`:
`failure`*: Failure
of StatusKind.`true`:
`true`* {.preservesLiteral: "#t".}: bool
of StatusKind.`success`:
`success`* {.preservesLiteral: "#t".}: bool
proc `$`*(x: Failure | Mountpoint | Status): string =
`$`(toPreserve(x))
`$`(toPreserves(x))
proc encode*(x: Failure | Mountpoint | Status): seq[byte] =
encode(toPreserve(x))
encode(toPreserves(x))

View File

@ -10,7 +10,7 @@ type
`maximum`*: float32
proc `$`*(x: RoundTripTime): string =
`$`(toPreserve(x))
`$`(toPreserves(x))
proc encode*(x: RoundTripTime): seq[byte] =
encode(toPreserve(x))
encode(toPreserves(x))

View File

@ -12,7 +12,7 @@ type
`options`*: seq[string]
proc `$`*(x: Environment | Select | Options): string =
`$`(toPreserve(x))
`$`(toPreserves(x))
proc encode*(x: Environment | Select | Options): seq[byte] =
encode(toPreserve(x))
encode(toPreserves(x))

View File

@ -3,13 +3,14 @@
## An actor for relaying Webhooks.
import std/[asyncdispatch, asynchttpserver, json, net, sets, strutils, tables, uri]
import std/[asyncdispatch, asynchttpserver, net, strutils, tables, uri]
import preserves, preserves/jsonhooks
import syndicate, syndicate/[bags, relays]
import syndicate/protocols/[http, transportAddress]
type
CapBag = Bag[Cap]
Endpoints = Table[seq[string], Cap]
WebhookArgs {.preservesDictionary.} = object
endpoints: Assertion
@ -19,36 +20,37 @@ type
func splitPath(s: string): seq[string] = s.strip(chars={'/'}).split('/')
proc toRecord(req: Request; seqnum: BiggestInt; path: seq[string]): HttpRequest =
proc toRecord(req: Request; seqnum: BiggestInt; path: seq[string]): Value =
## Convert a request value from the std/asynchttpserver module
## to a request type from syndicate/protocols/http.
result.sequenceNumber = seqnum
result.host = req.hostname
result.`method` = Symbol($req.reqMethod)
result.path = path
var record: HttpRequest
record.sequenceNumber = seqnum
record.host = req.hostname
record.`method` = Symbol($req.reqMethod)
record.path = path
for key, val in req.headers.pairs:
result.headers[Symbol key] = val
record.headers[Symbol key] = val
for key, val in decodeQuery(req.url.query):
result.query[Symbol key] =
record.query[Symbol key] =
@[QueryValue(orKind: QueryValueKind.string, string: val)]
let contentType = req.headers.getOrDefault("content-type")
result = toPreserves record
if req.body.len > 0:
result.body = RequestBody(orKind: RequestBodyKind.present)
case contentType.toString
of "application/json":
var js = parseJson req.body
result.body.present = js.toPreserve
of "application/octet-stream":
result.body.present = cast[seq[byte]](req.body).toPreserve
else:
result.body.present = req.body.toPreserve
result[7] =
case contentType.toString
of "application/json":
req.body.parsePreserves
of "application/octet-stream":
cast[seq[byte]](req.body).toPreserves
else:
req.body.toPreserves
proc bootWebhookActor*(turn: var Turn; root: Cap): Actor =
spawn("webhooks", turn) do (turn: var Turn):
during(turn, root, inject(!BootArgs, {1: grab(), 2: grab()})) do (host: string; port: Port):
var seqNum: BiggestInt
let facet = turn.facet
let endpoints = newTable[seq[string], Bag[Cap]]()
let endpoints = newTable[seq[string], CapBag]()
# use a bag so the same capability registered multiple
# times with the same path does not get duplicate messages
@ -60,10 +62,11 @@ proc bootWebhookActor*(turn: var Turn; root: Cap): Actor =
"no capabilities registered at $1\n" % [req.url.path])
else:
result = respond(req, Http200, "")
run(facet) do (turn: var Turn):
var rec = req.toRecord(seqNum, path)
for cap in endpoints[rec.path]:
proc act(turn: var Turn) {.gcsafe.} =
let rec = req.toRecord(seqNum, path)
for cap in endpoints[path]:
message(turn, cap, rec)
run(facet, act)
let server = newAsyncHttpServer()
if host.isIpAddress:
@ -80,7 +83,7 @@ proc bootWebhookActor*(turn: var Turn; root: Cap): Actor =
during(turn, root, inject(!BootArgs, {0: grab(), 1: ?host, 2: ?port})) do (eps: Endpoints):
for path, cap in eps:
if not endpoints.hasKey path:
endpoints[path] = Bag[Cap]()
endpoints[path] = CapBag()
discard endpoints[path].change(cap, +1)
do:
for path, cap in eps:
@ -88,5 +91,3 @@ proc bootWebhookActor*(turn: var Turn; root: Cap): Actor =
do:
close(server)
# TODO: JSON payloads

View File

@ -15,17 +15,17 @@ proc exitProc() {.noconv.} =
setControlCHook(exitProc)
proc parsePattern(pr: Assertion): Pattern =
proc parsePattern(pr: Value): Pattern =
let
dropSigil = initRecord("lit", "_".toSymbol(Cap))
grabSigil = initRecord("lit", "?".toSymbol(Cap))
var pr = grab(pr).toPreserve(Cap)
apply(pr) do (pr: var Assertion):
dropSigil = initRecord("lit", "_".toSymbol)
grabSigil = initRecord("lit", "?".toSymbol)
var pr = grab(pr).toPreserves
apply(pr) do (pr: var Value):
if pr == dropSigil:
pr = initRecord[Cap]("_")
pr = initRecord("_")
elif pr == grabSigil:
pr = initRecord("bind", initRecord[Cap]("_"))
doAssert result.fromPreserve(pr)
pr = initRecord("bind", initRecord("_"))
doAssert result.fromPreserves(pr)
proc inputPattern: Pattern =
var args = commandLineParams()
@ -36,10 +36,10 @@ proc inputPattern: Pattern =
if input == "":
quit "expected Preserves Pattern on stdin"
else:
var pr: Assertion
try: pr = decodePreserves(input, Cap)
var pr: Value
try: pr = decodePreserves(input)
except ValueError: discard
try: pr = parsePreserves(input, Cap)
try: pr = parsePreserves(input)
except ValueError: discard
if pr.isFalse:
quit "failed to parse Preserves argument"
@ -47,7 +47,7 @@ proc inputPattern: Pattern =
type TermEntity {.final.} = ref object of Entity
pattern: Pattern
value: Assertion
value: Value
method publish(te: TermEntity; turn: var Turn; v: AssertionRef; h: Handle) =
te.value = v.value

View File

@ -2,34 +2,32 @@
# SPDX-License-Identifier: Unlicense
import std/[os, tables]
import preserves
import syndicate,
syndicate/[durings, relays]
import preserves, syndicate, syndicate/[durings, relays]
proc parsePattern(pr: Assertion): Pattern =
proc parsePattern(pr: Value): Pattern =
let
dropSigil = initRecord("lit", "_".toSymbol(Cap))
grabSigil = initRecord("lit", "?".toSymbol(Cap))
var pr = grab(pr).toPreserve(Cap)
apply(pr) do (pr: var Assertion):
dropSigil = initRecord("lit", "_".toSymbol)
grabSigil = initRecord("lit", "?".toSymbol)
var pr = grab(pr).toPreserves
apply(pr) do (pr: var Value):
if pr == dropSigil:
pr = initRecord[Cap]("_")
pr = initRecord("_")
elif pr == grabSigil:
pr = initRecord("bind", initRecord[Cap]("_"))
doAssert result.fromPreserve(pr)
pr = initRecord("bind", initRecord("_"))
doAssert result.fromPreserves(pr)
proc inputPatterns: seq[Pattern] =
var args = commandLineParams()
result.setLen(args.len)
for i, input in args:
try: result[i] = input.parsePreserves(Cap).parsePattern
try: result[i] = input.parsePreserves.parsePattern
except ValueError:
quit "failed to parse Preserves argument"
type DumpEntity {.final.} = ref object of Entity
assertions: Table[Handle, seq[Assertion]]
assertions: Table[Handle, seq[Value]]
proc toLine(values: seq[Assertion]; prefix: char): string =
proc toLine(values: seq[Value]; prefix: char): string =
result = newStringOfCap(1024)
let sep = getEnv("FS", " ")
result.add(prefix)
@ -45,7 +43,7 @@ method publish(dump: DumpEntity; turn: var Turn; ass: AssertionRef; h: Handle) =
dump.assertions[h] = values
method retract(dump: DumpEntity; turn: var Turn; h: Handle) =
var values: seq[Assertion]
var values: seq[Value]
if dump.assertions.pop(h, values):
stdout.write(values.toLine('-'))
stdout.flushFile()

View File

@ -1,6 +1,6 @@
# Package
version = "20231226"
version = "20240108"
author = "Emery Hemingway"
description = "Utilites for Syndicated Actors and Synit"
license = "unlicense"
@ -10,4 +10,4 @@ bin = @["cache_actor", "json_socket_translator", "json_translator", "m
# Dependencies
requires "nim >= 1.6.6", "illwill", "syndicate >= 20231108"
requires "nim >= 2.0.0", "illwill", "syndicate >= 20240108"