syndicate-nim/src/syndicate/relays.nim

654 lines
22 KiB
Nim

# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[options, tables]
import preserves
import ../syndicate, ./durings, ./membranes, ./protocols/[gatekeeper, protocol, sturdy, transportAddress]
when defined(posix):
import ./capabilities
from std/os import getEnv, `/`
when defined(traceSyndicate):
when defined(posix):
template trace(args: varargs[untyped]): untyped = stderr.writeLine(args)
else:
template trace(args: varargs[untyped]): untyped = echo(args)
else:
template trace(args: varargs[untyped]): untyped = discard
export `$`
export Route, Stdio, Tcp, WebSocket, Unix
type
Assertion = Value
Event = protocol.Event
Handle = actors.Handle
Oid = sturdy.Oid
Turn = syndicate.Turn
WireRef = sturdy.WireRef
PacketWriter = proc (turn: var Turn; buf: seq[byte]) {.closure.}
RelaySetup = proc (turn: var Turn; relay: Relay) {.closure.}
Relay* = ref object
facet: Facet
inboundAssertions: Table[Handle,
tuple[localHandle: Handle, imported: seq[WireSymbol]]]
outboundAssertions: Table[Handle, seq[WireSymbol]]
pendingTurn: protocol.Turn
exported: Membrane
imported: Membrane
nextLocalOid: Oid
wireBuf: BufferedDecoder
packetWriter: PacketWriter
peer: Cap
SyncPeerEntity = ref object of Entity
relay: Relay
peer: Cap
handleMap: Table[Handle, Handle]
e: WireSymbol
RelayEntity = ref object of Entity
## https://synit.org/book/protocol.html#relay-entities
label: string
relay: Relay
proc releaseCapOut(r: Relay; e: WireSymbol) =
r.exported.drop e
method publish(spe: SyncPeerEntity; t: var Turn; a: AssertionRef; h: Handle) =
spe.handleMap[h] = publish(t, spe.peer, a.value)
method retract(se: SyncPeerEntity; t: var Turn; h: Handle) =
var other: Handle
if se.handleMap.pop(h, other):
retract(t, other)
method message(se: SyncPeerEntity; t: var Turn; a: AssertionRef) =
if not se.e.isNil:
se.relay.releaseCapOut(se.e)
message(t, se.peer, a.value)
method sync(se: SyncPeerEntity; t: var Turn; peer: Cap) =
sync(t, se.peer, peer)
proc newSyncPeerEntity(r: Relay; p: Cap): SyncPeerEntity =
SyncPeerEntity(relay: r, peer: p)
proc rewriteCapOut(relay: Relay; cap: Cap; exported: var seq[WireSymbol]): WireRef =
if cap.target of RelayEntity and cap.target.RelayEntity.relay == relay and cap.attenuation.len == 0:
result = WireRef(orKind: WireRefKind.yours, yours: WireRefYours(oid: cap.target.oid))
else:
var ws = grab(relay.exported, cap)
if ws.isNil:
ws = newWireSymbol(relay.exported, relay.nextLocalOid, cap)
inc relay.nextLocalOid
exported.add ws
result = WireRef(
orKind: WireRefKind.mine,
mine: WireRefMine(oid: ws.oid))
proc rewriteOut(relay: Relay; v: Assertion):
tuple[rewritten: Value, exported: seq[WireSymbol]] =
var exported: seq[WireSymbol]
result.rewritten = mapEmbeds(v) do (pr: Value) -> Value:
let o = pr.unembed(Cap); if o.isSome:
rewriteCapOut(relay, o.get, exported).toPreserves
else: pr
result.exported = exported
proc register(relay: Relay; v: Assertion; h: Handle): tuple[rewritten: Value, exported: seq[WireSymbol]] =
result = rewriteOut(relay, v)
relay.outboundAssertions[h] = result.exported
proc deregister(relay: Relay; h: Handle) =
var outbound: seq[WireSymbol]
if relay.outboundAssertions.pop(h, outbound):
for e in outbound: releaseCapOut(relay, e)
proc send(relay: Relay; turn: var Turn; rOid: protocol.Oid; m: Event) =
# TODO: don't send right away.
relay.pendingTurn.add TurnEvent(oid: rOid, event: m)
queueEffect(turn, relay.facet) do (turn: var Turn):
if relay.pendingTurn.len > 0:
var pkt = Packet(
orKind: PacketKind.Turn,
turn: move relay.pendingTurn)
trace "C: ", pkt
relay.packetWriter(turn, encode pkt)
proc send(re: RelayEntity; turn: var Turn; ev: Event) =
send(re.relay, turn, protocol.Oid re.oid, ev)
method publish(re: RelayEntity; t: var Turn; a: AssertionRef; h: Handle) =
re.send(t, Event(
orKind: EventKind.Assert,
`assert`: protocol.Assert(
assertion: re.relay.register(a.value, h).rewritten,
handle: h)))
method retract(re: RelayEntity; t: var Turn; h: Handle) =
re.relay.deregister h
re.send(t, Event(
orKind: EventKind.Retract,
retract: Retract(handle: h)))
method message(re: RelayEntity; turn: var Turn; msg: AssertionRef) =
var (value, exported) = rewriteOut(re.relay, msg.value)
assert(len(exported) == 0, "cannot send a reference in a message")
if len(exported) == 0:
re.send(turn, Event(orKind: EventKind.Message, message: Message(body: value)))
method sync(re: RelayEntity; turn: var Turn; peer: Cap) =
var
peerEntity = newSyncPeerEntity(re.relay, peer)
exported: seq[WireSymbol]
wr = rewriteCapOut(re.relay, turn.newCap(peerEntity), exported)
peerEntity.e = exported[0]
var ev = Event(orKind: EventKind.Sync)
ev.sync.peer = wr.toPreserves.embed
re.send(turn, ev)
proc newRelayEntity(label: string; r: Relay; o: Oid): RelayEntity =
RelayEntity(label: label, relay: r, oid: o)
using
relay: Relay
facet: Facet
proc lookupLocal(relay; oid: Oid): Cap =
let sym = relay.exported.grab oid
if not sym.isNil:
result = sym.cap
proc rewriteCapIn(relay; facet; n: WireRef, imported: var seq[WireSymbol]): Cap =
case n.orKind
of WireRefKind.mine:
var e = relay.imported.grab(n.mine.oid)
if e.isNil:
e = newWireSymbol(
relay.imported,
n.mine.oid,
newCap(facet, newRelayEntity("rewriteCapIn", relay, n.mine.oid)),
)
imported.add e
result = e.cap
of WireRefKind.yours:
result = relay.lookupLocal(n.yours.oid)
if result.isNil:
result = newInertCap()
elif n.yours.attenuation.len > 0:
result = attenuate(result, n.yours.attenuation)
proc rewriteIn(relay; facet; v: Value):
tuple[rewritten: Assertion; imported: seq[WireSymbol]] =
var imported: seq[WireSymbol]
result.rewritten = mapEmbeds(v) do (pr: Value) -> Value:
let wr = pr.preservesTo WireRef; if wr.isSome:
result = rewriteCapIn(relay, facet, wr.get, imported).embed
else:
result = pr
result.imported = imported
proc close(r: Relay) = discard
proc dispatch(relay: Relay; turn: var Turn; cap: Cap; event: Event) =
case event.orKind
of EventKind.Assert:
let (a, imported) = rewriteIn(relay, turn.facet, event.assert.assertion)
relay.inboundAssertions[event.assert.handle] = (publish(turn, cap, a), imported,)
of EventKind.Retract:
let remoteHandle = event.retract.handle
var outbound: tuple[localHandle: Handle, imported: seq[WireSymbol]]
if relay.inboundAssertions.pop(remoteHandle, outbound):
for e in outbound.imported: relay.imported.drop e
turn.retract(outbound.localHandle)
of EventKind.Message:
let (a, imported) = rewriteIn(relay, turn.facet, event.message.body)
assert imported.len == 0, "Cannot receive transient reference"
turn.message(cap, a)
of EventKind.Sync:
turn.sync(cap) do (turn: var Turn):
var
(v, imported) = rewriteIn(relay, turn.facet, event.sync.peer)
peer = unembed(v, Cap)
if peer.isSome:
turn.message(get peer, true)
for e in imported: relay.imported.drop e
proc dispatch(relay: Relay; v: Value) =
trace "S: ", v
run(relay.facet) do (t: var Turn):
var pkt: Packet
if pkt.fromPreserves(v):
case pkt.orKind
of PacketKind.Turn:
# https://synit.org/book/protocol.html#turn-packets
for te in pkt.turn:
let r = lookupLocal(relay, te.oid.Oid)
if not r.isNil:
dispatch(relay, t, r, te.event)
of PacketKind.Error:
# https://synit.org/book/protocol.html#error-packets
when defined(posix):
stderr.writeLine("Error from server: ", pkt.error.message, " (detail: ", pkt.error.detail, ")")
close relay
of PacketKind.Extension:
# https://synit.org/book/protocol.html#extension-packets
discard
else:
when defined(posix):
stderr.writeLine("discarding undecoded packet ", v)
proc recv(relay: Relay; buf: openarray[byte]; slice: Slice[int]) =
feed(relay.wireBuf, buf, slice)
var pr = decode(relay.wireBuf)
if pr.isSome: dispatch(relay, pr.get)
proc recv(relay: Relay; buf: openarray[byte]) =
feed(relay.wireBuf, buf)
var pr = decode(relay.wireBuf)
if pr.isSome: dispatch(relay, pr.get)
type
RelayOptions* = object of RootObj
packetWriter*: PacketWriter
RelayActorOptions* = object of RelayOptions
initialOid*: Option[Oid]
initialCap*: Cap
nextLocalOid*: Option[Oid]
proc spawnRelay(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup) =
linkActor(turn, name) do (turn: var Turn):
turn.preventInertCheck()
let relay = Relay(
facet: turn.facet,
packetWriter: opts.packetWriter,
wireBuf: newBufferedDecoder(0),
)
if not opts.initialCap.isNil:
var exported: seq[WireSymbol]
discard rewriteCapOut(relay, opts.initialCap, exported)
opts.nextLocalOid.map do (oid: Oid):
relay.nextLocalOid =
if oid == 0.Oid: 1.Oid
else: oid
assert opts.initialOid.isSome
if opts.initialOid.isSome:
var
imported: seq[WireSymbol]
wr = WireRef(
orKind: WireRefKind.mine,
mine: WireRefMine(oid: opts.initialOid.get))
relay.peer = rewriteCapIn(relay, turn.facet, wr, imported)
assert not relay.peer.isNil
setup(turn, relay)
proc rejected(detail: Value): Resolved =
result = Resolved(orKind: ResolvedKind.Rejected)
result.rejected.detail = detail
proc accepted(cap: Cap): Resolved =
result = Resolved(orKind: ResolvedKind.accepted)
result.accepted.responderSession = cap
type ShutdownEntity = ref object of Entity
method retract(e: ShutdownEntity; turn: var Turn; h: Handle) =
stopActor(e.facet)
when defined(posix):
import std/[oserrors, posix]
import pkg/sys/[files, handles, ioqueue, sockets]
export transportAddress.Unix
type StdioEntity = ref object of Entity
relay: Relay
stdin: AsyncFile
alive: bool
method message(entity: StdioEntity; turn: var Turn; ass: AssertionRef) =
if ass.value.preservesTo(ForceDisconnect).isSome:
entity.alive = false
proc loop(entity: StdioEntity) {.asyncio.} =
let buf = new seq[byte]
entity.alive = true
while entity.alive:
buf[].setLen(0x1000)
let n = read(entity.stdin, buf)
if n == 0:
stopActor(entity.facet)
else:
entity.relay.recv(buf[], 0..<n)
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Stdio) =
## Connect to an external dataspace over stdio.
let localDataspace = newDataspace(turn)
proc stdoutWriter(turn: var Turn; buf: seq[byte]) =
## Blocking write to stdout.
let n = writeBytes(stdout, buf, 0, buf.len)
flushFile(stdout)
if n != buf.len:
stopActor(turn)
var opts = RelayActorOptions(
packetWriter: stdoutWriter,
initialCap: localDataspace,
initialOid: 0.Oid.some,
)
spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay):
let
facet = turn.facet
fd = stdin.getOsFileHandle()
flags = fcntl(fd.cint, F_GETFL, 0)
if flags < 0: raiseOSError(osLastError())
if fcntl(fd.cint, F_SETFL, flags or O_NONBLOCK) < 0:
raiseOSError(osLastError())
let entity = StdioEntity(
facet: turn.facet, relay: relay, stdin: newAsyncFile(FD fd))
onStop(entity.facet) do (turn: var Turn):
entity.alive = false
close(entity.stdin)
# Close stdin to remove it from the ioqueue
discard trampoline:
whelp loop(entity)
publish(turn, ds, TransportConnection(
`addr`: ta.toPreserves,
control: newCap(entity, turn),
resolved: localDataspace.accepted,
))
proc connectStdio*(turn: var Turn; ds: Cap) =
## Connect to an external dataspace over stdin and stdout.
connectTransport(turn, ds, transportAddress.Stdio())
type
TcpEntity = ref object of Entity
relay: Relay
sock: AsyncConn[sockets.Protocol.TCP]
alive: bool
UnixEntity = ref object of Entity
relay: Relay
sock: AsyncConn[sockets.Protocol.Unix]
alive: bool
SocketEntity = TcpEntity | UnixEntity
method message(entity: SocketEntity; turn: var Turn; ass: AssertionRef) =
if ass.value.preservesTo(ForceDisconnect).isSome:
entity.alive = false
template bootSocketEntity() {.dirty.} =
proc setup(turn: var Turn) {.closure.} =
proc kill(turn: var Turn) =
if entity.alive:
entity.alive = false
close(entity.sock)
onStop(turn, kill)
publish(turn, ds, TransportConnection(
`addr`: ta.toPreserves,
control: newCap(entity, turn),
resolved: entity.relay.peer.accepted,
))
run(entity.relay.facet, setup)
let buf = new seq[byte]
entity.alive = true
while entity.alive:
buf[].setLen(0x1000)
let n = read(entity.sock, buf)
if n < 0: raiseOSError(osLastError())
elif n == 0:
stopActor(entity.facet)
else:
entity.relay.recv(buf[], 0..<n)
# the socket closes when the actor is stopped
proc boot(entity: TcpEntity; ta: transportAddress.Tcp; ds: Cap) {.asyncio.} =
entity.sock = connectTcpAsync(ta.host, Port ta.port)
bootSocketEntity()
proc boot(entity: UnixEntity; ta: transportAddress.Unix; ds: Cap) {.asyncio.} =
entity.sock = connectUnixAsync(ta.path)
bootSocketEntity()
template spawnSocketRelay() {.dirty.} =
proc writeConn(turn: var Turn; buf: seq[byte]) =
discard trampoline:
whelp write(entity.sock, buf)
var ops = RelayActorOptions(
packetWriter: writeConn,
initialOid: 0.Oid.some,
)
spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay):
entity.facet = turn.facet
entity.relay = relay
discard trampoline:
whelp boot(entity, ta, ds)
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Tcp) =
let entity = TcpEntity()
spawnSocketRelay()
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Unix) =
let entity = UnixEntity()
spawnSocketRelay()
elif defined(solo5):
import solo5_dispatcher
import taps
type
TcpEntity = ref object of Entity
relay: Relay
conn: Connection
decoder: BufferedDecoder
method message(entity: TcpEntity; turn: var Turn; ass: AssertionRef) =
if ass.value.preservesTo(ForceDisconnect).isSome:
entity.conn.abort()
proc connectTransport(turn: var Turn; ds: Cap; ta: transportAddress.Tcp) =
let entity = TcpEntity(facet: turn.facet)
proc writeConn(turn: var Turn; buf: seq[byte]) =
assert not entity.conn.isNil
entity.conn.batch:
entity.conn.send(buf)
var ops = RelayActorOptions(
packetWriter: writeConn,
initialOid: 0.Oid.some,
)
spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay):
entity.facet = turn.facet
entity.relay = relay
var ep = newRemoteEndpoint()
if ta.host.isIpAddress:
ep.with ta.host.parseIpAddress
else:
ep.withHostname ta.host
ep.with ta.port.Port
var tp = newTransportProperties()
tp.require "reliability"
tp.ignore "congestion-control"
tp.ignore "preserve-order"
var preconn = newPreconnection(
remote=[ep], transport=tp.some)
entity.conn = preconn.initiate()
entity.facet.onStop do (turn: var Turn):
entity.conn.close()
entity.conn.onConnectionError do (err: ref Exception):
run(entity.facet) do (turn: var Turn):
terminate(turn, err)
entity.conn.onClosed():
stop(entity.facet)
entity.conn.onReceivedPartial do (data: seq[byte]; ctx: MessageContext; eom: bool):
entity.relay.recv(data)
if eom:
stop(entity.facet)
else:
entity.conn.receive()
entity.conn.onReady do ():
entity.facet.run do (turn: var Turn):
publish(turn, ds, TransportConnection(
`addr`: ta.toPreserves,
control: newCap(entity, turn),
resolved: entity.relay.peer.accepted,
))
entity.conn.receive()
proc walk(turn: var Turn; ds, origin: Cap; route: Route; transOff, stepOff: int) =
if stepOff < route.pathSteps.len:
let
step = route.pathSteps[stepOff]
rejectPat = ResolvedPathStep?:{
0: ?(origin.embed), 1: ?step, 2: ?:Rejected}
acceptPat = ResolvedPathStep?:{
0: ?(origin.embed), 1: ?step, 2: ?:ResolvedAccepted}
onPublish(turn, ds, rejectPat) do (detail: Value):
publish(turn, ds, ResolvePath(
route: route,
`addr`: route.transports[transOff],
resolved: detail.rejected,
))
during(turn, ds, acceptPat) do (next: Cap):
walk(turn, ds, next, route, transOff, stepOff.succ)
else:
publish(turn, ds, ResolvePath(
route: route,
`addr`: route.transports[transOff],
resolved: origin.accepted,
))
proc connectRoute(turn: var Turn; ds: Cap; route: Route; transOff: int) =
let rejectPat = TransportConnection ?: {
0: ?route.transports[transOff],
2: ?:Rejected,
}
during(turn, ds, rejectPat) do (detail: Value):
publish(turn, ds, ResolvePath(
route: route,
`addr`: route.transports[transOff],
resolved: detail.rejected,
))
let acceptPat = TransportConnection?:{
0: ?route.transports[transOff],
2: ?:ResolvedAccepted,
}
onPublish(turn, ds, acceptPat) do (origin: Cap):
walk(turn, ds, origin, route, transOff, 0)
type StepCallback = proc (turn: var Turn; step: Value; origin, next: Cap) {.closure.}
proc spawnStepResolver(turn: var Turn; ds: Cap; stepType: Value; cb: StepCallback) =
let stepPat = grabRecord(stepType, grab())
let pat = ?Observe(pattern: ResolvedPathStep?:{1: stepPat}) ?? {0: grabLit(), 1: grab()}
during(turn, ds, pat) do (origin: Cap; stepDetail: Literal[Value]):
let step = toRecord(stepType, stepDetail.value)
proc duringCallback(turn: var Turn; ass: Value; h: Handle): TurnAction =
var res = ass.preservesTo Resolved
if res.isSome:
if res.get.orKind == ResolvedKind.accepted and
res.get.accepted.responderSession of Cap:
cb(turn, step, origin, res.get.accepted.responderSession.Cap)
else:
publish(turn, ds, ResolvedPathStep(
origin: origin, pathStep: step, resolved: res.get))
proc action(turn: var Turn) =
stop(turn)
result = action
publish(turn, origin, Resolve(
step: step, observer: newCap(turn, during(duringCallback))))
proc spawnRelays*(turn: var Turn; ds: Cap) =
## Spawn actors that manage routes and appeasing gatekeepers.
let transPat = ?Observe(pattern: !TransportConnection) ?? { 0: grab() }
# Use a generic pattern and type matching
# in the during handler because it is easy.
when defined(posix):
let stdioPat = ?Observe(pattern: TransportConnection?:{0: ?:Stdio})
during(turn, ds, stdioPat) do:
connectTransport(turn, ds, Stdio())
# TODO: unix pattern
during(turn, ds, transPat) do (ta: Literal[transportAddress.Unix]):
try: connectTransport(turn, ds, ta.value)
except exceptions.IOError as e:
publish(turn, ds, TransportConnection(
`addr`: ta.toPreserve,
resolved: rejected(embed e),
))
# TODO: tcp pattern
during(turn, ds, transPat) do (ta: Literal[transportAddress.Tcp]):
try: connectTransport(turn, ds, ta.value)
except exceptions.IOError as e:
publish(turn, ds, TransportConnection(
`addr`: ta.toPreserve,
resolved: rejected(embed e),
))
let resolvePat = ?Observe(pattern: !ResolvePath) ?? {0: grab()}
during(turn, ds, resolvePat) do (route: Literal[Route]):
for i, transAddr in route.value.transports:
connectRoute(turn, ds, route.value, i)
spawnStepResolver(turn, ds, "ref".toSymbol) do (
turn: var Turn, step: Value, origin: Cap, next: Cap):
publish(turn, ds, ResolvedPathStep(
origin: origin, pathStep: step, resolved: next.accepted))
type BootProc* = proc (turn: var Turn; ds: Cap) {.closure.}
proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
## Resolve `route` within `ds` and call `bootProc` with resolved capabilities.
during(turn, ds, ResolvePath ?: {0: ?route, 3: ?:ResolvedAccepted}) do (dst: Cap):
bootProc(turn, dst)
when defined(posix):
const defaultRoute* = "<route [<stdio>]>"
proc envRoute*: Route =
## Get an route to a Syndicate capability from the calling environment.
## On UNIX this is the SYNDICATE_ROUTE environmental variable with a
## fallack to a defaultRoute_.
## See https://git.syndicate-lang.org/syndicate-lang/syndicate-protocols/raw/branch/main/schemas/gatekeeper.prs.
var text = getEnv("SYNDICATE_ROUTE", defaultRoute)
if text == "":
var tx = (getEnv("XDG_RUNTIME_DIR", "/run/user/1000") / "dataspace").toPreserves
result.transports = @[initRecord("unix", tx)]
result.pathSteps = @[capabilities.mint().toPreserves]
else:
var pr = parsePreserves(text)
if not result.fromPreserves(pr):
raise newException(ValueError, "failed to parse $SYNDICATE_ROUTE " & $pr)
proc resolveEnvironment*(turn: var Turn; bootProc: BootProc) =
## Resolve a capability from the calling environment
## and call `bootProc`. See envRoute_.
var resolved = false
let
ds = newDataspace(turn)
pat = ResolvePath ?: {0: ?envRoute(), 3: ?:ResolvedAccepted}
during(turn, ds, pat) do (dst: Cap):
if not resolved:
resolved = true
bootProc(turn, dst)
do:
resolved = false
spawnRelays(turn, ds)
# TODO: define a runActor that comes preloaded with relaying