# SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense import std/[asyncdispatch, options, tables] from std/os import getEnv, `/` import preserves import ../syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress] when defined(traceSyndicate): when defined(posix): template trace(args: varargs[untyped]): untyped = stderr.writeLine(args) else: template trace(args: varargs[untyped]): untyped = echo(args) else: template trace(args: varargs[untyped]): untyped = discard export `$` type Oid = sturdy.Oid export Stdio, Tcp, WebSocket, Unix type Assertion = Value WireRef = sturdy.WireRef Turn = syndicate.Turn Handle = actors.Handle PacketWriter = proc (turn: var Turn; buf: seq[byte]) {.closure, gcsafe.} RelaySetup = proc (turn: var Turn; relay: Relay) {.closure, gcsafe.} Relay* = ref object facet: Facet inboundAssertions: Table[Handle, tuple[localHandle: Handle, imported: seq[WireSymbol]]] outboundAssertions: Table[Handle, seq[WireSymbol]] exported: Membrane imported: Membrane nextLocalOid: Oid pendingTurn: protocol.Turn wireBuf: BufferedDecoder packetWriter: PacketWriter peer: Cap SyncPeerEntity = ref object of Entity relay: Relay peer: Cap handleMap: Table[Handle, Handle] e: WireSymbol RelayEntity = ref object of Entity ## https://synit.org/book/protocol.html#relay-entities label: string relay: Relay proc releaseCapOut(r: Relay; e: WireSymbol) = r.exported.drop e method publish(spe: SyncPeerEntity; t: var Turn; a: AssertionRef; h: Handle) = spe.handleMap[h] = publish(t, spe.peer, a.value) method retract(se: SyncPeerEntity; t: var Turn; h: Handle) = var other: Handle if se.handleMap.pop(h, other): retract(t, other) method message(se: SyncPeerEntity; t: var Turn; a: AssertionRef) = if not se.e.isNil: se.relay.releaseCapOut(se.e) message(t, se.peer, a.value) method sync(se: SyncPeerEntity; t: var Turn; peer: Cap) = sync(t, se.peer, peer) proc newSyncPeerEntity(r: Relay; p: Cap): SyncPeerEntity = SyncPeerEntity(relay: r, peer: p) proc rewriteCapOut(relay: Relay; cap: Cap; exported: var seq[WireSymbol]): WireRef = if cap.target of RelayEntity and cap.target.RelayEntity.relay == relay and cap.attenuation.len == 0: result = WireRef(orKind: WireRefKind.yours, yours: WireRefYours(oid: cap.target.oid)) else: var ws = grab(relay.exported, cap) if ws.isNil: ws = newWireSymbol(relay.exported, relay.nextLocalOid, cap) inc relay.nextLocalOid exported.add ws result = WireRef( orKind: WireRefKind.mine, mine: WireRefMine(oid: ws.oid)) proc rewriteOut(relay: Relay; v: Assertion): tuple[rewritten: Value, exported: seq[WireSymbol]] {.gcsafe.} = var exported: seq[WireSymbol] result.rewritten = mapEmbeds(v) do (pr: Value) -> Value: let o = pr.unembed(Cap); if o.isSome: rewriteCapOut(relay, o.get, exported).toPreserves else: pr result.exported = exported proc register(relay: Relay; v: Assertion; h: Handle): tuple[rewritten: Value, exported: seq[WireSymbol]] = result = rewriteOut(relay, v) relay.outboundAssertions[h] = result.exported proc deregister(relay: Relay; h: Handle) = var outbound: seq[WireSymbol] if relay.outboundAssertions.pop(h, outbound): for e in outbound: releaseCapOut(relay, e) proc send(relay: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) = if relay.pendingTurn.len == 0: # If the pending queue is empty then schedule a packet # to be sent after pending I/O is processed. callSoon do (): relay.facet.run do (turn: var Turn): var pkt = Packet( orKind: PacketKind.Turn, turn: move relay.pendingTurn) trace "C: ", pkt relay.packetWriter(turn, encode pkt) relay.pendingTurn.add TurnEvent(oid: rOid, event: m) proc send(re: RelayEntity; turn: var Turn; ev: Event) = send(re.relay, turn, protocol.Oid re.oid, ev) method publish(re: RelayEntity; t: var Turn; a: AssertionRef; h: Handle) {.gcsafe.} = re.send(t, Event( orKind: EventKind.Assert, `assert`: protocol.Assert( assertion: re.relay.register(a.value, h).rewritten, handle: h))) method retract(re: RelayEntity; t: var Turn; h: Handle) {.gcsafe.} = re.relay.deregister h re.send(t, Event( orKind: EventKind.Retract, retract: Retract(handle: h))) method message(re: RelayEntity; turn: var Turn; msg: AssertionRef) {.gcsafe.} = var (value, exported) = rewriteOut(re.relay, msg.value) assert(len(exported) == 0, "cannot send a reference in a message") if len(exported) == 0: re.send(turn, Event(orKind: EventKind.Message, message: Message(body: value))) method sync(re: RelayEntity; turn: var Turn; peer: Cap) {.gcsafe.} = var peerEntity = newSyncPeerEntity(re.relay, peer) exported: seq[WireSymbol] wr = rewriteCapOut(re.relay, turn.newCap(peerEntity), exported) peerEntity.e = exported[0] var ev = Event(orKind: EventKind.Sync) ev.sync.peer = wr.toPreserves.embed re.send(turn, ev) proc newRelayEntity(label: string; r: Relay; o: Oid): RelayEntity = RelayEntity(label: label, relay: r, oid: o) using relay: Relay facet: Facet proc lookupLocal(relay; oid: Oid): Cap = let sym = relay.exported.grab oid if sym.isNil: newInertCap() else: sym.cap proc isInert(r: Cap): bool = r.target.isNil proc rewriteCapIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Cap = case n.orKind of WireRefKind.mine: var e = relay.imported.grab(n.mine.oid) if e.isNil: e = newWireSymbol( relay.imported, n.mine.oid, newCap(facet, newRelayEntity("rewriteCapIn", relay, n.mine.oid)), ) imported.add e result = e.cap of WireRefKind.yours: let r = relay.lookupLocal(n.yours.oid) if n.yours.attenuation.len == 0 or r.isInert: result = r else: raiseAssert "attenuation not implemented" proc rewriteIn(relay; facet; v: Value): tuple[rewritten: Assertion; imported: seq[WireSymbol]] {.gcsafe.} = var imported: seq[WireSymbol] result.rewritten = mapEmbeds(v) do (pr: Value) -> Value: let wr = pr.preservesTo WireRef; if wr.isSome: result = rewriteCapIn(relay, facet, wr.get, imported).embed else: result = pr result.imported = imported proc close(r: Relay) = discard proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) {.gcsafe.} = case event.orKind of EventKind.Assert: let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion) relay.inboundAssertions[event.assert.handle] = (publish(turn, cap, a), imported,) of EventKind.Retract: let remoteHandle = event.retract.handle var outbound: tuple[localHandle: Handle, imported: seq[WireSymbol]] if relay.inboundAssertions.pop(remoteHandle, outbound): for e in outbound.imported: relay.imported.drop e turn.retract(outbound.localHandle) of EventKind.Message: let (a, imported) = rewriteIn(relay, turn.facet, event.message.body) assert imported.len == 0, "Cannot receive transient reference" turn.message(cap, a) of EventKind.Sync: discard # TODO #[ var imported: seq[WireSymbol] let k = relay.rewriteCapIn(turn, evenr.sync.peer, imported) turn.sync(cap) do (turn: var Turn): turn.message(k, true) for e in imported: relay.imported.del e ]# proc dispatch(relay: Relay; v: Value) {.gcsafe.} = trace "S: ", v run(relay.facet) do (t: var Turn): var pkt: Packet if pkt.fromPreserves(v): case pkt.orKind of PacketKind.Turn: # https://synit.org/book/protocol.html#turn-packets for te in pkt.turn: let r = lookupLocal(relay, te.oid.Oid) if not r.isInert: dispatch(relay, t, r, te.event) else: stderr.writeLine("discarding event for unknown Cap; ", te.event) of PacketKind.Error: # https://synit.org/book/protocol.html#error-packets when defined(posix): stderr.writeLine("Error from server: ", pkt.error.message, " (detail: ", pkt.error.detail, ")") close relay of PacketKind.Extension: # https://synit.org/book/protocol.html#extension-packets discard else: when defined(posix): stderr.writeLine("discarding undecoded packet ", v) proc recv(relay: Relay; buf: seq[byte]) = feed(relay.wireBuf, buf) var pr = decode(relay.wireBuf) if pr.isSome: dispatch(relay, pr.get) type RelayOptions* = object of RootObj packetWriter*: PacketWriter RelayActorOptions* = object of RelayOptions initialOid*: Option[Oid] initialCap*: Cap nextLocalOid*: Option[Oid] proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) = spawn(name, turn) do (turn: var Turn): let relay = Relay( facet: turn.facet, packetWriter: opts.packetWriter, wireBuf: newBufferedDecoder(0), ) discard relay.facet.preventInertCheck() if not opts.initialCap.isNil: var exported: seq[WireSymbol] discard rewriteCapOut(relay, opts.initialCap, exported) opts.nextLocalOid.map do (oid: Oid): relay.nextLocalOid = if oid == 0.Oid: 1.Oid else: oid assert opts.initialOid.isSome if opts.initialOid.isSome: var imported: seq[WireSymbol] wr = WireRef( orKind: WireRefKind.mine, mine: WireRefMine(oid: opts.initialOid.get)) relay.peer = rewriteCapIn(relay, turn.facet, wr, imported) assert not relay.peer.isNil setup(turn, relay) proc rejected(detail: Value): Resolved = result = Resolved(orKind: ResolvedKind.Rejected) result.rejected.detail = detail proc accepted(cap: Cap): Resolved = result = Resolved(orKind: ResolvedKind.accepted) result.accepted.responderSession = cap when defined(posix): import std/asyncfile export Unix type StdioControlEntity = ref object of Entity stdin: AsyncFile method message(entity: StdioControlEntity; turn: var Turn; ass: AssertionRef) = if ass.value.preservesTo(ForceDisconnect).isSome: close(entity.stdin) close(stdout) proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Stdio) = ## Connect to an external dataspace over stdio. proc stdoutWriter(turn: var Turn; buf: seq[byte]) = ## Blocking write to stdout. let n = writeBytes(stdout, buf, 0, buf.len) flushFile(stdout) if n != buf.len: stopActor(turn) var opts = RelayActorOptions( packetWriter: stdoutWriter, initialCap: ds, initialOid: 0.Oid.some, ) spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay): let facet = turn.facet asyncStdin = openAsync("/dev/stdin") # this is universal now? publish(turn, ds, TransportConnection( `addr`: ta.toPreserves, control: StdioControlEntity(stdin: asyncStdin).newCap(turn), resolved: relay.peer.accepted, )) const stdinReadSize = 0x2000 proc readCb(pktFut: Future[string]) {.gcsafe.} = if not pktFut.failed: var buf = pktFut.read if buf.len == 0: run(facet) do (turn: var Turn): stopActor(turn) else: relay.recv(cast[seq[byte]](buf)) asyncStdin.read(stdinReadSize).addCallback(readCb) asyncStdin.read(stdinReadSize).addCallback(readCb) proc connectStdio*(turn: var Turn; ds: Cap) = ## Connect to an external dataspace over stdin and stdout. connectTransport(turn, ds, transportAddress.Stdio()) import std/asyncnet from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol type SocketControlEntity = ref object of Entity socket: AsyncSocket method message(entity: SocketControlEntity; turn: var Turn; ass: AssertionRef) = if ass.value.preservesTo(ForceDisconnect).isSome: close(entity.socket) type ShutdownEntity* = ref object of Entity method retract(e: ShutdownEntity; turn: var Turn; h: Handle) = stopActor(turn) proc connect(turn: var Turn; ds: Cap; transAddr: Value; socket: AsyncSocket) = proc socketWriter(turn: var Turn; buf: seq[byte]) = asyncCheck(turn, socket.send(cast[string](buf))) var ops = RelayActorOptions( packetWriter: socketWriter, initialOid: 0.Oid.some, ) spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay): let facet = turn.facet facet.actor.atExit do (turn: var Turn): close(socket) publish(turn, ds, TransportConnection( `addr`: transAddr, control: SocketControlEntity(socket: socket).newCap(turn), resolved: relay.peer.accepted, )) const recvSize = 0x4000 proc recvCb(pktFut: Future[string]) {.gcsafe.} = if pktFut.failed or pktFut.read.len == 0: run(facet) do (turn: var Turn): stopActor(turn) else: relay.recv(cast[seq[byte]](pktFut.read)) if not socket.isClosed: socket.recv(recvSize).addCallback(recvCb) socket.recv(recvSize).addCallback(recvCb) proc connect(turn: var Turn; ds: Cap; ta: Value; socket: AsyncSocket; fut: Future[void]) = let facet = turn.facet fut.addCallback do (): run(facet) do (turn: var Turn): if fut.failed: var ass = TransportConnection( `addr`: ta, resolved: Resolved(orKind: ResolvedKind.Rejected), ) ass.resolved.rejected.detail = embed fut.error publish(turn, ds, ass) else: connect(turn, ds, ta, socket) proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Tcp) = let facet = turn.facet socket = newAsyncSocket( domain = AF_INET, sockType = SOCK_STREAM, protocol = IPPROTO_TCP, buffered = false, ) connect(turn, ds, ta.toPreserves, socket, connect(socket, ta.host, Port ta.port)) proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Unix) = ## Relay a dataspace over a UNIX socket. let socket = newAsyncSocket( domain = AF_UNIX, sockType = SOCK_STREAM, protocol = cast[Protocol](0), buffered = false) connect(turn, ds, ta.toPreserves, socket, connectUnix(socket, ta.path)) proc walk(turn: var Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) {.gcsafe.} = if stepOff < route.pathSteps.len: let step = route.pathSteps[stepOff] rejectPat = ResolvedPathStep?:{ 0: ?(origin.embed), 1: ?step, 2: ?:Rejected} acceptPat = ResolvedPathStep?:{ 0: ?(origin.embed), 1: ?step, 2: ?:ResolvedAccepted} onPublish(turn, ds, rejectPat) do (detail: Value): publish(turn, ds, ResolvePath( route: route, `addr`: route.transports[transOff], resolved: detail.rejected, )) during(turn, ds, acceptPat) do (next: Cap): walk(turn, ds, next, route, transOff, stepOff.succ) else: publish(turn, ds, ResolvePath( route: route, `addr`: route.transports[transOff], resolved: origin.accepted, )) proc connectRoute(turn: var Turn; ds: Cap; route: Route; transOff: int) = let rejectPat = TransportConnection ?: { 0: ?route.transports[transOff], 2: ?:Rejected, } during(turn, ds, rejectPat) do (detail: Value): publish(turn, ds, ResolvePath( route: route, `addr`: route.transports[transOff], resolved: detail.rejected, )) let acceptPat = TransportConnection?:{ 0: ?route.transports[transOff], 2: ?:ResolvedAccepted, } onPublish(turn, ds, acceptPat) do (origin: Cap): walk(turn, ds, origin, route, transOff, 0) type StepCallback = proc (turn: var Turn; step: Value; origin, next: Cap) {.gcsafe.} proc spawnStepResolver(turn: var Turn; ds: Cap; stepType: Value; cb: StepCallback) = spawn($stepType & "-step", turn) do (turn: var Turn): let stepPat = grabRecord(stepType, grab()) let pat = ?Observe(pattern: ResolvedPathStep?:{1: stepPat}) ?? {0: grabLit(), 1: grab()} during(turn, ds, pat) do (origin: Cap; stepDetail: Literal[Value]): let step = toRecord(stepType, stepDetail.value) proc duringCallback(turn: var Turn; ass: Value; h: Handle): TurnAction = var res = ass.preservesTo Resolved if res.isSome: if res.get.orKind == ResolvedKind.accepted and res.get.accepted.responderSession of Cap: cb(turn, step, origin, res.get.accepted.responderSession.Cap) else: publish(turn, ds, ResolvedPathStep( origin: origin, pathStep: step, resolved: res.get)) proc action(turn: var Turn) = stop(turn) result = action publish(turn, origin, Resolve( step: step, observer: newCap(turn, during(duringCallback)))) proc spawnRelays*(turn: var Turn; ds: Cap) = ## Spawn actors that manage routes and appeasing gatekeepers. spawn("transport-connector", turn) do (turn: var Turn): let pat = ?Observe(pattern: !TransportConnection) ?? { 0: grab() } # Use a generic pattern and type matching # in the during handler because it is easy. let stdioPat = ?Observe(pattern: TransportConnection?:{0: ?:Stdio}) during(turn, ds, stdioPat) do: connectTransport(turn, ds, Stdio()) # TODO: tcp pattern during(turn, ds, pat) do (ta: Literal[transportAddress.Tcp]): connectTransport(turn, ds, ta.value) # TODO: unix pattern during(turn, ds, pat) do (ta: Literal[transportAddress.Unix]): connectTransport(turn, ds, ta.value) spawn("path-resolver", turn) do (turn: var Turn): let pat = ?Observe(pattern: !ResolvePath) ?? {0: grab()} during(turn, ds, pat) do (route: Literal[Route]): for i, transAddr in route.value.transports: connectRoute(turn, ds, route.value, i) spawnStepResolver(turn, ds, "ref".toSymbol) do ( turn: var Turn, step: Value, origin: Cap, next: Cap): publish(turn, ds, ResolvedPathStep( origin: origin, pathStep: step, resolved: next.accepted)) type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.} proc envRoute*: Route = var text = getEnv("SYNDICATE_ROUTE") if text == "": var tx = (getEnv("XDG_RUNTIME_DIR", "/run/user/1000") / "dataspace").toPreserves result.transports = @[initRecord("unix", tx)] result.pathSteps = @[capabilities.mint().toPreserves] else: var pr = parsePreserves(text) if not result.fromPreserves(pr): raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr) proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) = during(turn, ds, ResolvePath ?: {0: ?route, 3: ?:ResolvedAccepted}) do (dst: Cap): bootProc(turn, dst) # TODO: define a runActor that comes preloaded with relaying