From a4ba81a481ca67e40dadaccd3390f29fc1c5f2d3 Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Tue, 16 Jan 2024 19:59:34 +0200 Subject: [PATCH] Cleanup relays --- src/syndicate/actors.nim | 1 - src/syndicate/relays.nim | 140 ++++++++++++++++++--------------------- syndicate.nimble | 2 +- 3 files changed, 67 insertions(+), 76 deletions(-) diff --git a/src/syndicate/actors.nim b/src/syndicate/actors.nim index 8a10275..4b93ef8 100644 --- a/src/syndicate/actors.nim +++ b/src/syndicate/actors.nim @@ -601,7 +601,6 @@ method message(entity: SyncContinuation; turn: var Turn; v: AssertionRef) = entity.action(turn) proc sync*(turn: var Turn; refer: Cap; act: TurnAction) = - let e = SyncContinuation(action: act) sync(turn, refer, newCap(turn, SyncContinuation(action: act))) proc running*(actor): bool = diff --git a/src/syndicate/relays.nim b/src/syndicate/relays.nim index 7758631..e1a2311 100644 --- a/src/syndicate/relays.nim +++ b/src/syndicate/relays.nim @@ -28,7 +28,7 @@ type Handle = actors.Handle type - PacketWriter = proc (pkt: sink Packet): Future[void] {.gcsafe.} + PacketHandler = proc (buf: seq[byte]) {.gcsafe.} RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.} Relay* = ref object of RootObj @@ -40,7 +40,8 @@ type imported: Membrane nextLocalOid: Oid pendingTurn: protocol.Turn - packetWriter: PacketWriter + packetSender: PacketHandler + wireBuf: BufferedDecoder untrusted: bool SyncPeerEntity = ref object of Entity @@ -117,8 +118,8 @@ proc send(r: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) = orKind: PacketKind.Turn, turn: move r.pendingTurn) trace "C: ", pkt - assert(not r.packetWriter.isNil, "missing packetWriter proc") - asyncCheck(turn, r.packetWriter(pkt)) + assert(not r.packetSender.isNil, "missing packetSender proc") + r.packetSender(encode pkt) r.pendingTurn.add TurnEvent(oid: rOid, event: m) proc send(re: RelayEntity; turn: var Turn; ev: Event) = @@ -197,7 +198,7 @@ proc rewriteIn(relay; facet; v: Value): proc close(r: Relay) = discard -proc dispatch*(relay: Relay; turn: var Turn; cap: Cap; event: Event) {.gcsafe.} = +proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) {.gcsafe.} = case event.orKind of EventKind.Assert: let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion) @@ -225,7 +226,7 @@ proc dispatch*(relay: Relay; turn: var Turn; cap: Cap; event: Event) {.gcsafe.} for e in imported: relay.imported.del e ]# -proc dispatch*(relay: Relay; v: Value) {.gcsafe.} = +proc dispatch(relay: Relay; v: Value) {.gcsafe.} = trace "S: ", v run(relay.facet) do (t: var Turn): var pkt: Packet @@ -251,9 +252,13 @@ proc dispatch*(relay: Relay; v: Value) {.gcsafe.} = when defined(posix): stderr.writeLine("discarding undecoded packet ", v) +proc dispatch(relay: Relay; buf: seq[byte]) = + feed(relay.wireBuf, buf) + var pr = decode(relay.wireBuf) + if pr.isSome: dispatch(relay, get pr) + type RelayOptions* = object of RootObj - packetWriter*: PacketWriter untrusted*: bool RelayActorOptions* = object of RelayOptions initialOid*: Option[Oid] @@ -263,7 +268,7 @@ type proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay = result = Relay( facet: turn.facet, - packetWriter: opts.packetWriter, + wireBuf: newBufferedDecoder(0), untrusted: opts.untrusted) discard result.facet.preventInertCheck() setup(turn, result) @@ -308,39 +313,63 @@ type ConnectProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.} export Tcp when defined(posix): + import std/asyncfile export Unix - proc connect*(turn: var Turn; ds: Cap; route: Route; addrAss: Assertion; socket: AsyncSocket; step: Value) = - ## Relay a dataspace over an open `AsyncSocket`. - proc socketWriter(packet: sink Packet): Future[void] = - socket.send(cast[string](encode(packet))) + proc newStdioTunnel(facet: Facet; receiver: PacketHandler): PacketHandler = + let asyncStdin = openAsync("/dev/stdin") # this is universal now? + close(stdin) + const readSize = 0x2000 + proc readCb(fut: Future[string]) {.gcsafe.} = + if fut.failed: terminate(facet, fut.error) + else: + receiver(cast[seq[byte]](fut.read)) + asyncStdin.read(readSize).addCallback(readCb) + asyncStdin.read(readSize).addCallback(readCb) + proc sender(buf: seq[byte]) = + try: + if writeBytes(stdout, buf, 0, buf.len) != buf.len: + raise newException(IOError, "failed to write Preserves to stdout") + flushFile(stdout) + except CatchableError as err: + terminate(facet, err) + sender + + proc connectStdio*(turn: var Turn; ds: Cap) = + ## Connect to an external dataspace over stdin and stdout. + var opts = RelayActorOptions( + initialCap: ds, + initialOid: 0.Oid.some) + spawnRelay("stdio", turn, ds, Stdio().toPreserves, opts) do (turn: var Turn; relay: Relay): + proc receiver(buf: seq[byte]) = dispatch(relay, buf) + relay.packetSender = newStdioTunnel(turn.facet, receiver) + + proc newTunnel(facet: Facet; receiver: PacketHandler; socket: AsyncSocket): PacketHandler = const recvSize = 0x2000 + proc recvCb(fut: Future[string]) {.gcsafe.} = + if fut.failed: terminate(facet, fut.error) + else: + receiver(cast[seq[byte]](fut.read)) + if not socket.isClosed: + socket.recv(recvSize).addCallback(recvCb) + socket.recv(recvSize).addCallback(recvCb) + proc sender(buf: seq[byte]) = + asyncCheck(facet, socket.send(cast[string](buf))) + sender + + proc connect*(turn: var Turn; ds: Cap; route: Route; addrAss: Assertion; socket: AsyncSocket; steps: seq[Value]) = + ## Relay a dataspace over an open `AsyncSocket`. var shutdownCap: Cap let reenable = turn.facet.preventInertCheck() connectionClosedCap = newCap(turn, ShutdownEntity()) discard bootActor("socket") do (turn: var Turn): var ops = RelayActorOptions( - packetWriter: socketWriter, initialOid: 0.Oid.some) spawnRelay("socket", turn, ds, addrAss, ops) do (turn: var Turn; relay: Relay): let facet = turn.facet - var wireBuf = newBufferedDecoder(0) - proc recvCb(pktFut: Future[string]) {.gcsafe.} = - if pktFut.failed: - run(facet) do (turn: var Turn): stopActor(turn) - else: - var buf = pktFut.read - if buf.len == 0: - run(facet) do (turn: var Turn): stopActor(turn) - else: - feed(wireBuf, buf) - var (success, pr) = decode(wireBuf) - if success: - dispatch(relay, pr) - if not socket.isClosed: - socket.recv(recvSize).addCallback(recvCb) - socket.recv(recvSize).addCallback(recvCb) + proc receiver(buf: seq[byte]) = dispatch(relay, buf) + relay.packetSender = newTunnel(turn.facet, receiver, socket) turn.facet.actor.atExit do (turn: var Turn): close(socket) discard publish(turn, connectionClosedCap, true) shutdownCap = newCap(turn, ShutdownEntity()) @@ -359,12 +388,12 @@ when defined(posix): stop(turn, facet) result = action var resolve = Resolve( - step: step, + step: steps[0], observer: newCap(turn, during(duringCallback)), ) discard publish(turn, gatekeeper, resolve) - proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Tcp; step: Value) = + proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Tcp; steps: seq[Value]) = ## Relay a dataspace over TCP. let socket = newAsyncSocket( domain = AF_INET, @@ -373,9 +402,9 @@ when defined(posix): buffered = false) let fut = connect(socket, transport.host, Port transport.port) addCallback(fut, turn) do (turn: var Turn): - connect(turn, ds, route, transport.toPreserves, socket, step) + connect(turn, ds, route, transport.toPreserves, socket, steps) - proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Unix; step: Value) = + proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Unix; steps: seq[Value]) = ## Relay a dataspace over a UNIX socket. let socket = newAsyncSocket( domain = AF_UNIX, @@ -384,44 +413,7 @@ when defined(posix): buffered = false) let fut = connectUnix(socket, transport.path) addCallback(fut, turn) do (turn: var Turn): - connect(turn, ds, route, transport.toPreserves, socket, step) - - import std/asyncfile - - const stdinReadSize = 128 - - proc connectStdio*(turn: var Turn; ds: Cap) = - ## Connect to an external dataspace over stdin and stdout. - proc stdoutWriter(packet: sink Packet): Future[void] = - result = newFuture[void]() - var buf = encode(packet) - doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len - flushFile(stdout) - complete result - var opts = RelayActorOptions( - packetWriter: stdoutWriter, - initialCap: ds, - initialOid: 0.Oid.some) - spawnRelay("stdio", turn, ds, Stdio().toPreserves, opts) do (turn: var Turn; relay: Relay): - let - facet = turn.facet - asyncStdin = openAsync("/dev/stdin") # this is universal now? - close(stdin) - facet.actor.atExit do (turn: var Turn): - close(asyncStdin) - var wireBuf = newBufferedDecoder(0) - proc readCb(pktFut: Future[string]) {.gcsafe.} = - if not pktFut.failed: - var buf = pktFut.read - if buf.len == 0: - run(facet) do (turn: var Turn): stopActor(turn) - else: - feed(wireBuf, buf) - var (success, pr) = decode(wireBuf) - if success: - dispatch(relay, pr) - asyncStdin.read(stdinReadSize).addCallback(readCb) - asyncStdin.read(stdinReadSize).addCallback(readCb) + connect(turn, ds, route, transport.toPreserves, socket, steps) type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.} @@ -442,12 +434,12 @@ proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) = tcp: Tcp stdio: Stdio doAssert(route.transports.len == 1, "only a single transport supported for routes") - doAssert(route.pathSteps.len < 2, "multiple path steps not supported for routes") if unix.fromPreserves route.transports[0]: - connect(turn, ds, route, unix, route.pathSteps[0]) + connect(turn, ds, route, unix, route.pathSteps) elif tcp.fromPreserves route.transports[0]: - connect(turn, ds, route, tcp, route.pathSteps[0]) + connect(turn, ds, route, tcp, route.pathSteps) elif stdio.fromPreserves route.transports[0]: + doAssert(route.pathSteps.len == 0, "route steps not available over stdio") connectStdio(turn, ds) bootProc(turn, ds) else: diff --git a/syndicate.nimble b/syndicate.nimble index 501809d..62c0a63 100644 --- a/syndicate.nimble +++ b/syndicate.nimble @@ -9,4 +9,4 @@ srcDir = "src" # Dependencies -requires "https://github.com/ehmry/hashlib.git#f9455d4be988e14e3dc7933eb7cc7d7c4820b7ac", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240108" +requires "https://github.com/ehmry/hashlib.git#f9455d4be988e14e3dc7933eb7cc7d7c4820b7ac", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240116"