From 68a742797ce30ffe4b8bc9c6d047515243b888d1 Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Sun, 16 Oct 2022 11:42:07 -0500 Subject: [PATCH] Use binary Preserves over stdio --- src/syndicate/relays.nim | 37 +++++++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/src/syndicate/relays.nim b/src/syndicate/relays.nim index 6901de8..a7722ec 100644 --- a/src/syndicate/relays.nim +++ b/src/syndicate/relays.nim @@ -1,7 +1,7 @@ # SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway # SPDX-License-Identifier: Unlicense -import std/[asyncdispatch, options, tables] +import std/[asyncdispatch, options, streams, tables] import preserves import ./actors, ./durings, ./membranes, ./protocols/[protocol, sturdy] @@ -232,7 +232,7 @@ proc dispatch(relay: Relay; v: Preserve[WireRef]) = of PacketKind.Extension: discard else: - stderr.writeLine "discarding unparsed packet ", v + stderr.writeLine "discarding undecoded packet ", v type RelayOptions = object of RootObj @@ -350,12 +350,13 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: Connec import std/asyncfile +const stdinReadSize = 128 + proc connectStdio*(ds: Ref; turn: var Turn) = ## Connect to an external dataspace over stdin and stdout. proc stdoutWriter(packet: sink Packet): Future[void] {.async.} = - # var buf = encode(packet) - # doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len - write(stdout, packet) + var buf = encode(packet) + doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len flushFile(stdout) var opts = RelayActorOptions( packetWriter: stdoutWriter, @@ -367,15 +368,27 @@ proc connectStdio*(ds: Ref; turn: var Turn) = asyncStdin = openAsync("/dev/stdin") facet.actor.atExit do (turn: var Turn): close(asyncStdin) + var wireBuf = newStringStream() proc recvCb(pktFut: Future[string]) {.gcsafe.} = - if pktFut.failed: discard - else: - let buf = pktFut.read + if not pktFut.failed: + var buf = pktFut.read if buf.len == 0: run(facet) do (turn: var Turn): stopActor(turn) else: - var v = parsePreserves(buf) - dispatch(relay, cast[Preserve[WireRef]](v)) - callSoon: asyncStdin.readLine().addCallback(recvCb) + var decodePos: int + if wireBuf.atEnd: + wireBuf.setPosition(0) + wireBuf.data = move buf + else: + decodePos = wireBuf.getPosition + wireBuf.data.add(buf) + try: + while not wireBuf.atEnd: + decodePos = wireBuf.getPosition + var pr = decodePreserves(wireBuf, WireRef) + dispatch(relay, pr) + except IOError, ValueError: + wireBuf.setPosition(decodePos) + callSoon: asyncStdin.read(stdinReadSize).addCallback(recvCb) # do not process the next line immediately - asyncStdin.readLine().addCallback(recvCb) + asyncStdin.read(stdinReadSize).addCallback(recvCb)