From eb5d4d9a572022e88e959a319a8f87d63d44ed7d Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Mon, 4 Mar 2024 18:20:29 +0000 Subject: [PATCH] Port relays to nim-sys --- lock.json | 16 +-- src/syndicate/relays.nim | 227 ++++++++++++++++++++++----------------- syndicate.nimble | 4 +- 3 files changed, 139 insertions(+), 108 deletions(-) diff --git a/lock.json b/lock.json index 48a452a..42c77c8 100644 --- a/lock.json +++ b/lock.json @@ -16,11 +16,11 @@ "packages": [ "cps" ], - "path": "/nix/store/m9vpcf3dq6z2h1xpi1vlw0ycxp91s5p7-source", - "rev": "2a4d771a715ba45cfba3a82fa625ae7ad6591c8b", - "sha256": "0c62k5wpq9z9mn8cd4rm8jjc4z0xmnak4piyj5dsfbyj6sbdw2bf", + "path": "/nix/store/452hfhasrn3gl6vijfmzs69djl099j0j-source", + "rev": "b7c179f172e3a256a482a9daee3c0815ea423206", + "sha256": "1sn9s7iv83sw1jl5jgi2h7b0xpgsn13f9icp5124jvbp0qkxskx2", "srcDir": "", - "url": "https://github.com/nim-works/cps/archive/2a4d771a715ba45cfba3a82fa625ae7ad6591c8b.tar.gz" + "url": "https://github.com/nim-works/cps/archive/b7c179f172e3a256a482a9daee3c0815ea423206.tar.gz" }, { "method": "fetchzip", @@ -82,11 +82,11 @@ "packages": [ "sys" ], - "path": "/nix/store/ayplzmq7xdzrp3n6ly6dnskf5c5aiihp-source", - "rev": "3b86a5083a4aa178994fe4ffdc046d340aa13b32", - "sha256": "0qz9hag7synp8sx2b6caazm2kidvd0lv2p0h98sslkyzaf4icnal", + "path": "/nix/store/syhxsjlsdqfap0hk4qp3s6kayk8cqknd-source", + "rev": "4ef3b624db86e331ba334e705c1aa235d55b05e1", + "sha256": "1q4qgw4an4mmmcbx48l6xk1jig1vc8p9cq9dbx39kpnb0890j32q", "srcDir": "src", - "url": "https://github.com/alaviss/nim-sys/archive/3b86a5083a4aa178994fe4ffdc046d340aa13b32.tar.gz" + "url": "https://github.com/ehmry/nim-sys/archive/4ef3b624db86e331ba334e705c1aa235d55b05e1.tar.gz" } ] } diff --git a/src/syndicate/relays.nim b/src/syndicate/relays.nim index 370f901..a91d728 100644 --- a/src/syndicate/relays.nim +++ b/src/syndicate/relays.nim @@ -3,6 +3,7 @@ import std/[options, tables] from std/os import getEnv, `/` +import pkg/sys/ioqueue import preserves import ../syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress] @@ -16,16 +17,15 @@ else: export `$` -type - Oid = sturdy.Oid - export Stdio, Tcp, WebSocket, Unix type Assertion = Value - WireRef = sturdy.WireRef - Turn = syndicate.Turn + Event = protocol.Event Handle = actors.Handle + Oid = sturdy.Oid + Turn = syndicate.Turn + WireRef = sturdy.WireRef PacketWriter = proc (turn: var Turn; buf: seq[byte]) {.closure.} RelaySetup = proc (turn: var Turn; relay: Relay) {.closure.} @@ -38,7 +38,6 @@ type exported: Membrane imported: Membrane nextLocalOid: Oid - pendingTurn: protocol.Turn wireBuf: BufferedDecoder packetWriter: PacketWriter peer: Cap @@ -90,7 +89,7 @@ proc rewriteCapOut(relay: Relay; cap: Cap; exported: var seq[WireSymbol]): WireR mine: WireRefMine(oid: ws.oid)) proc rewriteOut(relay: Relay; v: Assertion): - tuple[rewritten: Value, exported: seq[WireSymbol]] {.closure.} = + tuple[rewritten: Value, exported: seq[WireSymbol]] = var exported: seq[WireSymbol] result.rewritten = mapEmbeds(v) do (pr: Value) -> Value: let o = pr.unembed(Cap); if o.isSome: @@ -108,17 +107,15 @@ proc deregister(relay: Relay; h: Handle) = for e in outbound: releaseCapOut(relay, e) proc send(relay: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) = - if relay.pendingTurn.len == 0: - # If the pending queue is empty then schedule a packet - # to be sent after pending I/O is processed. - callSoon do (): - relay.facet.run do (turn: var Turn): - var pkt = Packet( - orKind: PacketKind.Turn, - turn: move relay.pendingTurn) - trace "C: ", pkt - relay.packetWriter(turn, encode pkt) - relay.pendingTurn.add TurnEvent(oid: rOid, event: m) + # TODO: don't send right away. + var pendingTurn: protocol.Turn + pendingTurn.add TurnEvent(oid: rOid, event: m) + relay.facet.run do (turn: var Turn): + var pkt = Packet( + orKind: PacketKind.Turn, + turn: pendingTurn) + trace "C: ", pkt + relay.packetWriter(turn, encode pkt) proc send(re: RelayEntity; turn: var Turn; ev: Event) = send(re.relay, turn, protocol.Oid re.oid, ev) @@ -250,8 +247,8 @@ proc dispatch(relay: Relay; v: Value) = when defined(posix): stderr.writeLine("discarding undecoded packet ", v) -proc recv(relay: Relay; buf: seq[byte]) = - feed(relay.wireBuf, buf) +proc recv(relay: Relay; buf: openarray[byte]; slice: Slice[int]) = + feed(relay.wireBuf, buf, slice) var pr = decode(relay.wireBuf) if pr.isSome: dispatch(relay, pr.get) @@ -265,7 +262,7 @@ type nextLocalOid*: Option[Oid] proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) = - spawn(name, turn) do (turn: var Turn): + spawnActor(name, turn) do (turn: var Turn): let relay = Relay( facet: turn.facet, packetWriter: opts.packetWriter, @@ -300,16 +297,29 @@ proc accepted(cap: Cap): Resolved = when defined(posix): - import std/asyncfile - export Unix + import std/[oserrors, posix] + import pkg/sys/[files, handles, sockets] + export transportAddress.Unix type StdioControlEntity = ref object of Entity + buf: ref seq[byte] + relay: Relay stdin: AsyncFile method message(entity: StdioControlEntity; turn: var Turn; ass: AssertionRef) = if ass.value.preservesTo(ForceDisconnect).isSome: close(entity.stdin) - close(stdout) + + proc loop(entity: StdioControlEntity) {.asyncio.} = + new entity.buf + entity.buf[].setLen(0x1000) + while true: + let n = read(entity.stdin, entity.buf) + if n == 0: + stderr.writeLine "empty read on stdin, stopping actor" + stopActor(entity.relay.facet) + else: + entity.relay.recv(entity.buf[], 0..= 20231130", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240208", "https://github.com/alaviss/nim-sys.git", "https://github.com/nim-works/cps" +requires "https://github.com/ehmry/hashlib.git >= 20231130", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240208", "https://github.com/ehmry/nim-sys.git#4ef3b624db86e331ba334e705c1aa235d55b05e1", "https://github.com/nim-works/cps"