From 67fa320db6d7cecda5b61ed216bfd45ec3865a88 Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Mon, 14 Mar 2022 10:09:29 -0500 Subject: [PATCH] relays: connectStdio --- src/syndicate/relays.nim | 112 ++++++++++++++++++++++++--------------- 1 file changed, 70 insertions(+), 42 deletions(-) diff --git a/src/syndicate/relays.nim b/src/syndicate/relays.nim index a42417a..98304d0 100644 --- a/src/syndicate/relays.nim +++ b/src/syndicate/relays.nim @@ -5,6 +5,8 @@ import std/[asyncdispatch, options, tables] import preserves, preserves/parse import ./actors, ./durings, ./membranes, ./protocols/[protocol, sturdy] +from ./patterns import grab + when defined(traceSyndicate): template trace(args: varargs[untyped]): untyped = stderr.writeLine(args) else: @@ -72,7 +74,7 @@ proc newSyncPeerEntity(r: Relay; p: Ref): SyncPeerEntity = proc rewriteRefOut(relay: Relay; `ref`: Ref; transient: bool; exported: var seq[WireSymbol]): WireRef = if `ref`.target of RelayEntity and `ref`.target.RelayEntity.relay == relay: - stderr.writeLine "do the rewriteRefOut that wasn't being done before" + trace "do the rewriteRefOut that wasn't being done before" result = WireRef( orKind: WirerefKind.yours, yours: WireRefYours[Ref](oid: `ref`.target.oid)) @@ -233,18 +235,12 @@ proc dispatch(relay: Relay; v: Preserve[WireRef]) = for te in pkt.turn: dispatch(relay, t, lookupLocal(relay, te.oid.Oid), te.event) of PacketKind.Error: - relay.facet.log("Error from server: ", pkt.error.message, " (detail: ", pkt.error.detail, ")") + stderr.writeLine ("Error from server: ", pkt.error.message, " (detail: ", pkt.error.detail, ")") close relay - - -proc recv(relay: Relay; buf: string) = - # var pkt = decodePreserves(buf, WireRef) - trace "S: ", buf - var pkt = cast[Preserve[WireRef]]( - parsePreserves(buf, sturdy.WireRef[void])) - # the compiler cannot convert `Preserve[void]` to `Preserve[WireRef[Ref]]` - # so convert to `Preserve[WireRef[void]]` and cast. - dispatch(relay, pkt) + of PacketKind.Extension: + discard + else: + stderr.writeLine "discarding unparsed packet ", v type RelayOptions = object of RootObj @@ -291,13 +287,9 @@ import protocols/gatekeeper type ShutdownEntity = ref object of Entity -proc shutdownRetract(e: Entity; turn: var Turn; h: Handle) = +method retract(e: ShutdownEntity; turn: var Turn; h: Handle) = stopActor(turn) -proc newShutdownEntity(): ShutdownEntity = - new result - result.setProcs(retract = shutdownRetract) - type SturdyRef = sturdy.SturdyRef[Ref] Resolve = gatekeeper.Resolve[Ref] @@ -308,41 +300,41 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: During sockType = SOCK_STREAM, protocol = cast[Protocol](0), buffered = false) - proc socketWriter(packet: seq[byte]): Future[void] = - socket.send cast[string](packet) + proc socketWriter(packet: sink Packet): Future[void] = + socket.send($packet) const recvSize = 1 shl 18 # this is an excessive buffer size but the PEG parser # can only read complete documents # TODO: use a binary protocol and improve that # parser to stream data in chunks var shutdownRef: Ref - let reenable = turn.activeFacet.preventInertCheck() - let connectionClosedRef = newRef(turn, newShutdownEntity()) - proc setup(turn: var Turn; relay: Relay) = - let facet = turn.activeFacet - proc recvCb(pktFut: Future[string]) {.gcsafe.} = - if pktFut.failed: - run(facet) do (turn: var Turn): stopActor(turn) - else: - let buf = pktFut.read - if buf.len == 0: - run(facet) do (turn: var Turn): stopActor(turn) - else: - relay.recv(buf) - socket.recv(recvSize).addCallback(recvCb) - # TODO: should this need be callSoon? - socket.recv(recvSize).addCallback(recvCb) - turn.activeFacet.actor.atExit do (turn: var Turn): close(socket) - discard publish(turn, connectionClosedRef, true) - shutdownRef = newRef(turn, newShutdownEntity()) + let reenable = turn.facet.preventInertCheck() + let connectionClosedRef = newRef(turn, ShutdownEntity()) var fut = newFuture[void]"connectUnix" connectUnix(socket, path).addCallback do (f: Future[void]): read f - discard newActor("unix") do (turn: var Turn): - let relayFut = spawnRelay("unix", turn, RelayActorOptions( + discard bootActor("unix") do (turn: var Turn): + var ops = RelayActorOptions( packetWriter: socketWriter, - setup: setup, - initialOid: 0.Oid.some)) + initialOid: 0.Oid.some) + let relayFut = spawnRelay("unix", turn, ops) do (turn: var Turn; relay: Relay): + let facet = turn.facet + proc recvCb(pktFut: Future[string]) {.gcsafe.} = + if pktFut.failed: + run(facet) do (turn: var Turn): stopActor(turn) + else: + let buf = pktFut.read + if buf.len == 0: + run(facet) do (turn: var Turn): stopActor(turn) + else: + var pr = parsePreserves(buf, sturdy.WireRef[void]) + dispatch(relay, cast[Preserve[WireRef]](pr)) + socket.recv(recvSize).addCallback(recvCb) + # TODO: should this need be callSoon? + socket.recv(recvSize).addCallback(recvCb) + turn.facet.actor.atExit do (turn: var Turn): close(socket) + discard publish(turn, connectionClosedRef, true) + shutdownRef = newRef(turn, ShutdownEntity()) relayFut.addCallback do (refFut: Future[Ref]): let gatekeeper = read refFut run(gatekeeper.relay) do (turn: var Turn): @@ -360,3 +352,39 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: During discard publish(turn, gatekeeper, res) fut.complete() asyncCheck(turn, fut) + +import std/asyncfile + +proc connectStdio*(ds: Ref; turn: var Turn) = + ## Connect to an external dataspace over stdin and stdout. + proc stdoutWriter(packet: sink Packet): Future[void] {.async.} = + # var buf = encode(packet) + # doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len + write(stdout, packet) + flushFile(stdout) + var opts = RelayActorOptions( + packetWriter: stdoutWriter, + initialRef: ds, + initialOid: 0.Oid.some) + asyncCheck spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay): + let + facet = turn.facet + asyncStdin = openAsync("/dev/stdin") + observer = observe(turn, ds, grab(), newRelayEntity("stdio", relay, Oid 1)) + # publish an observe in the local dataspace + # so that everything is relayed to stdout + facet.actor.atExit do (turn: var Turn): + retract(turn, observer) + close(asyncStdin) + proc recvCb(pktFut: Future[string]) {.gcsafe.} = + if pktFut.failed: quit() + else: + let buf = pktFut.read + if buf.len == 0: + run(facet) do (turn: var Turn): stopActor(turn) + else: + var v = parsePreserves(buf) + dispatch(relay, cast[Preserve[WireRef]](v)) + callSoon: asyncStdin.readLine().addCallback(recvCb) + # do not process the next line immediately + asyncStdin.readLine().addCallback(recvCb)