# SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense import std/[asyncdispatch, options, streams, tables] import preserves import ./actors, ./durings, ./membranes, ./protocols/[protocol, sturdy, transportAddress] when defined(traceSyndicate): when defined(posix): template trace(args: varargs[untyped]): untyped = stderr.writeLine(args) else: template trace(args: varargs[untyped]): untyped = echo(args) else: template trace(args: varargs[untyped]): untyped = discard export `$` type Oid = sturdy.Oid type Value = Preserve[void] Assertion = Preserve[Ref] WireRef = sturdy.WireRef[void] Turn = actors.Turn type PacketWriter = proc (pkt: sink Packet): Future[void] {.gcsafe.} RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.} Relay* = ref object of RootObj facet: Facet inboundAssertions: Table[Handle, tuple[localHandle: Handle, imported: seq[WireSymbol]]] outboundAssertions: Table[Handle, seq[WireSymbol]] exported: Membrane imported: Membrane nextLocalOid: Oid pendingTurn: protocol.Turn packetWriter: PacketWriter untrusted: bool SyncPeerEntity = ref object of Entity relay: Relay peer: Ref handleMap: Table[Handle, Handle] e: WireSymbol RelayEntity = ref object of Entity ## https://synit.org/book/protocol.html#relay-entities label: string relay: Relay proc releaseRefOut(r: Relay; e: WireSymbol) = r.exported.drop e method publish(spe: SyncPeerEntity; t: var Turn; a: AssertionRef; h: Handle) = spe.handleMap[h] = publish(t, spe.peer, a.value) method retract(se: SyncPeerEntity; t: var Turn; h: Handle) = var other: Handle if se.handleMap.pop(h, other): retract(t, other) method message(se: SyncPeerEntity; t: var Turn; a: AssertionRef) = if not se.e.isNil: se.relay.releaseRefOut(se.e) message(t, se.peer, a.value) method sync(se: SyncPeerEntity; t: var Turn; peer: Ref) = sync(t, se.peer, peer) proc newSyncPeerEntity(r: Relay; p: Ref): SyncPeerEntity = SyncPeerEntity(relay: r, peer: p) proc rewriteRefOut(relay: Relay; `ref`: Ref; exported: var seq[WireSymbol]): WireRef = if `ref`.target of RelayEntity and `ref`.target.RelayEntity.relay == relay and `ref`.attenuation.len == 0: WireRef(orKind: WireRefKind.yours, yours: WireRefYours[void](oid: `ref`.target.oid)) else: var ws = grab(relay.exported, `ref`) if ws.isNil: ws = newWireSymbol(relay.exported, relay.nextLocalOid, `ref`) inc relay.nextLocalOid exported.add ws WireRef( orKind: WireRefKind.mine, mine: WireRefMine(oid: ws.oid)) proc rewriteOut(relay: Relay; v: Assertion): tuple[rewritten: Value, exported: seq[WireSymbol]] {.gcsafe.} = var exported: seq[WireSymbol] result.rewritten = contract(v) do (r: Ref) -> Value: rewriteRefOut(relay, r, exported).toPreserve result.exported = exported proc register(relay: Relay; v: Assertion; h: Handle): tuple[rewritten: Value, exported: seq[WireSymbol]] = result = rewriteOut(relay, v) relay.outboundAssertions[h] = result.exported proc deregister(relay: Relay; h: Handle) = var outbound: seq[WireSymbol] if relay.outboundAssertions.pop(h, outbound): for e in outbound: releaseRefOut(relay, e) proc send(r: Relay; pkt: sink Packet): Future[void] = assert(not r.packetWriter.isNil, "missing packetWriter proc") r.packetWriter(pkt) proc send(r: Relay; rOid: protocol.Oid; m: Event) = if r.pendingTurn.len == 0: # do not send a packet immediately, # wait until the pending I/O is processed callSoon do (): r.facet.run do (turn: var Turn): var pkt = Packet( orKind: PacketKind.Turn, turn: move r.pendingTurn) trace "C: ", pkt asyncCheck(turn, r.send(pkt)) r.pendingTurn.add TurnEvent(oid: rOid, event: m) proc send(re: RelayEntity; ev: Event) = send(re.relay, protocol.Oid re.oid, ev) method publish(re: RelayEntity; t: var Turn; a: AssertionRef; h: Handle) {.gcsafe.} = re.send Event( orKind: EventKind.Assert, `assert`: protocol.Assert( assertion: re.relay.register(a.value, h).rewritten, handle: h)) method retract(re: RelayEntity; t: var Turn; h: Handle) {.gcsafe.} = re.relay.deregister h re.send Event( orKind: EventKind.Retract, retract: Retract(handle: h)) method message(re: RelayEntity; turn: var Turn; msg: AssertionRef) {.gcsafe.} = var (value, exported) = rewriteOut(re.relay, msg.value) assert(len(exported) == 0, "cannot send a reference in a message") if len(exported) == 0: re.send Event(orKind: EventKind.Message, message: Message(body: value)) method sync(re: RelayEntity; turn: var Turn; peer: Ref) {.gcsafe.} = var peerEntity = newSyncPeerEntity(re.relay, peer) exported: seq[WireSymbol] discard rewriteRefOut(re.relay, turn.newRef(peerEntity), exported) # TODO: discard? peerEntity.e = exported[0] re.send Event(orKind: EventKind.Sync) proc newRelayEntity(label: string; r: Relay; o: Oid): RelayEntity = RelayEntity(label: label, relay: r, oid: o) using relay: Relay facet: Facet proc lookupLocal(relay; oid: Oid): Ref = let sym = relay.exported.grab oid if sym.isNil: newInertRef() else: sym.`ref` proc isInert(r: Ref): bool = r.target.isNil proc rewriteRefIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Ref = case n.orKind of WireRefKind.mine: var e = relay.imported.grab(n.mine.oid) if e.isNil: e = newWireSymbol( relay.imported, n.mine.oid, newRef(facet, newRelayEntity("rewriteRefIn", relay, n.mine.oid))) imported.add e result = e.`ref` of WireRefKind.yours: let r = relay.lookupLocal(n.yours.oid) if n.yours.attenuation.len == 0 or r.isInert: result = r else: raiseAssert "attenuation not implemented" proc rewriteIn(relay; facet; v: Value): tuple[rewritten: Assertion; imported: seq[WireSymbol]] {.gcsafe.} = var imported: seq[WireSymbol] result.rewritten = expand(v) do (pr: Value) -> Assertion: var wr: WireRef if not fromPreserve(wr, pr): raiseAssert "expansion of embedded value failed" rewriteRefIn(relay, facet, wr, imported).toPreserve(Ref) result.imported = imported proc close(r: Relay) = discard proc dispatch*(relay: Relay; turn: var Turn; `ref`: Ref; event: Event) {.gcsafe.} = case event.orKind of EventKind.Assert: let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion) relay.inboundAssertions[event.assert.handle] = (publish(turn, `ref`, a), imported,) of EventKind.Retract: let remoteHandle = event.retract.handle var outbound: tuple[localHandle: Handle, imported: seq[WireSymbol]] if relay.inboundAssertions.pop(remoteHandle, outbound): for e in outbound.imported: relay.imported.drop e turn.retract(outbound.localHandle) of EventKind.Message: let (a, imported) = rewriteIn(relay, turn.facet, event.message.body) assert imported.len == 0, "Cannot receive transient reference" turn.message(`ref`, a) of EventKind.Sync: discard # TODO #[ var imported: seq[WireSymbol] let k = relay.rewriteRefIn(turn, evenr.sync.peer, imported) turn.sync(`ref`) do (turn: var Turn): turn.message(k, true) for e in imported: relay.imported.del e ]# proc dispatch*(relay: Relay; v: Value) {.gcsafe.} = trace "S: ", v run(relay.facet) do (t: var Turn): var pkt: Packet if fromPreserve(pkt, v): case pkt.orKind of PacketKind.Turn: # https://synit.org/book/protocol.html#turn-packets for te in pkt.turn: let r = lookupLocal(relay, te.oid.Oid) if not r.isInert: dispatch(relay, t, r, te.event) else: stderr.writeLine("discarding event for unknown Ref; ", te.event) of PacketKind.Error: # https://synit.org/book/protocol.html#error-packets when defined(posix): stderr.writeLine("Error from server: ", pkt.error.message, " (detail: ", pkt.error.detail, ")") close relay of PacketKind.Extension: # https://synit.org/book/protocol.html#extension-packets discard else: when defined(posix): stderr.writeLine("discarding undecoded packet ", v) type RelayOptions* = object of RootObj packetWriter*: PacketWriter untrusted*: bool RelayActorOptions* = object of RelayOptions initialOid*: Option[Oid] initialRef*: Ref nextLocalOid*: Option[Oid] proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay = result = Relay( facet: turn.facet, packetWriter: opts.packetWriter, untrusted: opts.untrusted) discard result.facet.preventInertCheck() setup(turn, result) proc spawnRelay*(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup): Future[Ref] = var fut = newFuture[Ref]"spawnRelay" spawn(name, turn) do (turn: var Turn): let relay = newRelay(turn, opts, setup) if not opts.initialRef.isNil: var exported: seq[WireSymbol] discard rewriteRefOut(relay, opts.initialRef, exported) if opts.initialOid.isSome: var imported: seq[WireSymbol] var wr = WireRef( orKind: WireRefKind.mine, mine: WireRefMine(oid: opts.initialOid.get)) fut.complete rewriteRefIn(relay, turn.facet, wr, imported) else: fut.complete(nil) opts.nextLocalOid.map do (oid: Oid): relay.nextLocalOid = if oid == 0.Oid: 1.Oid else: oid fut when defined(posix): import std/asyncnet from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol import protocols/[gatekeeper, sturdy] type ShutdownEntity* = ref object of Entity method retract(e: ShutdownEntity; turn: var Turn; h: Handle) = stopActor(turn) type ConnectProc* = proc (turn: var Turn; ds: Ref) {.gcsafe.} export Tcp when defined(posix): export Unix proc connect*(turn: var Turn; socket: AsyncSocket; step: Preserve[Ref]; bootProc: ConnectProc) = proc socketWriter(packet: sink Packet): Future[void] = socket.send(cast[string](encode(packet))) const recvSize = 0x2000 var shutdownRef: Ref let reenable = turn.facet.preventInertCheck() connectionClosedRef = newRef(turn, ShutdownEntity()) fut = newFuture[void]"connect" discard bootActor("socket") do (turn: var Turn): var ops = RelayActorOptions( packetWriter: socketWriter, initialOid: 0.Oid.some) let refFut = spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay): let facet = turn.facet var wireBuf = newBufferedDecoder(0) proc recvCb(pktFut: Future[string]) {.gcsafe.} = if pktFut.failed: run(facet) do (turn: var Turn): stopActor(turn) else: var buf = pktFut.read if buf.len == 0: run(facet) do (turn: var Turn): stopActor(turn) else: feed(wireBuf, buf) var (success, pr) = decode(wireBuf) if success: dispatch(relay, pr) socket.recv(recvSize).addCallback(recvCb) socket.recv(recvSize).addCallback(recvCb) turn.facet.actor.atExit do (turn: var Turn): close(socket) discard publish(turn, connectionClosedRef, true) shutdownRef = newRef(turn, ShutdownEntity()) addCallback(refFut) do (): let gatekeeper = read refFut run(gatekeeper.relay) do (turn: var Turn): reenable() discard publish(turn, shutdownRef, true) proc duringCallback(turn: var Turn; a: Assertion; h: Handle): TurnAction = let facet = inFacet(turn) do (turn: var Turn): var accepted: ResolvedAccepted[Ref] rejected: Rejected[Ref] if fromPreserve(accepted, a): bootProc(turn, accepted.responderSession) elif fromPreserve(rejected, a): fail(fut, newException(CatchableError, $rejected.detail)) else: fail(fut, newException(CatchableError, $a)) proc action(turn: var Turn) = stop(turn, facet) result = action discard publish(turn, gatekeeper, Resolve[Ref]( step: step, observer: newRef(turn, during(duringCallback)), )) fut.complete() asyncCheck(turn, fut) proc connect*(turn: var Turn; transport: Tcp; step: Preserve[Ref]; bootProc: ConnectProc) = let socket = newAsyncSocket( domain = AF_INET, sockType = SOCK_STREAM, protocol = IPPROTO_TCP, buffered = false) let fut = connect(socket, transport.host, Port transport.port) addCallback(fut, turn) do (turn: var Turn): connect(turn, socket, step, bootProc) proc connect*(turn: var Turn; transport: Unix; step: Preserve[Ref]; bootProc: ConnectProc) = let socket = newAsyncSocket( domain = AF_UNIX, sockType = SOCK_STREAM, protocol = cast[Protocol](0), buffered = false) let fut = connectUnix(socket, transport.path) addCallback(fut, turn) do (turn: var Turn): connect(turn, socket, step, bootProc) import std/asyncfile const stdinReadSize = 128 proc connectStdio*(ds: Ref; turn: var Turn) = ## Connect to an external dataspace over stdin and stdout. proc stdoutWriter(packet: sink Packet): Future[void] {.async.} = var buf = encode(packet) doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len flushFile(stdout) var opts = RelayActorOptions( packetWriter: stdoutWriter, initialRef: ds, initialOid: 0.Oid.some) asyncCheck spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay): let facet = turn.facet asyncStdin = openAsync("/dev/stdin") # this is universal now? close(stdin) facet.actor.atExit do (turn: var Turn): close(asyncStdin) var wireBuf = newBufferedDecoder() proc readCb(pktFut: Future[string]) {.gcsafe.} = if not pktFut.failed: var buf = pktFut.read if buf.len == 0: run(facet) do (turn: var Turn): stopActor(turn) else: feed(wireBuf, buf) var (success, pr) = decode(wireBuf) if success: dispatch(relay, pr) asyncStdin.read(stdinReadSize).addCallback(readCb) asyncStdin.read(stdinReadSize).addCallback(readCb)