diff --git a/nix_actor.nimble b/nix_actor.nimble index af6bf4c..c88a142 100644 --- a/nix_actor.nimble +++ b/nix_actor.nimble @@ -1,4 +1,4 @@ -version = "20230530" +version = "20230607" author = "Emery Hemingway" description = "Syndicated Nix Actor" license = "Unlicense" diff --git a/protocol.prs b/protocol.prs index 7260783..4e437ec 100644 --- a/protocol.prs +++ b/protocol.prs @@ -1,6 +1,5 @@ version 1 . - Build = . Realise = . @@ -12,3 +11,26 @@ Eval = . Narinfo = . Dict = {symbol: any ...:...} . + +FieldInt = int . +FieldString = string . +Field = int / string . +Fields = [Field ...] . + +ActionStart = . +ActionStop = . +ActionResult = . + +; TODO: why not make target a singleton? +Missing = . + +PathInfo = . diff --git a/src/nix_actor.nim b/src/nix_actor.nim index fbf9458..443dc0b 100644 --- a/src/nix_actor.nim +++ b/src/nix_actor.nim @@ -1,12 +1,12 @@ # SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense -import std/[asyncdispatch, httpclient, json, osproc, parseutils, strutils, tables] +import std/[asyncdispatch, httpclient, json, os, osproc, parseutils, strutils, tables] import preserves, preserves/jsonhooks import syndicate from syndicate/protocols/dataspace import Observe import ./nix_actor/protocol -import ./nix_actor/[main, store] +import ./nix_actor/[main, sockets] type Value = Preserve[void] @@ -39,7 +39,6 @@ proc narinfo(turn: var Turn; ds: Ref; path: string) = client = newAsyncHttpClient() url = "https://cache.nixos.org/" & path & ".narinfo" futGet = get(client, url) - stderr.writeLine "fetching ", url addCallback(futGet, turn) do (turn: var Turn): let resp = read(futGet) if code(resp) != Http200: @@ -108,13 +107,21 @@ proc bootNixFacet(ds: Ref; turn: var Turn): Facet = during(turn, ds, ?Observe(pattern: !Narinfo) ?? {0: grabLit()}) do (path: string): narinfo(turn, ds, path) -type Args {.preservesDictionary.} = object - dataspace: Ref +type + RefArgs {.preservesDictionary.} = object + dataspace: Ref + SocketArgs {.preservesDictionary.} = object + `listen-socket`: string proc bootNixActor(root: Ref; turn: var Turn) = connectStdio(root, turn) - during(turn, root, ?Args) do (ds: Ref): + during(turn, root, ?RefArgs) do (ds: Ref): discard bootNixFacet(ds, turn) + during(turn, root, ?SocketArgs) do (path: string): + removeFile(path) + asyncCheck(turn, emulateSocket(path)) + do: + removeFile(path) initNix() # Nix lib isn't actually being used but it's nice to know that it links. runActor("main", bootNixActor) diff --git a/src/nix_actor/protocol.nim b/src/nix_actor/protocol.nim index 08989ef..cf893f4 100644 --- a/src/nix_actor/protocol.nim +++ b/src/nix_actor/protocol.nim @@ -1,6 +1,6 @@ import - preserves, std/tables + preserves, std/sets, std/tables type Eval* {.preservesRecord: "eval".} = object @@ -12,22 +12,87 @@ type `drv`*: string `outputs`*: seq[string] + Missing* {.preservesRecord: "missing".} = object + `targets`*: HashSet[string] + `willBuild`*: HashSet[string] + `willSubstitute`*: HashSet[string] + `unknown`*: HashSet[string] + `downloadSize`*: BiggestInt + `narSize`*: BiggestInt + Narinfo* {.preservesRecord: "narinfo".} = object `path`*: string `info`*: Dict + FieldKind* {.pure.} = enum + `int`, `string` + `Field`* {.preservesOr.} = object + case orKind*: FieldKind + of FieldKind.`int`: + `int`*: int + + of FieldKind.`string`: + `string`*: string + + + PathInfo* {.preservesRecord: "path-info".} = object + `path`*: string + `deriver`*: string + `narHash`*: string + `references`*: HashSet[string] + `registrationTime`*: BiggestInt + `narSize`*: BiggestInt + `ultimate`*: bool + `sigs`*: HashSet[string] + `ca`*: string + Dict* = Table[Symbol, Preserve[void]] Build* {.preservesRecord: "nix-build".} = object `input`*: string `output`*: Preserve[void] + Fields* = seq[Field] + ActionStart* {.preservesRecord: "start".} = object + `id`*: BiggestInt + `level`*: BiggestInt + `type`*: BiggestInt + `text`*: string + `fields`*: Fields + `parent`*: BiggestInt + + FieldString* = string Instantiate* {.preservesRecord: "instantiate".} = object `expr`*: string `options`*: Dict `result`*: Preserve[void] -proc `$`*(x: Eval | Realise | Narinfo | Dict | Build | Instantiate): string = + FieldInt* = BiggestInt + ActionStop* {.preservesRecord: "stop".} = object + `id`*: BiggestInt + + ActionResult* {.preservesRecord: "result".} = object + `id`*: BiggestInt + `type`*: BiggestInt + `fields`*: Fields + +proc `$`*(x: Eval | Realise | Missing | Narinfo | Field | PathInfo | Dict | + Build | + Fields | + ActionStart | + FieldString | + Instantiate | + FieldInt | + ActionStop | + ActionResult): string = `$`(toPreserve(x)) -proc encode*(x: Eval | Realise | Narinfo | Dict | Build | Instantiate): seq[byte] = +proc encode*(x: Eval | Realise | Missing | Narinfo | Field | PathInfo | Dict | + Build | + Fields | + ActionStart | + FieldString | + Instantiate | + FieldInt | + ActionStop | + ActionResult): seq[byte] = encode(toPreserve(x)) diff --git a/src/nix_actor/sockets.nim b/src/nix_actor/sockets.nim new file mode 100644 index 0000000..8aa9157 --- /dev/null +++ b/src/nix_actor/sockets.nim @@ -0,0 +1,464 @@ +# SPDX-FileCopyrightText: ☭ Emery Hemingway +# SPDX-License-Identifier: Unlicense + +import std/[asyncdispatch, asyncnet, os, sets, strtabs, strutils] +from std/nativesockets import AF_INET, AF_UNIX, SOCK_STREAM, Protocol + +import preserves +import ./protocol, ./store + +{.pragma: workerProtocol, importc, header: "worker-protocol.hh".} + +type Word = uint64 +proc `$`(w: Word): string = toHex(w) + +const + WORKER_MAGIC_1 = 0x6E697863 + WORKER_MAGIC_2 = 0x6478696F + PROTOCOL_VERSION = 256 or 35 + + STDERR_NEXT = 0x6F6C6d67 + STDERR_READ = 0x64617461 + STDERR_WRITE = 0x64617416 + STDERR_LAST = 0x616C7473 + STDERR_ERROR = 0x63787470 + STDERR_START_ACTIVITY = 0x53545254 + STDERR_STOP_ACTIVITY = 0x53544F50 + STDERR_RESULT = 0x52534C54 + + wopIsValidPath = 1 + wopHasSubstitutes = 3 + wopQueryReferrers = 6 + wopAddToStore = 7 + wopBuildPaths = 9 + wopEnsurePath = 10 + wopAddTempRoot = 11 + wopAddIndirectRoot = 12 + wopSyncWithGC = 13 + wopFindRoots = 14 + wopSetOptions = 19 + wopCollectGarbage = 20 + wopQuerySubstitutablePathInfo = 21 + wopQueryAllValidPaths = 23 + wopQueryFailedPaths = 24 + wopClearFailedPaths = 25 + wopQueryPathInfo = 26 + wopQueryPathFromHashPart = 29 + wopQuerySubstitutablePathInfos = 30 + wopQueryValidPaths = 31 + wopQuerySubstitutablePaths = 32 + wopQueryValidDerivers = 33 + wopOptimiseStore = 34 + wopVerifyStore = 35 + wopBuildDerivation = 36 + wopAddSignatures = 37 + wopNarFromPath = 38 + wopAddToStoreNar = 39 + wopQueryMissing = 40 + wopQueryDerivationOutputMap = 41 + wopRegisterDrvOutput = 42 + wopQueryRealisation = 43 + wopAddMultipleToStore = 44 + wopAddBuildLog = 45 + wopBuildPathsWithResults = 46 + +type + ProtocolError = object of IOError + Version = uint16 + Session = ref object + client, daemon: AsyncSocket + buffer: seq[Word] + version: Version + +func major(version: Version): uint16 = version and 0xff00 +func minor(version: Version): uint16 = version and 0x00ff + +proc daemonSocketPath: string = + getEnv( + "NIX_DAEMON_SOCKET_PATH", + "/nix/var/nix/daemon-socket/socket") + +proc send(session: Session; sock: AsyncSocket; words: varargs[Word]): Future[void] = + for i, word in words: session.buffer[i] = word + send(sock, addr session.buffer[0], words.len shl 3) + +proc send(session: Session; sock: AsyncSocket; s: string): Future[void] = + let wordCount = (s.len + 7) shr 3 + if wordCount > session.buffer.len: setLen(session.buffer, wordCount) + session.buffer[0] = Word s.len + if wordCount > 0: + session.buffer[wordCount] = 0x00 + copyMem(addr session.buffer[1], unsafeAddr s[0], s.len) + send(sock, addr session.buffer[0], (1 + wordCount) shl 3) + +proc recvWord(sock: AsyncSocket): Future[Word] {.async.} = + var w: Word + let n = await recvInto(sock, addr w, sizeof(Word)) + if n != sizeof(Word): raise newException(ProtocolError, "short read of word") + return w + +proc passWord(a, b: AsyncSocket): Future[Word] {.async.} = + var w = await recvWord(a) + await send(b, addr w, sizeof(Word)) + return w + +proc recvString(sock: AsyncSocket): Future[string] {.async.} = + let w = await recvWord(sock) + let stringLen = int w + var s: string + if stringLen > 0: + s.setLen((stringLen + 7) and (not 7)) + let n = await recvInto(sock, addr s[0], s.len) + if n != s.len: + raise newException(ProtocolError, "short string read") + setLen(s, stringLen) + return s + +proc passString(session: Session; a, b: AsyncSocket): Future[string] {.async.} = + var s = await recvString(a) + await send(session, b, s) + return s + +proc passStringSeq(session: Session; a, b: AsyncSocket): Future[seq[string]] {.async.} = + let count = int(await passWord(a, b)) + var strings = newSeq[string](count) + for i in 0..= 16 + info.ultimate = (await passDaemonWord(session)) != 0 + info.sigs = await passDaemonStringSet(session) + info.ca = await passDaemonString(session) + return info + +proc passChunks(session: Session; a, b: AsyncSocket): Future[int] {.async.} = + var total: int + while true: + let chunkLen = int(await passWord(a, b)) + if chunkLen == 0: + break + else: + let wordLen = (chunkLen + 7) shr 3 + if session.buffer.len < wordLen: setLen(session.buffer, wordLen) + let recvLen = await recvInto(a, addr session.buffer[0], chunkLen) + # each chunk must be recved contiguously + if recvLen != chunkLen: + raise newException(ProtocolError, "invalid chunk read") + await send(b, addr session.buffer[0], recvLen) + inc(total, recvLen) + return total + +proc passClientChunks(session: Session): Future[int] = + passChunks(session, session.client, session.daemon) + +proc passErrorDaemonError(session: Session) {.async.} = + let + typ = await passDaemonString(session) + assert typ == "Error" + let + lvl = await passDaemonWord(session) + name = await passDaemonString(session) + msg = passDaemonString(session) + havePos = await passDaemonWord(session) + assert havePos == 0 + let + nrTraces = await passDaemonWord(session) + for i in 1..nrTraces: + let havPos = await passDaemonWord(session) + assert havPos == 0 + let msg = await passDaemonString(session) + +proc passDaemonFields(session: Session): Future[Fields] {.async.} = + let count = await passDaemonWord(session) + var fields = newSeq[Field](count) + for i in 0..= 26 + await passErrorDaemonError(session) + + of STDERR_NEXT: + let s = await passDaemonString(session) + + of STDERR_START_ACTIVITY: + var act: ActionStart + act.id = BiggestInt(await passDaemonWord(session)) + act.level = BiggestInt(await passDaemonWord(session)) + act.`type` = BiggestInt(await passDaemonWord(session)) + act.text = await passDaemonString(session) + act.fields = await passDaemonFields(session) + act.parent = BiggestInt(await passDaemonWord(session)) + + of STDERR_STOP_ACTIVITY: + var act: ActionStop + act.id = BiggestInt(await passDaemonWord(session)) + + of STDERR_RESULT: + var act: ActionResult + act.id = BiggestInt(await passDaemonWord(session)) + act.`type` = BiggestInt(await passDaemonWord(session)) + act.fields = await passDaemonFields(session) + + of STDERR_LAST: + break + + else: + raise newException(ProtocolError, "unknown work verb " & $word) + +#[ +proc fromClient(miss: var Missing; socket: AsyncSocket) {.async.} = + result.targets = await passClientStringSet(session) + +proc fromDaemon(miss: var Missing; socket: AsyncSocket) {.async.} = + miss.willBuild = await passDaemonStringSet(session) + miss.willSubstitute = await passDaemonStringSet(session) + miss.unknown = await passDaemonStringSet(session) + miss.downloadSize = BiggestInt await passDaemonWord(session) + miss.narSize = BiggestInt await passDaemonWord(session) +]# + +proc loop(session: Session) {.async.} = + var chunksTotal: int + try: + while not session.client.isClosed: + let wop = await passClientWord(session) + case wop + of wopIsValidPath: + let path = await passClientString(session) + stderr.writeLine "wopIsValidPath ", path + await passWork(session) + let word = await passDaemonWord(session) + + of wopAddToStore: + assert session.version.minor >= 25 + let + name = await passClientString(session) + caMethod = await passClientString(session) + refs = await passClientStringSet(session) + repairBool = await passClientWord(session) + stderr.writeLine "wopAddToStore ", name + let n = await passClientChunks(session) + inc(chunksTotal, n) + await passWork(session) + let info = await passDaemonValidPathInfo(session, true) + + of wopAddTempRoot: + let path = await passClientString(session) + stderr.writeLine "wopAddTempRoot ", path + await passWork(session) + discard await passDaemonWord(session) + + of wopAddIndirectRoot: + let path = await passClientString(session) + stderr.writeLine "wopAddIndirectRoot ", path + await passWork(session) + discard await passDaemonWord(session) + + of wopSetOptions: + discard passClientWord(session) # keepFailed + discard passClientWord(session) # keepGoing + discard passClientWord(session) # tryFallback + discard passClientWord(session) # verbosity + discard passClientWord(session) # maxBuildJobs + discard passClientWord(session) # maxSilentTime + discard passClientWord(session) # useBuildHook + discard passClientWord(session) # verboseBuild + discard passClientWord(session) # logType + discard passClientWord(session) # printBuildTrace + discard passClientWord(session) # buildCores + discard passClientWord(session) # useSubstitutes + assert session.version.minor >= 12 + let overrides = await passClientStringMap(session) + await passWork(session) + + of wopQueryPathInfo: + assert session.version >= 17 + let path = await passClientString(session) + stderr.writeLine "wopQueryPathInfo ", path + await passWork(session) + let valid = await passDaemonWord(session) + if valid != 0: + var info = await passDaemonValidPathInfo(session, false) + info.path = path + stderr.writeLine "wopQueryPathInfo ", $info + + of wopQueryMissing: + assert session.version >= 30 + var miss: Missing + miss.targets = await passClientStringSet(session) + await passWork(session) + miss.willBuild = await passDaemonStringSet(session) + miss.willSubstitute = await passDaemonStringSet(session) + miss.unknown = await passDaemonStringSet(session) + miss.downloadSize = BiggestInt await passDaemonWord(session) + miss.narSize = BiggestInt await passDaemonWord(session) + stderr.writeLine "wopQueryMissing ", $miss + + of wopBuildPathsWithResults: + assert session.version >= 34 + let + drvs = await passClientStringSeq(session) + buildMode = await passClientWord(session) + stderr.writeLine "wopBuildPathsWithResults drvs ", $drvs + await passWork(session) + let count = await passDaemonWord(session) + for _ in 1..count: + let + path = await passDaemonString(session) + status = await passDaemonWord(session) + errorMsg = await passDaemonString(session) + timesBUild = await passDaemonWord(session) + isNonDeterministic = await passDaemonWord(session) + startTime = await passDaemonWord(session) + stopTime = await passDaemonWord(session) + outputs = await passDaemonStringMap(session) + + else: + stderr.writeLine "unknown worker op ", wop.int + break + except ProtocolError as err: + stderr.writeLine "connection terminated" + stderr.writeLine "chunk bytes transfered: ", formatSize(chunksTotal) + finally: + close(session.daemon) + close(session.client) + +proc handshake(listener: AsyncSocket): Future[Session] {.async.} = + ## Take the next connection from `listener` and return a `Session`. + let session = Session(buffer: newSeq[Word](1024)) # 8KiB + session.client = await listener.accept() + session.daemon = newAsyncSocket( + domain = AF_UNIX, + sockType = SOCK_STREAM, + protocol = cast[Protocol](0), + buffered = false) + await connectUnix(session.daemon, daemonSocketPath()) + let clientMagic = await passClientWord(session) + if clientMagic != WORKER_MAGIC_1: + raise newException(ProtocolError, "invalid protocol magic") + let daemonMagic = await passDaemonWord(session) + let daemonVersion = await passDaemonWord(session) + session.version = Version(await passClientWord(session)) + if session.version < 0x1_0a: + raise newException(ProtocolError, "obsolete protocol version") + assert session.version.minor >= 14 + discard await(passClientWord(session)) + # obsolete CPU affinity + assert session.version.minor >= 11 + discard await(passClientWord(session)) + # obsolete reserveSpace + assert session.version.minor >= 33 + let daemonVersionString = await passDaemonString(session) + assert daemonVersionString == $store.nixVersion + await passWork(session) + return session + +proc emulateSocket*(path: string) {.async, gcsafe.} = + let listener = newAsyncSocket( + domain = AF_UNIX, + sockType = SOCK_STREAM, + protocol = cast[Protocol](0), + buffered = false) + bindUnix(listener, path) + listen(listener) + stderr.writeLine "listening on ", path + while not listener.isClosed: + try: + let session = await handshake(listener) + assert not session.isNil + asyncCheck loop(session) + except ProtocolError as err: + stderr.writeLine "failed to service client, ", err.msg + +when isMainModule: + const path = "/tmp/worker.nix.socket" + if fileExists(path): removeFile(path) + try: waitFor emulateSocket(path) + finally: removeFile(path) diff --git a/src/nix_actor/store.nim b/src/nix_actor/store.nim index 997720d..5808608 100644 --- a/src/nix_actor/store.nim +++ b/src/nix_actor/store.nim @@ -6,10 +6,20 @@ {.passC: "'-DSYSTEM=\"x86_64-linux\"'".} +type StdString {.importcpp: "std::string", header: "".} = object +proc data(s: StdString): pointer {.importcpp: "#.data()".} +proc len(s: StdString): csize_t {.importcpp: "#.length()".} +proc `$`*(cpp: StdString): string = + result.setLen(cpp.len) + if result.len > 0: + copyMem(addr result[0], cpp.data, result.len) + type StorePath {.importcpp: "nix::StorePath", header: "path.hh".} = object discard +var nixVersion* {.importc: "nix::nixVersion", header: "globals.hh".}: StdString + proc isDerivation*(path: StorePath): bool {.importcpp.} type diff --git a/src/nix_actor/the_protocol.nim b/src/nix_actor/the_protocol.nim deleted file mode 100644 index d721694..0000000 --- a/src/nix_actor/the_protocol.nim +++ /dev/null @@ -1,14 +0,0 @@ - -import - std/typetraits, preserves - -type - Build* {.preservesRecord: "nix-build".} = object - `input`*: string - `output`*: string - -proc `$`*(x: Build): string = - `$`(toPreserve(x)) - -proc encode*(x: Build): seq[byte] = - encode(toPreserve(x))