# SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway # SPDX-License-Identifier: Unlicense import std/[asyncdispatch, options, tables] import preserves import ./actors, ./durings, ./membranes, ./protocols/[protocol, sturdy] from ./patterns import grab when defined(traceSyndicate): template trace(args: varargs[untyped]): untyped = stderr.writeLine(args) else: template trace(args: varargs[untyped]): untyped = discard type Oid = sturdy.Oid type Assertion = Preserve[Ref] WireRef = sturdy.WireRef[void] WireAssertion = Preserve[WireRef] Event = protocol.Event[WireRef] TurnEvent = protocol.TurnEvent[WireRef] Packet = protocol.Packet[WireRef] Turn = actors.Turn type PacketWriter = proc (pkt: sink Packet): Future[void] {.gcsafe.} RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.} Relay = ref object of RootObj facet: Facet inboundAssertions: Table[Handle, tuple[localHandle: Handle, imported: seq[WireSymbol]]] outboundAssertions: Table[Handle, seq[WireSymbol]] exported: Membrane imported: Membrane nextLocalOid: Oid pendingTurn: protocol.Turn[WireRef] packetWriter: PacketWriter untrusted: bool SyncPeerEntity = ref object of Entity relay: Relay peer: Ref handleMap: Table[Handle, Handle] e: WireSymbol RelayEntity = ref object of Entity label: string relay: Relay proc releaseRefOut(r: Relay; e: WireSymbol) = r.exported.drop e method publish(spe: SyncPeerEntity; t: var Turn; v: Assertion; h: Handle) = spe.handleMap[h] = publish(t, spe.peer, v) method retract(se: SyncPeerEntity; t: var Turn; h: Handle) = var other: Handle if se.handleMap.pop(h, other): retract(t, other) method message(se: SyncPeerEntity; t: var Turn; v: Assertion) = if not se.e.isNil: se.relay.releaseRefOut(se.e) message(t, se.peer, v) method sync(se: SyncPeerEntity; t: var Turn; peer: Ref) = sync(t, se.peer, peer) proc newSyncPeerEntity(r: Relay; p: Ref): SyncPeerEntity = SyncPeerEntity(relay: r, peer: p) proc rewriteRefOut(relay: Relay; `ref`: Ref; transient: bool; exported: var seq[WireSymbol]): WireRef = if `ref`.target of RelayEntity and `ref`.target.RelayEntity.relay == relay and `ref`.attenuation.len == 0: WireRef(orKind: WireRefKind.yours, yours: WireRefYours[void](oid: `ref`.target.oid)) else: var ws = grab(relay.exported, `ref`) if ws.isNil: doAssert(not transient, "Cannot send transient reference") ws = newWireSymbol(relay.exported, relay.nextLocalOid, `ref`) inc relay.nextLocalOid exported.add ws WireRef( orKind: WireRefKind.mine, mine: WireRefMine(oid: ws.oid)) proc rewriteOut(relay: Relay; v: Assertion; transient: bool): tuple[rewritten: WireAssertion, exported: seq[WireSymbol]] = var exported: seq[WireSymbol] result.rewritten = mapEmbeds[Ref, WireRef](v) do (r: Ref) -> WireRef: rewriteRefOut(relay, r, transient, exported) result.exported = exported proc register(relay: Relay; v: Assertion): WireAssertion = rewriteOut(relay, v, false).rewritten proc register(relay: Relay; v: Assertion; h: Handle): WireAssertion = var (rewritten, exported) = rewriteOut(relay, v, false) relay.outboundAssertions[h] = exported rewritten proc deregister(relay: Relay; h: Handle) = var outbound: seq[WireSymbol] if relay.outboundAssertions.pop(h, outbound): for e in outbound: releaseRefOut(relay, e) proc send(r: Relay; pkt: sink Packet): Future[void] = assert(not r.packetWriter.isNil, "missing packetWriter proc") r.packetWriter(pkt) proc send(r: Relay; rOid: protocol.Oid; m: Event) = if r.pendingTurn.len == 0: callSoon: r.facet.run do (turn: var Turn): var pkt = Packet( orKind: PacketKind.Turn, turn: move r.pendingTurn) trace "C: ", pkt asyncCheck(turn, r.send(pkt)) r.pendingTurn.add TurnEvent(oid: rOid, event: m) proc send(re: RelayEntity; ev: Event) = send(re.relay, protocol.Oid re.oid, ev) method publish(re: RelayEntity; t: var Turn; v: Assertion; h: Handle) = re.send Event( orKind: EventKind.Assert, `assert`: protocol.Assert[WireRef]( assertion: re.relay.register(v, h), handle: h)) method retract(re: RelayEntity; t: var Turn; h: Handle) = re.relay.deregister h re.send Event( orKind: EventKind.Retract, retract: Retract(handle: h)) method message(re: RelayEntity; turn: var Turn; msg: Assertion) = re.send Event(orKind: EventKind.Message, message: Message[WireRef](body: register(re.relay, msg))) method sync(re: RelayEntity; turn: var Turn; peer: Ref) = var peerEntity = newSyncPeerEntity(re.relay, peer) exported: seq[WireSymbol] discard rewriteRefOut(re.relay, turn.newRef(peerEntity), false, exported) # TODO: discard? peerEntity.e = exported[0] re.send Event( orKind: EventKind.Sync, sync: Sync[WireRef](peer: embed toPreserve(false, WireRef))) # TODO: send the WireRef? proc newRelayEntity(label: string; r: Relay; o: Oid): RelayEntity = RelayEntity(label: label, relay: r, oid: o) using relay: Relay facet: Facet proc lookupLocal(relay; oid: Oid): Ref = let sym = relay.exported.grab oid if sym.isNil: newInertRef() else: sym.`ref` proc isInert(r: Ref): bool = r.target.isNil proc rewriteRefIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Ref = case n.orKind of WireRefKind.mine: var e = relay.imported.grab(n.mine.oid) if e.isNil: e = newWireSymbol( relay.imported, n.mine.oid, newRef(facet, newRelayEntity("rewriteRefIn", relay, n.mine.oid))) imported.add e result = e.`ref` of WireRefKind.yours: let r = relay.lookupLocal(n.yours.oid) if n.yours.attenuation.len == 0 or r.isInert: result = r else: raiseAssert "attenuation not implemented" proc rewriteIn(relay; facet; a: Preserve[WireRef]): tuple[rewritten: Assertion; imported: seq[WireSymbol]] = var imported: seq[WireSymbol] result.rewritten = mapEmbeds(a) do (wr: WireRef) -> Ref: rewriteRefIn(relay, facet, wr, imported) result.imported = imported proc close(r: Relay) = discard proc dispatch(relay: Relay; turn: var Turn; `ref`: Ref; event: Event) = case event.orKind of EventKind.Assert: let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion) relay.inboundAssertions[event.assert.handle] = (publish(turn, `ref`, a), imported,) of EventKind.Retract: let remoteHandle = event.retract.handle var outbound: tuple[localHandle: Handle, imported: seq[WireSymbol]] if relay.inboundAssertions.pop(remoteHandle, outbound): for e in outbound.imported: relay.imported.drop e turn.retract(outbound.localHandle) of EventKind.Message: let (a, imported) = rewriteIn(relay, turn.facet, event.message.body) assert imported.len == 0, "Cannot receive transient reference" turn.message(`ref`, a) of EventKind.Sync: discard # TODO #[ var imported: seq[WireSymbol] let k = relay.rewriteRefIn(turn, evenr.sync.peer, imported) turn.sync(`ref`) do (turn: var Turn): turn.message(k, true) for e in imported: relay.imported.del e ]# proc dispatch(relay: Relay; v: Preserve[WireRef]) = trace "S: ", v run(relay.facet) do (t: var Turn): var pkt: Packet if fromPreserve(pkt, v): case pkt.orKind of PacketKind.Turn: for te in pkt.turn: dispatch(relay, t, lookupLocal(relay, te.oid.Oid), te.event) of PacketKind.Error: stderr.writeLine ("Error from server: ", pkt.error.message, " (detail: ", pkt.error.detail, ")") close relay of PacketKind.Extension: discard else: stderr.writeLine "discarding unparsed packet ", v type RelayOptions = object of RootObj packetWriter: PacketWriter untrusted: bool RelayActorOptions = object of RelayOptions initialOid: Option[Oid] initialRef: Ref nextLocalOid: Option[Oid] proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay = result = Relay( facet: turn.facet, packetWriter: opts.packetWriter, untrusted: opts.untrusted) discard result.facet.preventInertCheck() setup(turn, result) proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup): Future[Ref] = var fut = newFuture[Ref]"spawnRelay" spawn(name, turn) do (turn: var Turn): let relay = newRelay(turn, opts, setup) if not opts.initialRef.isNil: var exported: seq[WireSymbol] discard rewriteRefOut(relay, opts.initialRef, false, exported) if opts.initialOid.isSome: var imported: seq[WireSymbol] var wr = WireRef( orKind: WireRefKind.mine, mine: WireRefMine(oid: opts.initialOid.get)) fut.complete rewriteRefIn(relay, turn.facet, wr, imported) else: fut.complete(nil) opts.nextLocalOid.map do (oid: Oid): relay.nextLocalOid = if oid == 0.Oid: 1.Oid else: oid fut import std/asyncnet from std/nativesockets import AF_UNIX, SOCK_STREAM, Protocol import protocols/gatekeeper type ShutdownEntity = ref object of Entity method retract(e: ShutdownEntity; turn: var Turn; h: Handle) = stopActor(turn) type SturdyRef = sturdy.SturdyRef[Ref] Resolve = gatekeeper.Resolve[Ref] ConnectProc* = proc (turn: var Turn; a: Assertion): TurnAction {.gcsafe.} proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: ConnectProc) = var socket = newAsyncSocket( domain = AF_UNIX, sockType = SOCK_STREAM, protocol = cast[Protocol](0), buffered = false) proc socketWriter(packet: sink Packet): Future[void] = socket.send($packet) const recvSize = 1 shl 18 # this is an excessive buffer size but the PEG parser # can only read complete documents # TODO: use a binary protocol and improve that # parser to stream data in chunks var shutdownRef: Ref let reenable = turn.facet.preventInertCheck() let connectionClosedRef = newRef(turn, ShutdownEntity()) var fut = newFuture[void]"connectUnix" connectUnix(socket, path).addCallback do (f: Future[void]): read f discard bootActor("unix") do (turn: var Turn): var ops = RelayActorOptions( packetWriter: socketWriter, initialOid: 0.Oid.some) let relayFut = spawnRelay("unix", turn, ops) do (turn: var Turn; relay: Relay): let facet = turn.facet proc recvCb(pktFut: Future[string]) {.gcsafe.} = if pktFut.failed: run(facet) do (turn: var Turn): stopActor(turn) else: let buf = pktFut.read if buf.len == 0: run(facet) do (turn: var Turn): stopActor(turn) else: var pr = parsePreserves(buf, WireRef) dispatch(relay, cast[Preserve[WireRef]](pr)) socket.recv(recvSize).addCallback(recvCb) # TODO: should this need be callSoon? socket.recv(recvSize).addCallback(recvCb) turn.facet.actor.atExit do (turn: var Turn): close(socket) discard publish(turn, connectionClosedRef, true) shutdownRef = newRef(turn, ShutdownEntity()) relayFut.addCallback do (refFut: Future[Ref]): let gatekeeper = read refFut run(gatekeeper.relay) do (turn: var Turn): reenable() discard publish(turn, shutdownRef, true) proc duringCallback(turn: var Turn; ds: Assertion; h: Handle): TurnAction = let facet = facet(turn) do (turn: var Turn): discard bootProc(turn, ds) # TODO: what to do with this? proc action(turn: var Turn) = stop(turn, facet) result = action var res = Resolve( sturdyref: cap, observer: embed newRef(turn, during(duringCallback))) discard publish(turn, gatekeeper, res) fut.complete() asyncCheck(turn, fut) import std/asyncfile proc connectStdio*(ds: Ref; turn: var Turn) = ## Connect to an external dataspace over stdin and stdout. proc stdoutWriter(packet: sink Packet): Future[void] {.async.} = # var buf = encode(packet) # doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len write(stdout, packet) flushFile(stdout) var opts = RelayActorOptions( packetWriter: stdoutWriter, initialRef: ds, initialOid: 0.Oid.some) asyncCheck spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay): let facet = turn.facet asyncStdin = openAsync("/dev/stdin") facet.actor.atExit do (turn: var Turn): close(asyncStdin) proc recvCb(pktFut: Future[string]) {.gcsafe.} = if pktFut.failed: discard else: let buf = pktFut.read if buf.len == 0: run(facet) do (turn: var Turn): stopActor(turn) else: var v = parsePreserves(buf) dispatch(relay, cast[Preserve[WireRef]](v)) callSoon: asyncStdin.readLine().addCallback(recvCb) # do not process the next line immediately asyncStdin.readLine().addCallback(recvCb)