Buffer socket reads so long messages come through
This commit is contained in:
parent
b0f5ff98e2
commit
0742665288
|
@ -5,8 +5,6 @@ import std/[asyncdispatch, options, tables]
|
||||||
import preserves
|
import preserves
|
||||||
import ./actors, ./durings, ./membranes, ./protocols/[protocol, sturdy]
|
import ./actors, ./durings, ./membranes, ./protocols/[protocol, sturdy]
|
||||||
|
|
||||||
from ./patterns import grab
|
|
||||||
|
|
||||||
when defined(traceSyndicate):
|
when defined(traceSyndicate):
|
||||||
template trace(args: varargs[untyped]): untyped = stderr.writeLine(args)
|
template trace(args: varargs[untyped]): untyped = stderr.writeLine(args)
|
||||||
else:
|
else:
|
||||||
|
@ -290,6 +288,7 @@ type
|
||||||
ConnectProc* = proc (turn: var Turn; ds: Ref) {.gcsafe.}
|
ConnectProc* = proc (turn: var Turn; ds: Ref) {.gcsafe.}
|
||||||
|
|
||||||
proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: ConnectProc) =
|
proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: ConnectProc) =
|
||||||
|
var wireBuf: string
|
||||||
var socket = newAsyncSocket(
|
var socket = newAsyncSocket(
|
||||||
domain = AF_UNIX,
|
domain = AF_UNIX,
|
||||||
sockType = SOCK_STREAM,
|
sockType = SOCK_STREAM,
|
||||||
|
@ -297,11 +296,7 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: Connec
|
||||||
buffered = false)
|
buffered = false)
|
||||||
proc socketWriter(packet: sink Packet): Future[void] =
|
proc socketWriter(packet: sink Packet): Future[void] =
|
||||||
socket.send($packet)
|
socket.send($packet)
|
||||||
const recvSize = 1 shl 18
|
const recvSize = 0x2000
|
||||||
# this is an excessive buffer size but the PEG parser
|
|
||||||
# can only read complete documents
|
|
||||||
# TODO: use a binary protocol and improve that
|
|
||||||
# parser to stream data in chunks
|
|
||||||
var shutdownRef: Ref
|
var shutdownRef: Ref
|
||||||
let reenable = turn.facet.preventInertCheck()
|
let reenable = turn.facet.preventInertCheck()
|
||||||
let connectionClosedRef = newRef(turn, ShutdownEntity())
|
let connectionClosedRef = newRef(turn, ShutdownEntity())
|
||||||
|
@ -318,12 +313,17 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: Connec
|
||||||
if pktFut.failed:
|
if pktFut.failed:
|
||||||
run(facet) do (turn: var Turn): stopActor(turn)
|
run(facet) do (turn: var Turn): stopActor(turn)
|
||||||
else:
|
else:
|
||||||
let buf = pktFut.read
|
var buf = pktFut.read
|
||||||
if buf.len == 0:
|
if buf.len == 0:
|
||||||
run(facet) do (turn: var Turn): stopActor(turn)
|
run(facet) do (turn: var Turn): stopActor(turn)
|
||||||
else:
|
else:
|
||||||
var pr = parsePreserves(buf, WireRef)
|
if wireBuf.len == 0: wireBuf = move buf
|
||||||
dispatch(relay, cast[Preserve[WireRef]](pr))
|
else: wireBuf.add(buf)
|
||||||
|
try:
|
||||||
|
var pr = parsePreserves(wireBuf, WireRef)
|
||||||
|
dispatch(relay, cast[Preserve[WireRef]](pr))
|
||||||
|
wireBuf.setLen(0)
|
||||||
|
except ValueError: discard
|
||||||
socket.recv(recvSize).addCallback(recvCb)
|
socket.recv(recvSize).addCallback(recvCb)
|
||||||
# TODO: should this need be callSoon?
|
# TODO: should this need be callSoon?
|
||||||
socket.recv(recvSize).addCallback(recvCb)
|
socket.recv(recvSize).addCallback(recvCb)
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
# Package
|
# Package
|
||||||
|
|
||||||
version = "1.3.2"
|
version = "20220829"
|
||||||
author = "Emery Hemingway"
|
author = "Emery Hemingway"
|
||||||
description = "Syndicated actors for conversational concurrency"
|
description = "Syndicated actors for conversational concurrency"
|
||||||
license = "Unlicense"
|
license = "Unlicense"
|
||||||
|
@ -9,4 +9,4 @@ srcDir = "src"
|
||||||
|
|
||||||
# Dependencies
|
# Dependencies
|
||||||
|
|
||||||
requires "nim >= 1.4.8", "nimSHA2 >= 0.1.1", "preserves > 3.2.0"
|
requires "nim >= 1.4.8", "nimSHA2 >= 0.1.1", "preserves >= 20220709"
|
||||||
|
|
Loading…
Reference in New Issue