diff --git a/README.md b/README.md index f9c0b79..1d07f44 100644 --- a/README.md +++ b/README.md @@ -34,8 +34,7 @@ A demo script for the [Syndicate server](https://git.syndicate-lang.org/syndicat } }> ] - ? [ + ? [ $log ! = 1.6.10", "syndicate >= 20230530" diff --git a/protocol.prs b/protocol.prs index 894fe35..335f549 100644 --- a/protocol.prs +++ b/protocol.prs @@ -1,19 +1,19 @@ version 1 . +StringSeq = [string ...] . +StringSet = #{string} . +AttrSet = {symbol: any ...:...} . + Build = . -Realise = . +Realise = . -Instantiate = . +Instantiate = . Eval = . -Narinfo = . +Narinfo = . -Dict = {symbol: any ...:...} . - -FieldInt = int . -FieldString = string . Field = int / string . Fields = [Field ...] . @@ -22,16 +22,43 @@ ActionStop = . ActionResult = . ; TODO: why not make target a singleton? -Missing = . +Missing = . -; TODO keep a few critical fields and move the rest into a dictionary -PathInfo = . +; Path info for the worker protocol version 35. +LegacyPathAttrs = { + deriver: string + narHash: string + references: StringSeq ; prefer a set + registrationTime: int + narSize: int + ultimate: bool + sigs: StringSet + ca: string +} . + +AddToStoreClientAttrs = { + name: string + eris: bytes + ca-method: symbol + references: StringSeq ; prefer a set +} . + +; Intersection of the attributes needed to add a path to a store +; and the attributes returned by the daemon after adding the path. +AddToStoreAttrs = { + name: string + eris: bytes + ca-method: symbol + references: StringSeq ; prefer a set + + deriver: string + narHash: string + registrationTime: int + narSize: int + ultimate: bool + sigs: StringSet + ca: string +} . + +; Any collection of attributes describing a store path. +PathInfo = . diff --git a/src/nix_actor.nim b/src/nix_actor.nim index 0473aa3..2ad40e4 100644 --- a/src/nix_actor.nim +++ b/src/nix_actor.nim @@ -1,18 +1,20 @@ # SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense -import std/[asyncdispatch, httpclient, json, osproc, parseutils, strutils, tables] +import std/[json, osproc, parseutils, strutils, tables] +import eris/memory_stores import preserves, preserves/jsonhooks import syndicate from syndicate/protocols/dataspace import Observe import ./nix_actor/protocol -import ./nix_actor/[main, sockets] +import ./nix_actor/[clients, daemons] + type Value = Preserve[void] Observe = dataspace.Observe[Ref] -proc parseArgs(args: var seq[string]; opts: Dict) = +proc parseArgs(args: var seq[string]; opts: AttrSet) = for sym, val in opts: add(args, "--" & $sym) if not val.isString "": @@ -20,7 +22,8 @@ proc parseArgs(args: var seq[string]; opts: Dict) = if fromPreserve(js, val): add(args, $js) else: stderr.writeLine "invalid option --", sym, " ", val -proc parseNarinfo(info: var Dict; text: string) = +#[ +proc parseNarinfo(info: var AttrSet; text: string) = var key, val: string off: int @@ -50,6 +53,7 @@ proc narinfo(turn: var Turn; ds: Ref; path: string) = var narinfo = Narinfo(path: path) parseNarinfo(narinfo.info, read(futBody)) discard publish(turn, ds, narinfo) +]# # I never link to openssl if I can avoid it. proc build(spec: string): Build = var execOutput = execProcess("nix", args = ["build", "--json", "--no-link", spec], options = {poUsePath}) @@ -104,8 +108,10 @@ proc bootNixFacet(turn: var Turn; ds: Ref): Facet = ass.result = eval(ass) discard publish(turn, ds, ass) + #[ during(turn, ds, ?Observe(pattern: !Narinfo) ?? {0: grabLit()}) do (path: string): narinfo(turn, ds, path) + ]# type RefArgs {.preservesDictionary.} = object @@ -115,17 +121,15 @@ type DaemonSideArgs {.preservesDictionary.} = object `daemon-socket`: string -proc bootNixActor(root: Ref; turn: var Turn) = +runActor("main") do (root: Ref; turn: var Turn): + let store = newMemoryStore() connectStdio(root, turn) during(turn, root, ?RefArgs) do (ds: Ref): discard bootNixFacet(turn, ds) during(turn, root, ?ClientSideArgs) do (socketPath: string): - bootClientSide(turn.facet, ds, socketPath) + bootClientSide(turn, ds, store, socketPath) during(turn, root, ?DaemonSideArgs) do (socketPath: string): - bootDaemonSide(turn, ds, socketPath) - -initNix() # Nix lib isn't actually being used but it's nice to know that it links. -runActor("main", bootNixActor) + bootDaemonSide(turn, ds, store, socketPath) diff --git a/src/nix_actor.nim.cfg b/src/nix_actor.nim.cfg deleted file mode 100644 index 1f92ea5..0000000 --- a/src/nix_actor.nim.cfg +++ /dev/null @@ -1 +0,0 @@ -define:ssl diff --git a/src/nix_actor/clients.nim b/src/nix_actor/clients.nim new file mode 100644 index 0000000..86c50e3 --- /dev/null +++ b/src/nix_actor/clients.nim @@ -0,0 +1,172 @@ +# SPDX-FileCopyrightText: ☭ Emery Hemingway +# SPDX-License-Identifier: Unlicense + +import std/[asyncdispatch, asyncnet, os, sets, strutils, tables] +from std/algorithm import sort + +import eris +import preserves, syndicate +import ./protocol, ./sockets + +proc sendNext(client: Session; msg: string) {.async.} = + await send(client, STDERR_NEXT) + await send(client, msg) + +proc sendWorkEnd(client: Session): Future[void] = + send(client, STDERR_LAST) + +proc send(client: Session; miss: Missing) {.async.} = + await sendWorkEnd(client) + await send(client, miss.willBuild) + await send(client, miss.willSubstitute) + await send(client, miss.unknown) + await send(client, Word miss.downloadSize) + await send(client, Word miss.narSize) + +proc send(client: Session; info: LegacyPathAttrs) {.async.} = + await send(client, info.deriver) + await send(client, info.narHash) + await send(client, info.references) + await send(client, Word info.registrationTime) + await send(client, Word info.narSize) + await send(client, Word info.ultimate) + await send(client, info.sigs) + await send(client, info.ca) + +proc sendValidInfo(client: Session; info: LegacyPathAttrs) {.async.} = + await sendWorkEnd(client) + await send(client, 1) # valid + await send(client, info) + +proc completeAddToStore(client: Session; path: string; info: LegacyPathAttrs) {.async.} = + await sendWorkEnd(client) + await send(client, path) + await send(client, info) + +proc serveClient(facet: Facet; ds: Ref; store: ErisStore; client: Session) {.async.} = + block: + let clientMagic = await recvWord(client) + if clientMagic != WORKER_MAGIC_1: + raise newException(ProtocolError, "invalid protocol magic") + await send(client, WORKER_MAGIC_2, PROTOCOL_VERSION) + let clientVersion = Version(await recvWord(client)) + if clientVersion < 0x1_21: + raise newException(ProtocolError, "obsolete protocol version") + assert clientVersion.minor >= 14 + discard await(recvWord(client)) + # obsolete CPU affinity + assert clientVersion.minor >= 11 + discard await(recvWord(client)) + # obsolete reserveSpace + assert clientVersion.minor >= 33 + await send(client, "0.0.0") + await sendWorkEnd(client) + while not client.socket.isClosed: + let wop = await recvWord(client.socket) + case wop + + of wopAddToStore: + let + name = await recvString(client) + caMethod = await recvString(client) + var storeRefs = await recvStringSeq(client) + sort(storeRefs) # sets not valid for patterns so use a sorted list + discard await recvWord(client) # repair, not implemented + let cap = await ingestChunks(client, store) + await sendNext(client, $cap & " " & name) + let attrsPat = inject(?AddToStoreAttrs, { + "name".toSymbol(Ref): ?name, + "ca-method".toSymbol(Ref): ?caMethod.toSymbol, + "references".toSymbol(Ref): ?storeRefs, + "eris".toSymbol(Ref): ?cap.bytes, + }) + # bind AddToStoreAttrs and override with some literal values + let pat = PathInfo ? { 0: grab(), 1: attrsPat } + run(facet) do (turn: var Turn): + onPublish(turn, ds, pat) do (path: string, ca: string, deriver: string, narHash: string, narSize: BiggestInt, regTime: BiggestInt, sigs: StringSet, ultimate: bool): + asyncCheck(turn, completeAddToStore(client, path, LegacyPathAttrs( + ca: ca, + deriver: deriver, + narHash: narHash, + narSize: narSize, + references: storeRefs, + registrationTime: regTime, + sigs: sigs, + ultimate: ultimate, + ))) + + of wopQueryPathInfo: + let + path = await recvString(client) + pat = PathInfo ? { 0: ?path, 1: grab() } + run(facet) do (turn: var Turn): + onPublish(turn, ds, pat) do (info: LegacyPathAttrs): + asyncCheck(turn, sendValidInfo(client, info)) + + of wopQueryMissing: + var targets = toPreserve(await recvStringSeq(client)) + sort(targets.sequence) + # would prefer to use a set but that doesn't translate into a pattern + let pat = inject(?Missing, { 0: ?targets }) + run(facet) do (turn: var Turn): + onPublish(turn, ds, pat) do ( + willBuild: StringSet, + willSubstitute: StringSet, + unknown: StringSet, + downloadSize: BiggestInt, + narSize: BiggestInt + ): + let miss = Missing( + willBuild: willBuild, + willSubstitute: willSubstitute, + unknown: unknown, + downloadSize: downloadSize, + narSize: narSize, + ) + asyncCheck(turn, send(client, miss)) + + of wopSetOptions: + await discardWords(client, 12) + # 01 keepFailed + # 02 keepGoing + # 03 tryFallback + # 04 verbosity + # 05 maxBuildJobs + # 06 maxSilentTime + # 07 useBuildHook + # 08 verboseBuild + # 09 logType + # 10 printBuildTrace + # 11 buildCores + # 12 useSubstitutes + let overridePairCount = await recvWord(client) + for _ in 1..overridePairCount: + discard await (recvString(client)) + discard await (recvString(client)) + await sendWorkEnd(client) + # all options from the client are ingored + + else: + let msg = "unhandled worker op " & $wop.int + await sendNext(client, msg) + await sendWorkEnd(client) + close(client.socket) + +proc serveClientSide(facet: Facet; ds: Ref; store: ErisStore; listener: AsyncSocket) {.async.} = + while not listener.isClosed: + let + client = await accept(listener) + fut = serveClient(facet, ds, store, newSession(client)) + addCallback(fut) do (): + if not client.isClosed: + close(client) + +proc bootClientSide*(turn: var Turn; ds: Ref; store: ErisStore; socketPath: string) = + let listener = newUnixSocket() + onStop(turn.facet) do (turn: var Turn): + close(listener) + removeFile(socketPath) + removeFile(socketPath) + bindUnix(listener, socketPath) + listen(listener) + asyncCheck(turn, serveClientSide(turn.facet, ds, store, listener)) diff --git a/src/nix_actor/daemons.nim b/src/nix_actor/daemons.nim new file mode 100644 index 0000000..8afa7a0 --- /dev/null +++ b/src/nix_actor/daemons.nim @@ -0,0 +1,205 @@ +# SPDX-FileCopyrightText: ☭ Emery Hemingway +# SPDX-License-Identifier: Unlicense + +import std/[asyncdispatch, asyncnet, sets, streams, strutils] +from std/algorithm import sort + +import eris +import preserves, syndicate +from syndicate/protocols/dataspace import Observe +import ./protocol, ./sockets + +type Value = Preserve[void] + +proc merge(items: varargs[Value]): Value = + # TODO: just a hack, not a proper imlementation + # https://preserves.dev/preserves.html#appendix-merging-values + result = initDictionary() + for e in items: + for (key, val) in e.pairs: + result[key] = val + cannonicalize(result) + +type Observe = dataspace.Observe[Ref] + +proc recvError(daemon: Session): Future[string] {.async.} = + discard #[typ]# await recvString(daemon) + discard #[lvl]# await recvWord(daemon) + discard #[name]# await recvString(daemon) + let msg = #[msg]# await recvString(daemon) + discard #[havePos]# await recvWord(daemon) + let nrTraces = await recvWord(daemon) + for i in 1..nrTraces: + discard #[havPos]# await recvWord(daemon) + discard #[msg]# await recvString(daemon) + return msg + +proc recvFields(daemon: Session) {.async.} = + let count = await recvWord(daemon) + for i in 0..= 33: + discard await recvString(daemon) # version + if daemon.version.minor >= 35: + discard await recvWord(daemon) # remoteTrustsUs + await recvWork(daemon) + +proc queryMissing(daemon: Session; targets: StringSeq): Future[Missing] {.async.} = + var miss = Missing(targets: targets) + await send(daemon, wopQueryMissing) + await send(daemon, miss.targets) + await recvWork(daemon) + miss.willBuild = await recvStringSet(daemon) + miss.willSubstitute = await recvStringSet(daemon) + miss.unknown = await recvStringSet(daemon) + miss.downloadSize = BiggestInt await recvWord(daemon) + miss.narSize = BiggestInt await recvWord(daemon) + return miss + +proc queryPathInfo(daemon: Session; path: string): Future[LegacyPathAttrs] {.async.} = + var info: LegacyPathAttrs + await send(daemon, wopQueryPathInfo) + await send(daemon, path) + await recvWork(daemon) + let valid = await recvWord(daemon) + if valid != 0: + info.deriver = await recvString(daemon) + info.narHash = await recvString(daemon) + info.references = await recvStringSeq(daemon) + sort(info.references) + info.registrationTime = BiggestInt await recvWord(daemon) + info.narSize = BiggestInt await recvWord(daemon) + info.ultimate = (await recvWord(daemon)) != 0 + info.sigs = await recvStringSet(daemon) + info.ca = await recvString(daemon) + return info + +proc recvLegacyPathAttrs(daemon: Session): Future[AddToStoreAttrs] {.async.} = + var info: AddToStoreAttrs + info.deriver = await recvString(daemon) + info.narHash = await recvString(daemon) + info.references = await recvStringSeq(daemon) + sort(info.references) + info.registrationTime = BiggestInt await recvWord(daemon) + info.narSize = BiggestInt await recvWord(daemon) + assert daemon.version.minor >= 16 + info.ultimate = (await recvWord(daemon)) != 0 + info.sigs = await recvStringSet(daemon) + info.ca = await recvString(daemon) + return info + +proc addToStore(daemon: Session; store: ErisStore; request: AddToStoreClientAttrs): Future[(string, AddToStoreAttrs)] {.async.} = + let + erisCap = parseCap(request.eris) + stream = newErisStream(store, erisCap) + await send(daemon, wopAddToStore) + await send(daemon, request.name) + await send(daemon, string request.`ca-method`) + await send(daemon, request.references) + await send(daemon, 0) # repair + await recoverChunks(daemon, store, erisCap) + await recvWork(daemon) + let path = await recvString(daemon) + var info = await recvLegacyPathAttrs(daemon) + info.eris = request.eris + info.`ca-method` = request.`ca-method` + info.name = request.name + info.references = request.references + return (path, info) + +proc callDaemon(turn: var Turn; path: string; action: proc (daemon: Session; turn: var Turn) {.gcsafe.}): Session = + let + daemon = newSession() + fut = connectDaemon(daemon, path) + addCallback(fut, turn) do (turn: var Turn): + read(fut) + action(daemon, turn) + return daemon + +proc bootDaemonSide*(turn: var Turn; ds: Ref; store: ErisStore; socketPath: string) = + + during(turn, ds, ?Observe(pattern: !Missing) ?? {0: grab()}) do (a: Preserve[Ref]): + # cannot use `grabLit` here because an array is a compound + # TODO: unpack to a `Pattern` + let daemon = callDaemon(turn, socketPath) do (daemon: Session; turn: var Turn): + var targets: StringSeq + doAssert targets.fromPreserve(unpackLiterals(a)) + # unpack ]> + let missFut = queryMissing(daemon, targets) + addCallback(missFut, turn) do (turn: var Turn): + close(daemon) + var miss = read(missFut) + discard publish(turn, ds, miss) + do: + close(daemon) + + during(turn, ds, ?Observe(pattern: !PathInfo) ?? {0: grabLit()}) do (path: string): + let daemon = callDaemon(turn, socketPath) do (daemon: Session; turn: var Turn): + let infoFut = queryPathInfo(daemon, path) + addCallback(infoFut, turn) do (turn: var Turn): + close(daemon) + var info = read(infoFut) + discard publish(turn, ds, initRecord("path", path.toPreserve, info.toPreserve)) + do: + close(daemon) + + during(turn, ds, ?Observe(pattern: !PathInfo) ?? {1: grabDict()}) do (pat: Value): + var daemon: Session + var request: AddToStoreClientAttrs + if request.fromPreserve(unpackLiterals pat): + daemon = callDaemon(turn, socketPath) do (daemon: Session; turn: var Turn): + let fut = addToStore(daemon, store, request) + addCallback(fut, turn) do (turn: var Turn): + close(daemon) + var (path, info) = read(fut) + discard publish(turn, ds, initRecord("path", path.toPreserve, info.toPreserve)) + do: + close(daemon) diff --git a/src/nix_actor/main.nim b/src/nix_actor/main.nim deleted file mode 100644 index 072e97e..0000000 --- a/src/nix_actor/main.nim +++ /dev/null @@ -1,7 +0,0 @@ -# SPDX-FileCopyrightText: ☭ Emery Hemingway -# SPDX-License-Identifier: Unlicense - -{.passC: staticExec("pkg-config --cflags nix-main").} -{.passL: staticExec("pkg-config --libs nix-main").} - -proc initNix*() {.importcpp: "nix::initNix", header: "shared.hh".} diff --git a/src/nix_actor/protocol.nim b/src/nix_actor/protocol.nim index 1b38082..6f31627 100644 --- a/src/nix_actor/protocol.nim +++ b/src/nix_actor/protocol.nim @@ -8,21 +8,32 @@ type `options`*: Table[Symbol, Preserve[void]] `result`*: Preserve[void] + AttrSet* = Table[Symbol, Preserve[void]] Realise* {.preservesRecord: "realise".} = object `drv`*: string - `outputs`*: seq[string] + `outputs`*: StringSeq + + LegacyPathAttrs* {.preservesDictionary.} = object + `ca`*: string + `deriver`*: string + `narHash`*: string + `narSize`*: BiggestInt + `references`*: StringSeq + `registrationTime`*: BiggestInt + `sigs`*: StringSet + `ultimate`*: bool Missing* {.preservesRecord: "missing".} = object - `targets`*: seq[string] - `willBuild`*: HashSet[string] - `willSubstitute`*: HashSet[string] - `unknown`*: HashSet[string] + `targets`*: StringSeq + `willBuild`*: StringSet + `willSubstitute`*: StringSet + `unknown`*: StringSet `downloadSize`*: BiggestInt `narSize`*: BiggestInt Narinfo* {.preservesRecord: "narinfo".} = object `path`*: string - `info`*: Dict + `info`*: AttrSet FieldKind* {.pure.} = enum `int`, `string` @@ -35,18 +46,30 @@ type `string`*: string - PathInfo* {.preservesRecord: "path-info".} = object - `path`*: string - `deriver`*: string - `narHash`*: string - `references`*: HashSet[string] - `registrationTime`*: BiggestInt - `narSize`*: BiggestInt - `ultimate`*: bool - `sigs`*: HashSet[string] + StringSet* = HashSet[string] + AddToStoreAttrs* {.preservesDictionary.} = object `ca`*: string + `ca-method`*: Symbol + `deriver`*: string + `eris`*: seq[byte] + `name`*: string + `narHash`*: string + `narSize`*: BiggestInt + `references`*: StringSeq + `registrationTime`*: BiggestInt + `sigs`*: StringSet + `ultimate`*: bool + + AddToStoreClientAttrs* {.preservesDictionary.} = object + `ca-method`*: Symbol + `eris`*: seq[byte] + `name`*: string + `references`*: StringSeq + + PathInfo* {.preservesRecord: "path".} = object + `path`*: string + `attrs`*: AttrSet - Dict* = Table[Symbol, Preserve[void]] Build* {.preservesRecord: "nix-build".} = object `input`*: string `output`*: Preserve[void] @@ -60,13 +83,12 @@ type `fields`*: Fields `parent`*: BiggestInt - FieldString* = string Instantiate* {.preservesRecord: "instantiate".} = object `expr`*: string - `options`*: Dict + `options`*: AttrSet `result`*: Preserve[void] - FieldInt* = BiggestInt + StringSeq* = seq[string] ActionStop* {.preservesRecord: "stop".} = object `id`*: BiggestInt @@ -75,24 +97,32 @@ type `type`*: BiggestInt `fields`*: Fields -proc `$`*(x: Eval | Realise | Missing | Narinfo | Field | PathInfo | Dict | +proc `$`*(x: Eval | AttrSet | Realise | LegacyPathAttrs | Missing | Narinfo | + Field | + StringSet | + AddToStoreAttrs | + AddToStoreClientAttrs | + PathInfo | Build | Fields | ActionStart | - FieldString | Instantiate | - FieldInt | + StringSeq | ActionStop | ActionResult): string = `$`(toPreserve(x)) -proc encode*(x: Eval | Realise | Missing | Narinfo | Field | PathInfo | Dict | +proc encode*(x: Eval | AttrSet | Realise | LegacyPathAttrs | Missing | Narinfo | + Field | + StringSet | + AddToStoreAttrs | + AddToStoreClientAttrs | + PathInfo | Build | Fields | ActionStart | - FieldString | Instantiate | - FieldInt | + StringSeq | ActionStop | ActionResult): seq[byte] = encode(toPreserve(x)) diff --git a/src/nix_actor/sockets.nim b/src/nix_actor/sockets.nim index b333c04..2bc02b5 100644 --- a/src/nix_actor/sockets.nim +++ b/src/nix_actor/sockets.nim @@ -1,93 +1,96 @@ # SPDX-FileCopyrightText: ☭ Emery Hemingway # SPDX-License-Identifier: Unlicense -import std/[algorithm, asyncdispatch, asyncnet, os, sets, strtabs, strutils] -from std/nativesockets import AF_INET, AF_UNIX, SOCK_STREAM, Protocol +## Common module for communicating with Nix clients and daemons. +import std/[asyncdispatch, asyncnet, sets, strtabs, strutils, tables] +from std/nativesockets import AF_UNIX, SOCK_STREAM, Protocol + +import eris import preserves, syndicate -from syndicate/protocols/dataspace import Observe + import ./protocol {.pragma: workerProtocol, importc, header: "worker-protocol.hh".} -type Word = uint64 -proc `$`(w: Word): string = toHex(w) +type Word* = uint64 + +proc `[]=`*[T](attrs: var AttrSet; key: string; val: T) = + attrs[Symbol key] = val.toPreserve const - WORKER_MAGIC_1 = 0x6E697863 - WORKER_MAGIC_2 = 0x6478696F - PROTOCOL_VERSION = 0x100 or 35 + WORKER_MAGIC_1* = 0x6E697863 + WORKER_MAGIC_2* = 0x6478696F + PROTOCOL_VERSION* = 0x100 or 35 - STDERR_NEXT = 0x6F6C6d67 - STDERR_READ = 0x64617461 - STDERR_WRITE = 0x64617416 - STDERR_LAST = 0x616C7473 - STDERR_ERROR = 0x63787470 - STDERR_START_ACTIVITY = 0x53545254 - STDERR_STOP_ACTIVITY = 0x53544F50 - STDERR_RESULT = 0x52534C54 + STDERR_NEXT* = 0x6F6C6d67 + STDERR_READ* = 0x64617461 + STDERR_WRITE* = 0x64617416 + STDERR_LAST* = 0x616C7473 + STDERR_ERROR* = 0x63787470 + STDERR_START_ACTIVITY* = 0x53545254 + STDERR_STOP_ACTIVITY* = 0x53544F50 + STDERR_RESULT* = 0x52534C54 - wopIsValidPath = 1 - wopHasSubstitutes = 3 - wopQueryReferrers = 6 - wopAddToStore = 7 - wopBuildPaths = 9 - wopEnsurePath = 10 - wopAddTempRoot = 11 - wopAddIndirectRoot = 12 - wopSyncWithGC = 13 - wopFindRoots = 14 - wopSetOptions = 19 - wopCollectGarbage = 20 - wopQuerySubstitutablePathInfo = 21 - wopQueryAllValidPaths = 23 - wopQueryFailedPaths = 24 - wopClearFailedPaths = 25 - wopQueryPathInfo = 26 - wopQueryPathFromHashPart = 29 - wopQuerySubstitutablePathInfos = 30 - wopQueryValidPaths = 31 - wopQuerySubstitutablePaths = 32 - wopQueryValidDerivers = 33 - wopOptimiseStore = 34 - wopVerifyStore = 35 - wopBuildDerivation = 36 - wopAddSignatures = 37 - wopNarFromPath = 38 - wopAddToStoreNar = 39 - wopQueryMissing = 40 - wopQueryDerivationOutputMap = 41 - wopRegisterDrvOutput = 42 - wopQueryRealisation = 43 - wopAddMultipleToStore = 44 - wopAddBuildLog = 45 - wopBuildPathsWithResults = 46 + wopIsValidPath* = 1 + wopHasSubstitutes* = 3 + wopQueryReferrers* = 6 + wopAddToStore* = 7 + wopBuildPaths* = 9 + wopEnsurePath* = 10 + wopAddTempRoot* = 11 + wopAddIndirectRoot* = 12 + wopSyncWithGC* = 13 + wopFindRoots* = 14 + wopSetOptions* = 19 + wopCollectGarbage* = 20 + wopQuerySubstitutablePathInfo* = 21 + wopQueryAllValidPaths* = 23 + wopQueryFailedPaths* = 24 + wopClearFailedPaths* = 25 + wopQueryPathInfo* = 26 + wopQueryPathFromHashPart* = 29 + wopQuerySubstitutablePathInfos* = 30 + wopQueryValidPaths* = 31 + wopQuerySubstitutablePaths* = 32 + wopQueryValidDerivers* = 33 + wopOptimiseStore* = 34 + wopVerifyStore* = 35 + wopBuildDerivation* = 36 + wopAddSignatures* = 37 + wopNarFromPath* = 38 + wopAddToStoreNar* = 39 + wopQueryMissing* = 40 + wopQueryDerivationOutputMap* = 41 + wopRegisterDrvOutput* = 42 + wopQueryRealisation* = 43 + wopAddMultipleToStore* = 44 + wopAddBuildLog* = 45 + wopBuildPathsWithResults* = 46 type - ProtocolError = object of IOError - Version = uint16 - StringSeq = seq[string] - StringSet = HashSet[string] - Session = ref object - socket: AsyncSocket - buffer: seq[Word] - version: Version - Observe = dataspace.Observe[Ref] + ProtocolError* = object of IOError + Version* = uint16 -func major(version: Version): uint16 = version and 0xff00 -func minor(version: Version): uint16 = version and 0x00ff + Session* = ref object + socket*: AsyncSocket + buffer*: seq[Word] + version*: Version -proc close(session: Session) = +func major*(version: Version): uint16 = version and 0xff00 +func minor*(version: Version): uint16 = version and 0x00ff + +proc close*(session: Session) = close(session.socket) reset(session.buffer) -proc send(session: Session; words: varargs[Word]): Future[void] = +proc send*(session: Session; words: varargs[Word]): Future[void] = if session.buffer.len < words.len: session.buffer.setLen(words.len) for i, word in words: session.buffer[i] = word send(session.socket, addr session.buffer[0], words.len shl 3) -proc send(session: Session; s: string): Future[void] = +proc send*(session: Session; s: string): Future[void] = let wordCount = 1 + ((s.len + 7) shr 3) if session.buffer.len < wordCount: setLen(session.buffer, wordCount) session.buffer[0] = Word s.len @@ -96,7 +99,7 @@ proc send(session: Session; s: string): Future[void] = copyMem(addr session.buffer[1], unsafeAddr s[0], s.len) send(session.socket, addr session.buffer[0], wordCount shl 3) -proc send(session: Session; ss: StringSeq|StringSet): Future[void] = +proc send*(session: Session; ss: StringSeq|StringSet): Future[void] = ## Send a set of strings. The set is sent as a contiguous buffer. session.buffer[0] = Word ss.len var off = 1 @@ -113,23 +116,23 @@ proc send(session: Session; ss: StringSeq|StringSet): Future[void] = inc(off, stringWordLen) send(session.socket, addr session.buffer[0], off shl 3) -proc recvWord(sock: AsyncSocket): Future[Word] {.async.} = +proc recvWord*(sock: AsyncSocket): Future[Word] {.async.} = var w: Word let n = await recvInto(sock, addr w, sizeof(Word)) if n != sizeof(Word): raise newException(ProtocolError, "short read") return w -proc recvWord(session: Session): Future[Word] = +proc recvWord*(session: Session): Future[Word] = recvWord(session.socket) -proc discardWords(session: Session; n: int): Future[void] {.async.} = +proc discardWords*(session: Session; n: int): Future[void] {.async.} = if session.buffer.len < n: setLen(session.buffer, n) let byteCount = n shl 3 let n = await recvInto(session.socket, addr session.buffer[0], byteCount) if n != byteCount: raise newException(ProtocolError, "short read") -proc recvString(socket: AsyncSocket): Future[string] {.async.} = +proc recvString*(socket: AsyncSocket): Future[string] {.async.} = let stringLen = int (await recvWord(socket)) if stringLen > 0: var s = newString((stringLen + 7) and (not 7)) @@ -140,310 +143,61 @@ proc recvString(socket: AsyncSocket): Future[string] {.async.} = return s return "" -proc recvString(session: Session): Future[string] = +proc recvString*(session: Session): Future[string] = recvString(session.socket) -proc recvStringSeq(session: Session): Future[StringSeq] {.async.} = +proc recvStringSeq*(session: Session): Future[StringSeq] {.async.} = let count = int(await recvWord(session.socket)) var strings = newSeq[string](count) for i in 0..= 14 - discard await(recvWord(session)) - # obsolete CPU affinity - assert clientVersion.minor >= 11 - discard await(recvWord(session)) - # obsolete reserveSpace - assert clientVersion.minor >= 33 - await send(session, "0.0.0") - await send(session, STDERR_LAST) - while not session.socket.isClosed: - let wop = await recvWord(session.socket) - case wop - - of wopQueryPathInfo: - let - path = await recvString(session) - pat = inject(?PathInfo, { 0: ?path }) - await send(session, STDERR_NEXT) - await send(session, $pat) - run(facet) do (turn: var Turn): - onPublish(turn, ds, pat) do ( - deriver: string, - narHash: string, - references: StringSet, - registrationTime: BiggestInt, - narSize: BiggestInt, - ultimate: bool, - sigs: StringSet, - ca: string - ): - var info = PathInfo( - deriver: deriver, - narHash: narHash, - references: references, - registrationTime: registrationTime, - narSize: narSize, - ultimate: ultimate, - sigs: sigs, - ca: ca, - ) - asyncCheck(turn, send(session, info)) - - of wopQueryMissing: - var targets = toPreserve(await recvStringSeq(session)) - sort(targets.sequence) - # would prefer to use a set but that doesn't translate into a pattern - let pat = inject(?Missing, { 0: ?targets }) - # TODO send the pattern to the client as a log line - await send(session, STDERR_NEXT) - await send(session, $pat) - run(facet) do (turn: var Turn): - onPublish(turn, ds, pat) do ( - willBuild: StringSet, - willSubstitute: StringSet, - unknown: StringSet, - downloadSize: BiggestInt, - narSize: BiggestInt - ): - let miss = Missing( - willBuild: willBuild, - willSubstitute: willSubstitute, - unknown: unknown, - downloadSize: downloadSize, - narSize: narSize, - ) - asyncCheck(turn, send(session, miss)) - - of wopSetOptions: - await discardWords(session, 12) - # 01 keepFailed - # 02 keepGoing - # 03 tryFallback - # 04 verbosity - # 05 maxBuildJobs - # 06 maxSilentTime - # 07 useBuildHook - # 08 verboseBuild - # 09 logType - # 10 printBuildTrace - # 11 buildCores - # 12 useSubstitutes - let overridePairCount = await recvWord(session) - for _ in 1..overridePairCount: - discard await (recvString(session)) - discard await (recvString(session)) - await send(session, STDERR_LAST) - # all options from the client are ingored - - else: - let msg = "unhandled worker op " & $wop.int - await send(session, STDERR_NEXT) - await send(session, msg) - await send(session, STDERR_LAST) - close(session.socket) - -proc serveClientSide*(facet: Facet; ds: Ref; listener: AsyncSocket) {.async.} = - while not listener.isClosed: - let - client = await accept(listener) - fut = serveClient(facet, ds, newSession(client)) - addCallback(fut) do (): - if not client.isClosed: - close(client) - -proc bootClientSide*(facet: Facet; ds: Ref; socketPath: string) = - let listener = newAsyncSocket( +proc newUnixSocket*(): AsyncSocket = + newAsyncSocket( domain = AF_UNIX, sockType = SOCK_STREAM, protocol = cast[Protocol](0), - buffered = false) - onStop(facet) do (turn: var Turn): - close(listener) - removeFile(socketPath) - removeFile(socketPath) - bindUnix(listener, socketPath) - listen(listener) - asyncCheck(facet, serveClientSide(facet, ds, listener)) + buffered = false, + ) -proc connectDaemon(session: Session; socketPath: string) {.async.} = - await connectUnix(session.socket, socketPath) - await send(session, WORKER_MAGIC_1) - let daemonMagic = await recvWord(session) - if daemonMagic != WORKER_MAGIC_2: - raise newException(ProtocolError, "bad magic from daemon") - let daemonVersion = await recvWord(session) - session.version = min(Version daemonVersion, PROTOCOL_VERSION) - await send(session, Word session.version) - await send(session, 0) # CPU affinity - await send(session, 0) # reserve space - if session.version.minor >= 33: - discard await recvString(session) # version - if session.version.minor >= 35: - discard await recvWord(session) # remoteTrustsUs - await recvWork(session) +proc newSession*(socket: AsyncSocket): Session = + Session(socket: socket, buffer: newSeq[Word](512)) -proc queryMissing(session: Session; targets: StringSeq): Future[Missing] {.async.} = - var miss = Missing(targets: targets) - await send(session, wopQueryMissing) - await send(session, miss.targets) - await recvWork(session) - miss.willBuild = await recvStringSet(session) - miss.willSubstitute = await recvStringSet(session) - miss.unknown = await recvStringSet(session) - miss.downloadSize = BiggestInt await recvWord(session) - miss.narSize = BiggestInt await recvWord(session) - return miss +proc newSession*(): Session = + newUnixSocket().newSession() -proc queryPathInfo(session: Session; path: string): Future[PathInfo] {.async.} = - var info = PathInfo(path: path) - await send(session, wopQueryPathInfo) - await send(session, info.path) - await recvWork(session) - let valid = await recvWord(session) - if valid != 0: - info.deriver = await recvString(session) - info.narHash = await recvString(session) - info.references = await recvStringSet(session) - info.registrationTime = BiggestInt await recvWord(session) - info.narSize = BiggestInt await recvWord(session) - info.ultimate = (await recvWord(session)) != 0 - info.sigs = await recvStringSet(session) - info.ca = await recvString(session) - return info +proc ingestChunks*(session: Session; store: ErisStore): Future[ErisCap] {.async.} = + var ingest: ErisIngest + while true: + let chunkLen = int await recvWord(session) + if ingest.isNil: + ingest = newErisIngest( + store, recommendedChunkSize(chunkLen), convergentMode) + if chunkLen == 0: + break + else: + let wordLen = (chunkLen + 7) shr 3 + if session.buffer.len < wordLen: setLen(session.buffer, wordLen) + let recvLen = await recvInto(session.socket, addr session.buffer[0], chunkLen) + # each chunk must be received contiguously + if recvLen != chunkLen: + raise newException(ProtocolError, "invalid chunk read") + await append(ingest, addr session.buffer[0], chunkLen) + var cap = await cap(ingest) + return cap -proc bootDaemonSide*(turn: var Turn; ds: Ref; socketPath: string) = - - during(turn, ds, ?Observe(pattern: !Missing) ?? {0: grab()}) do (a: Preserve[Ref]): - # cannot use `grabLit` here because an array is a compound - let - session = newSession() - fut = connectDaemon(session, socketPath) - addCallback(fut, turn) do (turn: var Turn): - read(fut) - var targets: StringSeq - doAssert targets.fromPreserve(unpackLiterals(a)) - # unpack ]> - let missFut = queryMissing(session, targets) - addCallback(missFut, turn) do (turn: var Turn): - var miss = read(missFut) - discard publish(turn, ds, miss) - do: - close(session) - - during(turn, ds, ?Observe(pattern: !PathInfo) ?? {0: grabLit()}) do (path: string): - let - session = newSession() - fut = connectDaemon(session, socketPath) - addCallback(fut, turn) do (turn: var Turn): - read(fut) - let infoFut = queryPathInfo(session, path) - addCallback(infoFut, turn) do (turn: var Turn): - var info = read(infoFut) - discard publish(turn, ds, info) - do: - close(session) +proc recoverChunks*(session: Session; store: ErisStore; cap: ErisCap) {.async.} = + let stream = newErisStream(store, cap) + session.buffer.setLen(succ(cap.chunkSize.int shr 3)) + while true: + let n = await stream.readBuffer(addr session.buffer[1], cap.chunkSize.int) + session.buffer[0] = Word n + await send(session.socket, addr session.buffer[0], 8+n) + if n == 0: break + close(stream)