syndicate-nim/src/syndicate/relays.nim

458 lines
16 KiB
Nim
Raw Normal View History

# SPDX-FileCopyrightText: ☭ Emery Hemingway
2021-09-24 19:25:47 +00:00
# SPDX-License-Identifier: Unlicense
2024-01-01 18:29:54 +00:00
import std/[asyncdispatch, options, tables]
from std/os import getEnv, `/`
2022-03-19 00:09:19 +00:00
import preserves
import ../syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress]
2021-09-24 19:25:47 +00:00
2021-11-03 18:22:42 +00:00
when defined(traceSyndicate):
when defined(posix):
template trace(args: varargs[untyped]): untyped = stderr.writeLine(args)
else:
template trace(args: varargs[untyped]): untyped = echo(args)
2021-11-03 18:22:42 +00:00
else:
template trace(args: varargs[untyped]): untyped = discard
export `$`
type
Oid = sturdy.Oid
export Stdio, Tcp, WebSocket, Unix
2021-09-24 19:25:47 +00:00
type
2023-12-31 17:15:06 +00:00
Assertion = Value
WireRef = sturdy.WireRef
Turn = syndicate.Turn
2024-01-14 10:09:49 +00:00
Handle = actors.Handle
2021-09-24 19:25:47 +00:00
type
2022-03-12 16:12:21 +00:00
PacketWriter = proc (pkt: sink Packet): Future[void] {.gcsafe.}
2021-09-24 19:25:47 +00:00
RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.}
Relay* = ref object of RootObj
2021-09-24 19:25:47 +00:00
facet: Facet
inboundAssertions: Table[Handle,
tuple[localHandle: Handle, imported: seq[WireSymbol]]]
outboundAssertions: Table[Handle, seq[WireSymbol]]
exported: Membrane
imported: Membrane
nextLocalOid: Oid
2022-12-08 08:15:01 +00:00
pendingTurn: protocol.Turn
2021-09-24 19:25:47 +00:00
packetWriter: PacketWriter
untrusted: bool
SyncPeerEntity = ref object of Entity
relay: Relay
2023-07-24 15:13:36 +00:00
peer: Cap
2021-09-24 19:25:47 +00:00
handleMap: Table[Handle, Handle]
e: WireSymbol
RelayEntity = ref object of Entity
2023-05-18 10:20:44 +00:00
## https://synit.org/book/protocol.html#relay-entities
2021-09-24 19:25:47 +00:00
label: string
relay: Relay
2023-07-24 15:13:36 +00:00
proc releaseCapOut(r: Relay; e: WireSymbol) =
2021-09-24 19:25:47 +00:00
r.exported.drop e
method publish(spe: SyncPeerEntity; t: var Turn; a: AssertionRef; h: Handle) =
spe.handleMap[h] = publish(t, spe.peer, a.value)
2021-09-24 19:25:47 +00:00
method retract(se: SyncPeerEntity; t: var Turn; h: Handle) =
2021-09-24 19:25:47 +00:00
var other: Handle
if se.handleMap.pop(h, other):
retract(t, other)
method message(se: SyncPeerEntity; t: var Turn; a: AssertionRef) =
2021-09-24 19:25:47 +00:00
if not se.e.isNil:
2023-07-24 15:13:36 +00:00
se.relay.releaseCapOut(se.e)
message(t, se.peer, a.value)
2021-09-24 19:25:47 +00:00
2023-07-24 15:13:36 +00:00
method sync(se: SyncPeerEntity; t: var Turn; peer: Cap) =
2021-09-24 19:25:47 +00:00
sync(t, se.peer, peer)
2023-07-24 15:13:36 +00:00
proc newSyncPeerEntity(r: Relay; p: Cap): SyncPeerEntity =
SyncPeerEntity(relay: r, peer: p)
2021-09-24 19:25:47 +00:00
2023-07-24 15:13:36 +00:00
proc rewriteCapOut(relay: Relay; cap: Cap; exported: var seq[WireSymbol]): WireRef =
if cap.target of RelayEntity and cap.target.RelayEntity.relay == relay and cap.attenuation.len == 0:
2024-01-06 12:31:17 +00:00
result = WireRef(orKind: WireRefKind.yours, yours: WireRefYours(oid: cap.target.oid))
2022-03-07 19:15:32 +00:00
else:
2023-07-24 15:13:36 +00:00
var ws = grab(relay.exported, cap)
2022-03-07 19:15:32 +00:00
if ws.isNil:
2023-07-24 15:13:36 +00:00
ws = newWireSymbol(relay.exported, relay.nextLocalOid, cap)
2022-03-16 15:34:29 +00:00
inc relay.nextLocalOid
2022-03-07 19:15:32 +00:00
exported.add ws
2024-01-06 12:31:17 +00:00
result = WireRef(
2022-03-16 15:34:29 +00:00
orKind: WireRefKind.mine,
mine: WireRefMine(oid: ws.oid))
2021-09-24 19:25:47 +00:00
2023-05-18 10:20:44 +00:00
proc rewriteOut(relay: Relay; v: Assertion):
tuple[rewritten: Value, exported: seq[WireSymbol]] {.gcsafe.} =
2021-09-24 19:25:47 +00:00
var exported: seq[WireSymbol]
2024-01-07 22:11:59 +00:00
result.rewritten = mapEmbeds(v) do (pr: Value) -> Value:
let o = pr.unembed(Cap); if o.isSome:
rewriteCapOut(relay, o.get, exported).toPreserves
else: pr
2022-03-16 15:34:29 +00:00
result.exported = exported
2023-05-18 10:20:44 +00:00
proc register(relay: Relay; v: Assertion; h: Handle): tuple[rewritten: Value, exported: seq[WireSymbol]] =
result = rewriteOut(relay, v)
relay.outboundAssertions[h] = result.exported
2021-09-24 19:25:47 +00:00
proc deregister(relay: Relay; h: Handle) =
var outbound: seq[WireSymbol]
if relay.outboundAssertions.pop(h, outbound):
2023-07-24 15:13:36 +00:00
for e in outbound: releaseCapOut(relay, e)
2021-09-24 19:25:47 +00:00
proc send(r: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
2021-09-24 19:25:47 +00:00
if r.pendingTurn.len == 0:
# If the pending queue is empty then schedule a packet
# to be sent after pending I/O is processed.
callSoon do ():
2021-09-24 19:25:47 +00:00
r.facet.run do (turn: var Turn):
2022-03-12 16:12:21 +00:00
var pkt = Packet(
2021-09-24 19:25:47 +00:00
orKind: PacketKind.Turn,
turn: move r.pendingTurn)
2021-11-03 18:22:42 +00:00
trace "C: ", pkt
assert(not r.packetWriter.isNil, "missing packetWriter proc")
asyncCheck(turn, r.packetWriter(pkt))
2021-10-28 16:57:09 +00:00
r.pendingTurn.add TurnEvent(oid: rOid, event: m)
2021-09-24 19:25:47 +00:00
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
send(re.relay, turn, protocol.Oid re.oid, ev)
2021-09-24 19:25:47 +00:00
method publish(re: RelayEntity; t: var Turn; a: AssertionRef; h: Handle) {.gcsafe.} =
re.send(t, Event(
2021-10-28 16:57:09 +00:00
orKind: EventKind.Assert,
2022-12-08 08:15:01 +00:00
`assert`: protocol.Assert(
2023-05-18 10:20:44 +00:00
assertion: re.relay.register(a.value, h).rewritten,
handle: h)))
2021-10-28 16:57:09 +00:00
method retract(re: RelayEntity; t: var Turn; h: Handle) {.gcsafe.} =
2021-09-24 19:25:47 +00:00
re.relay.deregister h
re.send(t, Event(
2021-09-24 19:25:47 +00:00
orKind: EventKind.Retract,
retract: Retract(handle: h)))
2021-09-24 19:25:47 +00:00
method message(re: RelayEntity; turn: var Turn; msg: AssertionRef) {.gcsafe.} =
2023-05-18 10:20:44 +00:00
var (value, exported) = rewriteOut(re.relay, msg.value)
assert(len(exported) == 0, "cannot send a reference in a message")
if len(exported) == 0:
re.send(turn, Event(orKind: EventKind.Message, message: Message(body: value)))
2021-09-24 19:25:47 +00:00
2023-07-24 15:13:36 +00:00
method sync(re: RelayEntity; turn: var Turn; peer: Cap) {.gcsafe.} =
2021-09-24 19:25:47 +00:00
var
2021-10-28 16:57:09 +00:00
peerEntity = newSyncPeerEntity(re.relay, peer)
2021-09-24 19:25:47 +00:00
exported: seq[WireSymbol]
2024-01-14 10:09:49 +00:00
wr = rewriteCapOut(re.relay, turn.newCap(peerEntity), exported)
2021-09-24 19:25:47 +00:00
peerEntity.e = exported[0]
2024-01-14 10:09:49 +00:00
var ev = Event(orKind: EventKind.Sync)
ev.sync.peer = wr.toPreserves.embed
re.send(turn, ev)
2021-09-24 19:25:47 +00:00
2021-10-28 16:57:09 +00:00
proc newRelayEntity(label: string; r: Relay; o: Oid): RelayEntity =
RelayEntity(label: label, relay: r, oid: o)
2021-10-28 16:57:09 +00:00
2021-09-24 19:25:47 +00:00
using
relay: Relay
facet: Facet
2023-07-24 15:13:36 +00:00
proc lookupLocal(relay; oid: Oid): Cap =
2022-03-07 19:15:32 +00:00
let sym = relay.exported.grab oid
2023-07-24 15:13:36 +00:00
if sym.isNil: newInertCap()
else: sym.cap
2021-09-24 19:25:47 +00:00
2023-07-24 15:13:36 +00:00
proc isInert(r: Cap): bool =
2021-09-24 19:25:47 +00:00
r.target.isNil
2023-07-24 15:13:36 +00:00
proc rewriteCapIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Cap =
2021-09-24 19:25:47 +00:00
case n.orKind
of WireRefKind.mine:
2022-03-07 19:15:32 +00:00
var e = relay.imported.grab(n.mine.oid)
2024-01-14 10:09:49 +00:00
if e.isNil:
e = newWireSymbol(
relay.imported,
n.mine.oid,
newCap(facet, newRelayEntity("rewriteCapIn", relay, n.mine.oid)),
)
2021-09-24 19:25:47 +00:00
imported.add e
2023-07-24 15:13:36 +00:00
result = e.cap
2021-09-24 19:25:47 +00:00
of WireRefKind.yours:
let r = relay.lookupLocal(n.yours.oid)
if n.yours.attenuation.len == 0 or r.isInert: result = r
else: raiseAssert "attenuation not implemented"
2021-09-24 19:25:47 +00:00
2022-12-08 08:15:01 +00:00
proc rewriteIn(relay; facet; v: Value):
tuple[rewritten: Assertion; imported: seq[WireSymbol]] {.gcsafe.} =
var imported: seq[WireSymbol]
2024-01-06 12:31:17 +00:00
result.rewritten = mapEmbeds(v) do (pr: Value) -> Value:
2024-01-07 22:11:59 +00:00
let wr = pr.preservesTo WireRef; if wr.isSome:
result = rewriteCapIn(relay, facet, wr.get, imported).embed
else:
result = pr
result.imported = imported
2021-09-24 19:25:47 +00:00
proc close(r: Relay) = discard
2023-07-24 15:13:36 +00:00
proc dispatch*(relay: Relay; turn: var Turn; cap: Cap; event: Event) {.gcsafe.} =
2021-09-24 19:25:47 +00:00
case event.orKind
of EventKind.Assert:
let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion)
2023-07-24 15:13:36 +00:00
relay.inboundAssertions[event.assert.handle] = (publish(turn, cap, a), imported,)
2021-09-24 19:25:47 +00:00
of EventKind.Retract:
let remoteHandle = event.retract.handle
var outbound: tuple[localHandle: Handle, imported: seq[WireSymbol]]
if relay.inboundAssertions.pop(remoteHandle, outbound):
for e in outbound.imported: relay.imported.drop e
turn.retract(outbound.localHandle)
of EventKind.Message:
let (a, imported) = rewriteIn(relay, turn.facet, event.message.body)
2021-09-24 19:25:47 +00:00
assert imported.len == 0, "Cannot receive transient reference"
2023-07-24 15:13:36 +00:00
turn.message(cap, a)
2021-09-24 19:25:47 +00:00
of EventKind.Sync:
discard # TODO
#[
var imported: seq[WireSymbol]
2023-07-24 15:13:36 +00:00
let k = relay.rewriteCapIn(turn, evenr.sync.peer, imported)
turn.sync(cap) do (turn: var Turn):
2021-09-24 19:25:47 +00:00
turn.message(k, true)
for e in imported: relay.imported.del e
]#
proc dispatch*(relay: Relay; v: Value) {.gcsafe.} =
2022-03-12 16:12:21 +00:00
trace "S: ", v
2021-09-24 19:25:47 +00:00
run(relay.facet) do (t: var Turn):
var pkt: Packet
2023-12-31 17:15:06 +00:00
if pkt.fromPreserves(v):
2021-09-24 19:25:47 +00:00
case pkt.orKind
of PacketKind.Turn:
2023-05-18 10:20:44 +00:00
# https://synit.org/book/protocol.html#turn-packets
2021-09-24 19:25:47 +00:00
for te in pkt.turn:
2023-05-18 10:20:44 +00:00
let r = lookupLocal(relay, te.oid.Oid)
if not r.isInert:
dispatch(relay, t, r, te.event)
else:
2023-07-24 15:13:36 +00:00
stderr.writeLine("discarding event for unknown Cap; ", te.event)
2021-09-24 19:25:47 +00:00
of PacketKind.Error:
2023-05-18 10:20:44 +00:00
# https://synit.org/book/protocol.html#error-packets
2022-12-08 08:15:01 +00:00
when defined(posix):
stderr.writeLine("Error from server: ", pkt.error.message, " (detail: ", pkt.error.detail, ")")
2021-09-24 19:25:47 +00:00
close relay
2022-03-14 15:09:29 +00:00
of PacketKind.Extension:
2023-05-18 10:20:44 +00:00
# https://synit.org/book/protocol.html#extension-packets
2022-03-14 15:09:29 +00:00
discard
else:
2022-12-08 08:15:01 +00:00
when defined(posix):
stderr.writeLine("discarding undecoded packet ", v)
2021-09-24 19:25:47 +00:00
type
RelayOptions* = object of RootObj
packetWriter*: PacketWriter
untrusted*: bool
RelayActorOptions* = object of RelayOptions
initialOid*: Option[Oid]
2023-07-24 15:13:36 +00:00
initialCap*: Cap
nextLocalOid*: Option[Oid]
2021-09-24 19:25:47 +00:00
2022-03-12 16:15:07 +00:00
proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay =
2021-09-24 19:25:47 +00:00
result = Relay(
facet: turn.facet,
2021-09-24 19:25:47 +00:00
packetWriter: opts.packetWriter,
untrusted: opts.untrusted)
discard result.facet.preventInertCheck()
2022-03-12 16:15:07 +00:00
setup(turn, result)
2021-09-24 19:25:47 +00:00
2023-12-31 17:15:06 +00:00
proc transportConnectionResolve(addrAss: Assertion; ds: Cap): gatekeeper.TransportConnection =
result.`addr` = addrAss
2023-12-31 17:15:06 +00:00
result.resolved = Resolved(orKind: ResolvedKind.accepted)
2024-01-07 22:11:59 +00:00
result.resolved.accepted.responderSession = ds
proc spawnRelay*(name: string; turn: var Turn; ds: Cap; addrAss: Assertion; opts: RelayActorOptions; setup: RelaySetup) =
2023-07-22 10:32:52 +00:00
discard spawn(name, turn) do (turn: var Turn):
2022-03-12 16:15:07 +00:00
let relay = newRelay(turn, opts, setup)
2023-07-24 15:13:36 +00:00
if not opts.initialCap.isNil:
2021-09-24 19:25:47 +00:00
var exported: seq[WireSymbol]
2023-07-24 15:13:36 +00:00
discard rewriteCapOut(relay, opts.initialCap, exported)
2021-09-24 19:25:47 +00:00
opts.nextLocalOid.map do (oid: Oid):
relay.nextLocalOid =
if oid == 0.Oid: 1.Oid
else: oid
if opts.initialOid.isSome:
var
imported: seq[WireSymbol]
wr = WireRef(
orKind: WireRefKind.mine,
mine: WireRefMine(oid: opts.initialOid.get))
res = rewriteCapIn(relay, turn.facet, wr, imported)
discard publish(turn, ds, transportConnectionResolve(addrAss, res))
else:
discard publish(turn, ds, transportConnectionResolve(addrAss, ds))
2021-09-24 19:25:47 +00:00
when defined(posix):
import std/asyncnet
2023-05-18 10:20:44 +00:00
from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol
2021-09-24 19:25:47 +00:00
type ShutdownEntity* = ref object of Entity
2021-09-24 19:25:47 +00:00
2022-03-14 15:09:29 +00:00
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
2021-09-24 19:25:47 +00:00
stopActor(turn)
2023-07-24 15:13:36 +00:00
type ConnectProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
2023-05-18 10:20:44 +00:00
export Tcp
2021-09-24 19:25:47 +00:00
when defined(posix):
2023-05-18 10:20:44 +00:00
export Unix
2023-12-31 17:15:06 +00:00
proc connect*(turn: var Turn; ds: Cap; route: Route; addrAss: Assertion; socket: AsyncSocket; step: Value) =
2023-07-20 17:36:05 +00:00
## Relay a dataspace over an open `AsyncSocket`.
proc socketWriter(packet: sink Packet): Future[void] =
socket.send(cast[string](encode(packet)))
const recvSize = 0x2000
2023-07-24 15:13:36 +00:00
var shutdownCap: Cap
2023-05-18 10:20:44 +00:00
let
reenable = turn.facet.preventInertCheck()
2023-07-24 15:13:36 +00:00
connectionClosedCap = newCap(turn, ShutdownEntity())
2023-05-18 10:20:44 +00:00
discard bootActor("socket") do (turn: var Turn):
var ops = RelayActorOptions(
packetWriter: socketWriter,
initialOid: 0.Oid.some)
spawnRelay("socket", turn, ds, addrAss, ops) do (turn: var Turn; relay: Relay):
2023-05-18 10:20:44 +00:00
let facet = turn.facet
var wireBuf = newBufferedDecoder(0)
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
if pktFut.failed:
run(facet) do (turn: var Turn): stopActor(turn)
else:
var buf = pktFut.read
if buf.len == 0:
2022-03-14 15:09:29 +00:00
run(facet) do (turn: var Turn): stopActor(turn)
else:
2023-05-18 10:20:44 +00:00
feed(wireBuf, buf)
var (success, pr) = decode(wireBuf)
if success:
dispatch(relay, pr)
2023-07-20 17:36:05 +00:00
if not socket.isClosed:
socket.recv(recvSize).addCallback(recvCb)
2023-05-18 10:20:44 +00:00
socket.recv(recvSize).addCallback(recvCb)
turn.facet.actor.atExit do (turn: var Turn): close(socket)
2023-07-24 15:13:36 +00:00
discard publish(turn, connectionClosedCap, true)
shutdownCap = newCap(turn, ShutdownEntity())
2023-12-31 17:15:06 +00:00
onPublish(turn, ds, TransportConnection ?: {0: ?addrAss, 2: ?:Rejected}) do (detail: Value):
raise newException(IOError, $detail)
2023-12-31 17:15:06 +00:00
onPublish(turn, ds, TransportConnection ?: {0: ?addrAss, 2: ?:ResolvedAccepted}) do (gatekeeper: Cap):
2023-05-18 10:20:44 +00:00
run(gatekeeper.relay) do (turn: var Turn):
reenable()
2023-07-24 15:13:36 +00:00
discard publish(turn, shutdownCap, true)
proc duringCallback(turn: var Turn; ass: Assertion; h: Handle): TurnAction =
2023-05-18 10:20:44 +00:00
let facet = inFacet(turn) do (turn: var Turn):
2024-01-07 22:11:59 +00:00
let o = ass.preservesTo Resolved; if o.isSome:
discard publish(turn, ds, ResolvePath(
route: route, `addr`: addrAss, resolved: o.get))
2023-05-18 10:20:44 +00:00
proc action(turn: var Turn) =
stop(turn, facet)
result = action
2024-01-01 18:29:54 +00:00
var resolve = Resolve(
2023-05-18 10:20:44 +00:00
step: step,
2024-01-07 22:11:59 +00:00
observer: newCap(turn, during(duringCallback)),
2024-01-01 18:29:54 +00:00
)
discard publish(turn, gatekeeper, resolve)
2023-12-31 17:15:06 +00:00
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Tcp; step: Value) =
2023-07-20 17:36:05 +00:00
## Relay a dataspace over TCP.
2023-05-18 10:20:44 +00:00
let socket = newAsyncSocket(
domain = AF_INET,
sockType = SOCK_STREAM,
protocol = IPPROTO_TCP,
buffered = false)
let fut = connect(socket, transport.host, Port transport.port)
addCallback(fut, turn) do (turn: var Turn):
2023-12-31 17:15:06 +00:00
connect(turn, ds, route, transport.toPreserves, socket, step)
2023-05-18 10:20:44 +00:00
2023-12-31 17:15:06 +00:00
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Unix; step: Value) =
2023-07-20 17:36:05 +00:00
## Relay a dataspace over a UNIX socket.
2023-05-18 10:20:44 +00:00
let socket = newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false)
let fut = connectUnix(socket, transport.path)
addCallback(fut, turn) do (turn: var Turn):
2023-12-31 17:15:06 +00:00
connect(turn, ds, route, transport.toPreserves, socket, step)
2023-05-18 10:20:44 +00:00
import std/asyncfile
const stdinReadSize = 128
proc connectStdio*(turn: var Turn; ds: Cap) =
## Connect to an external dataspace over stdin and stdout.
proc stdoutWriter(packet: sink Packet): Future[void] =
result = newFuture[void]()
var buf = encode(packet)
doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len
flushFile(stdout)
complete result
var opts = RelayActorOptions(
packetWriter: stdoutWriter,
2023-07-24 15:13:36 +00:00
initialCap: ds,
initialOid: 0.Oid.some)
2023-12-31 17:15:06 +00:00
spawnRelay("stdio", turn, ds, Stdio().toPreserves, opts) do (turn: var Turn; relay: Relay):
let
facet = turn.facet
asyncStdin = openAsync("/dev/stdin") # this is universal now?
close(stdin)
facet.actor.atExit do (turn: var Turn):
close(asyncStdin)
var wireBuf = newBufferedDecoder(0)
proc readCb(pktFut: Future[string]) {.gcsafe.} =
if not pktFut.failed:
var buf = pktFut.read
if buf.len == 0:
run(facet) do (turn: var Turn): stopActor(turn)
else:
feed(wireBuf, buf)
var (success, pr) = decode(wireBuf)
if success:
dispatch(relay, pr)
asyncStdin.read(stdinReadSize).addCallback(readCb)
asyncStdin.read(stdinReadSize).addCallback(readCb)
2023-10-13 22:36:34 +00:00
type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
2023-12-31 17:15:06 +00:00
proc envRoute*: Route =
var text = getEnv("SYNDICATE_ROUTE")
if text == "":
2023-12-31 17:15:06 +00:00
var tx = (getEnv("XDG_RUNTIME_DIR", "/run/user/1000") / "dataspace").toPreserves
result.transports = @[initRecord("unix", tx)]
2023-12-31 17:15:06 +00:00
result.pathSteps = @[capabilities.mint().toPreserves]
else:
2024-01-01 18:29:54 +00:00
var pr = parsePreserves(text)
2023-12-31 17:15:06 +00:00
if not result.fromPreserves(pr):
raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr)
proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
2023-10-13 22:36:34 +00:00
var
unix: Unix
tcp: Tcp
stdio: Stdio
doAssert(route.transports.len == 1, "only a single transport supported for routes")
doAssert(route.pathSteps.len < 2, "multiple path steps not supported for routes")
2023-12-31 17:15:06 +00:00
if unix.fromPreserves route.transports[0]:
connect(turn, ds, route, unix, route.pathSteps[0])
2023-12-31 17:15:06 +00:00
elif tcp.fromPreserves route.transports[0]:
connect(turn, ds, route, tcp, route.pathSteps[0])
2023-12-31 17:15:06 +00:00
elif stdio.fromPreserves route.transports[0]:
connectStdio(turn, ds)
bootProc(turn, ds)
2023-10-13 22:36:34 +00:00
else:
raise newException(ValueError, "unsupported route")
2023-12-31 17:15:06 +00:00
during(turn, ds, ResolvePath ?: { 0: ?route, 3: ?:ResolvedAccepted}) do (dest: Cap):
bootProc(turn, dest)