relays: publish gatekeeper rather then pass by future

This commit is contained in:
Emery Hemingway 2023-11-02 13:53:59 +00:00
parent 090b4d77ef
commit 8bc0ee2ae5
4 changed files with 45 additions and 44 deletions

View File

@ -182,7 +182,7 @@ macro during*(turn: untyped; ds: Cap; pattern: Pattern; publishBody: untyped) =
callbackSym = callbackProc[0] callbackSym = callbackProc[0]
result = quote do: result = quote do:
if `argCount` != 0 and `pattern`.analyse.capturePaths.len != `argCount`: if `argCount` != 0 and `pattern`.analyse.capturePaths.len != `argCount`:
raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments") raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments - " & $`pattern`)
`callbackProc` `callbackProc`
discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`)) discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`))

View File

@ -263,26 +263,31 @@ proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay =
discard result.facet.preventInertCheck() discard result.facet.preventInertCheck()
setup(turn, result) setup(turn, result)
proc spawnRelay*(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup): Future[Cap] = proc transportConnectionResolve(addrAss: Assertion; ds: Cap): gatekeeper.TransportConnection[Cap] =
var fut = newFuture[Cap]"spawnRelay" result.`addr` = addrAss
result.resolved = Resolved[Cap](orKind: ResolvedKind.accepted)
result.resolved.accepted.responderSession = ds
proc spawnRelay*(name: string; turn: var Turn; ds: Cap; addrAss: Assertion; opts: RelayActorOptions; setup: RelaySetup) =
discard spawn(name, turn) do (turn: var Turn): discard spawn(name, turn) do (turn: var Turn):
let relay = newRelay(turn, opts, setup) let relay = newRelay(turn, opts, setup)
if not opts.initialCap.isNil: if not opts.initialCap.isNil:
var exported: seq[WireSymbol] var exported: seq[WireSymbol]
discard rewriteCapOut(relay, opts.initialCap, exported) discard rewriteCapOut(relay, opts.initialCap, exported)
if opts.initialOid.isSome:
var imported: seq[WireSymbol]
var wr = WireRef(
orKind: WireRefKind.mine,
mine: WireRefMine(oid: opts.initialOid.get))
fut.complete rewriteCapIn(relay, turn.facet, wr, imported)
else:
fut.complete(nil)
opts.nextLocalOid.map do (oid: Oid): opts.nextLocalOid.map do (oid: Oid):
relay.nextLocalOid = relay.nextLocalOid =
if oid == 0.Oid: 1.Oid if oid == 0.Oid: 1.Oid
else: oid else: oid
fut if opts.initialOid.isSome:
var
imported: seq[WireSymbol]
wr = WireRef(
orKind: WireRefKind.mine,
mine: WireRefMine(oid: opts.initialOid.get))
res = rewriteCapIn(relay, turn.facet, wr, imported)
discard publish(turn, ds, transportConnectionResolve(addrAss, res))
else:
discard publish(turn, ds, transportConnectionResolve(addrAss, ds))
when defined(posix): when defined(posix):
import std/asyncnet import std/asyncnet
@ -300,9 +305,8 @@ export Tcp
when defined(posix): when defined(posix):
export Unix export Unix
proc connect*(turn: var Turn; socket: AsyncSocket; step: Preserve[Cap]; bootProc: ConnectProc) = proc connect*(turn: var Turn; ds: Cap; route: Route; addrAss: Assertion; socket: AsyncSocket; step: Preserve[Cap]) =
## Relay a dataspace over an open `AsyncSocket`. ## Relay a dataspace over an open `AsyncSocket`.
## *`bootProc` may be called multiple times for multiple remote gatekeepers.*
proc socketWriter(packet: sink Packet): Future[void] = proc socketWriter(packet: sink Packet): Future[void] =
socket.send(cast[string](encode(packet))) socket.send(cast[string](encode(packet)))
const recvSize = 0x2000 const recvSize = 0x2000
@ -310,12 +314,11 @@ when defined(posix):
let let
reenable = turn.facet.preventInertCheck() reenable = turn.facet.preventInertCheck()
connectionClosedCap = newCap(turn, ShutdownEntity()) connectionClosedCap = newCap(turn, ShutdownEntity())
fut = newFuture[void]"connect"
discard bootActor("socket") do (turn: var Turn): discard bootActor("socket") do (turn: var Turn):
var ops = RelayActorOptions( var ops = RelayActorOptions(
packetWriter: socketWriter, packetWriter: socketWriter,
initialOid: 0.Oid.some) initialOid: 0.Oid.some)
let refFut = spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay): spawnRelay("socket", turn, ds, addrAss, ops) do (turn: var Turn; relay: Relay):
let facet = turn.facet let facet = turn.facet
var wireBuf = newBufferedDecoder(0) var wireBuf = newBufferedDecoder(0)
proc recvCb(pktFut: Future[string]) {.gcsafe.} = proc recvCb(pktFut: Future[string]) {.gcsafe.} =
@ -336,22 +339,19 @@ when defined(posix):
turn.facet.actor.atExit do (turn: var Turn): close(socket) turn.facet.actor.atExit do (turn: var Turn): close(socket)
discard publish(turn, connectionClosedCap, true) discard publish(turn, connectionClosedCap, true)
shutdownCap = newCap(turn, ShutdownEntity()) shutdownCap = newCap(turn, ShutdownEntity())
addCallback(refFut) do (): onPublish(turn, ds, TransportConnection[Cap] ? {0: ?addrAss, 2: ?Rejected[Cap]}) do (detail: Assertion):
let gatekeeper = read refFut raise newException(IOError, $detail)
onPublish(turn, ds, TransportConnection[Cap] ? {0: ?addrAss, 2: ?ResolvedAccepted[Cap]}) do (gatekeeper: Cap):
run(gatekeeper.relay) do (turn: var Turn): run(gatekeeper.relay) do (turn: var Turn):
reenable() reenable()
discard publish(turn, shutdownCap, true) discard publish(turn, shutdownCap, true)
proc duringCallback(turn: var Turn; a: Assertion; h: Handle): TurnAction = proc duringCallback(turn: var Turn; ass: Assertion; h: Handle): TurnAction =
let facet = inFacet(turn) do (turn: var Turn): let facet = inFacet(turn) do (turn: var Turn):
var var resolvePath = ResolvePath[Cap](route: route, `addr`: addrAss)
accepted: ResolvedAccepted[Cap] if resolvePath.resolved.fromPreserve(ass):
rejected: Rejected[Cap] discard publish(turn, ds, resolvePath)
if fromPreserve(accepted, a):
bootProc(turn, accepted.responderSession)
elif fromPreserve(rejected, a):
fail(fut, newException(CatchableError, $rejected.detail))
else: else:
fail(fut, newException(CatchableError, $a)) raise newException(CatchableError, "unhandled gatekeeper response " & $ass)
proc action(turn: var Turn) = proc action(turn: var Turn) =
stop(turn, facet) stop(turn, facet)
result = action result = action
@ -359,12 +359,9 @@ when defined(posix):
step: step, step: step,
observer: newCap(turn, during(duringCallback)), observer: newCap(turn, during(duringCallback)),
)) ))
fut.complete()
asyncCheck(turn, fut)
proc connect*(turn: var Turn; transport: Tcp; step: Preserve[Cap]; bootProc: ConnectProc) = proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Tcp; step: Preserve[Cap]) =
## Relay a dataspace over TCP. ## Relay a dataspace over TCP.
## *`bootProc` may be called multiple times for multiple remote gatekeepers.*
let socket = newAsyncSocket( let socket = newAsyncSocket(
domain = AF_INET, domain = AF_INET,
sockType = SOCK_STREAM, sockType = SOCK_STREAM,
@ -372,11 +369,10 @@ when defined(posix):
buffered = false) buffered = false)
let fut = connect(socket, transport.host, Port transport.port) let fut = connect(socket, transport.host, Port transport.port)
addCallback(fut, turn) do (turn: var Turn): addCallback(fut, turn) do (turn: var Turn):
connect(turn, socket, step, bootProc) connect(turn, ds, route, transport.toPreserve(Cap), socket, step)
proc connect*(turn: var Turn; transport: Unix; step: Preserve[Cap]; bootProc: ConnectProc) = proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Unix; step: Preserve[Cap]) =
## Relay a dataspace over a UNIX socket. ## Relay a dataspace over a UNIX socket.
## *`bootProc` may be called multiple times for multiple remote gatekeepers.*
let socket = newAsyncSocket( let socket = newAsyncSocket(
domain = AF_UNIX, domain = AF_UNIX,
sockType = SOCK_STREAM, sockType = SOCK_STREAM,
@ -384,7 +380,7 @@ when defined(posix):
buffered = false) buffered = false)
let fut = connectUnix(socket, transport.path) let fut = connectUnix(socket, transport.path)
addCallback(fut, turn) do (turn: var Turn): addCallback(fut, turn) do (turn: var Turn):
connect(turn, socket, step, bootProc) connect(turn, ds, route, transport.toPreserve(Cap), socket, step)
import std/asyncfile import std/asyncfile
@ -392,15 +388,17 @@ when defined(posix):
proc connectStdio*(turn: var Turn; ds: Cap) = proc connectStdio*(turn: var Turn; ds: Cap) =
## Connect to an external dataspace over stdin and stdout. ## Connect to an external dataspace over stdin and stdout.
proc stdoutWriter(packet: sink Packet): Future[void] {.async.} = proc stdoutWriter(packet: sink Packet): Future[void] =
result = newFuture[void]()
var buf = encode(packet) var buf = encode(packet)
doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len
flushFile(stdout) flushFile(stdout)
complete result
var opts = RelayActorOptions( var opts = RelayActorOptions(
packetWriter: stdoutWriter, packetWriter: stdoutWriter,
initialCap: ds, initialCap: ds,
initialOid: 0.Oid.some) initialOid: 0.Oid.some)
asyncCheck spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay): spawnRelay("stdio", turn, ds, Stdio().toPreserve(Cap), opts) do (turn: var Turn; relay: Relay):
let let
facet = turn.facet facet = turn.facet
asyncStdin = openAsync("/dev/stdin") # this is universal now? asyncStdin = openAsync("/dev/stdin") # this is universal now?
@ -421,8 +419,6 @@ when defined(posix):
asyncStdin.read(stdinReadSize).addCallback(readCb) asyncStdin.read(stdinReadSize).addCallback(readCb)
asyncStdin.read(stdinReadSize).addCallback(readCb) asyncStdin.read(stdinReadSize).addCallback(readCb)
proc connectStdio*(ds: Cap; turn: var Turn) {.deprecated.} = connectStdio(turn, ds)
type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.} type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.}
proc envRoute*: Route[Cap] = proc envRoute*: Route[Cap] =
@ -444,11 +440,14 @@ proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) =
doAssert(route.transports.len == 1, "only a single transport supported for routes") doAssert(route.transports.len == 1, "only a single transport supported for routes")
doAssert(route.pathSteps.len < 2, "multiple path steps not supported for routes") doAssert(route.pathSteps.len < 2, "multiple path steps not supported for routes")
if unix.fromPreserve route.transports[0]: if unix.fromPreserve route.transports[0]:
connect(turn, unix, route.pathSteps[0], bootProc) connect(turn, ds, route, unix, route.pathSteps[0])
elif tcp.fromPreserve route.transports[0]: elif tcp.fromPreserve route.transports[0]:
connect(turn, tcp, route.pathSteps[0], bootProc) connect(turn, ds, route, tcp, route.pathSteps[0])
elif stdio.fromPreserve route.transports[0]: elif stdio.fromPreserve route.transports[0]:
connectStdio(turn, ds) connectStdio(turn, ds)
bootProc(turn, ds) bootProc(turn, ds)
else: else:
raise newException(ValueError, "unsupported route") raise newException(ValueError, "unsupported route")
during(turn, ds, ResolvePath[Cap] ? { 0: ?route, 3: ?ResolvedAccepted[Cap]}) do (dest: Cap):
bootProc(turn, dest)

View File

@ -1,6 +1,6 @@
# Package # Package
version = "20231028" version = "20231102"
author = "Emery Hemingway" author = "Emery Hemingway"
description = "Syndicated actors for conversational concurrency" description = "Syndicated actors for conversational concurrency"
license = "Unlicense" license = "Unlicense"
@ -9,4 +9,4 @@ srcDir = "src"
# Dependencies # Dependencies
requires "https://github.com/khchen/hashlib.git#84e0247555e4488594975900401baaf5bbbfb531", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20231028" requires "https://github.com/khchen/hashlib.git#84e0247555e4488594975900401baaf5bbbfb531", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20231102"

View File

@ -45,7 +45,9 @@ proc main =
of "user", "username": of "user", "username":
username = val username = val
if username != "": if username == "":
stderr.writeLine "--user: unspecified"
else:
runActor("chat") do (turn: var Turn; root: Cap): runActor("chat") do (turn: var Turn; root: Cap):
resolve(turn, root, route) do (turn: var Turn; ds: Cap): resolve(turn, root, route) do (turn: var Turn; ds: Cap):
chat(turn, ds, username) chat(turn, ds, username)