diff --git a/src/syndicate/relays.nim b/src/syndicate/relays.nim index cef4b05..6901de8 100644 --- a/src/syndicate/relays.nim +++ b/src/syndicate/relays.nim @@ -5,8 +5,6 @@ import std/[asyncdispatch, options, tables] import preserves import ./actors, ./durings, ./membranes, ./protocols/[protocol, sturdy] -from ./patterns import grab - when defined(traceSyndicate): template trace(args: varargs[untyped]): untyped = stderr.writeLine(args) else: @@ -290,6 +288,7 @@ type ConnectProc* = proc (turn: var Turn; ds: Ref) {.gcsafe.} proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: ConnectProc) = + var wireBuf: string var socket = newAsyncSocket( domain = AF_UNIX, sockType = SOCK_STREAM, @@ -297,11 +296,7 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: Connec buffered = false) proc socketWriter(packet: sink Packet): Future[void] = socket.send($packet) - const recvSize = 1 shl 18 - # this is an excessive buffer size but the PEG parser - # can only read complete documents - # TODO: use a binary protocol and improve that - # parser to stream data in chunks + const recvSize = 0x2000 var shutdownRef: Ref let reenable = turn.facet.preventInertCheck() let connectionClosedRef = newRef(turn, ShutdownEntity()) @@ -318,12 +313,17 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: Connec if pktFut.failed: run(facet) do (turn: var Turn): stopActor(turn) else: - let buf = pktFut.read + var buf = pktFut.read if buf.len == 0: run(facet) do (turn: var Turn): stopActor(turn) else: - var pr = parsePreserves(buf, WireRef) - dispatch(relay, cast[Preserve[WireRef]](pr)) + if wireBuf.len == 0: wireBuf = move buf + else: wireBuf.add(buf) + try: + var pr = parsePreserves(wireBuf, WireRef) + dispatch(relay, cast[Preserve[WireRef]](pr)) + wireBuf.setLen(0) + except ValueError: discard socket.recv(recvSize).addCallback(recvCb) # TODO: should this need be callSoon? socket.recv(recvSize).addCallback(recvCb) diff --git a/syndicate.nimble b/syndicate.nimble index e02504f..86b25ad 100644 --- a/syndicate.nimble +++ b/syndicate.nimble @@ -1,6 +1,6 @@ # Package -version = "1.3.2" +version = "20220829" author = "Emery Hemingway" description = "Syndicated actors for conversational concurrency" license = "Unlicense" @@ -9,4 +9,4 @@ srcDir = "src" # Dependencies -requires "nim >= 1.4.8", "nimSHA2 >= 0.1.1", "preserves > 3.2.0" +requires "nim >= 1.4.8", "nimSHA2 >= 0.1.1", "preserves >= 20220709"