syndicate-nim/src/syndicate/relays.nim

425 lines
15 KiB
Nim

# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[asyncdispatch, options, streams, tables]
import preserves
import ./actors, ./durings, ./membranes, ./protocols/[protocol, sturdy, transportAddress]
when defined(traceSyndicate):
when defined(posix):
template trace(args: varargs[untyped]): untyped = stderr.writeLine(args)
else:
template trace(args: varargs[untyped]): untyped = echo(args)
else:
template trace(args: varargs[untyped]): untyped = discard
export `$`
type Oid = sturdy.Oid
type
Value = Preserve[void]
Assertion = Preserve[Ref]
WireRef = sturdy.WireRef[void]
Turn = actors.Turn
type
PacketWriter = proc (pkt: sink Packet): Future[void] {.gcsafe.}
RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.}
Relay* = ref object of RootObj
facet: Facet
inboundAssertions: Table[Handle,
tuple[localHandle: Handle, imported: seq[WireSymbol]]]
outboundAssertions: Table[Handle, seq[WireSymbol]]
exported: Membrane
imported: Membrane
nextLocalOid: Oid
pendingTurn: protocol.Turn
packetWriter: PacketWriter
untrusted: bool
SyncPeerEntity = ref object of Entity
relay: Relay
peer: Ref
handleMap: Table[Handle, Handle]
e: WireSymbol
RelayEntity = ref object of Entity
## https://synit.org/book/protocol.html#relay-entities
label: string
relay: Relay
proc releaseRefOut(r: Relay; e: WireSymbol) =
r.exported.drop e
method publish(spe: SyncPeerEntity; t: var Turn; a: AssertionRef; h: Handle) =
spe.handleMap[h] = publish(t, spe.peer, a.value)
method retract(se: SyncPeerEntity; t: var Turn; h: Handle) =
var other: Handle
if se.handleMap.pop(h, other):
retract(t, other)
method message(se: SyncPeerEntity; t: var Turn; a: AssertionRef) =
if not se.e.isNil:
se.relay.releaseRefOut(se.e)
message(t, se.peer, a.value)
method sync(se: SyncPeerEntity; t: var Turn; peer: Ref) =
sync(t, se.peer, peer)
proc newSyncPeerEntity(r: Relay; p: Ref): SyncPeerEntity =
SyncPeerEntity(relay: r, peer: p)
proc rewriteRefOut(relay: Relay; `ref`: Ref; exported: var seq[WireSymbol]): WireRef =
if `ref`.target of RelayEntity and `ref`.target.RelayEntity.relay == relay and `ref`.attenuation.len == 0:
WireRef(orKind: WireRefKind.yours, yours: WireRefYours[void](oid: `ref`.target.oid))
else:
var ws = grab(relay.exported, `ref`)
if ws.isNil:
ws = newWireSymbol(relay.exported, relay.nextLocalOid, `ref`)
inc relay.nextLocalOid
exported.add ws
WireRef(
orKind: WireRefKind.mine,
mine: WireRefMine(oid: ws.oid))
proc rewriteOut(relay: Relay; v: Assertion):
tuple[rewritten: Value, exported: seq[WireSymbol]] {.gcsafe.} =
var exported: seq[WireSymbol]
result.rewritten = contract(v) do (r: Ref) -> Value:
rewriteRefOut(relay, r, exported).toPreserve
result.exported = exported
proc register(relay: Relay; v: Assertion; h: Handle): tuple[rewritten: Value, exported: seq[WireSymbol]] =
result = rewriteOut(relay, v)
relay.outboundAssertions[h] = result.exported
proc deregister(relay: Relay; h: Handle) =
var outbound: seq[WireSymbol]
if relay.outboundAssertions.pop(h, outbound):
for e in outbound: releaseRefOut(relay, e)
proc send(r: Relay; pkt: sink Packet): Future[void] =
assert(not r.packetWriter.isNil, "missing packetWriter proc")
r.packetWriter(pkt)
proc send(r: Relay; rOid: protocol.Oid; m: Event) =
if r.pendingTurn.len == 0:
# do not send a packet immediately,
# wait until the pending I/O is processed
callSoon do ():
r.facet.run do (turn: var Turn):
var pkt = Packet(
orKind: PacketKind.Turn,
turn: move r.pendingTurn)
trace "C: ", pkt
asyncCheck(turn, r.send(pkt))
r.pendingTurn.add TurnEvent(oid: rOid, event: m)
proc send(re: RelayEntity; ev: Event) =
send(re.relay, protocol.Oid re.oid, ev)
method publish(re: RelayEntity; t: var Turn; a: AssertionRef; h: Handle) {.gcsafe.} =
re.send Event(
orKind: EventKind.Assert,
`assert`: protocol.Assert(
assertion: re.relay.register(a.value, h).rewritten,
handle: h))
method retract(re: RelayEntity; t: var Turn; h: Handle) {.gcsafe.} =
re.relay.deregister h
re.send Event(
orKind: EventKind.Retract,
retract: Retract(handle: h))
method message(re: RelayEntity; turn: var Turn; msg: AssertionRef) {.gcsafe.} =
var (value, exported) = rewriteOut(re.relay, msg.value)
assert(len(exported) == 0, "cannot send a reference in a message")
if len(exported) == 0:
re.send Event(orKind: EventKind.Message, message: Message(body: value))
method sync(re: RelayEntity; turn: var Turn; peer: Ref) {.gcsafe.} =
var
peerEntity = newSyncPeerEntity(re.relay, peer)
exported: seq[WireSymbol]
discard rewriteRefOut(re.relay, turn.newRef(peerEntity), exported)
# TODO: discard?
peerEntity.e = exported[0]
re.send Event(orKind: EventKind.Sync)
proc newRelayEntity(label: string; r: Relay; o: Oid): RelayEntity =
RelayEntity(label: label, relay: r, oid: o)
using
relay: Relay
facet: Facet
proc lookupLocal(relay; oid: Oid): Ref =
let sym = relay.exported.grab oid
if sym.isNil: newInertRef()
else: sym.`ref`
proc isInert(r: Ref): bool =
r.target.isNil
proc rewriteRefIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Ref =
case n.orKind
of WireRefKind.mine:
var e = relay.imported.grab(n.mine.oid)
if e.isNil: e = newWireSymbol(
relay.imported,
n.mine.oid,
newRef(facet, newRelayEntity("rewriteRefIn", relay, n.mine.oid)))
imported.add e
result = e.`ref`
of WireRefKind.yours:
let r = relay.lookupLocal(n.yours.oid)
if n.yours.attenuation.len == 0 or r.isInert: result = r
else: raiseAssert "attenuation not implemented"
proc rewriteIn(relay; facet; v: Value):
tuple[rewritten: Assertion; imported: seq[WireSymbol]] {.gcsafe.} =
var imported: seq[WireSymbol]
result.rewritten = expand(v) do (pr: Value) -> Assertion:
var wr: WireRef
if not fromPreserve(wr, pr):
raiseAssert "expansion of embedded value failed"
rewriteRefIn(relay, facet, wr, imported).toPreserve(Ref)
result.imported = imported
proc close(r: Relay) = discard
proc dispatch*(relay: Relay; turn: var Turn; `ref`: Ref; event: Event) {.gcsafe.} =
case event.orKind
of EventKind.Assert:
let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion)
relay.inboundAssertions[event.assert.handle] = (publish(turn, `ref`, a), imported,)
of EventKind.Retract:
let remoteHandle = event.retract.handle
var outbound: tuple[localHandle: Handle, imported: seq[WireSymbol]]
if relay.inboundAssertions.pop(remoteHandle, outbound):
for e in outbound.imported: relay.imported.drop e
turn.retract(outbound.localHandle)
of EventKind.Message:
let (a, imported) = rewriteIn(relay, turn.facet, event.message.body)
assert imported.len == 0, "Cannot receive transient reference"
turn.message(`ref`, a)
of EventKind.Sync:
discard # TODO
#[
var imported: seq[WireSymbol]
let k = relay.rewriteRefIn(turn, evenr.sync.peer, imported)
turn.sync(`ref`) do (turn: var Turn):
turn.message(k, true)
for e in imported: relay.imported.del e
]#
proc dispatch*(relay: Relay; v: Value) {.gcsafe.} =
trace "S: ", v
run(relay.facet) do (t: var Turn):
var pkt: Packet
if fromPreserve(pkt, v):
case pkt.orKind
of PacketKind.Turn:
# https://synit.org/book/protocol.html#turn-packets
for te in pkt.turn:
let r = lookupLocal(relay, te.oid.Oid)
if not r.isInert:
dispatch(relay, t, r, te.event)
else:
stderr.writeLine("discarding event for unknown Ref; ", te.event)
of PacketKind.Error:
# https://synit.org/book/protocol.html#error-packets
when defined(posix):
stderr.writeLine("Error from server: ", pkt.error.message, " (detail: ", pkt.error.detail, ")")
close relay
of PacketKind.Extension:
# https://synit.org/book/protocol.html#extension-packets
discard
else:
when defined(posix):
stderr.writeLine("discarding undecoded packet ", v)
type
RelayOptions* = object of RootObj
packetWriter*: PacketWriter
untrusted*: bool
RelayActorOptions* = object of RelayOptions
initialOid*: Option[Oid]
initialRef*: Ref
nextLocalOid*: Option[Oid]
proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay =
result = Relay(
facet: turn.facet,
packetWriter: opts.packetWriter,
untrusted: opts.untrusted)
discard result.facet.preventInertCheck()
setup(turn, result)
proc spawnRelay*(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup): Future[Ref] =
var fut = newFuture[Ref]"spawnRelay"
discard spawn(name, turn) do (turn: var Turn):
let relay = newRelay(turn, opts, setup)
if not opts.initialRef.isNil:
var exported: seq[WireSymbol]
discard rewriteRefOut(relay, opts.initialRef, exported)
if opts.initialOid.isSome:
var imported: seq[WireSymbol]
var wr = WireRef(
orKind: WireRefKind.mine,
mine: WireRefMine(oid: opts.initialOid.get))
fut.complete rewriteRefIn(relay, turn.facet, wr, imported)
else:
fut.complete(nil)
opts.nextLocalOid.map do (oid: Oid):
relay.nextLocalOid =
if oid == 0.Oid: 1.Oid
else: oid
fut
when defined(posix):
import std/asyncnet
from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol
import protocols/gatekeeper
type ShutdownEntity* = ref object of Entity
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
stopActor(turn)
type ConnectProc* = proc (turn: var Turn; ds: Ref) {.gcsafe.}
export Tcp
when defined(posix):
export Unix
proc connect*(turn: var Turn; socket: AsyncSocket; step: Preserve[Ref]; bootProc: ConnectProc) =
## Relay a dataspace over an open `AsyncSocket`.
## *`bootProc` may be called multiple times for multiple remote gatekeepers.*
proc socketWriter(packet: sink Packet): Future[void] =
socket.send(cast[string](encode(packet)))
const recvSize = 0x2000
var shutdownRef: Ref
let
reenable = turn.facet.preventInertCheck()
connectionClosedRef = newRef(turn, ShutdownEntity())
fut = newFuture[void]"connect"
discard bootActor("socket") do (turn: var Turn):
var ops = RelayActorOptions(
packetWriter: socketWriter,
initialOid: 0.Oid.some)
let refFut = spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay):
let facet = turn.facet
var wireBuf = newBufferedDecoder(0)
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
if pktFut.failed:
run(facet) do (turn: var Turn): stopActor(turn)
else:
var buf = pktFut.read
if buf.len == 0:
run(facet) do (turn: var Turn): stopActor(turn)
else:
feed(wireBuf, buf)
var (success, pr) = decode(wireBuf)
if success:
dispatch(relay, pr)
if not socket.isClosed:
socket.recv(recvSize).addCallback(recvCb)
socket.recv(recvSize).addCallback(recvCb)
turn.facet.actor.atExit do (turn: var Turn): close(socket)
discard publish(turn, connectionClosedRef, true)
shutdownRef = newRef(turn, ShutdownEntity())
addCallback(refFut) do ():
let gatekeeper = read refFut
run(gatekeeper.relay) do (turn: var Turn):
reenable()
discard publish(turn, shutdownRef, true)
proc duringCallback(turn: var Turn; a: Assertion; h: Handle): TurnAction =
let facet = inFacet(turn) do (turn: var Turn):
var
accepted: ResolvedAccepted[Ref]
rejected: Rejected[Ref]
if fromPreserve(accepted, a):
bootProc(turn, accepted.responderSession)
elif fromPreserve(rejected, a):
fail(fut, newException(CatchableError, $rejected.detail))
else:
fail(fut, newException(CatchableError, $a))
proc action(turn: var Turn) =
stop(turn, facet)
result = action
discard publish(turn, gatekeeper, Resolve[Ref](
step: step,
observer: newRef(turn, during(duringCallback)),
))
fut.complete()
asyncCheck(turn, fut)
proc connect*(turn: var Turn; transport: Tcp; step: Preserve[Ref]; bootProc: ConnectProc) =
## Relay a dataspace over TCP.
## *`bootProc` may be called multiple times for multiple remote gatekeepers.*
let socket = newAsyncSocket(
domain = AF_INET,
sockType = SOCK_STREAM,
protocol = IPPROTO_TCP,
buffered = false)
let fut = connect(socket, transport.host, Port transport.port)
addCallback(fut, turn) do (turn: var Turn):
connect(turn, socket, step, bootProc)
proc connect*(turn: var Turn; transport: Unix; step: Preserve[Ref]; bootProc: ConnectProc) =
## Relay a dataspace over a UNIX socket.
## *`bootProc` may be called multiple times for multiple remote gatekeepers.*
let socket = newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false)
let fut = connectUnix(socket, transport.path)
addCallback(fut, turn) do (turn: var Turn):
connect(turn, socket, step, bootProc)
import std/asyncfile
const stdinReadSize = 128
proc connectStdio*(ds: Ref; turn: var Turn) =
## Connect to an external dataspace over stdin and stdout.
proc stdoutWriter(packet: sink Packet): Future[void] {.async.} =
var buf = encode(packet)
doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len
flushFile(stdout)
var opts = RelayActorOptions(
packetWriter: stdoutWriter,
initialRef: ds,
initialOid: 0.Oid.some)
asyncCheck spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
let
facet = turn.facet
asyncStdin = openAsync("/dev/stdin") # this is universal now?
close(stdin)
facet.actor.atExit do (turn: var Turn):
close(asyncStdin)
var wireBuf = newBufferedDecoder(0)
proc readCb(pktFut: Future[string]) {.gcsafe.} =
if not pktFut.failed:
var buf = pktFut.read
if buf.len == 0:
run(facet) do (turn: var Turn): stopActor(turn)
else:
feed(wireBuf, buf)
var (success, pr) = decode(wireBuf)
if success:
dispatch(relay, pr)
asyncStdin.read(stdinReadSize).addCallback(readCb)
asyncStdin.read(stdinReadSize).addCallback(readCb)