2021-09-24 19:25:47 +00:00
|
|
|
# SPDX-FileCopyrightText: ☭ 2021 Emery Hemingway
|
|
|
|
# SPDX-License-Identifier: Unlicense
|
|
|
|
|
|
|
|
import std/[asyncdispatch, options, tables]
|
|
|
|
import preserves, preserves/parse
|
|
|
|
import ../syndicate/protocols/[protocol, sturdy]
|
|
|
|
import ./actors, ./dataspaces
|
|
|
|
|
|
|
|
type Oid = sturdy.Oid
|
|
|
|
|
|
|
|
type
|
|
|
|
Assertion = Preserve[Ref]
|
|
|
|
WireAssertion = Preserve[WireRef]
|
|
|
|
WireRef = sturdy.WireRef[Ref]
|
|
|
|
Packet = protocol.Packet[WireRef]
|
|
|
|
|
|
|
|
Turn = actors.Turn
|
|
|
|
|
|
|
|
WireSymbol = ref object
|
|
|
|
oid: Oid
|
|
|
|
`ref`: Ref
|
|
|
|
membrane: Membrane
|
|
|
|
count: int
|
|
|
|
|
|
|
|
Membrane = object
|
|
|
|
byOid: Table[Oid, WireSymbol]
|
|
|
|
byRef: Table[Ref, WireSymbol]
|
|
|
|
|
|
|
|
#[
|
|
|
|
proc `$`(ws: WireSymbol): string =
|
|
|
|
"<ws:" & $ws.oid & "/" & $ws.count & "/" & $ws.`ref` &>"
|
|
|
|
]#
|
|
|
|
|
|
|
|
proc grab(mb: var Membrane; key: Oid|Ref; transient: bool; alloc: proc(): WireSymbol {.gcsafe.}): WireSymbol {.deprecated: "not idomatic Nim".} =
|
|
|
|
when key is Oid:
|
|
|
|
result = mb.byOid.getOrDefault(key)
|
|
|
|
elif key is ref:
|
|
|
|
result = mb.byRef.getOrDefault(key)
|
|
|
|
if result.isNil:
|
|
|
|
result = alloc()
|
|
|
|
mb.byOid[result.oid] = result
|
|
|
|
mb.byRef[result.`ref`] = result
|
|
|
|
if not transient: inc result.count
|
|
|
|
|
|
|
|
proc drop(mb: var Membrane; ws: WireSymbol) =
|
|
|
|
dec ws.count
|
|
|
|
if ws.count < 1:
|
|
|
|
mb.byOid.del ws.oid
|
|
|
|
mb.byRef.del ws.`ref`
|
|
|
|
|
|
|
|
type
|
|
|
|
PacketWriter = proc (bs: seq[byte]): Future[void] {.gcsafe.}
|
|
|
|
RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.}
|
|
|
|
|
|
|
|
Relay = ref object of RootObj
|
|
|
|
facet: Facet
|
|
|
|
inboundAssertions: Table[Handle,
|
|
|
|
tuple[localHandle: Handle, imported: seq[WireSymbol]]]
|
|
|
|
outboundAssertions: Table[Handle, seq[WireSymbol]]
|
|
|
|
exported: Membrane
|
|
|
|
imported: Membrane
|
|
|
|
nextLocalOid: Oid
|
|
|
|
pendingTurn: protocol.Turn[WireRef]
|
|
|
|
packetWriter: PacketWriter
|
|
|
|
untrusted: bool
|
|
|
|
|
|
|
|
SyncPeerEntity = ref object of Entity
|
|
|
|
relay: Relay
|
|
|
|
peer: Ref
|
|
|
|
handleMap: Table[Handle, Handle]
|
|
|
|
e: WireSymbol
|
|
|
|
|
|
|
|
RelayEntity = ref object of Entity
|
|
|
|
label: string
|
|
|
|
relay: Relay
|
|
|
|
|
|
|
|
#[
|
|
|
|
proc newSyncPeerEntity(r: Relay; p: Ref): SyncPeerEntity =
|
|
|
|
SyncPeerEntity(relay: r, peer: p)
|
|
|
|
]#
|
|
|
|
|
|
|
|
proc releaseRefOut(r: Relay; e: WireSymbol) =
|
|
|
|
r.exported.drop e
|
|
|
|
|
|
|
|
method publish(se: SyncPeerEntity; t: var Turn; v: Assertion; h: Handle) =
|
|
|
|
se.handleMap[h] = publish(t, se.peer, v)
|
|
|
|
|
|
|
|
method retract(se: SyncPeerEntity; t: var Turn; h: Handle) =
|
|
|
|
var other: Handle
|
|
|
|
if se.handleMap.pop(h, other):
|
|
|
|
retract(t, other)
|
|
|
|
|
|
|
|
method message(se: SyncPeerEntity; t: var Turn; v: Assertion) =
|
|
|
|
if not se.e.isNil:
|
|
|
|
se.relay.releaseRefOut(se.e)
|
|
|
|
message(t, se.peer, v)
|
|
|
|
|
|
|
|
method sync(se: SyncPeerEntity; t: var Turn; peer: Ref) =
|
|
|
|
sync(t, se.peer, peer)
|
|
|
|
|
|
|
|
proc newRelayEntity(label: string; r: Relay; o: Oid): RelayEntity =
|
|
|
|
RelayEntity(label: label, relay: r, oid: o)
|
|
|
|
|
|
|
|
#[
|
|
|
|
proc `$`(ws: WireSymbol): string =
|
|
|
|
"<ws:" & $ws.oid & "/" & $ws.count & "/" & $ws.`ref` &>"
|
|
|
|
]#
|
|
|
|
|
|
|
|
proc `$`(re: RelayEntity): string =
|
|
|
|
"<Relay:" & re.label & ":" & $re.oid & ">"
|
|
|
|
|
|
|
|
proc rewriteRefOut(relay: Relay; `ref`: Ref; transient: bool; exported: var seq[WireSymbol]): WireRef =
|
|
|
|
#[
|
|
|
|
if not relay.untrusted:
|
|
|
|
result = WireRef(
|
|
|
|
orKind: WirerefKind.yours,
|
|
|
|
yours: WireRefYours[Ref](
|
|
|
|
oid: `ref`.target.oid,
|
|
|
|
attenuation: `ref`.attenuation))
|
|
|
|
]#
|
|
|
|
let e = grab(relay.exported, `ref`, transient) do () -> WireSymbol:
|
|
|
|
assert(not transient, "Cannot send transient reference")
|
|
|
|
result = WireSymbol( oid: relay.nextLocalOid, `ref`: `ref`)
|
|
|
|
inc relay.nextLocalOid
|
|
|
|
exported.add e
|
|
|
|
WireRef(
|
|
|
|
orKind: WireRefKind.mine,
|
|
|
|
mine: WireRefMine(oid: e.oid))
|
|
|
|
|
|
|
|
proc rewriteOut(relay: Relay; v: Assertion; transient: bool):
|
|
|
|
tuple[rewritten: WireAssertion, exported: seq[WireSymbol]] =
|
|
|
|
var exported: seq[WireSymbol]
|
|
|
|
var rewritten = mapEmbeds[Ref, WireRef](v) do (r: Ref) -> WireRef:
|
|
|
|
result = rewriteRefOut(relay, r, transient, exported)
|
|
|
|
(rewritten, exported)
|
|
|
|
|
|
|
|
proc register(relay: Relay; v: Assertion; h: Handle): WireAssertion =
|
|
|
|
var (rewritten, exported) = rewriteOut(relay, v, false)
|
|
|
|
relay.outboundAssertions[h] = exported
|
|
|
|
rewritten
|
|
|
|
|
|
|
|
proc deregister(relay: Relay; h: Handle) =
|
|
|
|
var outbound: seq[WireSymbol]
|
|
|
|
if relay.outboundAssertions.pop(h, outbound):
|
|
|
|
for e in outbound: releaseRefOut(relay, e)
|
|
|
|
|
|
|
|
proc send(r: Relay; msg: seq[byte]): Future[void] =
|
|
|
|
assert(not r.packetWriter.isNil, "missing packetWriter proc")
|
|
|
|
r.packetWriter(msg)
|
|
|
|
|
|
|
|
proc send(r: Relay; rOid: protocol.Oid; m: Event[WireRef]) =
|
|
|
|
if r.pendingTurn.len == 0:
|
|
|
|
callSoon:
|
|
|
|
r.facet.run do (turn: var Turn):
|
|
|
|
var pkt = $Packet(
|
|
|
|
orKind: PacketKind.Turn,
|
|
|
|
turn: move r.pendingTurn)
|
|
|
|
echo "C: ", pkt
|
|
|
|
#asyncCheck(turn, r.send(encode pkt))
|
|
|
|
asyncCheck(turn, r.send(cast[seq[byte]](pkt)))
|
|
|
|
r.pendingTurn.add TurnEvent[WireRef](oid: rOid, event: m)
|
|
|
|
|
|
|
|
proc send(re: RelayEntity; ev: Event) =
|
|
|
|
send(re.relay, protocol.Oid re.oid, ev)
|
|
|
|
|
|
|
|
method publish(re: RelayEntity; t: var Turn; v: Assertion; h: Handle) =
|
|
|
|
var ev = protocol.Event[WireRef](
|
|
|
|
orKind: EventKind.Assert,
|
|
|
|
`assert`: protocol.Assert[WireRef](
|
|
|
|
assertion: re.relay.register(v, h),
|
|
|
|
handle: h))
|
|
|
|
re.send ev
|
|
|
|
|
|
|
|
method retract(re: RelayEntity; t: var Turn; h: Handle) =
|
|
|
|
re.relay.deregister h
|
|
|
|
re.send Event[WireRef](
|
|
|
|
orKind: EventKind.Retract,
|
|
|
|
retract: Retract(handle: h))
|
|
|
|
|
|
|
|
method message(re: RelayEntity; turn: var Turn; msg: Assertion) =
|
|
|
|
var ev = Event[WireRef](orKind: EventKind.Message)
|
|
|
|
var (body, _) = rewriteOut(re.relay, msg, true)
|
2021-10-28 15:41:28 +00:00
|
|
|
ev.message = Message[WireRef](body: body)
|
2021-09-24 19:25:47 +00:00
|
|
|
re.send ev
|
|
|
|
|
|
|
|
method sync(re: RelayEntity; turn: var Turn; peer: Ref) =
|
|
|
|
var
|
|
|
|
peerEntity = SyncPeerEntity(relay: re.relay, peer: peer)
|
|
|
|
exported: seq[WireSymbol]
|
|
|
|
discard rewriteRefOut(re.relay, turn.newRef(peerEntity), false, exported)
|
|
|
|
# TODO: discard?
|
|
|
|
peerEntity.e = exported[0]
|
|
|
|
re.send Event[WireRef](
|
|
|
|
orKind: EventKind.Sync,
|
|
|
|
sync: Sync[WireRef](peer: embed toPreserve(false, WireRef))) # TODO: send the WireRef?
|
|
|
|
|
|
|
|
using
|
|
|
|
relay: Relay
|
|
|
|
facet: Facet
|
|
|
|
|
|
|
|
proc lookupLocal(relay; oid: Oid): Ref =
|
|
|
|
try: relay.exported.byOid[oid].`ref`
|
|
|
|
except KeyError: newInertRef()
|
|
|
|
|
|
|
|
proc isInert(r: Ref): bool =
|
|
|
|
r.target.isNil
|
|
|
|
|
|
|
|
proc rewriteRefIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Ref =
|
|
|
|
case n.orKind
|
|
|
|
of WireRefKind.mine:
|
|
|
|
let e = relay.imported.grab(n.mine.oid, false) do () -> WireSymbol:
|
|
|
|
WireSymbol(
|
|
|
|
oid: n.mine.oid,
|
|
|
|
`ref`: newRef(facet, newRelayEntity("rewriteRefIn", relay, n.mine.oid)))
|
|
|
|
imported.add e
|
|
|
|
result = e.`ref`
|
|
|
|
of WireRefKind.yours:
|
|
|
|
let r = relay.lookupLocal(n.yours.oid)
|
|
|
|
if n.yours.attenuation.len == 0 or r.isInert:
|
|
|
|
result = r
|
|
|
|
else:
|
|
|
|
raiseAssert "attenuation not implemented"
|
|
|
|
|
|
|
|
proc rewriteIn(relay; facet; a: Preserve[WireRef]):
|
|
|
|
tuple[rewritten: Assertion; imported: seq[WireSymbol]] =
|
|
|
|
var
|
|
|
|
imported: seq[WireSymbol]
|
|
|
|
rewritten = mapEmbeds(a) do (wr: WireRef) -> Ref:
|
|
|
|
rewriteRefIn(relay, facet, wr, imported)
|
|
|
|
(rewritten, imported)
|
|
|
|
|
|
|
|
proc close(r: Relay) = discard
|
|
|
|
|
|
|
|
proc dispatch(relay: Relay; turn: var Turn; `ref`: Ref; event: Event[WireRef]) =
|
|
|
|
case event.orKind
|
|
|
|
of EventKind.Assert:
|
|
|
|
let (a, imported) = rewriteIn(relay, turn.activeFacet, event.assert.assertion)
|
|
|
|
relay.inboundAssertions[event.assert.handle] =
|
|
|
|
(turn.publish(`ref`, a), imported,)
|
|
|
|
|
|
|
|
of EventKind.Retract:
|
|
|
|
let remoteHandle = event.retract.handle
|
|
|
|
var outbound: tuple[localHandle: Handle, imported: seq[WireSymbol]]
|
|
|
|
if relay.inboundAssertions.pop(remoteHandle, outbound):
|
|
|
|
for e in outbound.imported: relay.imported.drop e
|
|
|
|
turn.retract(outbound.localHandle)
|
|
|
|
|
|
|
|
of EventKind.Message:
|
|
|
|
let (a, imported) = rewriteIn(relay, turn.activeFacet, event.message.body)
|
|
|
|
assert imported.len == 0, "Cannot receive transient reference"
|
|
|
|
turn.message(`ref`, a)
|
|
|
|
|
|
|
|
of EventKind.Sync:
|
|
|
|
discard # TODO
|
|
|
|
#[
|
|
|
|
var imported: seq[WireSymbol]
|
|
|
|
let k = relay.rewriteRefIn(turn, evenr.sync.peer, imported)
|
|
|
|
turn.sync(`ref`) do (turn: var Turn):
|
|
|
|
turn.message(k, true)
|
|
|
|
for e in imported: relay.imported.del e
|
|
|
|
]#
|
|
|
|
|
|
|
|
proc dispatch(relay: Relay; v: Preserve[WireRef]) =
|
|
|
|
run(relay.facet) do (t: var Turn):
|
|
|
|
var pkt: Packet
|
|
|
|
if fromPreserve(pkt, v):
|
|
|
|
case pkt.orKind
|
|
|
|
of PacketKind.Turn:
|
|
|
|
for te in pkt.turn:
|
|
|
|
dispatch(relay, t, lookupLocal(relay, te.oid.Oid), te.event)
|
|
|
|
of PacketKind.Error:
|
|
|
|
relay.facet.log("Error from server: ", pkt.error.message, " (detail: ", pkt.error.detail, ")")
|
|
|
|
close relay
|
|
|
|
|
|
|
|
|
|
|
|
proc recv(relay: Relay; buf: seq[byte]) =
|
|
|
|
# var pkt = decodePreserves(buf, WireRef)
|
|
|
|
var pkt = cast[Preserve[WireRef]](
|
|
|
|
parsePreserves(cast[string](buf), sturdy.WireRef[void]))
|
|
|
|
# the compiler cannot convert `Preserve[void]` to `Preserve[WireRef[Ref]]`
|
|
|
|
# so convert to `Preserve[WireRef[void]]` and cast.
|
|
|
|
echo "S: ", pkt
|
|
|
|
dispatch(relay, pkt)
|
|
|
|
|
|
|
|
type
|
|
|
|
RelayOptions = object of RootObj
|
|
|
|
packetWriter: PacketWriter
|
|
|
|
setup: RelaySetup
|
|
|
|
untrusted: bool
|
|
|
|
RelayActorOptions = object of RelayOptions
|
|
|
|
initialOid: Option[Oid]
|
|
|
|
initialRef: Ref
|
|
|
|
nextLocalOid: Option[Oid]
|
|
|
|
|
|
|
|
proc newRelay(turn: var Turn; opts: RelayOptions): Relay =
|
|
|
|
result = Relay(
|
|
|
|
facet: turn.activeFacet,
|
|
|
|
packetWriter: opts.packetWriter,
|
|
|
|
untrusted: opts.untrusted)
|
|
|
|
discard result.facet.preventInertCheck()
|
|
|
|
opts.setup(turn, result)
|
|
|
|
|
|
|
|
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions): Future[Ref] =
|
|
|
|
var fut = newFuture[Ref]"spawnRelay"
|
|
|
|
spawn(name, turn) do (turn: var Turn):
|
|
|
|
let relay = newRelay(turn, opts)
|
|
|
|
if not opts.initialRef.isNil:
|
|
|
|
var exported: seq[WireSymbol]
|
|
|
|
discard rewriteRefOut(relay, opts.initialRef, false, exported)
|
|
|
|
if opts.initialOid.isSome:
|
|
|
|
var imported: seq[WireSymbol]
|
|
|
|
var wr = WireRef(
|
|
|
|
orKind: WireRefKind.mine,
|
|
|
|
mine: WireRefMine(oid: opts.initialOid.get))
|
|
|
|
fut.complete rewriteRefIn(relay, turn.activeFacet, wr, imported)
|
|
|
|
else:
|
|
|
|
fut.complete(nil)
|
|
|
|
opts.nextLocalOid.map do (oid: Oid):
|
|
|
|
relay.nextLocalOid =
|
|
|
|
if oid == 0.Oid: 1.Oid
|
|
|
|
else: oid
|
|
|
|
fut
|
|
|
|
|
|
|
|
import std/[asyncdispatch, asyncnet]
|
|
|
|
from std/nativesockets import AF_UNIX, SOCK_STREAM, Protocol
|
|
|
|
|
|
|
|
import protocols/gatekeeper
|
|
|
|
|
|
|
|
type ShutdownEntity = ref object of Entity
|
|
|
|
|
|
|
|
method publish(e: ShutdownEntity; t: var Turn; v: Assertion; h: Handle) = discard
|
|
|
|
|
|
|
|
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
|
|
|
|
stopActor(turn)
|
|
|
|
|
|
|
|
type
|
|
|
|
SturdyRef = sturdy.SturdyRef[Ref]
|
|
|
|
Resolve = gatekeeper.Resolve[Ref]
|
|
|
|
|
|
|
|
proc connectUnix*(turn: var Turn; path: string; cap: SturdyRef; bootProc: DuringProc) =
|
|
|
|
var socket = newAsyncSocket(
|
|
|
|
domain = AF_UNIX,
|
|
|
|
sockType = SOCK_STREAM,
|
|
|
|
protocol = cast[Protocol](0),
|
|
|
|
buffered = false)
|
|
|
|
proc socketWriter(packet: seq[byte]): Future[void] =
|
|
|
|
socket.send cast[string](packet)
|
|
|
|
const recvSize = 4096
|
|
|
|
var shutdownRef: Ref
|
|
|
|
let reenable = turn.activeFacet.preventInertCheck()
|
|
|
|
let connectionClosedRef = newRef(turn, ShutdownEntity())
|
|
|
|
proc setup(turn: var Turn; relay: Relay) =
|
|
|
|
let facet = turn.activeFacet
|
|
|
|
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
|
|
|
|
let buf = cast[seq[byte]](pktFut.read)
|
|
|
|
if buf.len == 0:
|
|
|
|
run(facet) do (turn: var Turn): stopActor(turn)
|
|
|
|
else:
|
|
|
|
relay.recv(buf )
|
|
|
|
socket.recv(recvSize).addCallback(recvCb)
|
|
|
|
# TODO: should this need be callSoon?
|
|
|
|
socket.recv(recvSize).addCallback(recvCb)
|
|
|
|
turn.activeFacet.actor.atExit do (turn: var Turn): close(socket)
|
|
|
|
discard publish(turn, connectionClosedRef, true)
|
|
|
|
shutdownRef = newRef(turn, ShutdownEntity())
|
|
|
|
var fut = newFuture[void]"connectUnix"
|
|
|
|
connectUnix(socket, path).addCallback do (f: Future[void]):
|
|
|
|
read f
|
|
|
|
discard newActor("unix") do (turn: var Turn):
|
|
|
|
let relayFut = spawnRelay("unix", turn, RelayActorOptions(
|
|
|
|
packetWriter: socketWriter,
|
|
|
|
setup: setup,
|
|
|
|
initialOid: 0.Oid.some))
|
|
|
|
relayFut.addCallback do (refFut: Future[Ref]):
|
|
|
|
let gatekeeper = read refFut
|
|
|
|
run(gatekeeper.relay) do (turn: var Turn):
|
|
|
|
reenable()
|
|
|
|
discard publish(turn, shutdownRef, true)
|
|
|
|
proc duringCallback(turn: var Turn; ds: Preserve[Ref]): TurnAction =
|
|
|
|
let facet = facet(turn) do (turn: var Turn):
|
|
|
|
discard bootProc(turn, ds) # TODO: what to do with this?
|
|
|
|
proc action(turn: var Turn) =
|
|
|
|
stop(turn, facet)
|
|
|
|
result = action
|
|
|
|
var res = Resolve(
|
|
|
|
sturdyref: cap,
|
|
|
|
observer: embed newRef(turn, during(duringCallback)))
|
|
|
|
discard publish(turn, gatekeeper, res)
|
|
|
|
fut.complete()
|
|
|
|
asyncCheck(turn, fut)
|