relays: reopen stdin asynchronously

This was preventing futures from being completed until the next
packet was received on stdin. This should fix a lot of problems
and bad behavior.
This commit is contained in:
Emery Hemingway 2023-05-06 16:04:20 +01:00
parent 572e3b76ab
commit 74254dd45b
2 changed files with 9 additions and 8 deletions

View File

@ -112,6 +112,8 @@ proc send(r: Relay; pkt: sink Packet): Future[void] =
proc send(r: Relay; rOid: protocol.Oid; m: Event) =
if r.pendingTurn.len == 0:
# do not send a packet immediately,
# wait until the pending I/O is processed
callSoon do ():
r.facet.run do (turn: var Turn):
var pkt = Packet(
@ -328,8 +330,7 @@ when defined(posix):
var (success, pr) = decode(wireBuf)
if success:
dispatch(relay, pr)
callSoon:
socket.recv(recvSize).addCallback(recvCb)
socket.recv(recvSize).addCallback(recvCb)
socket.recv(recvSize).addCallback(recvCb)
turn.facet.actor.atExit do (turn: var Turn): close(socket)
discard publish(turn, connectionClosedRef, true)
@ -369,11 +370,12 @@ when defined(posix):
asyncCheck spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
let
facet = turn.facet
asyncStdin = newAsyncFile(AsyncFD 0)
asyncStdin = openAsync("/dev/stdin") # this is universal now?
close(stdin)
facet.actor.atExit do (turn: var Turn):
close(asyncStdin)
var wireBuf = newBufferedDecoder()
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
proc readCb(pktFut: Future[string]) {.gcsafe.} =
if not pktFut.failed:
var buf = pktFut.read
if buf.len == 0:
@ -383,6 +385,5 @@ when defined(posix):
var (success, pr) = decode(wireBuf)
if success:
dispatch(relay, pr)
callSoon: asyncStdin.read(stdinReadSize).addCallback(recvCb)
# do not process the next line immediately
asyncStdin.read(stdinReadSize).addCallback(recvCb)
asyncStdin.read(stdinReadSize).addCallback(readCb)
asyncStdin.read(stdinReadSize).addCallback(readCb)

View File

@ -1,6 +1,6 @@
# Package
version = "20230503"
version = "20230506"
author = "Emery Hemingway"
description = "Syndicated actors for conversational concurrency"
license = "Unlicense"