diff --git a/src/syndicate/relays.nim b/src/syndicate/relays.nim index 48a2f5b..3d2a03b 100644 --- a/src/syndicate/relays.nim +++ b/src/syndicate/relays.nim @@ -288,7 +288,7 @@ when defined(posix): import std/asyncnet from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol -import protocols/[gatekeeper, sturdy] +import protocols/gatekeeper type ShutdownEntity* = ref object of Entity @@ -303,6 +303,8 @@ when defined(posix): export Unix proc connect*(turn: var Turn; socket: AsyncSocket; step: Preserve[Ref]; bootProc: ConnectProc) = + ## Relay a dataspace over an open `AsyncSocket`. + ## *`bootProc` may be called multiple times for multiple remote gatekeepers.* proc socketWriter(packet: sink Packet): Future[void] = socket.send(cast[string](encode(packet))) const recvSize = 0x2000 @@ -330,7 +332,8 @@ when defined(posix): var (success, pr) = decode(wireBuf) if success: dispatch(relay, pr) - socket.recv(recvSize).addCallback(recvCb) + if not socket.isClosed: + socket.recv(recvSize).addCallback(recvCb) socket.recv(recvSize).addCallback(recvCb) turn.facet.actor.atExit do (turn: var Turn): close(socket) discard publish(turn, connectionClosedRef, true) @@ -362,6 +365,8 @@ when defined(posix): asyncCheck(turn, fut) proc connect*(turn: var Turn; transport: Tcp; step: Preserve[Ref]; bootProc: ConnectProc) = + ## Relay a dataspace over TCP. + ## *`bootProc` may be called multiple times for multiple remote gatekeepers.* let socket = newAsyncSocket( domain = AF_INET, sockType = SOCK_STREAM, @@ -372,6 +377,8 @@ when defined(posix): connect(turn, socket, step, bootProc) proc connect*(turn: var Turn; transport: Unix; step: Preserve[Ref]; bootProc: ConnectProc) = + ## Relay a dataspace over a UNIX socket. + ## *`bootProc` may be called multiple times for multiple remote gatekeepers.* let socket = newAsyncSocket( domain = AF_UNIX, sockType = SOCK_STREAM,