# SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway # SPDX-License-Identifier: Unlicense import std/[asyncdispatch, options, tables] import preserves, preserves/parse import ./actors, ./dataspaces, ./protocols/[protocol, sturdy] when defined(traceSyndicate): template trace(args: varargs[untyped]): untyped = echo(args) else: template trace(args: varargs[untyped]): untyped = discard type Oid = sturdy.Oid type Assertion = Preserve[Ref] WireRef = sturdy.WireRef[Ref] WireAssertion = Preserve[WireRef] Event = protocol.Event[WireRef] TurnEvent = protocol.TurnEvent[WireRef] Packet = protocol.Packet[WireRef] Turn = actors.Turn WireSymbol = ref object oid: Oid `ref`: Ref membrane: Membrane count: int Membrane = object byOid: Table[Oid, WireSymbol] byRef: Table[Ref, WireSymbol] #[ proc `$`(ws: WireSymbol): string = "" ]# proc grab(mb: var Membrane; key: Oid|Ref; transient: bool; alloc: proc(): WireSymbol {.gcsafe.}): WireSymbol {.deprecated: "not idomatic Nim".} = when key is Oid: result = mb.byOid.getOrDefault(key) elif key is ref: result = mb.byRef.getOrDefault(key) if result.isNil: result = alloc() mb.byOid[result.oid] = result mb.byRef[result.`ref`] = result if not transient: inc result.count proc drop(mb: var Membrane; ws: WireSymbol) = dec ws.count if ws.count < 1: mb.byOid.del ws.oid mb.byRef.del ws.`ref` type PacketWriter = proc (bs: seq[byte]): Future[void] {.gcsafe.} RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.} Relay = ref object of RootObj facet: Facet inboundAssertions: Table[Handle, tuple[localHandle: Handle, imported: seq[WireSymbol]]] outboundAssertions: Table[Handle, seq[WireSymbol]] exported: Membrane imported: Membrane nextLocalOid: Oid pendingTurn: protocol.Turn[WireRef] packetWriter: PacketWriter untrusted: bool SyncPeerEntity = ref object of Entity relay: Relay peer: Ref handleMap: Table[Handle, Handle] e: WireSymbol RelayEntity = ref object of Entity label: string relay: Relay proc releaseRefOut(r: Relay; e: WireSymbol) = r.exported.drop e proc syncPeerPublish(e: Entity; t: var Turn; v: Assertion; h: Handle) = var se = SyncPeerEntity(e) se.handleMap[h] = publish(t, se.peer, v) proc syncPeerRetract(e: Entity; t: var Turn; h: Handle) = var se = SyncPeerEntity(e) var other: Handle if se.handleMap.pop(h, other): retract(t, other) proc syncPeerMessage(e: Entity; t: var Turn; v: Assertion) = var se = SyncPeerEntity(e) if not se.e.isNil: se.relay.releaseRefOut(se.e) message(t, se.peer, v) proc syncPeerSync(e: Entity; t: var Turn; peer: Ref) = var se = SyncPeerEntity(e) sync(t, se.peer, peer) proc newSyncPeerEntity(r: Relay; p: Ref): SyncPeerEntity = result = SyncPeerEntity(relay: r, peer: p) result.setProcs( syncPeerPublish, syncPeerRetract, syncPeerMessage, syncPeerSync) proc rewriteRefOut(relay: Relay; `ref`: Ref; transient: bool; exported: var seq[WireSymbol]): WireRef = #[ if not relay.untrusted: result = WireRef( orKind: WirerefKind.yours, yours: WireRefYours[Ref]( oid: `ref`.target.oid, attenuation: `ref`.attenuation)) ]# let e = grab(relay.exported, `ref`, transient) do () -> WireSymbol: assert(not transient, "Cannot send transient reference") result = WireSymbol( oid: relay.nextLocalOid, `ref`: `ref`) inc relay.nextLocalOid exported.add e WireRef( orKind: WireRefKind.mine, mine: WireRefMine(oid: e.oid)) proc rewriteOut(relay: Relay; v: Assertion; transient: bool): tuple[rewritten: WireAssertion, exported: seq[WireSymbol]] = var exported: seq[WireSymbol] var rewritten = mapEmbeds[Ref, WireRef](v) do (r: Ref) -> WireRef: result = rewriteRefOut(relay, r, transient, exported) (rewritten, exported) proc register(relay: Relay; v: Assertion; h: Handle): WireAssertion = var (rewritten, exported) = rewriteOut(relay, v, false) relay.outboundAssertions[h] = exported rewritten proc deregister(relay: Relay; h: Handle) = var outbound: seq[WireSymbol] if relay.outboundAssertions.pop(h, outbound): for e in outbound: releaseRefOut(relay, e) proc send(r: Relay; msg: seq[byte]): Future[void] = assert(not r.packetWriter.isNil, "missing packetWriter proc") r.packetWriter(msg) proc send(r: Relay; rOid: protocol.Oid; m: Event) = if r.pendingTurn.len == 0: callSoon: r.facet.run do (turn: var Turn): var pkt = $Packet( orKind: PacketKind.Turn, turn: move r.pendingTurn) trace "C: ", pkt #asyncCheck(turn, r.send(encode pkt)) asyncCheck(turn, r.send(cast[seq[byte]](pkt))) r.pendingTurn.add TurnEvent(oid: rOid, event: m) proc send(re: RelayEntity; ev: Event) = send(re.relay, protocol.Oid re.oid, ev) proc relayPublish(e: Entity; t: var Turn; v: Assertion; h: Handle) = var re = RelayEntity(e) re.send Event( orKind: EventKind.Assert, `assert`: protocol.Assert[WireRef]( assertion: re.relay.register(v, h), handle: h)) proc relayRetract(e: Entity; t: var Turn; h: Handle) = var re = RelayEntity(e) re.relay.deregister h re.send Event( orKind: EventKind.Retract, retract: Retract(handle: h)) proc relayMessage(e: Entity; turn: var Turn; msg: Assertion) = var re = RelayEntity(e) ev = Event(orKind: EventKind.Message) (body, _) = rewriteOut(re.relay, msg, true) ev.message = Message[WireRef](body: body) re.send ev proc relaySync(e: Entity; turn: var Turn; peer: Ref) = var re = RelayEntity(e) peerEntity = newSyncPeerEntity(re.relay, peer) exported: seq[WireSymbol] discard rewriteRefOut(re.relay, turn.newRef(peerEntity), false, exported) # TODO: discard? peerEntity.e = exported[0] re.send Event( orKind: EventKind.Sync, sync: Sync[WireRef](peer: embed toPreserve(false, WireRef))) # TODO: send the WireRef? proc newRelayEntity(label: string; r: Relay; o: Oid): RelayEntity = result = RelayEntity(label: label, relay: r, oid: o) result.setProcs(relayPublish, relayRetract, relayMessage, relaySync) using relay: Relay facet: Facet proc lookupLocal(relay; oid: Oid): Ref = try: relay.exported.byOid[oid].`ref` except KeyError: newInertRef() proc isInert(r: Ref): bool = r.target.isNil proc rewriteRefIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Ref = case n.orKind of WireRefKind.mine: let e = relay.imported.grab(n.mine.oid, false) do () -> WireSymbol: WireSymbol( oid: n.mine.oid, `ref`: newRef(facet, newRelayEntity("rewriteRefIn", relay, n.mine.oid))) imported.add e result = e.`ref` of WireRefKind.yours: let r = relay.lookupLocal(n.yours.oid) if n.yours.attenuation.len == 0 or r.isInert: result = r else: raiseAssert "attenuation not implemented" proc rewriteIn(relay; facet; a: Preserve[WireRef]): tuple[rewritten: Assertion; imported: seq[WireSymbol]] = var imported: seq[WireSymbol] rewritten = mapEmbeds(a) do (wr: WireRef) -> Ref: rewriteRefIn(relay, facet, wr, imported) (rewritten, imported) proc close(r: Relay) = discard proc dispatch(relay: Relay; turn: var Turn; `ref`: Ref; event: Event) = case event.orKind of EventKind.Assert: let (a, imported) = rewriteIn(relay, turn.activeFacet, event.assert.assertion) relay.inboundAssertions[event.assert.handle] = (turn.publish(`ref`, a), imported,) of EventKind.Retract: let remoteHandle = event.retract.handle var outbound: tuple[localHandle: Handle, imported: seq[WireSymbol]] if relay.inboundAssertions.pop(remoteHandle, outbound): for e in outbound.imported: relay.imported.drop e turn.retract(outbound.localHandle) of EventKind.Message: let (a, imported) = rewriteIn(relay, turn.activeFacet, event.message.body) assert imported.len == 0, "Cannot receive transient reference" turn.message(`ref`, a) of EventKind.Sync: discard # TODO #[ var imported: seq[WireSymbol] let k = relay.rewriteRefIn(turn, evenr.sync.peer, imported) turn.sync(`ref`) do (turn: var Turn): turn.message(k, true) for e in imported: relay.imported.del e ]# proc dispatch(relay: Relay; v: Preserve[WireRef]) = run(relay.facet) do (t: var Turn): var pkt: Packet if fromPreserve(pkt, v): case pkt.orKind of PacketKind.Turn: for te in pkt.turn: dispatch(relay, t, lookupLocal(relay, te.oid.Oid), te.event) of PacketKind.Error: relay.facet.log("Error from server: ", pkt.error.message, " (detail: ", pkt.error.detail, ")") close relay proc recv(relay: Relay; buf: string) = # var pkt = decodePreserves(buf, WireRef) trace "S: ", buf var pkt = cast[Preserve[WireRef]]( parsePreserves(buf, sturdy.WireRef[void])) # the compiler cannot convert `Preserve[void]` to `Preserve[WireRef[Ref]]` # so convert to `Preserve[WireRef[void]]` and cast. dispatch(relay, pkt) type RelayOptions = object of RootObj packetWriter: PacketWriter setup: RelaySetup untrusted: bool RelayActorOptions = object of RelayOptions initialOid: Option[Oid] initialRef: Ref nextLocalOid: Option[Oid] proc newRelay(turn: var Turn; opts: RelayOptions): Relay = result = Relay( facet: turn.activeFacet, packetWriter: opts.packetWriter, untrusted: opts.untrusted) discard result.facet.preventInertCheck() opts.setup(turn, result) proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions): Future[Ref] = var fut = newFuture[Ref]"spawnRelay" spawn(name, turn) do (turn: var Turn): let relay = newRelay(turn, opts) if not opts.initialRef.isNil: var exported: seq[WireSymbol] discard rewriteRefOut(relay, opts.initialRef, false, exported) if opts.initialOid.isSome: var imported: seq[WireSymbol] var wr = WireRef( orKind: WireRefKind.mine, mine: WireRefMine(oid: opts.initialOid.get)) fut.complete rewriteRefIn(relay, turn.activeFacet, wr, imported) else: fut.complete(nil) opts.nextLocalOid.map do (oid: Oid): relay.nextLocalOid = if oid == 0.Oid: 1.Oid else: oid fut import std/asyncnet from std/nativesockets import AF_UNIX, SOCK_STREAM, Protocol import protocols/gatekeeper type ShutdownEntity = ref object of Entity proc shutdownRetract(e: Entity; turn: var Turn; h: Handle) = stopActor(turn) proc newShutdownEntity(): ShutdownEntity = new result result.setProcs(retract = shutdownRetract) type SturdyRef = sturdy.SturdyRef[Ref] Resolve = gatekeeper.Resolve[Ref] proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: DuringProc) = var socket = newAsyncSocket( domain = AF_UNIX, sockType = SOCK_STREAM, protocol = cast[Protocol](0), buffered = false) proc socketWriter(packet: seq[byte]): Future[void] = socket.send cast[string](packet) const recvSize = 1 shl 18 # this is an excessive buffer size but the PEG parser # can only read complete documents # TODO: use a binary protocol and improve that # parser to stream data in chunks var shutdownRef: Ref let reenable = turn.activeFacet.preventInertCheck() let connectionClosedRef = newRef(turn, newShutdownEntity()) proc setup(turn: var Turn; relay: Relay) = let facet = turn.activeFacet proc recvCb(pktFut: Future[string]) {.gcsafe.} = if pktFut.failed: run(facet) do (turn: var Turn): stopActor(turn) else: let buf = pktFut.read if buf.len == 0: run(facet) do (turn: var Turn): stopActor(turn) else: relay.recv(buf) socket.recv(recvSize).addCallback(recvCb) # TODO: should this need be callSoon? socket.recv(recvSize).addCallback(recvCb) turn.activeFacet.actor.atExit do (turn: var Turn): close(socket) discard publish(turn, connectionClosedRef, true) shutdownRef = newRef(turn, newShutdownEntity()) var fut = newFuture[void]"connectUnix" connectUnix(socket, path).addCallback do (f: Future[void]): read f discard newActor("unix") do (turn: var Turn): let relayFut = spawnRelay("unix", turn, RelayActorOptions( packetWriter: socketWriter, setup: setup, initialOid: 0.Oid.some)) relayFut.addCallback do (refFut: Future[Ref]): let gatekeeper = read refFut run(gatekeeper.relay) do (turn: var Turn): reenable() discard publish(turn, shutdownRef, true) proc duringCallback(turn: var Turn; ds: Preserve[Ref]): TurnAction = let facet = facet(turn) do (turn: var Turn): discard bootProc(turn, ds) # TODO: what to do with this? proc action(turn: var Turn) = stop(turn, facet) result = action var res = Resolve( sturdyref: cap, observer: embed newRef(turn, during(duringCallback))) discard publish(turn, gatekeeper, res) fut.complete() asyncCheck(turn, fut)