Cleanup relays
This commit is contained in:
parent
75d1e33bff
commit
a4ba81a481
|
@ -601,7 +601,6 @@ method message(entity: SyncContinuation; turn: var Turn; v: AssertionRef) =
|
|||
entity.action(turn)
|
||||
|
||||
proc sync*(turn: var Turn; refer: Cap; act: TurnAction) =
|
||||
let e = SyncContinuation(action: act)
|
||||
sync(turn, refer, newCap(turn, SyncContinuation(action: act)))
|
||||
|
||||
proc running*(actor): bool =
|
||||
|
|
|
@ -28,7 +28,7 @@ type
|
|||
Handle = actors.Handle
|
||||
|
||||
type
|
||||
PacketWriter = proc (pkt: sink Packet): Future[void] {.gcsafe.}
|
||||
PacketHandler = proc (buf: seq[byte]) {.gcsafe.}
|
||||
RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.}
|
||||
|
||||
Relay* = ref object of RootObj
|
||||
|
@ -40,7 +40,8 @@ type
|
|||
imported: Membrane
|
||||
nextLocalOid: Oid
|
||||
pendingTurn: protocol.Turn
|
||||
packetWriter: PacketWriter
|
||||
packetSender: PacketHandler
|
||||
wireBuf: BufferedDecoder
|
||||
untrusted: bool
|
||||
|
||||
SyncPeerEntity = ref object of Entity
|
||||
|
@ -117,8 +118,8 @@ proc send(r: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
|
|||
orKind: PacketKind.Turn,
|
||||
turn: move r.pendingTurn)
|
||||
trace "C: ", pkt
|
||||
assert(not r.packetWriter.isNil, "missing packetWriter proc")
|
||||
asyncCheck(turn, r.packetWriter(pkt))
|
||||
assert(not r.packetSender.isNil, "missing packetSender proc")
|
||||
r.packetSender(encode pkt)
|
||||
r.pendingTurn.add TurnEvent(oid: rOid, event: m)
|
||||
|
||||
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
|
||||
|
@ -197,7 +198,7 @@ proc rewriteIn(relay; facet; v: Value):
|
|||
|
||||
proc close(r: Relay) = discard
|
||||
|
||||
proc dispatch*(relay: Relay; turn: var Turn; cap: Cap; event: Event) {.gcsafe.} =
|
||||
proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) {.gcsafe.} =
|
||||
case event.orKind
|
||||
of EventKind.Assert:
|
||||
let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion)
|
||||
|
@ -225,7 +226,7 @@ proc dispatch*(relay: Relay; turn: var Turn; cap: Cap; event: Event) {.gcsafe.}
|
|||
for e in imported: relay.imported.del e
|
||||
]#
|
||||
|
||||
proc dispatch*(relay: Relay; v: Value) {.gcsafe.} =
|
||||
proc dispatch(relay: Relay; v: Value) {.gcsafe.} =
|
||||
trace "S: ", v
|
||||
run(relay.facet) do (t: var Turn):
|
||||
var pkt: Packet
|
||||
|
@ -251,9 +252,13 @@ proc dispatch*(relay: Relay; v: Value) {.gcsafe.} =
|
|||
when defined(posix):
|
||||
stderr.writeLine("discarding undecoded packet ", v)
|
||||
|
||||
proc dispatch(relay: Relay; buf: seq[byte]) =
|
||||
feed(relay.wireBuf, buf)
|
||||
var pr = decode(relay.wireBuf)
|
||||
if pr.isSome: dispatch(relay, get pr)
|
||||
|
||||
type
|
||||
RelayOptions* = object of RootObj
|
||||
packetWriter*: PacketWriter
|
||||
untrusted*: bool
|
||||
RelayActorOptions* = object of RelayOptions
|
||||
initialOid*: Option[Oid]
|
||||
|
@ -263,7 +268,7 @@ type
|
|||
proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay =
|
||||
result = Relay(
|
||||
facet: turn.facet,
|
||||
packetWriter: opts.packetWriter,
|
||||
wireBuf: newBufferedDecoder(0),
|
||||
untrusted: opts.untrusted)
|
||||
discard result.facet.preventInertCheck()
|
||||
setup(turn, result)
|
||||
|
@ -308,39 +313,63 @@ type ConnectProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
|
|||
export Tcp
|
||||
|
||||
when defined(posix):
|
||||
import std/asyncfile
|
||||
export Unix
|
||||
|
||||
proc connect*(turn: var Turn; ds: Cap; route: Route; addrAss: Assertion; socket: AsyncSocket; step: Value) =
|
||||
## Relay a dataspace over an open `AsyncSocket`.
|
||||
proc socketWriter(packet: sink Packet): Future[void] =
|
||||
socket.send(cast[string](encode(packet)))
|
||||
proc newStdioTunnel(facet: Facet; receiver: PacketHandler): PacketHandler =
|
||||
let asyncStdin = openAsync("/dev/stdin") # this is universal now?
|
||||
close(stdin)
|
||||
const readSize = 0x2000
|
||||
proc readCb(fut: Future[string]) {.gcsafe.} =
|
||||
if fut.failed: terminate(facet, fut.error)
|
||||
else:
|
||||
receiver(cast[seq[byte]](fut.read))
|
||||
asyncStdin.read(readSize).addCallback(readCb)
|
||||
asyncStdin.read(readSize).addCallback(readCb)
|
||||
proc sender(buf: seq[byte]) =
|
||||
try:
|
||||
if writeBytes(stdout, buf, 0, buf.len) != buf.len:
|
||||
raise newException(IOError, "failed to write Preserves to stdout")
|
||||
flushFile(stdout)
|
||||
except CatchableError as err:
|
||||
terminate(facet, err)
|
||||
sender
|
||||
|
||||
proc connectStdio*(turn: var Turn; ds: Cap) =
|
||||
## Connect to an external dataspace over stdin and stdout.
|
||||
var opts = RelayActorOptions(
|
||||
initialCap: ds,
|
||||
initialOid: 0.Oid.some)
|
||||
spawnRelay("stdio", turn, ds, Stdio().toPreserves, opts) do (turn: var Turn; relay: Relay):
|
||||
proc receiver(buf: seq[byte]) = dispatch(relay, buf)
|
||||
relay.packetSender = newStdioTunnel(turn.facet, receiver)
|
||||
|
||||
proc newTunnel(facet: Facet; receiver: PacketHandler; socket: AsyncSocket): PacketHandler =
|
||||
const recvSize = 0x2000
|
||||
proc recvCb(fut: Future[string]) {.gcsafe.} =
|
||||
if fut.failed: terminate(facet, fut.error)
|
||||
else:
|
||||
receiver(cast[seq[byte]](fut.read))
|
||||
if not socket.isClosed:
|
||||
socket.recv(recvSize).addCallback(recvCb)
|
||||
socket.recv(recvSize).addCallback(recvCb)
|
||||
proc sender(buf: seq[byte]) =
|
||||
asyncCheck(facet, socket.send(cast[string](buf)))
|
||||
sender
|
||||
|
||||
proc connect*(turn: var Turn; ds: Cap; route: Route; addrAss: Assertion; socket: AsyncSocket; steps: seq[Value]) =
|
||||
## Relay a dataspace over an open `AsyncSocket`.
|
||||
var shutdownCap: Cap
|
||||
let
|
||||
reenable = turn.facet.preventInertCheck()
|
||||
connectionClosedCap = newCap(turn, ShutdownEntity())
|
||||
discard bootActor("socket") do (turn: var Turn):
|
||||
var ops = RelayActorOptions(
|
||||
packetWriter: socketWriter,
|
||||
initialOid: 0.Oid.some)
|
||||
spawnRelay("socket", turn, ds, addrAss, ops) do (turn: var Turn; relay: Relay):
|
||||
let facet = turn.facet
|
||||
var wireBuf = newBufferedDecoder(0)
|
||||
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
|
||||
if pktFut.failed:
|
||||
run(facet) do (turn: var Turn): stopActor(turn)
|
||||
else:
|
||||
var buf = pktFut.read
|
||||
if buf.len == 0:
|
||||
run(facet) do (turn: var Turn): stopActor(turn)
|
||||
else:
|
||||
feed(wireBuf, buf)
|
||||
var (success, pr) = decode(wireBuf)
|
||||
if success:
|
||||
dispatch(relay, pr)
|
||||
if not socket.isClosed:
|
||||
socket.recv(recvSize).addCallback(recvCb)
|
||||
socket.recv(recvSize).addCallback(recvCb)
|
||||
proc receiver(buf: seq[byte]) = dispatch(relay, buf)
|
||||
relay.packetSender = newTunnel(turn.facet, receiver, socket)
|
||||
turn.facet.actor.atExit do (turn: var Turn): close(socket)
|
||||
discard publish(turn, connectionClosedCap, true)
|
||||
shutdownCap = newCap(turn, ShutdownEntity())
|
||||
|
@ -359,12 +388,12 @@ when defined(posix):
|
|||
stop(turn, facet)
|
||||
result = action
|
||||
var resolve = Resolve(
|
||||
step: step,
|
||||
step: steps[0],
|
||||
observer: newCap(turn, during(duringCallback)),
|
||||
)
|
||||
discard publish(turn, gatekeeper, resolve)
|
||||
|
||||
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Tcp; step: Value) =
|
||||
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Tcp; steps: seq[Value]) =
|
||||
## Relay a dataspace over TCP.
|
||||
let socket = newAsyncSocket(
|
||||
domain = AF_INET,
|
||||
|
@ -373,9 +402,9 @@ when defined(posix):
|
|||
buffered = false)
|
||||
let fut = connect(socket, transport.host, Port transport.port)
|
||||
addCallback(fut, turn) do (turn: var Turn):
|
||||
connect(turn, ds, route, transport.toPreserves, socket, step)
|
||||
connect(turn, ds, route, transport.toPreserves, socket, steps)
|
||||
|
||||
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Unix; step: Value) =
|
||||
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Unix; steps: seq[Value]) =
|
||||
## Relay a dataspace over a UNIX socket.
|
||||
let socket = newAsyncSocket(
|
||||
domain = AF_UNIX,
|
||||
|
@ -384,44 +413,7 @@ when defined(posix):
|
|||
buffered = false)
|
||||
let fut = connectUnix(socket, transport.path)
|
||||
addCallback(fut, turn) do (turn: var Turn):
|
||||
connect(turn, ds, route, transport.toPreserves, socket, step)
|
||||
|
||||
import std/asyncfile
|
||||
|
||||
const stdinReadSize = 128
|
||||
|
||||
proc connectStdio*(turn: var Turn; ds: Cap) =
|
||||
## Connect to an external dataspace over stdin and stdout.
|
||||
proc stdoutWriter(packet: sink Packet): Future[void] =
|
||||
result = newFuture[void]()
|
||||
var buf = encode(packet)
|
||||
doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len
|
||||
flushFile(stdout)
|
||||
complete result
|
||||
var opts = RelayActorOptions(
|
||||
packetWriter: stdoutWriter,
|
||||
initialCap: ds,
|
||||
initialOid: 0.Oid.some)
|
||||
spawnRelay("stdio", turn, ds, Stdio().toPreserves, opts) do (turn: var Turn; relay: Relay):
|
||||
let
|
||||
facet = turn.facet
|
||||
asyncStdin = openAsync("/dev/stdin") # this is universal now?
|
||||
close(stdin)
|
||||
facet.actor.atExit do (turn: var Turn):
|
||||
close(asyncStdin)
|
||||
var wireBuf = newBufferedDecoder(0)
|
||||
proc readCb(pktFut: Future[string]) {.gcsafe.} =
|
||||
if not pktFut.failed:
|
||||
var buf = pktFut.read
|
||||
if buf.len == 0:
|
||||
run(facet) do (turn: var Turn): stopActor(turn)
|
||||
else:
|
||||
feed(wireBuf, buf)
|
||||
var (success, pr) = decode(wireBuf)
|
||||
if success:
|
||||
dispatch(relay, pr)
|
||||
asyncStdin.read(stdinReadSize).addCallback(readCb)
|
||||
asyncStdin.read(stdinReadSize).addCallback(readCb)
|
||||
connect(turn, ds, route, transport.toPreserves, socket, steps)
|
||||
|
||||
type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
|
||||
|
||||
|
@ -442,12 +434,12 @@ proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
|
|||
tcp: Tcp
|
||||
stdio: Stdio
|
||||
doAssert(route.transports.len == 1, "only a single transport supported for routes")
|
||||
doAssert(route.pathSteps.len < 2, "multiple path steps not supported for routes")
|
||||
if unix.fromPreserves route.transports[0]:
|
||||
connect(turn, ds, route, unix, route.pathSteps[0])
|
||||
connect(turn, ds, route, unix, route.pathSteps)
|
||||
elif tcp.fromPreserves route.transports[0]:
|
||||
connect(turn, ds, route, tcp, route.pathSteps[0])
|
||||
connect(turn, ds, route, tcp, route.pathSteps)
|
||||
elif stdio.fromPreserves route.transports[0]:
|
||||
doAssert(route.pathSteps.len == 0, "route steps not available over stdio")
|
||||
connectStdio(turn, ds)
|
||||
bootProc(turn, ds)
|
||||
else:
|
||||
|
|
|
@ -9,4 +9,4 @@ srcDir = "src"
|
|||
|
||||
# Dependencies
|
||||
|
||||
requires "https://github.com/ehmry/hashlib.git#f9455d4be988e14e3dc7933eb7cc7d7c4820b7ac", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240108"
|
||||
requires "https://github.com/ehmry/hashlib.git#f9455d4be988e14e3dc7933eb7cc7d7c4820b7ac", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20240116"
|
||||
|
|
Loading…
Reference in New Issue