Use binary Preserves over Unix sockets

This commit is contained in:
Emery Hemingway 2022-10-16 15:35:50 -05:00
parent 68a742797c
commit d69af0a90d
2 changed files with 17 additions and 11 deletions

View File

@ -288,14 +288,13 @@ type
ConnectProc* = proc (turn: var Turn; ds: Ref) {.gcsafe.} ConnectProc* = proc (turn: var Turn; ds: Ref) {.gcsafe.}
proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: ConnectProc) = proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: ConnectProc) =
var wireBuf: string
var socket = newAsyncSocket( var socket = newAsyncSocket(
domain = AF_UNIX, domain = AF_UNIX,
sockType = SOCK_STREAM, sockType = SOCK_STREAM,
protocol = cast[Protocol](0), protocol = cast[Protocol](0),
buffered = false) buffered = false)
proc socketWriter(packet: sink Packet): Future[void] = proc socketWriter(packet: sink Packet): Future[void] =
socket.send($packet) socket.send(cast[string](encode(packet)))
const recvSize = 0x2000 const recvSize = 0x2000
var shutdownRef: Ref var shutdownRef: Ref
let reenable = turn.facet.preventInertCheck() let reenable = turn.facet.preventInertCheck()
@ -309,6 +308,7 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: Connec
initialOid: 0.Oid.some) initialOid: 0.Oid.some)
let relayFut = spawnRelay("unix", turn, ops) do (turn: var Turn; relay: Relay): let relayFut = spawnRelay("unix", turn, ops) do (turn: var Turn; relay: Relay):
let facet = turn.facet let facet = turn.facet
var wireBuf = newStringStream()
proc recvCb(pktFut: Future[string]) {.gcsafe.} = proc recvCb(pktFut: Future[string]) {.gcsafe.} =
if pktFut.failed: if pktFut.failed:
run(facet) do (turn: var Turn): stopActor(turn) run(facet) do (turn: var Turn): stopActor(turn)
@ -317,15 +317,22 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: Connec
if buf.len == 0: if buf.len == 0:
run(facet) do (turn: var Turn): stopActor(turn) run(facet) do (turn: var Turn): stopActor(turn)
else: else:
if wireBuf.len == 0: wireBuf = move buf var decodePos: int
else: wireBuf.add(buf) if wireBuf.atEnd:
wireBuf.setPosition(0)
wireBuf.data = move buf
else:
decodePos = wireBuf.getPosition
wireBuf.data.add(buf)
try: try:
var pr = parsePreserves(wireBuf, WireRef) while not wireBuf.atEnd:
dispatch(relay, cast[Preserve[WireRef]](pr)) decodePos = wireBuf.getPosition
wireBuf.setLen(0) var pr = decodePreserves(wireBuf, WireRef)
except ValueError: discard dispatch(relay, pr)
socket.recv(recvSize).addCallback(recvCb) except IOError, ValueError:
# TODO: should this need be callSoon? wireBuf.setPosition(decodePos)
callSoon:
socket.recv(recvSize).addCallback(recvCb)
socket.recv(recvSize).addCallback(recvCb) socket.recv(recvSize).addCallback(recvCb)
turn.facet.actor.atExit do (turn: var Turn): close(socket) turn.facet.actor.atExit do (turn: var Turn): close(socket)
discard publish(turn, connectionClosedRef, true) discard publish(turn, connectionClosedRef, true)

View File

@ -49,6 +49,5 @@ bootDataspace("main") do (root: Ref; turn: var Turn):
sendLine(turn) sendLine(turn)
do: do:
lineElements.excl a lineElements.excl a
sendLine(turn)
runForever() runForever()