Relays: use BufferedDecoder

This commit is contained in:
Emery Hemingway 2022-10-22 18:44:17 -05:00
parent ec60d9c64a
commit d9a3570d6f
1 changed files with 10 additions and 30 deletions

View File

@ -308,7 +308,7 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: Connec
initialOid: 0.Oid.some) initialOid: 0.Oid.some)
let relayFut = spawnRelay("unix", turn, ops) do (turn: var Turn; relay: Relay): let relayFut = spawnRelay("unix", turn, ops) do (turn: var Turn; relay: Relay):
let facet = turn.facet let facet = turn.facet
var wireBuf = newStringStream() var wireBuf = newBufferedDecoder()
proc recvCb(pktFut: Future[string]) {.gcsafe.} = proc recvCb(pktFut: Future[string]) {.gcsafe.} =
if pktFut.failed: if pktFut.failed:
run(facet) do (turn: var Turn): stopActor(turn) run(facet) do (turn: var Turn): stopActor(turn)
@ -317,20 +317,10 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: Connec
if buf.len == 0: if buf.len == 0:
run(facet) do (turn: var Turn): stopActor(turn) run(facet) do (turn: var Turn): stopActor(turn)
else: else:
var decodePos: int feed(wireBuf, buf)
if wireBuf.atEnd: var (success, pr) = decode(wireBuf, WireRef)
wireBuf.setPosition(0) if success:
wireBuf.data = move buf dispatch(relay, pr)
else:
decodePos = wireBuf.getPosition
wireBuf.data.add(buf)
try:
while not wireBuf.atEnd:
decodePos = wireBuf.getPosition
var pr = decodePreserves(wireBuf, WireRef)
dispatch(relay, pr)
except IOError, ValueError:
wireBuf.setPosition(decodePos)
callSoon: callSoon:
socket.recv(recvSize).addCallback(recvCb) socket.recv(recvSize).addCallback(recvCb)
socket.recv(recvSize).addCallback(recvCb) socket.recv(recvSize).addCallback(recvCb)
@ -375,27 +365,17 @@ proc connectStdio*(ds: Ref; turn: var Turn) =
asyncStdin = openAsync("/dev/stdin") asyncStdin = openAsync("/dev/stdin")
facet.actor.atExit do (turn: var Turn): facet.actor.atExit do (turn: var Turn):
close(asyncStdin) close(asyncStdin)
var wireBuf = newStringStream() var wireBuf = newBufferedDecoder()
proc recvCb(pktFut: Future[string]) {.gcsafe.} = proc recvCb(pktFut: Future[string]) {.gcsafe.} =
if not pktFut.failed: if not pktFut.failed:
var buf = pktFut.read var buf = pktFut.read
if buf.len == 0: if buf.len == 0:
run(facet) do (turn: var Turn): stopActor(turn) run(facet) do (turn: var Turn): stopActor(turn)
else: else:
var decodePos: int feed(wireBuf, buf)
if wireBuf.atEnd: var (success, pr) = decode(wireBuf, WireRef)
wireBuf.setPosition(0) if success:
wireBuf.data = move buf dispatch(relay, pr)
else:
decodePos = wireBuf.getPosition
wireBuf.data.add(buf)
try:
while not wireBuf.atEnd:
decodePos = wireBuf.getPosition
var pr = decodePreserves(wireBuf, WireRef)
dispatch(relay, pr)
except IOError, ValueError:
wireBuf.setPosition(decodePos)
callSoon: asyncStdin.read(stdinReadSize).addCallback(recvCb) callSoon: asyncStdin.read(stdinReadSize).addCallback(recvCb)
# do not process the next line immediately # do not process the next line immediately
asyncStdin.read(stdinReadSize).addCallback(recvCb) asyncStdin.read(stdinReadSize).addCallback(recvCb)