# SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense import std/[options, tables] from std/os import getEnv, `/` import pkg/sys/ioqueue import preserves import ../syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress] when defined(traceSyndicate): when defined(posix): template trace(args: varargs[untyped]): untyped = stderr.writeLine(args) else: template trace(args: varargs[untyped]): untyped = echo(args) else: template trace(args: varargs[untyped]): untyped = discard export `$` export Stdio, Tcp, WebSocket, Unix type Assertion = Value Event = protocol.Event Handle = actors.Handle Oid = sturdy.Oid Turn = syndicate.Turn WireRef = sturdy.WireRef PacketWriter = proc (turn: var Turn; buf: seq[byte]) {.closure.} RelaySetup = proc (turn: var Turn; relay: Relay) {.closure.} Relay* = ref object facet: Facet inboundAssertions: Table[Handle, tuple[localHandle: Handle, imported: seq[WireSymbol]]] outboundAssertions: Table[Handle, seq[WireSymbol]] exported: Membrane imported: Membrane nextLocalOid: Oid wireBuf: BufferedDecoder packetWriter: PacketWriter peer: Cap SyncPeerEntity = ref object of Entity relay: Relay peer: Cap handleMap: Table[Handle, Handle] e: WireSymbol RelayEntity = ref object of Entity ## https://synit.org/book/protocol.html#relay-entities label: string relay: Relay proc releaseCapOut(r: Relay; e: WireSymbol) = r.exported.drop e method publish(spe: SyncPeerEntity; t: var Turn; a: AssertionRef; h: Handle) = spe.handleMap[h] = publish(t, spe.peer, a.value) method retract(se: SyncPeerEntity; t: var Turn; h: Handle) = var other: Handle if se.handleMap.pop(h, other): retract(t, other) method message(se: SyncPeerEntity; t: var Turn; a: AssertionRef) = if not se.e.isNil: se.relay.releaseCapOut(se.e) message(t, se.peer, a.value) method sync(se: SyncPeerEntity; t: var Turn; peer: Cap) = sync(t, se.peer, peer) proc newSyncPeerEntity(r: Relay; p: Cap): SyncPeerEntity = SyncPeerEntity(relay: r, peer: p) proc rewriteCapOut(relay: Relay; cap: Cap; exported: var seq[WireSymbol]): WireRef = if cap.target of RelayEntity and cap.target.RelayEntity.relay == relay and cap.attenuation.len == 0: result = WireRef(orKind: WireRefKind.yours, yours: WireRefYours(oid: cap.target.oid)) else: var ws = grab(relay.exported, cap) if ws.isNil: ws = newWireSymbol(relay.exported, relay.nextLocalOid, cap) inc relay.nextLocalOid exported.add ws result = WireRef( orKind: WireRefKind.mine, mine: WireRefMine(oid: ws.oid)) proc rewriteOut(relay: Relay; v: Assertion): tuple[rewritten: Value, exported: seq[WireSymbol]] = var exported: seq[WireSymbol] result.rewritten = mapEmbeds(v) do (pr: Value) -> Value: let o = pr.unembed(Cap); if o.isSome: rewriteCapOut(relay, o.get, exported).toPreserves else: pr result.exported = exported proc register(relay: Relay; v: Assertion; h: Handle): tuple[rewritten: Value, exported: seq[WireSymbol]] = result = rewriteOut(relay, v) relay.outboundAssertions[h] = result.exported proc deregister(relay: Relay; h: Handle) = var outbound: seq[WireSymbol] if relay.outboundAssertions.pop(h, outbound): for e in outbound: releaseCapOut(relay, e) proc send(relay: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) = # TODO: don't send right away. var pendingTurn: protocol.Turn pendingTurn.add TurnEvent(oid: rOid, event: m) relay.facet.run do (turn: var Turn): var pkt = Packet( orKind: PacketKind.Turn, turn: pendingTurn) trace "C: ", pkt relay.packetWriter(turn, encode pkt) proc send(re: RelayEntity; turn: var Turn; ev: Event) = send(re.relay, turn, protocol.Oid re.oid, ev) method publish(re: RelayEntity; t: var Turn; a: AssertionRef; h: Handle) = re.send(t, Event( orKind: EventKind.Assert, `assert`: protocol.Assert( assertion: re.relay.register(a.value, h).rewritten, handle: h))) method retract(re: RelayEntity; t: var Turn; h: Handle) = re.relay.deregister h re.send(t, Event( orKind: EventKind.Retract, retract: Retract(handle: h))) method message(re: RelayEntity; turn: var Turn; msg: AssertionRef) = var (value, exported) = rewriteOut(re.relay, msg.value) assert(len(exported) == 0, "cannot send a reference in a message") if len(exported) == 0: re.send(turn, Event(orKind: EventKind.Message, message: Message(body: value))) method sync(re: RelayEntity; turn: var Turn; peer: Cap) = var peerEntity = newSyncPeerEntity(re.relay, peer) exported: seq[WireSymbol] wr = rewriteCapOut(re.relay, turn.newCap(peerEntity), exported) peerEntity.e = exported[0] var ev = Event(orKind: EventKind.Sync) ev.sync.peer = wr.toPreserves.embed re.send(turn, ev) proc newRelayEntity(label: string; r: Relay; o: Oid): RelayEntity = RelayEntity(label: label, relay: r, oid: o) using relay: Relay facet: Facet proc lookupLocal(relay; oid: Oid): Cap = let sym = relay.exported.grab oid if not sym.isNil: result = sym.cap proc rewriteCapIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Cap = case n.orKind of WireRefKind.mine: var e = relay.imported.grab(n.mine.oid) if e.isNil: e = newWireSymbol( relay.imported, n.mine.oid, newCap(facet, newRelayEntity("rewriteCapIn", relay, n.mine.oid)), ) imported.add e result = e.cap of WireRefKind.yours: result = relay.lookupLocal(n.yours.oid) if result.isNil: result = newInertCap() elif n.yours.attenuation.len > 0: result = attenuate(result, n.yours.attenuation) proc rewriteIn(relay; facet; v: Value): tuple[rewritten: Assertion; imported: seq[WireSymbol]] = var imported: seq[WireSymbol] result.rewritten = mapEmbeds(v) do (pr: Value) -> Value: let wr = pr.preservesTo WireRef; if wr.isSome: result = rewriteCapIn(relay, facet, wr.get, imported).embed else: result = pr result.imported = imported proc close(r: Relay) = discard proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) = case event.orKind of EventKind.Assert: let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion) relay.inboundAssertions[event.assert.handle] = (publish(turn, cap, a), imported,) of EventKind.Retract: let remoteHandle = event.retract.handle var outbound: tuple[localHandle: Handle, imported: seq[WireSymbol]] if relay.inboundAssertions.pop(remoteHandle, outbound): for e in outbound.imported: relay.imported.drop e turn.retract(outbound.localHandle) of EventKind.Message: let (a, imported) = rewriteIn(relay, turn.facet, event.message.body) assert imported.len == 0, "Cannot receive transient reference" turn.message(cap, a) of EventKind.Sync: turn.sync(cap) do (turn: var Turn): var (v, imported) = rewriteIn(relay, turn.facet, event.sync.peer) peer = unembed(v, Cap) if peer.isSome: turn.message(get peer, true) for e in imported: relay.imported.drop e proc dispatch(relay: Relay; v: Value) = trace "S: ", v run(relay.facet) do (t: var Turn): var pkt: Packet if pkt.fromPreserves(v): case pkt.orKind of PacketKind.Turn: # https://synit.org/book/protocol.html#turn-packets for te in pkt.turn: let r = lookupLocal(relay, te.oid.Oid) if not r.isNil: dispatch(relay, t, r, te.event) of PacketKind.Error: # https://synit.org/book/protocol.html#error-packets when defined(posix): stderr.writeLine("Error from server: ", pkt.error.message, " (detail: ", pkt.error.detail, ")") close relay of PacketKind.Extension: # https://synit.org/book/protocol.html#extension-packets discard else: when defined(posix): stderr.writeLine("discarding undecoded packet ", v) proc recv(relay: Relay; buf: openarray[byte]; slice: Slice[int]) = feed(relay.wireBuf, buf, slice) var pr = decode(relay.wireBuf) if pr.isSome: dispatch(relay, pr.get) type RelayOptions* = object of RootObj packetWriter*: PacketWriter RelayActorOptions* = object of RelayOptions initialOid*: Option[Oid] initialCap*: Cap nextLocalOid*: Option[Oid] proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) = linkActor(turn, name) do (turn: var Turn): turn.preventInertCheck() let relay = Relay( facet: turn.facet, packetWriter: opts.packetWriter, wireBuf: newBufferedDecoder(0), ) if not opts.initialCap.isNil: var exported: seq[WireSymbol] discard rewriteCapOut(relay, opts.initialCap, exported) opts.nextLocalOid.map do (oid: Oid): relay.nextLocalOid = if oid == 0.Oid: 1.Oid else: oid assert opts.initialOid.isSome if opts.initialOid.isSome: var imported: seq[WireSymbol] wr = WireRef( orKind: WireRefKind.mine, mine: WireRefMine(oid: opts.initialOid.get)) relay.peer = rewriteCapIn(relay, turn.facet, wr, imported) assert not relay.peer.isNil setup(turn, relay) proc rejected(detail: Value): Resolved = result = Resolved(orKind: ResolvedKind.Rejected) result.rejected.detail = detail proc accepted(cap: Cap): Resolved = result = Resolved(orKind: ResolvedKind.accepted) result.accepted.responderSession = cap when defined(posix): import std/[oserrors, posix] import pkg/sys/[files, handles, sockets] export transportAddress.Unix type StdioEntity = ref object of Entity relay: Relay stdin: AsyncFile alive: bool method message(entity: StdioEntity; turn: var Turn; ass: AssertionRef) = if ass.value.preservesTo(ForceDisconnect).isSome: entity.alive = false proc loop(entity: StdioEntity) {.asyncio.} = let buf = new seq[byte] entity.alive = true while entity.alive: buf[].setLen(0x1000) let n = read(entity.stdin, buf) if n == 0: stopActor(entity.facet) else: entity.relay.recv(buf[], 0..