diff --git a/src/syndicate.nim b/src/syndicate.nim index 595028d..e8f289d 100644 --- a/src/syndicate.nim +++ b/src/syndicate.nim @@ -182,7 +182,7 @@ macro during*(turn: untyped; ds: Cap; pattern: Pattern; publishBody: untyped) = callbackSym = callbackProc[0] result = quote do: if `argCount` != 0 and `pattern`.analyse.capturePaths.len != `argCount`: - raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments") + raiseAssert($`pattern`.analyse.capturePaths.len & " values captured but handler has " & $`argCount` & " arguments - " & $`pattern`) `callbackProc` discard observe(`turn`, `ds`, `pattern`, during(`callbackSym`)) diff --git a/src/syndicate/relays.nim b/src/syndicate/relays.nim index ea87818..80e3c96 100644 --- a/src/syndicate/relays.nim +++ b/src/syndicate/relays.nim @@ -263,26 +263,31 @@ proc newRelay(turn: var Turn; opts: RelayOptions; setup: RelaySetup): Relay = discard result.facet.preventInertCheck() setup(turn, result) -proc spawnRelay*(name: string; turn: var Turn; opts: RelayActorOptions; setup: RelaySetup): Future[Cap] = - var fut = newFuture[Cap]"spawnRelay" +proc transportConnectionResolve(addrAss: Assertion; ds: Cap): gatekeeper.TransportConnection[Cap] = + result.`addr` = addrAss + result.resolved = Resolved[Cap](orKind: ResolvedKind.accepted) + result.resolved.accepted.responderSession = ds + +proc spawnRelay*(name: string; turn: var Turn; ds: Cap; addrAss: Assertion; opts: RelayActorOptions; setup: RelaySetup) = discard spawn(name, turn) do (turn: var Turn): let relay = newRelay(turn, opts, setup) if not opts.initialCap.isNil: var exported: seq[WireSymbol] discard rewriteCapOut(relay, opts.initialCap, exported) - if opts.initialOid.isSome: - var imported: seq[WireSymbol] - var wr = WireRef( - orKind: WireRefKind.mine, - mine: WireRefMine(oid: opts.initialOid.get)) - fut.complete rewriteCapIn(relay, turn.facet, wr, imported) - else: - fut.complete(nil) opts.nextLocalOid.map do (oid: Oid): relay.nextLocalOid = if oid == 0.Oid: 1.Oid else: oid - fut + if opts.initialOid.isSome: + var + imported: seq[WireSymbol] + wr = WireRef( + orKind: WireRefKind.mine, + mine: WireRefMine(oid: opts.initialOid.get)) + res = rewriteCapIn(relay, turn.facet, wr, imported) + discard publish(turn, ds, transportConnectionResolve(addrAss, res)) + else: + discard publish(turn, ds, transportConnectionResolve(addrAss, ds)) when defined(posix): import std/asyncnet @@ -300,9 +305,8 @@ export Tcp when defined(posix): export Unix - proc connect*(turn: var Turn; socket: AsyncSocket; step: Preserve[Cap]; bootProc: ConnectProc) = + proc connect*(turn: var Turn; ds: Cap; route: Route; addrAss: Assertion; socket: AsyncSocket; step: Preserve[Cap]) = ## Relay a dataspace over an open `AsyncSocket`. - ## *`bootProc` may be called multiple times for multiple remote gatekeepers.* proc socketWriter(packet: sink Packet): Future[void] = socket.send(cast[string](encode(packet))) const recvSize = 0x2000 @@ -310,12 +314,11 @@ when defined(posix): let reenable = turn.facet.preventInertCheck() connectionClosedCap = newCap(turn, ShutdownEntity()) - fut = newFuture[void]"connect" discard bootActor("socket") do (turn: var Turn): var ops = RelayActorOptions( packetWriter: socketWriter, initialOid: 0.Oid.some) - let refFut = spawnRelay("socket", turn, ops) do (turn: var Turn; relay: Relay): + spawnRelay("socket", turn, ds, addrAss, ops) do (turn: var Turn; relay: Relay): let facet = turn.facet var wireBuf = newBufferedDecoder(0) proc recvCb(pktFut: Future[string]) {.gcsafe.} = @@ -336,22 +339,19 @@ when defined(posix): turn.facet.actor.atExit do (turn: var Turn): close(socket) discard publish(turn, connectionClosedCap, true) shutdownCap = newCap(turn, ShutdownEntity()) - addCallback(refFut) do (): - let gatekeeper = read refFut + onPublish(turn, ds, TransportConnection[Cap] ? {0: ?addrAss, 2: ?Rejected[Cap]}) do (detail: Assertion): + raise newException(IOError, $detail) + onPublish(turn, ds, TransportConnection[Cap] ? {0: ?addrAss, 2: ?ResolvedAccepted[Cap]}) do (gatekeeper: Cap): run(gatekeeper.relay) do (turn: var Turn): reenable() discard publish(turn, shutdownCap, true) - proc duringCallback(turn: var Turn; a: Assertion; h: Handle): TurnAction = + proc duringCallback(turn: var Turn; ass: Assertion; h: Handle): TurnAction = let facet = inFacet(turn) do (turn: var Turn): - var - accepted: ResolvedAccepted[Cap] - rejected: Rejected[Cap] - if fromPreserve(accepted, a): - bootProc(turn, accepted.responderSession) - elif fromPreserve(rejected, a): - fail(fut, newException(CatchableError, $rejected.detail)) + var resolvePath = ResolvePath[Cap](route: route, `addr`: addrAss) + if resolvePath.resolved.fromPreserve(ass): + discard publish(turn, ds, resolvePath) else: - fail(fut, newException(CatchableError, $a)) + raise newException(CatchableError, "unhandled gatekeeper response " & $ass) proc action(turn: var Turn) = stop(turn, facet) result = action @@ -359,12 +359,9 @@ when defined(posix): step: step, observer: newCap(turn, during(duringCallback)), )) - fut.complete() - asyncCheck(turn, fut) - proc connect*(turn: var Turn; transport: Tcp; step: Preserve[Cap]; bootProc: ConnectProc) = + proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Tcp; step: Preserve[Cap]) = ## Relay a dataspace over TCP. - ## *`bootProc` may be called multiple times for multiple remote gatekeepers.* let socket = newAsyncSocket( domain = AF_INET, sockType = SOCK_STREAM, @@ -372,11 +369,10 @@ when defined(posix): buffered = false) let fut = connect(socket, transport.host, Port transport.port) addCallback(fut, turn) do (turn: var Turn): - connect(turn, socket, step, bootProc) + connect(turn, ds, route, transport.toPreserve(Cap), socket, step) - proc connect*(turn: var Turn; transport: Unix; step: Preserve[Cap]; bootProc: ConnectProc) = + proc connect*(turn: var Turn; ds: Cap; route: Route; transport: Unix; step: Preserve[Cap]) = ## Relay a dataspace over a UNIX socket. - ## *`bootProc` may be called multiple times for multiple remote gatekeepers.* let socket = newAsyncSocket( domain = AF_UNIX, sockType = SOCK_STREAM, @@ -384,7 +380,7 @@ when defined(posix): buffered = false) let fut = connectUnix(socket, transport.path) addCallback(fut, turn) do (turn: var Turn): - connect(turn, socket, step, bootProc) + connect(turn, ds, route, transport.toPreserve(Cap), socket, step) import std/asyncfile @@ -392,15 +388,17 @@ when defined(posix): proc connectStdio*(turn: var Turn; ds: Cap) = ## Connect to an external dataspace over stdin and stdout. - proc stdoutWriter(packet: sink Packet): Future[void] {.async.} = + proc stdoutWriter(packet: sink Packet): Future[void] = + result = newFuture[void]() var buf = encode(packet) doAssert writeBytes(stdout, buf, 0, buf.len) == buf.len flushFile(stdout) + complete result var opts = RelayActorOptions( packetWriter: stdoutWriter, initialCap: ds, initialOid: 0.Oid.some) - asyncCheck spawnRelay("stdio", turn, opts) do (turn: var Turn; relay: Relay): + spawnRelay("stdio", turn, ds, Stdio().toPreserve(Cap), opts) do (turn: var Turn; relay: Relay): let facet = turn.facet asyncStdin = openAsync("/dev/stdin") # this is universal now? @@ -421,8 +419,6 @@ when defined(posix): asyncStdin.read(stdinReadSize).addCallback(readCb) asyncStdin.read(stdinReadSize).addCallback(readCb) - proc connectStdio*(ds: Cap; turn: var Turn) {.deprecated.} = connectStdio(turn, ds) - type BootProc* = proc (turn: var Turn; ds: Cap) {.gcsafe.} proc envRoute*: Route[Cap] = @@ -444,11 +440,14 @@ proc resolve*(turn: var Turn; ds: Cap; route: Route; bootProc: BootProc) = doAssert(route.transports.len == 1, "only a single transport supported for routes") doAssert(route.pathSteps.len < 2, "multiple path steps not supported for routes") if unix.fromPreserve route.transports[0]: - connect(turn, unix, route.pathSteps[0], bootProc) + connect(turn, ds, route, unix, route.pathSteps[0]) elif tcp.fromPreserve route.transports[0]: - connect(turn, tcp, route.pathSteps[0], bootProc) + connect(turn, ds, route, tcp, route.pathSteps[0]) elif stdio.fromPreserve route.transports[0]: connectStdio(turn, ds) bootProc(turn, ds) else: raise newException(ValueError, "unsupported route") + + during(turn, ds, ResolvePath[Cap] ? { 0: ?route, 3: ?ResolvedAccepted[Cap]}) do (dest: Cap): + bootProc(turn, dest) diff --git a/syndicate.nimble b/syndicate.nimble index 97a20eb..a856304 100644 --- a/syndicate.nimble +++ b/syndicate.nimble @@ -1,6 +1,6 @@ # Package -version = "20231028" +version = "20231102" author = "Emery Hemingway" description = "Syndicated actors for conversational concurrency" license = "Unlicense" @@ -9,4 +9,4 @@ srcDir = "src" # Dependencies -requires "https://github.com/khchen/hashlib.git#84e0247555e4488594975900401baaf5bbbfb531", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20231028" +requires "https://github.com/khchen/hashlib.git#84e0247555e4488594975900401baaf5bbbfb531", "nim >= 2.0.0", "https://git.syndicate-lang.org/ehmry/preserves-nim.git >= 20231102" diff --git a/tests/test_chat.nim b/tests/test_chat.nim index ab6124c..01400d7 100644 --- a/tests/test_chat.nim +++ b/tests/test_chat.nim @@ -45,7 +45,9 @@ proc main = of "user", "username": username = val - if username != "": + if username == "": + stderr.writeLine "--user: unspecified" + else: runActor("chat") do (turn: var Turn; root: Cap): resolve(turn, root, route) do (turn: var Turn; ds: Cap): chat(turn, ds, username)