diff --git a/README.md b/README.md index ac21e94..f9c0b79 100644 --- a/README.md +++ b/README.md @@ -2,21 +2,47 @@ An actor for interacting with the [Nix](https://nixos.org/) daemon via the [Syndicated Actor Model](https://syndicate-lang.org/). +See [protocol.prs](./protocol.prs) for the Syndicate protocol [schema](https://preserves.dev/preserves-schema.html). + *This is only a proof-of-concept and is not yet useful.* ## Example configuration + +A demo script for the [Syndicate server](https://git.syndicate-lang.org/syndicate-lang/syndicate-rs), see https://synit.org/book/operation/scripting.html ``` ? $nixspace [ ? {}; in pkgs.hello" { } ?drv> [ - ? [ ] + ? [ + $log ! + ] ] - ? [] - ? [] + ? [ + $log ! + ] + + ? [ + $log ! + ] + + ? [ + $log ! + ] + + ? [ + $log ! - ? ?any [ - $log ! ] $config [ @@ -24,10 +50,12 @@ An actor for interacting with the [Nix](https://nixos.org/) daemon via the [Synd ? ?cap> [ $cap { dataspace: $nixspace + daemon-socket: "/nix/var/nix/daemon-socket/socket" + listen-socket: "/tmp/translator.worker.nix.socket" } ] ] diff --git a/nix_actor.nimble b/nix_actor.nimble index c88a142..5a555eb 100644 --- a/nix_actor.nimble +++ b/nix_actor.nimble @@ -1,4 +1,4 @@ -version = "20230607" +version = "20230610" author = "Emery Hemingway" description = "Syndicated Nix Actor" license = "Unlicense" diff --git a/protocol.prs b/protocol.prs index 4e437ec..894fe35 100644 --- a/protocol.prs +++ b/protocol.prs @@ -22,8 +22,9 @@ ActionStop = . ActionResult = . ; TODO: why not make target a singleton? -Missing = . +Missing = . +; TODO keep a few critical fields and move the rest into a dictionary PathInfo = 0: + var s = newString((stringLen + 7) and (not 7)) + let n = await recvInto(socket, addr s[0], s.len) + if n != s.len: + raise newException(ProtocolError, "short read") + setLen(s, stringLen) + return s + return "" + +proc recvString(session: Session): Future[string] = + recvString(session.socket) + +proc recvStringSeq(session: Session): Future[StringSeq] {.async.} = + let count = int(await recvWord(session.socket)) + var strings = newSeq[string](count) + for i in 0.. session.buffer.len: setLen(session.buffer, wordCount) - session.buffer[0] = Word s.len - if wordCount > 0: - session.buffer[wordCount] = 0x00 - copyMem(addr session.buffer[1], unsafeAddr s[0], s.len) - send(sock, addr session.buffer[0], (1 + wordCount) shl 3) +proc newSession(): Session = + newSession(newAsyncSocket( + domain = AF_UNIX, + sockType = SOCK_STREAM, + protocol = cast[Protocol](0), + buffered = false)) -proc recvWord(sock: AsyncSocket): Future[Word] {.async.} = - var w: Word - let n = await recvInto(sock, addr w, sizeof(Word)) - if n != sizeof(Word): raise newException(ProtocolError, "short read of word") - return w +proc send(session: Session; miss: Missing) {.async.} = + await send(session, STDERR_LAST) + await send(session, miss.willBuild) + await send(session, miss.willSubstitute) + await send(session, miss.unknown) + await send(session, Word miss.downloadSize) + await send(session, Word miss.narSize) -proc passWord(a, b: AsyncSocket): Future[Word] {.async.} = - var w = await recvWord(a) - await send(b, addr w, sizeof(Word)) - return w +proc send(session: Session; info: PathInfo) {.async.} = + await send(session, STDERR_LAST) + await send(session, 1) + if info.path != "": + await send(session, info.path) + await send(session, info.deriver) + await send(session, info.narHash) + await send(session, info.references) + await send(session, Word info.registrationTime) + await send(session, Word info.narSize) + await send(session, Word info.ultimate) + await send(session, info.sigs) + await send(session, info.ca) -proc recvString(sock: AsyncSocket): Future[string] {.async.} = - let w = await recvWord(sock) - let stringLen = int w - var s: string - if stringLen > 0: - s.setLen((stringLen + 7) and (not 7)) - let n = await recvInto(sock, addr s[0], s.len) - if n != s.len: - raise newException(ProtocolError, "short string read") - setLen(s, stringLen) - return s +proc serveClient(facet: Facet; ds: Ref; session: Session) {.async.} = + block: + let clientMagic = await recvWord(session) + if clientMagic != WORKER_MAGIC_1: + raise newException(ProtocolError, "invalid protocol magic") + await send(session, WORKER_MAGIC_2, PROTOCOL_VERSION) + let clientVersion = Version(await recvWord(session)) + if clientVersion < 0x1_21: + raise newException(ProtocolError, "obsolete protocol version") + assert clientVersion.minor >= 14 + discard await(recvWord(session)) + # obsolete CPU affinity + assert clientVersion.minor >= 11 + discard await(recvWord(session)) + # obsolete reserveSpace + assert clientVersion.minor >= 33 + await send(session, "0.0.0") + await send(session, STDERR_LAST) + while not session.socket.isClosed: + let wop = await recvWord(session.socket) + case wop -proc passString(session: Session; a, b: AsyncSocket): Future[string] {.async.} = - var s = await recvString(a) - await send(session, b, s) - return s + of wopQueryPathInfo: + let + path = await recvString(session) + pat = inject(?PathInfo, { 0: ?path }) + await send(session, STDERR_NEXT) + await send(session, $pat) + run(facet) do (turn: var Turn): + onPublish(turn, ds, pat) do ( + deriver: string, + narHash: string, + references: StringSet, + registrationTime: BiggestInt, + narSize: BiggestInt, + ultimate: bool, + sigs: StringSet, + ca: string + ): + var info = PathInfo( + deriver: deriver, + narHash: narHash, + references: references, + registrationTime: registrationTime, + narSize: narSize, + ultimate: ultimate, + sigs: sigs, + ca: ca, + ) + asyncCheck(turn, send(session, info)) -proc passStringSeq(session: Session; a, b: AsyncSocket): Future[seq[string]] {.async.} = - let count = int(await passWord(a, b)) - var strings = newSeq[string](count) - for i in 0..= 16 - info.ultimate = (await passDaemonWord(session)) != 0 - info.sigs = await passDaemonStringSet(session) - info.ca = await passDaemonString(session) - return info - -proc passChunks(session: Session; a, b: AsyncSocket): Future[int] {.async.} = - var total: int - while true: - let chunkLen = int(await passWord(a, b)) - if chunkLen == 0: - break - else: - let wordLen = (chunkLen + 7) shr 3 - if session.buffer.len < wordLen: setLen(session.buffer, wordLen) - let recvLen = await recvInto(a, addr session.buffer[0], chunkLen) - # each chunk must be recved contiguously - if recvLen != chunkLen: - raise newException(ProtocolError, "invalid chunk read") - await send(b, addr session.buffer[0], recvLen) - inc(total, recvLen) - return total - -proc passClientChunks(session: Session): Future[int] = - passChunks(session, session.client, session.daemon) - -proc passErrorDaemonError(session: Session) {.async.} = - let - typ = await passDaemonString(session) - assert typ == "Error" - let - lvl = await passDaemonWord(session) - name = await passDaemonString(session) - msg = passDaemonString(session) - havePos = await passDaemonWord(session) - assert havePos == 0 - let - nrTraces = await passDaemonWord(session) - for i in 1..nrTraces: - let havPos = await passDaemonWord(session) - assert havPos == 0 - let msg = await passDaemonString(session) - -proc passDaemonFields(session: Session): Future[Fields] {.async.} = - let count = await passDaemonWord(session) - var fields = newSeq[Field](count) - for i in 0..= 26 - await passErrorDaemonError(session) - - of STDERR_NEXT: - let s = await passDaemonString(session) - - of STDERR_START_ACTIVITY: - var act: ActionStart - act.id = BiggestInt(await passDaemonWord(session)) - act.level = BiggestInt(await passDaemonWord(session)) - act.`type` = BiggestInt(await passDaemonWord(session)) - act.text = await passDaemonString(session) - act.fields = await passDaemonFields(session) - act.parent = BiggestInt(await passDaemonWord(session)) - - of STDERR_STOP_ACTIVITY: - var act: ActionStop - act.id = BiggestInt(await passDaemonWord(session)) - - of STDERR_RESULT: - var act: ActionResult - act.id = BiggestInt(await passDaemonWord(session)) - act.`type` = BiggestInt(await passDaemonWord(session)) - act.fields = await passDaemonFields(session) - - of STDERR_LAST: - break + of wopSetOptions: + await discardWords(session, 12) + # 01 keepFailed + # 02 keepGoing + # 03 tryFallback + # 04 verbosity + # 05 maxBuildJobs + # 06 maxSilentTime + # 07 useBuildHook + # 08 verboseBuild + # 09 logType + # 10 printBuildTrace + # 11 buildCores + # 12 useSubstitutes + let overridePairCount = await recvWord(session) + for _ in 1..overridePairCount: + discard await (recvString(session)) + discard await (recvString(session)) + await send(session, STDERR_LAST) + # all options from the client are ingored else: - raise newException(ProtocolError, "unknown work verb " & $word) + let msg = "unhandled worker op " & $wop.int + await send(session, STDERR_NEXT) + await send(session, msg) + await send(session, STDERR_LAST) + close(session.socket) -#[ -proc fromClient(miss: var Missing; socket: AsyncSocket) {.async.} = - result.targets = await passClientStringSet(session) +proc serveClientSide*(facet: Facet; ds: Ref; listener: AsyncSocket) {.async.} = + while not listener.isClosed: + let + client = await accept(listener) + fut = serveClient(facet, ds, newSession(client)) + addCallback(fut) do (): + if not client.isClosed: + close(client) -proc fromDaemon(miss: var Missing; socket: AsyncSocket) {.async.} = - miss.willBuild = await passDaemonStringSet(session) - miss.willSubstitute = await passDaemonStringSet(session) - miss.unknown = await passDaemonStringSet(session) - miss.downloadSize = BiggestInt await passDaemonWord(session) - miss.narSize = BiggestInt await passDaemonWord(session) -]# - -proc loop(session: Session) {.async.} = - var chunksTotal: int - try: - while not session.client.isClosed: - let wop = await passClientWord(session) - case wop - of wopIsValidPath: - let path = await passClientString(session) - stderr.writeLine "wopIsValidPath ", path - await passWork(session) - let word = await passDaemonWord(session) - - of wopAddToStore: - assert session.version.minor >= 25 - let - name = await passClientString(session) - caMethod = await passClientString(session) - refs = await passClientStringSet(session) - repairBool = await passClientWord(session) - stderr.writeLine "wopAddToStore ", name - let n = await passClientChunks(session) - inc(chunksTotal, n) - await passWork(session) - let info = await passDaemonValidPathInfo(session, true) - - of wopAddTempRoot: - let path = await passClientString(session) - stderr.writeLine "wopAddTempRoot ", path - await passWork(session) - discard await passDaemonWord(session) - - of wopAddIndirectRoot: - let path = await passClientString(session) - stderr.writeLine "wopAddIndirectRoot ", path - await passWork(session) - discard await passDaemonWord(session) - - of wopSetOptions: - discard passClientWord(session) # keepFailed - discard passClientWord(session) # keepGoing - discard passClientWord(session) # tryFallback - discard passClientWord(session) # verbosity - discard passClientWord(session) # maxBuildJobs - discard passClientWord(session) # maxSilentTime - discard passClientWord(session) # useBuildHook - discard passClientWord(session) # verboseBuild - discard passClientWord(session) # logType - discard passClientWord(session) # printBuildTrace - discard passClientWord(session) # buildCores - discard passClientWord(session) # useSubstitutes - assert session.version.minor >= 12 - let overrides = await passClientStringMap(session) - await passWork(session) - - of wopQueryPathInfo: - assert session.version >= 17 - let path = await passClientString(session) - stderr.writeLine "wopQueryPathInfo ", path - await passWork(session) - let valid = await passDaemonWord(session) - if valid != 0: - var info = await passDaemonValidPathInfo(session, false) - info.path = path - stderr.writeLine "wopQueryPathInfo ", $info - - of wopQueryMissing: - assert session.version >= 30 - var miss: Missing - miss.targets = await passClientStringSet(session) - await passWork(session) - miss.willBuild = await passDaemonStringSet(session) - miss.willSubstitute = await passDaemonStringSet(session) - miss.unknown = await passDaemonStringSet(session) - miss.downloadSize = BiggestInt await passDaemonWord(session) - miss.narSize = BiggestInt await passDaemonWord(session) - stderr.writeLine "wopQueryMissing ", $miss - - of wopBuildPathsWithResults: - assert session.version >= 34 - let - drvs = await passClientStringSeq(session) - buildMode = await passClientWord(session) - stderr.writeLine "wopBuildPathsWithResults drvs ", $drvs - await passWork(session) - let count = await passDaemonWord(session) - for _ in 1..count: - let - path = await passDaemonString(session) - status = await passDaemonWord(session) - errorMsg = await passDaemonString(session) - timesBUild = await passDaemonWord(session) - isNonDeterministic = await passDaemonWord(session) - startTime = await passDaemonWord(session) - stopTime = await passDaemonWord(session) - outputs = await passDaemonStringMap(session) - - else: - stderr.writeLine "unknown worker op ", wop.int - break - except ProtocolError as err: - stderr.writeLine "connection terminated" - stderr.writeLine "chunk bytes transfered: ", formatSize(chunksTotal) - finally: - close(session.daemon) - close(session.client) - -proc handshake(listener: AsyncSocket): Future[Session] {.async.} = - ## Take the next connection from `listener` and return a `Session`. - let session = Session(buffer: newSeq[Word](1024)) # 8KiB - session.client = await listener.accept() - session.daemon = newAsyncSocket( - domain = AF_UNIX, - sockType = SOCK_STREAM, - protocol = cast[Protocol](0), - buffered = false) - await connectUnix(session.daemon, daemonSocketPath()) - let clientMagic = await passClientWord(session) - if clientMagic != WORKER_MAGIC_1: - raise newException(ProtocolError, "invalid protocol magic") - let daemonMagic = await passDaemonWord(session) - let daemonVersion = await passDaemonWord(session) - session.version = Version(await passClientWord(session)) - if session.version < 0x1_0a: - raise newException(ProtocolError, "obsolete protocol version") - assert session.version.minor >= 14 - discard await(passClientWord(session)) - # obsolete CPU affinity - assert session.version.minor >= 11 - discard await(passClientWord(session)) - # obsolete reserveSpace - assert session.version.minor >= 33 - let daemonVersionString = await passDaemonString(session) - assert daemonVersionString == $store.nixVersion - await passWork(session) - return session - -proc emulateSocket*(path: string) {.async, gcsafe.} = +proc bootClientSide*(facet: Facet; ds: Ref; socketPath: string) = let listener = newAsyncSocket( domain = AF_UNIX, sockType = SOCK_STREAM, protocol = cast[Protocol](0), buffered = false) - bindUnix(listener, path) + onStop(facet) do (turn: var Turn): + close(listener) + removeFile(socketPath) + removeFile(socketPath) + bindUnix(listener, socketPath) listen(listener) - stderr.writeLine "listening on ", path - while not listener.isClosed: - try: - let session = await handshake(listener) - assert not session.isNil - asyncCheck loop(session) - except ProtocolError as err: - stderr.writeLine "failed to service client, ", err.msg + asyncCheck(facet, serveClientSide(facet, ds, listener)) -when isMainModule: - const path = "/tmp/worker.nix.socket" - if fileExists(path): removeFile(path) - try: waitFor emulateSocket(path) - finally: removeFile(path) +proc connectDaemon(session: Session; socketPath: string) {.async.} = + await connectUnix(session.socket, socketPath) + await send(session, WORKER_MAGIC_1) + let daemonMagic = await recvWord(session) + if daemonMagic != WORKER_MAGIC_2: + raise newException(ProtocolError, "bad magic from daemon") + let daemonVersion = await recvWord(session) + session.version = min(Version daemonVersion, PROTOCOL_VERSION) + await send(session, Word session.version) + await send(session, 0) # CPU affinity + await send(session, 0) # reserve space + if session.version.minor >= 33: + discard await recvString(session) # version + if session.version.minor >= 35: + discard await recvWord(session) # remoteTrustsUs + await recvWork(session) + +proc queryMissing(session: Session; targets: StringSeq): Future[Missing] {.async.} = + var miss = Missing(targets: targets) + await send(session, wopQueryMissing) + await send(session, miss.targets) + await recvWork(session) + miss.willBuild = await recvStringSet(session) + miss.willSubstitute = await recvStringSet(session) + miss.unknown = await recvStringSet(session) + miss.downloadSize = BiggestInt await recvWord(session) + miss.narSize = BiggestInt await recvWord(session) + return miss + +proc queryPathInfo(session: Session; path: string): Future[PathInfo] {.async.} = + var info = PathInfo(path: path) + await send(session, wopQueryPathInfo) + await send(session, info.path) + await recvWork(session) + let valid = await recvWord(session) + if valid != 0: + info.deriver = await recvString(session) + info.narHash = await recvString(session) + info.references = await recvStringSet(session) + info.registrationTime = BiggestInt await recvWord(session) + info.narSize = BiggestInt await recvWord(session) + info.ultimate = (await recvWord(session)) != 0 + info.sigs = await recvStringSet(session) + info.ca = await recvString(session) + return info + +proc bootDaemonSide*(turn: var Turn; ds: Ref; socketPath: string) = + + during(turn, ds, ?Observe(pattern: !Missing) ?? {0: grab()}) do (a: Preserve[Ref]): + # cannot use `grabLit` here because an array is a compound + let + session = newSession() + fut = connectDaemon(session, socketPath) + addCallback(fut, turn) do (turn: var Turn): + read(fut) + var targets: StringSeq + doAssert targets.fromPreserve(unpackLiterals(a)) + # unpack ]> + let missFut = queryMissing(session, targets) + addCallback(missFut, turn) do (turn: var Turn): + var miss = read(missFut) + discard publish(turn, ds, miss) + do: + close(session) + + during(turn, ds, ?Observe(pattern: !PathInfo) ?? {0: grabLit()}) do (path: string): + let + session = newSession() + fut = connectDaemon(session, socketPath) + addCallback(fut, turn) do (turn: var Turn): + read(fut) + let infoFut = queryPathInfo(session, path) + addCallback(infoFut, turn) do (turn: var Turn): + var info = read(infoFut) + discard publish(turn, ds, info) + do: + close(session)