relays: connectStdio
This commit is contained in:
parent
56431ee37b
commit
67fa320db6
|
@ -5,6 +5,8 @@ import std/[asyncdispatch, options, tables]
|
||||||
import preserves, preserves/parse
|
import preserves, preserves/parse
|
||||||
import ./actors, ./durings, ./membranes, ./protocols/[protocol, sturdy]
|
import ./actors, ./durings, ./membranes, ./protocols/[protocol, sturdy]
|
||||||
|
|
||||||
|
from ./patterns import grab
|
||||||
|
|
||||||
when defined(traceSyndicate):
|
when defined(traceSyndicate):
|
||||||
template trace(args: varargs[untyped]): untyped = stderr.writeLine(args)
|
template trace(args: varargs[untyped]): untyped = stderr.writeLine(args)
|
||||||
else:
|
else:
|
||||||
|
@ -72,7 +74,7 @@ proc newSyncPeerEntity(r: Relay; p: Ref): SyncPeerEntity =
|
||||||
|
|
||||||
proc rewriteRefOut(relay: Relay; `ref`: Ref; transient: bool; exported: var seq[WireSymbol]): WireRef =
|
proc rewriteRefOut(relay: Relay; `ref`: Ref; transient: bool; exported: var seq[WireSymbol]): WireRef =
|
||||||
if `ref`.target of RelayEntity and `ref`.target.RelayEntity.relay == relay:
|
if `ref`.target of RelayEntity and `ref`.target.RelayEntity.relay == relay:
|
||||||
stderr.writeLine "do the rewriteRefOut that wasn't being done before"
|
trace "do the rewriteRefOut that wasn't being done before"
|
||||||
result = WireRef(
|
result = WireRef(
|
||||||
orKind: WirerefKind.yours,
|
orKind: WirerefKind.yours,
|
||||||
yours: WireRefYours[Ref](oid: `ref`.target.oid))
|
yours: WireRefYours[Ref](oid: `ref`.target.oid))
|
||||||
|
@ -233,18 +235,12 @@ proc dispatch(relay: Relay; v: Preserve[WireRef]) =
|
||||||
for te in pkt.turn:
|
for te in pkt.turn:
|
||||||
dispatch(relay, t, lookupLocal(relay, te.oid.Oid), te.event)
|
dispatch(relay, t, lookupLocal(relay, te.oid.Oid), te.event)
|
||||||
of PacketKind.Error:
|
of PacketKind.Error:
|
||||||
relay.facet.log("Error from server: ", pkt.error.message, " (detail: ", pkt.error.detail, ")")
|
stderr.writeLine ("Error from server: ", pkt.error.message, " (detail: ", pkt.error.detail, ")")
|
||||||
close relay
|
close relay
|
||||||
|
of PacketKind.Extension:
|
||||||
|
discard
|
||||||
proc recv(relay: Relay; buf: string) =
|
else:
|
||||||
# var pkt = decodePreserves(buf, WireRef)
|
stderr.writeLine "discarding unparsed packet ", v
|
||||||
trace "S: ", buf
|
|
||||||
var pkt = cast[Preserve[WireRef]](
|
|
||||||
parsePreserves(buf, sturdy.WireRef[void]))
|
|
||||||
# the compiler cannot convert `Preserve[void]` to `Preserve[WireRef[Ref]]`
|
|
||||||
# so convert to `Preserve[WireRef[void]]` and cast.
|
|
||||||
dispatch(relay, pkt)
|
|
||||||
|
|
||||||
type
|
type
|
||||||
RelayOptions = object of RootObj
|
RelayOptions = object of RootObj
|
||||||
|
@ -291,13 +287,9 @@ import protocols/gatekeeper
|
||||||
|
|
||||||
type ShutdownEntity = ref object of Entity
|
type ShutdownEntity = ref object of Entity
|
||||||
|
|
||||||
proc shutdownRetract(e: Entity; turn: var Turn; h: Handle) =
|
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
|
||||||
stopActor(turn)
|
stopActor(turn)
|
||||||
|
|
||||||
proc newShutdownEntity(): ShutdownEntity =
|
|
||||||
new result
|
|
||||||
result.setProcs(retract = shutdownRetract)
|
|
||||||
|
|
||||||
type
|
type
|
||||||
SturdyRef = sturdy.SturdyRef[Ref]
|
SturdyRef = sturdy.SturdyRef[Ref]
|
||||||
Resolve = gatekeeper.Resolve[Ref]
|
Resolve = gatekeeper.Resolve[Ref]
|
||||||
|
@ -308,41 +300,41 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: During
|
||||||
sockType = SOCK_STREAM,
|
sockType = SOCK_STREAM,
|
||||||
protocol = cast[Protocol](0),
|
protocol = cast[Protocol](0),
|
||||||
buffered = false)
|
buffered = false)
|
||||||
proc socketWriter(packet: seq[byte]): Future[void] =
|
proc socketWriter(packet: sink Packet): Future[void] =
|
||||||
socket.send cast[string](packet)
|
socket.send($packet)
|
||||||
const recvSize = 1 shl 18
|
const recvSize = 1 shl 18
|
||||||
# this is an excessive buffer size but the PEG parser
|
# this is an excessive buffer size but the PEG parser
|
||||||
# can only read complete documents
|
# can only read complete documents
|
||||||
# TODO: use a binary protocol and improve that
|
# TODO: use a binary protocol and improve that
|
||||||
# parser to stream data in chunks
|
# parser to stream data in chunks
|
||||||
var shutdownRef: Ref
|
var shutdownRef: Ref
|
||||||
let reenable = turn.activeFacet.preventInertCheck()
|
let reenable = turn.facet.preventInertCheck()
|
||||||
let connectionClosedRef = newRef(turn, newShutdownEntity())
|
let connectionClosedRef = newRef(turn, ShutdownEntity())
|
||||||
proc setup(turn: var Turn; relay: Relay) =
|
|
||||||
let facet = turn.activeFacet
|
|
||||||
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
|
|
||||||
if pktFut.failed:
|
|
||||||
run(facet) do (turn: var Turn): stopActor(turn)
|
|
||||||
else:
|
|
||||||
let buf = pktFut.read
|
|
||||||
if buf.len == 0:
|
|
||||||
run(facet) do (turn: var Turn): stopActor(turn)
|
|
||||||
else:
|
|
||||||
relay.recv(buf)
|
|
||||||
socket.recv(recvSize).addCallback(recvCb)
|
|
||||||
# TODO: should this need be callSoon?
|
|
||||||
socket.recv(recvSize).addCallback(recvCb)
|
|
||||||
turn.activeFacet.actor.atExit do (turn: var Turn): close(socket)
|
|
||||||
discard publish(turn, connectionClosedRef, true)
|
|
||||||
shutdownRef = newRef(turn, newShutdownEntity())
|
|
||||||
var fut = newFuture[void]"connectUnix"
|
var fut = newFuture[void]"connectUnix"
|
||||||
connectUnix(socket, path).addCallback do (f: Future[void]):
|
connectUnix(socket, path).addCallback do (f: Future[void]):
|
||||||
read f
|
read f
|
||||||
discard newActor("unix") do (turn: var Turn):
|
discard bootActor("unix") do (turn: var Turn):
|
||||||
let relayFut = spawnRelay("unix", turn, RelayActorOptions(
|
var ops = RelayActorOptions(
|
||||||
packetWriter: socketWriter,
|
packetWriter: socketWriter,
|
||||||
setup: setup,
|
initialOid: 0.Oid.some)
|
||||||
initialOid: 0.Oid.some))
|
let relayFut = spawnRelay("unix", turn, ops) do (turn: var Turn; relay: Relay):
|
||||||
|
let facet = turn.facet
|
||||||
|
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
|
||||||
|
if pktFut.failed:
|
||||||
|
run(facet) do (turn: var Turn): stopActor(turn)
|
||||||
|
else:
|
||||||
|
let buf = pktFut.read
|
||||||
|
if buf.len == 0:
|
||||||
|
run(facet) do (turn: var Turn): stopActor(turn)
|
||||||
|
else:
|
||||||
|
var pr = parsePreserves(buf, sturdy.WireRef[void])
|
||||||
|
dispatch(relay, cast[Preserve[WireRef]](pr))
|
||||||
|
socket.recv(recvSize).addCallback(recvCb)
|
||||||
|
# TODO: should this need be callSoon?
|
||||||
|
socket.recv(recvSize).addCallback(recvCb)
|
||||||
|
turn.facet.actor.atExit do (turn: var Turn): close(socket)
|
||||||
|
discard publish(turn, connectionClosedRef, true)
|
||||||
|
shutdownRef = newRef(turn, ShutdownEntity())
|
||||||
relayFut.addCallback do (refFut: Future[Ref]):
|
relayFut.addCallback do (refFut: Future[Ref]):
|
||||||
let gatekeeper = read refFut
|
let gatekeeper = read refFut
|
||||||
run(gatekeeper.relay) do (turn: var Turn):
|
run(gatekeeper.relay) do (turn: var Turn):
|
||||||
|
@ -360,3 +352,39 @@ proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: During
|
||||||
discard publish(turn, gatekeeper, res)
|
discard publish(turn, gatekeeper, res)
|
||||||
fut.complete()
|
fut.complete()
|
||||||
asyncCheck(turn, fut)
|
asyncCheck(turn, fut)
|
||||||
|
|
||||||
|
import std/asyncfile
|
||||||
|
|
||||||
|
proc connectStdio*(ds: Ref; turn: var Turn) =
|
||||||
|
## Connect to an external dataspace over stdin and stdout.
|
||||||
|
proc stdoutWriter(packet: sink Packet): Future[void] {.async.} =
|
||||||
|
# var buf = encode(packet)
|
||||||
|
# doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len
|
||||||
|
write(stdout, packet)
|
||||||
|
flushFile(stdout)
|
||||||
|
var opts = RelayActorOptions(
|
||||||
|
packetWriter: stdoutWriter,
|
||||||
|
initialRef: ds,
|
||||||
|
initialOid: 0.Oid.some)
|
||||||
|
asyncCheck spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
|
||||||
|
let
|
||||||
|
facet = turn.facet
|
||||||
|
asyncStdin = openAsync("/dev/stdin")
|
||||||
|
observer = observe(turn, ds, grab(), newRelayEntity("stdio", relay, Oid 1))
|
||||||
|
# publish an observe in the local dataspace
|
||||||
|
# so that everything is relayed to stdout
|
||||||
|
facet.actor.atExit do (turn: var Turn):
|
||||||
|
retract(turn, observer)
|
||||||
|
close(asyncStdin)
|
||||||
|
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
|
||||||
|
if pktFut.failed: quit()
|
||||||
|
else:
|
||||||
|
let buf = pktFut.read
|
||||||
|
if buf.len == 0:
|
||||||
|
run(facet) do (turn: var Turn): stopActor(turn)
|
||||||
|
else:
|
||||||
|
var v = parsePreserves(buf)
|
||||||
|
dispatch(relay, cast[Preserve[WireRef]](v))
|
||||||
|
callSoon: asyncStdin.readLine().addCallback(recvCb)
|
||||||
|
# do not process the next line immediately
|
||||||
|
asyncStdin.readLine().addCallback(recvCb)
|
||||||
|
|
Loading…
Reference in New Issue