Use binary Preserves over stdio
This commit is contained in:
parent
fa5a4a9cbc
commit
68a742797c
|
@ -1,7 +1,7 @@
|
||||||
# SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway
|
# SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway
|
||||||
# SPDX-License-Identifier: Unlicense
|
# SPDX-License-Identifier: Unlicense
|
||||||
|
|
||||||
import std/[asyncdispatch, options, tables]
|
import std/[asyncdispatch, options, streams, tables]
|
||||||
import preserves
|
import preserves
|
||||||
import ./actors, ./durings, ./membranes, ./protocols/[protocol, sturdy]
|
import ./actors, ./durings, ./membranes, ./protocols/[protocol, sturdy]
|
||||||
|
|
||||||
|
@ -232,7 +232,7 @@ proc dispatch(relay: Relay; v: Preserve[WireRef]) =
|
||||||
of PacketKind.Extension:
|
of PacketKind.Extension:
|
||||||
discard
|
discard
|
||||||
else:
|
else:
|
||||||
stderr.writeLine "discarding unparsed packet ", v
|
stderr.writeLine "discarding undecoded packet ", v
|
||||||
|
|
||||||
type
|
type
|
||||||
RelayOptions = object of RootObj
|
RelayOptions = object of RootObj
|
||||||
|
@ -350,12 +350,13 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: Connec
|
||||||
|
|
||||||
import std/asyncfile
|
import std/asyncfile
|
||||||
|
|
||||||
|
const stdinReadSize = 128
|
||||||
|
|
||||||
proc connectStdio*(ds: Ref; turn: var Turn) =
|
proc connectStdio*(ds: Ref; turn: var Turn) =
|
||||||
## Connect to an external dataspace over stdin and stdout.
|
## Connect to an external dataspace over stdin and stdout.
|
||||||
proc stdoutWriter(packet: sink Packet): Future[void] {.async.} =
|
proc stdoutWriter(packet: sink Packet): Future[void] {.async.} =
|
||||||
# var buf = encode(packet)
|
var buf = encode(packet)
|
||||||
# doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len
|
doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len
|
||||||
write(stdout, packet)
|
|
||||||
flushFile(stdout)
|
flushFile(stdout)
|
||||||
var opts = RelayActorOptions(
|
var opts = RelayActorOptions(
|
||||||
packetWriter: stdoutWriter,
|
packetWriter: stdoutWriter,
|
||||||
|
@ -367,15 +368,27 @@ proc connectStdio*(ds: Ref; turn: var Turn) =
|
||||||
asyncStdin = openAsync("/dev/stdin")
|
asyncStdin = openAsync("/dev/stdin")
|
||||||
facet.actor.atExit do (turn: var Turn):
|
facet.actor.atExit do (turn: var Turn):
|
||||||
close(asyncStdin)
|
close(asyncStdin)
|
||||||
|
var wireBuf = newStringStream()
|
||||||
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
|
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
|
||||||
if pktFut.failed: discard
|
if not pktFut.failed:
|
||||||
else:
|
var buf = pktFut.read
|
||||||
let buf = pktFut.read
|
|
||||||
if buf.len == 0:
|
if buf.len == 0:
|
||||||
run(facet) do (turn: var Turn): stopActor(turn)
|
run(facet) do (turn: var Turn): stopActor(turn)
|
||||||
else:
|
else:
|
||||||
var v = parsePreserves(buf)
|
var decodePos: int
|
||||||
dispatch(relay, cast[Preserve[WireRef]](v))
|
if wireBuf.atEnd:
|
||||||
callSoon: asyncStdin.readLine().addCallback(recvCb)
|
wireBuf.setPosition(0)
|
||||||
|
wireBuf.data = move buf
|
||||||
|
else:
|
||||||
|
decodePos = wireBuf.getPosition
|
||||||
|
wireBuf.data.add(buf)
|
||||||
|
try:
|
||||||
|
while not wireBuf.atEnd:
|
||||||
|
decodePos = wireBuf.getPosition
|
||||||
|
var pr = decodePreserves(wireBuf, WireRef)
|
||||||
|
dispatch(relay, pr)
|
||||||
|
except IOError, ValueError:
|
||||||
|
wireBuf.setPosition(decodePos)
|
||||||
|
callSoon: asyncStdin.read(stdinReadSize).addCallback(recvCb)
|
||||||
# do not process the next line immediately
|
# do not process the next line immediately
|
||||||
asyncStdin.readLine().addCallback(recvCb)
|
asyncStdin.read(stdinReadSize).addCallback(recvCb)
|
||||||
|
|
Loading…
Reference in New Issue