diff --git a/src/syndicate/relays.nim b/src/syndicate/relays.nim index 2e83c95..46c7bd2 100644 --- a/src/syndicate/relays.nim +++ b/src/syndicate/relays.nim @@ -308,7 +308,7 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: Connec initialOid: 0.Oid.some) let relayFut = spawnRelay("unix", turn, ops) do (turn: var Turn; relay: Relay): let facet = turn.facet - var wireBuf = newStringStream() + var wireBuf = newBufferedDecoder() proc recvCb(pktFut: Future[string]) {.gcsafe.} = if pktFut.failed: run(facet) do (turn: var Turn): stopActor(turn) @@ -317,20 +317,10 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: Connec if buf.len == 0: run(facet) do (turn: var Turn): stopActor(turn) else: - var decodePos: int - if wireBuf.atEnd: - wireBuf.setPosition(0) - wireBuf.data = move buf - else: - decodePos = wireBuf.getPosition - wireBuf.data.add(buf) - try: - while not wireBuf.atEnd: - decodePos = wireBuf.getPosition - var pr = decodePreserves(wireBuf, WireRef) - dispatch(relay, pr) - except IOError, ValueError: - wireBuf.setPosition(decodePos) + feed(wireBuf, buf) + var (success, pr) = decode(wireBuf, WireRef) + if success: + dispatch(relay, pr) callSoon: socket.recv(recvSize).addCallback(recvCb) socket.recv(recvSize).addCallback(recvCb) @@ -375,27 +365,17 @@ proc connectStdio*(ds: Ref; turn: var Turn) = asyncStdin = openAsync("/dev/stdin") facet.actor.atExit do (turn: var Turn): close(asyncStdin) - var wireBuf = newStringStream() + var wireBuf = newBufferedDecoder() proc recvCb(pktFut: Future[string]) {.gcsafe.} = if not pktFut.failed: var buf = pktFut.read if buf.len == 0: run(facet) do (turn: var Turn): stopActor(turn) else: - var decodePos: int - if wireBuf.atEnd: - wireBuf.setPosition(0) - wireBuf.data = move buf - else: - decodePos = wireBuf.getPosition - wireBuf.data.add(buf) - try: - while not wireBuf.atEnd: - decodePos = wireBuf.getPosition - var pr = decodePreserves(wireBuf, WireRef) - dispatch(relay, pr) - except IOError, ValueError: - wireBuf.setPosition(decodePos) + feed(wireBuf, buf) + var (success, pr) = decode(wireBuf, WireRef) + if success: + dispatch(relay, pr) callSoon: asyncStdin.read(stdinReadSize).addCallback(recvCb) # do not process the next line immediately asyncStdin.read(stdinReadSize).addCallback(recvCb)