454 lines
16 KiB
Nim
454 lines
16 KiB
Nim
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
|
# SPDX-License-Identifier: Unlicense
|
|
|
|
import std/[asyncdispatch, options, streams, tables]
|
|
from std/os import getEnv, `/`
|
|
import preserves
|
|
import ../syndicate, /capabilities, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress]
|
|
|
|
when defined(traceSyndicate):
|
|
when defined(posix):
|
|
template trace(args: varargs[untyped]): untyped = stderr.writeLine(args)
|
|
else:
|
|
template trace(args: varargs[untyped]): untyped = echo(args)
|
|
else:
|
|
template trace(args: varargs[untyped]): untyped = discard
|
|
|
|
export `$`
|
|
|
|
type
|
|
Oid = sturdy.Oid
|
|
|
|
export Stdio, Tcp, WebSocket, Unix
|
|
|
|
type
|
|
Value = Preserve[void]
|
|
Assertion = Preserve[Cap]
|
|
WireRef = sturdy.WireRef[void]
|
|
Turn = syndicate.Turn
|
|
|
|
type
|
|
PacketWriter = proc (pkt: sink Packet): Future[void] {.gcsafe.}
|
|
RelaySetup = proc (turn: var Turn; relay: Relay) {.gcsafe.}
|
|
|
|
Relay* = ref object of RootObj
|
|
facet: Facet
|
|
inboundAssertions: Table[Handle,
|
|
tuple[localHandle: Handle, imported: seq[WireSymbol]]]
|
|
outboundAssertions: Table[Handle, seq[WireSymbol]]
|
|
exported: Membrane
|
|
imported: Membrane
|
|
nextLocalOid: Oid
|
|
pendingTurn: protocol.Turn
|
|
packetWriter: PacketWriter
|
|
untrusted: bool
|
|
|
|
SyncPeerEntity = ref object of Entity
|
|
relay: Relay
|
|
peer: Cap
|
|
handleMap: Table[Handle, Handle]
|
|
e: WireSymbol
|
|
|
|
RelayEntity = ref object of Entity
|
|
## https://synit.org/book/protocol.html#relay-entities
|
|
label: string
|
|
relay: Relay
|
|
|
|
proc releaseCapOut(r: Relay; e: WireSymbol) =
|
|
r.exported.drop e
|
|
|
|
method publish(spe: SyncPeerEntity; t: var Turn; a: AssertionRef; h: Handle) =
|
|
spe.handleMap[h] = publish(t, spe.peer, a.value)
|
|
|
|
method retract(se: SyncPeerEntity; t: var Turn; h: Handle) =
|
|
var other: Handle
|
|
if se.handleMap.pop(h, other):
|
|
retract(t, other)
|
|
|
|
method message(se: SyncPeerEntity; t: var Turn; a: AssertionRef) =
|
|
if not se.e.isNil:
|
|
se.relay.releaseCapOut(se.e)
|
|
message(t, se.peer, a.value)
|
|
|
|
method sync(se: SyncPeerEntity; t: var Turn; peer: Cap) =
|
|
sync(t, se.peer, peer)
|
|
|
|
proc newSyncPeerEntity(r: Relay; p: Cap): SyncPeerEntity =
|
|
SyncPeerEntity(relay: r, peer: p)
|
|
|
|
proc rewriteCapOut(relay: Relay; cap: Cap; exported: var seq[WireSymbol]): WireRef =
|
|
if cap.target of RelayEntity and cap.target.RelayEntity.relay == relay and cap.attenuation.len == 0:
|
|
WireRef(orKind: WireRefKind.yours, yours: WireRefYours[void](oid: cap.target.oid))
|
|
else:
|
|
var ws = grab(relay.exported, cap)
|
|
if ws.isNil:
|
|
ws = newWireSymbol(relay.exported, relay.nextLocalOid, cap)
|
|
inc relay.nextLocalOid
|
|
exported.add ws
|
|
WireRef(
|
|
orKind: WireRefKind.mine,
|
|
mine: WireRefMine(oid: ws.oid))
|
|
|
|
proc rewriteOut(relay: Relay; v: Assertion):
|
|
tuple[rewritten: Value, exported: seq[WireSymbol]] {.gcsafe.} =
|
|
var exported: seq[WireSymbol]
|
|
result.rewritten = contract(v) do (r: Cap) -> Value:
|
|
rewriteCapOut(relay, r, exported).toPreserve
|
|
result.exported = exported
|
|
|
|
proc register(relay: Relay; v: Assertion; h: Handle): tuple[rewritten: Value, exported: seq[WireSymbol]] =
|
|
result = rewriteOut(relay, v)
|
|
relay.outboundAssertions[h] = result.exported
|
|
|
|
proc deregister(relay: Relay; h: Handle) =
|
|
var outbound: seq[WireSymbol]
|
|
if relay.outboundAssertions.pop(h, outbound):
|
|
for e in outbound: releaseCapOut(relay, e)
|
|
|
|
proc send(r: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
|
|
if r.pendingTurn.len == 0:
|
|
# If the pending queue is empty then schedule a packet
|
|
# to be sent after pending I/O is processed.
|
|
callSoon do ():
|
|
r.facet.run do (turn: var Turn):
|
|
var pkt = Packet(
|
|
orKind: PacketKind.Turn,
|
|
turn: move r.pendingTurn)
|
|
trace "C: ", pkt
|
|
assert(not r.packetWriter.isNil, "missing packetWriter proc")
|
|
asyncCheck(turn, r.packetWriter(pkt))
|
|
r.pendingTurn.add TurnEvent(oid: rOid, event: m)
|
|
|
|
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
|
|
send(re.relay, turn, protocol.Oid re.oid, ev)
|
|
|
|
method publish(re: RelayEntity; t: var Turn; a: AssertionRef; h: Handle) {.gcsafe.} =
|
|
re.send(t, Event(
|
|
orKind: EventKind.Assert,
|
|
`assert`: protocol.Assert(
|
|
assertion: re.relay.register(a.value, h).rewritten,
|
|
handle: h)))
|
|
|
|
method retract(re: RelayEntity; t: var Turn; h: Handle) {.gcsafe.} =
|
|
re.relay.deregister h
|
|
re.send(t, Event(
|
|
orKind: EventKind.Retract,
|
|
retract: Retract(handle: h)))
|
|
|
|
method message(re: RelayEntity; turn: var Turn; msg: AssertionRef) {.gcsafe.} =
|
|
var (value, exported) = rewriteOut(re.relay, msg.value)
|
|
assert(len(exported) == 0, "cannot send a reference in a message")
|
|
if len(exported) == 0:
|
|
re.send(turn, Event(orKind: EventKind.Message, message: Message(body: value)))
|
|
|
|
method sync(re: RelayEntity; turn: var Turn; peer: Cap) {.gcsafe.} =
|
|
var
|
|
peerEntity = newSyncPeerEntity(re.relay, peer)
|
|
exported: seq[WireSymbol]
|
|
discard rewriteCapOut(re.relay, turn.newCap(peerEntity), exported)
|
|
# TODO: discard?
|
|
peerEntity.e = exported[0]
|
|
re.send(turn, Event(orKind: EventKind.Sync))
|
|
|
|
proc newRelayEntity(label: string; r: Relay; o: Oid): RelayEntity =
|
|
RelayEntity(label: label, relay: r, oid: o)
|
|
|
|
using
|
|
relay: Relay
|
|
facet: Facet
|
|
|
|
proc lookupLocal(relay; oid: Oid): Cap =
|
|
let sym = relay.exported.grab oid
|
|
if sym.isNil: newInertCap()
|
|
else: sym.cap
|
|
|
|
proc isInert(r: Cap): bool =
|
|
r.target.isNil
|
|
|
|
proc rewriteCapIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Cap =
|
|
case n.orKind
|
|
of WireRefKind.mine:
|
|
var e = relay.imported.grab(n.mine.oid)
|
|
if e.isNil: e = newWireSymbol(
|
|
relay.imported,
|
|
n.mine.oid,
|
|
newCap(facet, newRelayEntity("rewriteCapIn", relay, n.mine.oid)))
|
|
imported.add e
|
|
result = e.cap
|
|
of WireRefKind.yours:
|
|
let r = relay.lookupLocal(n.yours.oid)
|
|
if n.yours.attenuation.len == 0 or r.isInert: result = r
|
|
else: raiseAssert "attenuation not implemented"
|
|
|
|
proc rewriteIn(relay; facet; v: Value):
|
|
tuple[rewritten: Assertion; imported: seq[WireSymbol]] {.gcsafe.} =
|
|
var imported: seq[WireSymbol]
|
|
result.rewritten = expand(v) do (pr: Value) -> Assertion:
|
|
var wr: WireRef
|
|
if not fromPreserve(wr, pr):
|
|
raiseAssert "expansion of embedded value failed"
|
|
rewriteCapIn(relay, facet, wr, imported).toPreserve(Cap)
|
|
result.imported = imported
|
|
|
|
proc close(r: Relay) = discard
|
|
|
|
proc dispatch*(relay: Relay; turn: var Turn; cap: Cap; event: Event) {.gcsafe.} =
|
|
case event.orKind
|
|
of EventKind.Assert:
|
|
let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion)
|
|
relay.inboundAssertions[event.assert.handle] = (publish(turn, cap, a), imported,)
|
|
|
|
of EventKind.Retract:
|
|
let remoteHandle = event.retract.handle
|
|
var outbound: tuple[localHandle: Handle, imported: seq[WireSymbol]]
|
|
if relay.inboundAssertions.pop(remoteHandle, outbound):
|
|
for e in outbound.imported: relay.imported.drop e
|
|
turn.retract(outbound.localHandle)
|
|
|
|
of EventKind.Message:
|
|
let (a, imported) = rewriteIn(relay, turn.facet, event.message.body)
|
|
assert imported.len == 0, "Cannot receive transient reference"
|
|
turn.message(cap, a)
|
|
|
|
of EventKind.Sync:
|
|
discard # TODO
|
|
#[
|
|
var imported: seq[WireSymbol]
|
|
let k = relay.rewriteCapIn(turn, evenr.sync.peer, imported)
|
|
turn.sync(cap) do (turn: var Turn):
|
|
turn.message(k, true)
|
|
for e in imported: relay.imported.del e
|
|
]#
|
|
|
|
proc dispatch*(relay: Relay; v: Value) {.gcsafe.} =
|
|
trace "S: ", v
|
|
run(relay.facet) do (t: var Turn):
|
|
var pkt: Packet
|
|
if fromPreserve(pkt, v):
|
|
case pkt.orKind
|
|
of PacketKind.Turn:
|
|
# https://synit.org/book/protocol.html#turn-packets
|
|
for te in pkt.turn:
|
|
let r = lookupLocal(relay, te.oid.Oid)
|
|
if not r.isInert:
|
|
dispatch(relay, t, r, te.event)
|
|
else:
|
|
stderr.writeLine("discarding event for unknown Cap; ", te.event)
|
|
of PacketKind.Error:
|
|
# https://synit.org/book/protocol.html#error-packets
|
|
when defined(posix):
|
|
stderr.writeLine("Error from server: ", pkt.error.message, " (detail: ", pkt.error.detail, ")")
|
|
close relay
|
|
of PacketKind.Extension:
|
|
# https://synit.org/book/protocol.html#extension-packets
|
|
discard
|
|
else:
|
|
when defined(posix):
|
|
stderr.writeLine("discarding undecoded packet ", v)
|
|
|
|
type
|
|
RelayOptions* = object of RootObj
|
|
packetWriter*: PacketWriter
|
|
untrusted*: bool
|
|
RelayActorOptions* = object of RelayOptions
|
|
initialOid*: Option[Oid]
|
|
initialCap*: Cap
|
|
nextLocalOid*: Option[Oid]
|
|
|
|
proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay =
|
|
result = Relay(
|
|
facet: turn.facet,
|
|
packetWriter: opts.packetWriter,
|
|
untrusted: opts.untrusted)
|
|
discard result.facet.preventInertCheck()
|
|
setup(turn, result)
|
|
|
|
proc transportConnectionResolve(addrAss: Assertion; ds: Cap): gatekeeper.TransportConnection[Cap] =
|
|
result.`addr` = addrAss
|
|
result.resolved = Resolved[Cap](orKind: ResolvedKind.accepted)
|
|
result.resolved.accepted.responderSession = ds
|
|
|
|
proc spawnRelay*(name: string; turn: var Turn; ds: Cap; addrAss: Assertion; opts: RelayActorOptions; setup: RelaySetup) =
|
|
discard spawn(name, turn) do (turn: var Turn):
|
|
let relay = newRelay(turn, opts, setup)
|
|
if not opts.initialCap.isNil:
|
|
var exported: seq[WireSymbol]
|
|
discard rewriteCapOut(relay, opts.initialCap, exported)
|
|
opts.nextLocalOid.map do (oid: Oid):
|
|
relay.nextLocalOid =
|
|
if oid == 0.Oid: 1.Oid
|
|
else: oid
|
|
if opts.initialOid.isSome:
|
|
var
|
|
imported: seq[WireSymbol]
|
|
wr = WireRef(
|
|
orKind: WireRefKind.mine,
|
|
mine: WireRefMine(oid: opts.initialOid.get))
|
|
res = rewriteCapIn(relay, turn.facet, wr, imported)
|
|
discard publish(turn, ds, transportConnectionResolve(addrAss, res))
|
|
else:
|
|
discard publish(turn, ds, transportConnectionResolve(addrAss, ds))
|
|
|
|
when defined(posix):
|
|
import std/asyncnet
|
|
from std/nativesockets import AF_INET, AF_UNIX, IPPROTO_TCP, SOCK_STREAM, Protocol
|
|
|
|
type ShutdownEntity* = ref object of Entity
|
|
|
|
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
|
|
stopActor(turn)
|
|
|
|
type ConnectProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
|
|
|
|
export Tcp
|
|
|
|
when defined(posix):
|
|
export Unix
|
|
|
|
proc connect*(turn: var Turn; ds: Cap; route: Route; addrAss: Assertion; socket: AsyncSocket; step: Preserve[Cap]) =
|
|
## Relay a dataspace over an open `AsyncSocket`.
|
|
proc socketWriter(packet: sink Packet): Future[void] =
|
|
socket.send(cast[string](encode(packet)))
|
|
const recvSize = 0x2000
|
|
var shutdownCap: Cap
|
|
let
|
|
reenable = turn.facet.preventInertCheck()
|
|
connectionClosedCap = newCap(turn, ShutdownEntity())
|
|
discard bootActor("socket") do (turn: var Turn):
|
|
var ops = RelayActorOptions(
|
|
packetWriter: socketWriter,
|
|
initialOid: 0.Oid.some)
|
|
spawnRelay("socket", turn, ds, addrAss, ops) do (turn: var Turn; relay: Relay):
|
|
let facet = turn.facet
|
|
var wireBuf = newBufferedDecoder(0)
|
|
proc recvCb(pktFut: Future[string]) {.gcsafe.} =
|
|
if pktFut.failed:
|
|
run(facet) do (turn: var Turn): stopActor(turn)
|
|
else:
|
|
var buf = pktFut.read
|
|
if buf.len == 0:
|
|
run(facet) do (turn: var Turn): stopActor(turn)
|
|
else:
|
|
feed(wireBuf, buf)
|
|
var (success, pr) = decode(wireBuf)
|
|
if success:
|
|
dispatch(relay, pr)
|
|
if not socket.isClosed:
|
|
socket.recv(recvSize).addCallback(recvCb)
|
|
socket.recv(recvSize).addCallback(recvCb)
|
|
turn.facet.actor.atExit do (turn: var Turn): close(socket)
|
|
discard publish(turn, connectionClosedCap, true)
|
|
shutdownCap = newCap(turn, ShutdownEntity())
|
|
onPublish(turn, ds, TransportConnection[Cap] ? {0: ?addrAss, 2: ?Rejected[Cap]}) do (detail: Assertion):
|
|
raise newException(IOError, $detail)
|
|
onPublish(turn, ds, TransportConnection[Cap] ? {0: ?addrAss, 2: ?ResolvedAccepted[Cap]}) do (gatekeeper: Cap):
|
|
run(gatekeeper.relay) do (turn: var Turn):
|
|
reenable()
|
|
discard publish(turn, shutdownCap, true)
|
|
proc duringCallback(turn: var Turn; ass: Assertion; h: Handle): TurnAction =
|
|
let facet = inFacet(turn) do (turn: var Turn):
|
|
var resolvePath = ResolvePath[Cap](route: route, `addr`: addrAss)
|
|
if resolvePath.resolved.fromPreserve(ass):
|
|
discard publish(turn, ds, resolvePath)
|
|
else:
|
|
raise newException(CatchableError, "unhandled gatekeeper response " & $ass)
|
|
proc action(turn: var Turn) =
|
|
stop(turn, facet)
|
|
result = action
|
|
discard publish(turn, gatekeeper, Resolve[Cap](
|
|
step: step,
|
|
observer: newCap(turn, during(duringCallback)),
|
|
))
|
|
|
|
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Tcp; step: Preserve[Cap]) =
|
|
## Relay a dataspace over TCP.
|
|
let socket = newAsyncSocket(
|
|
domain = AF_INET,
|
|
sockType = SOCK_STREAM,
|
|
protocol = IPPROTO_TCP,
|
|
buffered = false)
|
|
let fut = connect(socket, transport.host, Port transport.port)
|
|
addCallback(fut, turn) do (turn: var Turn):
|
|
connect(turn, ds, route, transport.toPreserve(Cap), socket, step)
|
|
|
|
proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Unix; step: Preserve[Cap]) =
|
|
## Relay a dataspace over a UNIX socket.
|
|
let socket = newAsyncSocket(
|
|
domain = AF_UNIX,
|
|
sockType = SOCK_STREAM,
|
|
protocol = cast[Protocol](0),
|
|
buffered = false)
|
|
let fut = connectUnix(socket, transport.path)
|
|
addCallback(fut, turn) do (turn: var Turn):
|
|
connect(turn, ds, route, transport.toPreserve(Cap), socket, step)
|
|
|
|
import std/asyncfile
|
|
|
|
const stdinReadSize = 128
|
|
|
|
proc connectStdio*(turn: var Turn; ds: Cap) =
|
|
## Connect to an external dataspace over stdin and stdout.
|
|
proc stdoutWriter(packet: sink Packet): Future[void] =
|
|
result = newFuture[void]()
|
|
var buf = encode(packet)
|
|
doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len
|
|
flushFile(stdout)
|
|
complete result
|
|
var opts = RelayActorOptions(
|
|
packetWriter: stdoutWriter,
|
|
initialCap: ds,
|
|
initialOid: 0.Oid.some)
|
|
spawnRelay("stdio", turn, ds, Stdio().toPreserve(Cap), opts) do (turn: var Turn; relay: Relay):
|
|
let
|
|
facet = turn.facet
|
|
asyncStdin = openAsync("/dev/stdin") # this is universal now?
|
|
close(stdin)
|
|
facet.actor.atExit do (turn: var Turn):
|
|
close(asyncStdin)
|
|
var wireBuf = newBufferedDecoder(0)
|
|
proc readCb(pktFut: Future[string]) {.gcsafe.} =
|
|
if not pktFut.failed:
|
|
var buf = pktFut.read
|
|
if buf.len == 0:
|
|
run(facet) do (turn: var Turn): stopActor(turn)
|
|
else:
|
|
feed(wireBuf, buf)
|
|
var (success, pr) = decode(wireBuf)
|
|
if success:
|
|
dispatch(relay, pr)
|
|
asyncStdin.read(stdinReadSize).addCallback(readCb)
|
|
asyncStdin.read(stdinReadSize).addCallback(readCb)
|
|
|
|
type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
|
|
|
|
proc envRoute*: Route[Cap] =
|
|
var text = getEnv("SYNDICATE_ROUTE")
|
|
if text == "":
|
|
var tx = (getEnv("XDG_RUNTIME_DIR", "/run/user/1000") / "dataspace").toPreserve(Cap)
|
|
result.transports = @[initRecord("unix", tx)]
|
|
result.pathSteps = @[capabilities.mint().toPreserve(Cap)]
|
|
else:
|
|
var pr = parsePreserves(text, Cap)
|
|
if not result.fromPreserve(pr):
|
|
raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr)
|
|
|
|
proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
|
|
var
|
|
unix: Unix
|
|
tcp: Tcp
|
|
stdio: Stdio
|
|
doAssert(route.transports.len == 1, "only a single transport supported for routes")
|
|
doAssert(route.pathSteps.len < 2, "multiple path steps not supported for routes")
|
|
if unix.fromPreserve route.transports[0]:
|
|
connect(turn, ds, route, unix, route.pathSteps[0])
|
|
elif tcp.fromPreserve route.transports[0]:
|
|
connect(turn, ds, route, tcp, route.pathSteps[0])
|
|
elif stdio.fromPreserve route.transports[0]:
|
|
connectStdio(turn, ds)
|
|
bootProc(turn, ds)
|
|
else:
|
|
raise newException(ValueError, "unsupported route")
|
|
|
|
during(turn, ds, ResolvePath[Cap] ? { 0: ?route, 3: ?ResolvedAccepted[Cap]}) do (dest: Cap):
|
|
bootProc(turn, dest)
|