From d69af0a90df04a3daee768a81541b29473814adb Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Sun, 16 Oct 2022 15:35:50 -0500 Subject: [PATCH] Use binary Preserves over Unix sockets --- src/syndicate/relays.nim | 27 +++++++++++++-------- src/syndicate/unix/swaybar_status_actor.nim | 1 - 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/syndicate/relays.nim b/src/syndicate/relays.nim index a7722ec..2e83c95 100644 --- a/src/syndicate/relays.nim +++ b/src/syndicate/relays.nim @@ -288,14 +288,13 @@ type ConnectProc* = proc (turn: var Turn; ds: Ref) {.gcsafe.} proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: ConnectProc) = - var wireBuf: string var socket = newAsyncSocket( domain = AF_UNIX, sockType = SOCK_STREAM, protocol = cast[Protocol](0), buffered = false) proc socketWriter(packet: sink Packet): Future[void] = - socket.send($packet) + socket.send(cast[string](encode(packet))) const recvSize = 0x2000 var shutdownRef: Ref let reenable = turn.facet.preventInertCheck() @@ -309,6 +308,7 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: Connec initialOid: 0.Oid.some) let relayFut = spawnRelay("unix", turn, ops) do (turn: var Turn; relay: Relay): let facet = turn.facet + var wireBuf = newStringStream() proc recvCb(pktFut: Future[string]) {.gcsafe.} = if pktFut.failed: run(facet) do (turn: var Turn): stopActor(turn) @@ -317,15 +317,22 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: Connec if buf.len == 0: run(facet) do (turn: var Turn): stopActor(turn) else: - if wireBuf.len == 0: wireBuf = move buf - else: wireBuf.add(buf) + var decodePos: int + if wireBuf.atEnd: + wireBuf.setPosition(0) + wireBuf.data = move buf + else: + decodePos = wireBuf.getPosition + wireBuf.data.add(buf) try: - var pr = parsePreserves(wireBuf, WireRef) - dispatch(relay, cast[Preserve[WireRef]](pr)) - wireBuf.setLen(0) - except ValueError: discard - socket.recv(recvSize).addCallback(recvCb) - # TODO: should this need be callSoon? + while not wireBuf.atEnd: + decodePos = wireBuf.getPosition + var pr = decodePreserves(wireBuf, WireRef) + dispatch(relay, pr) + except IOError, ValueError: + wireBuf.setPosition(decodePos) + callSoon: + socket.recv(recvSize).addCallback(recvCb) socket.recv(recvSize).addCallback(recvCb) turn.facet.actor.atExit do (turn: var Turn): close(socket) discard publish(turn, connectionClosedRef, true) diff --git a/src/syndicate/unix/swaybar_status_actor.nim b/src/syndicate/unix/swaybar_status_actor.nim index 8858c2b..248f2c5 100644 --- a/src/syndicate/unix/swaybar_status_actor.nim +++ b/src/syndicate/unix/swaybar_status_actor.nim @@ -49,6 +49,5 @@ bootDataspace("main") do (root: Ref; turn: var Turn): sendLine(turn) do: lineElements.excl a - sendLine(turn) runForever()