From 74254dd45b000d5d75249a538eacfa93750e7f36 Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Sat, 6 May 2023 16:04:20 +0100 Subject: [PATCH] relays: reopen stdin asynchronously This was preventing futures from being completed until the next packet was received on stdin. This should fix a lot of problems and bad behavior. --- src/syndicate/relays.nim | 15 ++++++++------- syndicate.nimble | 2 +- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/syndicate/relays.nim b/src/syndicate/relays.nim index eb4cb9e..396170a 100644 --- a/src/syndicate/relays.nim +++ b/src/syndicate/relays.nim @@ -112,6 +112,8 @@ proc send(r: Relay; pkt: sink Packet): Future[void] = proc send(r: Relay; rOid: protocol.Oid; m: Event) = if r.pendingTurn.len == 0: + # do not send a packet immediately, + # wait until the pending I/O is processed callSoon do (): r.facet.run do (turn: var Turn): var pkt = Packet( @@ -328,8 +330,7 @@ when defined(posix): var (success, pr) = decode(wireBuf) if success: dispatch(relay, pr) - callSoon: - socket.recv(recvSize).addCallback(recvCb) + socket.recv(recvSize).addCallback(recvCb) socket.recv(recvSize).addCallback(recvCb) turn.facet.actor.atExit do (turn: var Turn): close(socket) discard publish(turn, connectionClosedRef, true) @@ -369,11 +370,12 @@ when defined(posix): asyncCheck spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay): let facet = turn.facet - asyncStdin = newAsyncFile(AsyncFD 0) + asyncStdin = openAsync("/dev/stdin") # this is universal now? + close(stdin) facet.actor.atExit do (turn: var Turn): close(asyncStdin) var wireBuf = newBufferedDecoder() - proc recvCb(pktFut: Future[string]) {.gcsafe.} = + proc readCb(pktFut: Future[string]) {.gcsafe.} = if not pktFut.failed: var buf = pktFut.read if buf.len == 0: @@ -383,6 +385,5 @@ when defined(posix): var (success, pr) = decode(wireBuf) if success: dispatch(relay, pr) - callSoon: asyncStdin.read(stdinReadSize).addCallback(recvCb) - # do not process the next line immediately - asyncStdin.read(stdinReadSize).addCallback(recvCb) + asyncStdin.read(stdinReadSize).addCallback(readCb) + asyncStdin.read(stdinReadSize).addCallback(readCb) diff --git a/syndicate.nimble b/syndicate.nimble index cf1b3d1..e47c0d8 100644 --- a/syndicate.nimble +++ b/syndicate.nimble @@ -1,6 +1,6 @@ # Package -version = "20230503" +version = "20230506" author = "Emery Hemingway" description = "Syndicated actors for conversational concurrency" license = "Unlicense"