Compare commits

..

5 Commits

Author SHA1 Message Date
Emery Hemingway b3dbb89529 ssh server
Bad idea, use unix:// instead
2023-06-07 13:15:05 +01:00
Emery Hemingway 9b843905f3 Better stuff 2023-06-07 13:14:47 +01:00
Emery Hemingway a18cdd16b4 Even more stuff happens 2023-06-06 15:01:00 +01:00
Emery Hemingway a1068cd836 More stuff happens 2023-06-06 01:20:50 +01:00
Emery Hemingway e5148cc654 Stuff happens 2023-06-05 17:12:06 +01:00
27 changed files with 627 additions and 1681 deletions

2
.envrc
View File

@ -1,2 +1,2 @@
source_env ..
use nix
use flake syndicate#nix_actor

2
.gitignore vendored
View File

@ -1 +1 @@
/nim.cfg
/.direnv

View File

@ -2,46 +2,21 @@
An actor for interacting with the [Nix](https://nixos.org/) daemon via the [Syndicated Actor Model](https://syndicate-lang.org/).
See [protocol.prs](./protocol.prs) for the Syndicate protocol [schema](https://preserves.dev/preserves-schema.html).
*This is only a proof-of-concept and is not yet useful.*
## Example configuration
A demo script for the [Syndicate server](https://git.syndicate-lang.org/syndicate-lang/syndicate-rs), see https://synit.org/book/operation/scripting.html
```
? <nixspace ?nixspace> $nixspace [
? <instantiate "let pkgs = import <nixpkgs> {}; in pkgs.hello" { } ?drv> [
? <realise $drv ?outputs> [
$log ! <log "-" { "hello": $outputs }>
]
? <realise $drv ?outputs> [ ]
]
? <eval "3 * 4" {} ?result> [
$log ! <log "-" { "nix eval 3 * 4": $result }>
]
? <eval "builtins.getEnv \"PATH\"" {impure: ""} ?result> [
$log ! <log "-" { "nix impure path": $result }>
]
? <missing ["/nix/store/p7fnjrbvmpwl192ir8p2ixfym68j7sgv-invidious-unstable-2023-05-08"] _ ?subs _ ?dlSize ?narSize> [
$log ! <log "-" { invidious-unstable-2023-05-08: {
substitutes: $subs
downloadSize: $dlSize
narSize: $narSize
} }>
]
? <path-info "/nix/store/jhgh02lyizd1kyl71brvc01ygsmgi40a-tzdata-2023c" ?deriver ?narHash _ _ ?narSize _ ?sigs _> [
$log ! <log "-" { tzdata-2023c: {
deriver: $deriver
narHash: $narHash
narSize: $narSize
sigs: $sigs
} }>
? <eval "3 * 4" {} _> []
? <eval "builtins.getEnv \"PATH\"" {impure: ""} _> []
? ?any [
$log ! <log "-" { nix: $any }>
]
$config [
@ -49,12 +24,10 @@ A demo script for the [Syndicate server](https://git.syndicate-lang.org/syndicat
? <service-object <daemon nix_actor> ?cap> [
$cap {
dataspace: $nixspace
daemon-socket: "/nix/var/nix/daemon-socket/socket"
listen-socket: "/tmp/translator.worker.nix.socket"
}
]
<daemon nix_actor {
argv: "/bin/nix_actor"
argv: "/usr/local/nix_actor"
protocol: application/syndicate
}>
]

View File

@ -1,2 +0,0 @@
include_rules
: lock.json |> !nim_cfg |> | ./<lock>

View File

@ -1,7 +1,5 @@
include ../eris-nim/depends.tup
NIM_FLAGS += --path:$(TUP_CWD)/../eris-nim/src
include ../syndicate-nim/depends.tup
NIM_FLAGS += --path:$(TUP_CWD)/../syndicate-nim/src
NIM_FLAGS += --path:$(TUP_CWD)/../libssh/src
NIM_GROUPS += $(TUP_CWD)/<lock>
NIM_FLAGS += --backend:cpp

171
lock.json
View File

@ -1,171 +0,0 @@
{
"depends": [
{
"method": "fetchzip",
"packages": [
"base32"
],
"path": "/nix/store/qcnchjsak3hyn4c6r0zd6qvm7j8y1747-source",
"ref": "0.1.3",
"rev": "f541038fbe49fdb118cc2002d29824b9fc4bfd61",
"sha256": "16gh1ifp9hslsg0is0v1ya7rxqfhq5hjqzc3pfdqvcgibp5ybh06",
"srcDir": "",
"url": "https://github.com/OpenSystemsLab/base32.nim/archive/f541038fbe49fdb118cc2002d29824b9fc4bfd61.tar.gz"
},
{
"method": "fetchzip",
"packages": [
"cbor"
],
"path": "/nix/store/70cqa9s36dqnmsf179cn9psj77jhqi1l-source",
"ref": "20230619",
"rev": "a4a1affd45ba90bea24e08733ae2bd02fe058166",
"sha256": "005ib6im97x9pdbg6p0fy58zpdwdbkpmilxa8nhrrb1hnpjzz90p",
"srcDir": "src",
"url": "https://git.sr.ht/~ehmry/nim_cbor/archive/a4a1affd45ba90bea24e08733ae2bd02fe058166.tar.gz"
},
{
"method": "fetchzip",
"packages": [
"coap"
],
"path": "/nix/store/pqj933cnw7r7hp46jrpjlwh1yr0jvckp-source",
"ref": "20230331",
"rev": "a134213b51a8d250684f2ba26802ffa97fae4ffb",
"sha256": "1wbix6d8l26nj7m3xinh4m2f27n4ma0yzs3x5lpann2ha0y51k8b",
"srcDir": "src",
"url": "https://codeberg.org/eris/nim-coap/archive/a134213b51a8d250684f2ba26802ffa97fae4ffb.tar.gz"
},
{
"method": "fetchzip",
"packages": [
"configparser"
],
"path": "/nix/store/4zl5v7i6cj3f9sayvsjcx2h20lqwr9a6-source",
"ref": "newSection",
"rev": "695f1285d63f1954c25eb1f42798d90fa7bcbe14",
"sha256": "0b0pb5i0kir130ia2zf8zcgdz8awms161i6p83ri3nbgibbjnr37",
"srcDir": "src",
"url": "https://github.com/ehmry/nim-configparser/archive/695f1285d63f1954c25eb1f42798d90fa7bcbe14.tar.gz"
},
{
"method": "fetchzip",
"packages": [
"eris"
],
"path": "/nix/store/lxa6ba8r9hhs06k6f2iyznwjxix1klv1-source",
"ref": "20230823",
"rev": "49d8117367d3530533dc1d6a9111ddd134b08b1e",
"sha256": "0lq9a04cayf04nnhn0gvp5phlij0cis38v7cz7jmgks2xvz1bcbr",
"srcDir": "src",
"url": "https://codeberg.org/eris/nim-eris/archive/49d8117367d3530533dc1d6a9111ddd134b08b1e.tar.gz"
},
{
"method": "fetchzip",
"packages": [
"freedesktop_org"
],
"path": "/nix/store/98wncmx58cfnhv3y96lzwm22zvyk9b1h-source",
"ref": "20230210",
"rev": "fb04d0862aca4be2edcc0eafa94b1840030231c8",
"sha256": "0wj5m09x1pr36gv8p5r72p6l3wwl01y8scpnlzx7q0h5ij6jaj6s",
"srcDir": "src",
"url": "https://git.sr.ht/~ehmry/freedesktop_org/archive/fb04d0862aca4be2edcc0eafa94b1840030231c8.tar.gz"
},
{
"method": "fetchzip",
"packages": [
"getdns"
],
"path": "/nix/store/x9xmn7w4k6jg8nv5bnx148ibhnsfh362-source",
"ref": "20221222",
"rev": "c73cbe288d9f9480586b8fa87f6d794ffb6a6ce6",
"sha256": "1sbgx2x51szr22i72n7c8jglnfmr8m7y7ga0v85d58fwadiv7g6b",
"srcDir": "src",
"url": "https://git.sr.ht/~ehmry/getdns-nim/archive/c73cbe288d9f9480586b8fa87f6d794ffb6a6ce6.tar.gz"
},
{
"method": "fetchzip",
"packages": [
"hashlib"
],
"path": "/nix/store/fav82xdbicvlk34nmcbl89zx99lr3mbs-source",
"rev": "f9455d4be988e14e3dc7933eb7cc7d7c4820b7ac",
"sha256": "1sx6j952lj98629qfgr7ds5aipyw9d6lldcnnqs205wpj4pkcjb3",
"srcDir": "",
"url": "https://github.com/ehmry/hashlib/archive/f9455d4be988e14e3dc7933eb7cc7d7c4820b7ac.tar.gz"
},
{
"method": "fetchzip",
"packages": [
"nimcrypto"
],
"path": "/nix/store/zyr8zwh7vaiycn1s4r8cxwc71f2k5l0h-source",
"ref": "traditional-api",
"rev": "602c5d20c69c76137201b5d41f788f72afb95aa8",
"sha256": "1dmdmgb6b9m5f8dyxk781nnd61dsk3hdxqks7idk9ncnpj9fng65",
"srcDir": "",
"url": "https://github.com/cheatfate/nimcrypto/archive/602c5d20c69c76137201b5d41f788f72afb95aa8.tar.gz"
},
{
"method": "fetchzip",
"packages": [
"npeg"
],
"path": "/nix/store/ffkxmjmigfs7zhhiiqm0iw2c34smyciy-source",
"ref": "1.2.1",
"rev": "26d62fdc40feb84c6533956dc11d5ee9ea9b6c09",
"sha256": "0xpzifjkfp49w76qmaylan8q181bs45anmp46l4bwr3lkrr7bpwh",
"srcDir": "src",
"url": "https://github.com/zevv/npeg/archive/26d62fdc40feb84c6533956dc11d5ee9ea9b6c09.tar.gz"
},
{
"method": "fetchzip",
"packages": [
"preserves"
],
"path": "/nix/store/fmb2yckksz7iv3qdkk5gk1j060kppkq9-source",
"ref": "20231102",
"rev": "4faeb766dc3945bcfacaa1a836ef6ab29b20ceb0",
"sha256": "1a3g5bk1l1h250q3p6sqv6r1lpsplp330qqyp48r0i4a5r0jksq3",
"srcDir": "src",
"url": "https://git.syndicate-lang.org/ehmry/preserves-nim/archive/4faeb766dc3945bcfacaa1a836ef6ab29b20ceb0.tar.gz"
},
{
"method": "fetchzip",
"packages": [
"syndicate"
],
"path": "/nix/store/nhpvl223vbzdrlzikw7pgyfxs344w7ma-source",
"ref": "20231108",
"rev": "095418032180e360ea27ec7fcd63193944b68e2c",
"sha256": "09pbml2chzz0v5zpz67fs7raj0mfmg8qrih2vz85xxc51h7ncqvw",
"srcDir": "src",
"url": "https://git.syndicate-lang.org/ehmry/syndicate-nim/archive/095418032180e360ea27ec7fcd63193944b68e2c.tar.gz"
},
{
"method": "fetchzip",
"packages": [
"taps"
],
"path": "/nix/store/did1li0xk9qih80pvxqhjc4np3ijlfjj-source",
"ref": "20230331",
"rev": "4f9c9972d74eb39c662b43ed79d761e109bf00f1",
"sha256": "12qsizmisr1q0q4x37c5q6gmnqb5mp0bid7s3jlcsjvhc4jw2q57",
"srcDir": "src",
"url": "https://git.sr.ht/~ehmry/nim_taps/archive/4f9c9972d74eb39c662b43ed79d761e109bf00f1.tar.gz"
},
{
"method": "fetchzip",
"packages": [
"tkrzw"
],
"path": "/nix/store/4x9wxyli4dy719svg1zaww0c0b3xckp0-source",
"ref": "20220922",
"rev": "efd87edb7b063182c1a1fa018006a87b515d589b",
"sha256": "1h0sdvai4gkkz48xfh67wa1xz2k8bkkba8q6snnbllmhmywd9apb",
"srcDir": "src",
"url": "https://git.sr.ht/~ehmry/nim-tkrzw/archive/efd87edb7b063182c1a1fa018006a87b515d589b.tar.gz"
}
]
}

View File

@ -1,8 +1,9 @@
version = "20231208"
version = "20230607"
author = "Emery Hemingway"
description = "Syndicated Nix Actor"
license = "Unlicense"
srcDir = "src"
bin = @["nix_actor"]
backend = "cpp"
requires "nim >= 1.6.10", "syndicate >= 20231005", "eris >= 20230823"
requires "nim >= 1.6.10", "syndicate >= 20230530"

View File

@ -1,19 +1,19 @@
version 1 .
StringSeq = [string ...] .
StringSet = #{string} .
AttrSet = {symbol: any ...:...} .
Build = <nix-build @input string @output any> .
Realise = <realise @drv string @outputs StringSeq> .
Realise = <realise @drv string @outputs [string ...]> .
Instantiate = <instantiate @expr string @options AttrSet @result any> .
Instantiate = <instantiate @expr string @options Dict @result any> .
Eval = <eval @expr string @path string @result any> .
Eval = <eval @expr string @options {symbol: any ...:...} @result any> .
Narinfo = <narinfo @path string @info AttrSet> .
Narinfo = <narinfo @path string @info Dict> .
Dict = {symbol: any ...:...} .
FieldInt = int .
FieldString = string .
Field = int / string .
Fields = [Field ...] .
@ -22,43 +22,15 @@ ActionStop = <stop @id int> .
ActionResult = <result @id int @type int @fields Fields> .
; TODO: why not make target a singleton?
Missing = <missing @targets StringSeq @willBuild StringSet @willSubstitute StringSet @unknown StringSet @downloadSize int @narSize int> .
Missing = <missing @targets #{string} @willBuild #{string} @willSubstitute #{string} @unknown #{string} @downloadSize int @narSize int> .
; Path info for the worker protocol version 35.
LegacyPathAttrs = {
deriver: string
narHash: string
references: StringSeq ; prefer a set
registrationTime: int
narSize: int
ultimate: bool
sigs: StringSet
ca: string
} .
AddToStoreClientAttrs = {
name: string
eris: bytes
ca-method: symbol
references: StringSeq ; prefer a set
} .
; Intersection of the attributes needed to add a path to a store
; and the attributes returned by the daemon after adding the path.
AddToStoreAttrs = {
name: string
eris: bytes
ca-method: symbol
references: StringSeq ; prefer a set
deriver: string
narHash: string
registrationTime: int
narSize: int
ultimate: bool
sigs: StringSet
ca: string
} .
; Any collection of attributes describing a store path.
PathInfo = <path @path string @attrs AttrSet> .
PathInfo = <path-info
@path string
@deriver string
@narHash string
@references #{string}
@registrationTime int
@narSize int
@ultimate bool
@sigs #{string}
@ca string> .

View File

@ -1,17 +0,0 @@
{ pkgs ? import <nixpkgs> { } }:
let
nix' = pkgs.nix.overrideAttrs (final: prev: {
src = pkgs.fetchFromGitHub {
owner = "tweag";
repo = "nix";
rev = "nix-c-bindings";
hash = "sha256-xOyU79lsz0THOj1LccfsDS45089n2DhlkWxaJFeKriY=";
};
});
in pkgs.buildNimPackage {
name = "dummy";
nativeBuildInputs = [ pkgs.pkg-config ];
buildInputs = [ pkgs.boost nix' ];
lockFile = ./lock.json;
}

View File

@ -1,3 +1,3 @@
include_rules
: foreach *.nim | $(SYNDICATE_PROTOCOL) ./<protocol> |> !nim_bin |> {bin}
: foreach {bin} |> !assert_built |>
: nix_actor.nim | $(SYNDICATE_PROTOCOL) ./<protocol> |> !nim_bin |> {bin}
: {bin} |> !assert_built |>

View File

@ -1,81 +1,137 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[os, strutils, tables]
import preserves, syndicate, syndicate/relays
# from syndicate/protocols/dataspace import Observe
import ./nix_actor/[nix_api, nix_api_value]
import std/[asyncdispatch, httpclient, json, os, osproc, parseutils, strutils, tables]
import preserves, preserves/jsonhooks
import syndicate
from syndicate/protocols/dataspace import Observe
import ./nix_actor/protocol
proc toPreserve(state: State; value: Value; E = void): Preserve[E] {.gcsafe.} =
var ctx: NixContext
stderr.writeLine get_type(ctx, value).int
case get_type(ctx, value)
of NIX_TYPE_THUNK: raiseAssert "cannot preserve thunk"
of NIX_TYPE_INT:
result = getInt(ctx, value).toPreserve(E)
of NIX_TYPE_FLOAT:
result = getFloat(ctx, value).toPreserve(E)
of NIX_TYPE_BOOL:
result = getBool(ctx, value).toPreserve(E)
of NIX_TYPE_STRING:
result = ($getString(ctx, value)).toPreserve(E)
of NIX_TYPE_PATH:
result = ($getPathString(ctx, value)).toPreserve(E)
of NIX_TYPE_NULL:
result = initRecord[E]("null")
of NIX_TYPE_ATTRS:
result = initDictionary(E)
let n = getAttrsSize(ctx, value)
var i: cuint
while i < n:
var (key, val) = get_attr_byidx(ctx, value, state, i)
inc(i)
result[toSymbol($key, E)] = toPreserve(state, val, E)
stderr.writeLine(result)
# close(val)
of NIX_TYPE_LIST:
let n = getListSize(ctx, value)
result = initSequence(n, E)
var i: cuint
while i < n:
var val = getListByIdx(ctx, value, state, i)
result[i] = toPreserve(state, val, E)
inc(i)
# close(val)
of NIX_TYPE_FUNCTION, NIX_TYPE_EXTERNAL:
raiseAssert "TODO: need a failure type"
import ./nix_actor/[main, sockets]
type
BootArgs {.preservesDictionary.} = object
dataspace: Cap
Value = Preserve[void]
Observe = dataspace.Observe[Ref]
proc main() =
initLibexpr()
proc parseArgs(args: var seq[string]; opts: Dict) =
for sym, val in opts:
add(args, "--" & $sym)
if not val.isString "":
var js: JsonNode
if fromPreserve(js, val): add(args, $js)
else: stderr.writeLine "invalid option --", sym, " ", val
runActor("nix_actor") do (root: Cap; turn: var Turn):
connectStdio(turn, root)
proc parseNarinfo(info: var Dict; text: string) =
var
key, val: string
off: int
while off < len(text):
off = off + parseUntil(text, key, ':', off) + 1
off = off + skipWhitespace(text, off)
off = off + parseUntil(text, val, '\n', off) + 1
if key != "" and val != "":
if allCharsInSet(val, Digits):
info[Symbol key] = val.parsePreserves
else:
info[Symbol key] = val.toPreserve
during(turn, root, ?BootArgs) do (ds: Cap):
let
store = openStore()
state = newState(store)
proc narinfo(turn: var Turn; ds: Ref; path: string) =
let
client = newAsyncHttpClient()
url = "https://cache.nixos.org/" & path & ".narinfo"
futGet = get(client, url)
addCallback(futGet, turn) do (turn: var Turn):
let resp = read(futGet)
if code(resp) != Http200:
close(client)
else:
let futBody = body(resp)
addCallback(futBody, turn) do (turn: var Turn):
close(client)
var narinfo = Narinfo(path: path)
parseNarinfo(narinfo.info, read(futBody))
discard publish(turn, ds, narinfo)
let pat = ?Observe(pattern: !Eval) ?? {0: grabLit(), 1: grabLit()}
during(turn, ds, pat) do (expr: string, path: string):
var
value: Value
ass = Eval(expr: expr, path: path)
try:
value = evalFromString(state, ass.expr, ass.path)
force(state, value)
ass.result = toPreserve(state, value, void)
discard publish(turn, ds, ass)
except CatchableError as err:
stderr.writeLine "failed to evaluate ", ass.expr, ": ", err.msg
close(value)
do:
close(state)
close(store)
proc build(spec: string): Build =
var execOutput = execProcess("nix", args = ["build", "--json", "--no-link", spec], options = {poUsePath})
var js = parseJson(execOutput)
Build(input: spec, output: js[0].toPreserve)
main()
proc realise(realise: Realise): seq[string] =
var execlines = execProcess("nix-store", args = ["--realize", realise.drv], options = {poUsePath})
split(strip(execlines), '\n')
proc instantiate(instantiate: Instantiate): Value =
const cmd = "nix-instantiate"
var args = @["--expr", instantiate.expr]
parseArgs(args, instantiate.options)
var execOutput = strip execProcess(cmd, args = args, options = {poUsePath})
execOutput.toPreserve
proc eval(eval: Eval): Value =
const cmd = "nix"
var args = @["eval", "--expr", eval.expr]
parseArgs(args, eval.options)
var execOutput = strip execProcess(cmd, args = args, options = {poUsePath})
if execOutput != "":
var js = parseJson(execOutput)
result = js.toPreserve
proc bootNixFacet(ds: Ref; turn: var Turn): Facet =
# let store = openStore()
result = inFacet(turn) do (turn: var Turn):
during(turn, ds, ?Observe(pattern: !Build) ?? {0: grabLit()}) do (spec: string):
discard publish(turn, ds, build(spec))
during(turn, ds, ?Observe(pattern: !Realise) ?? {0: grabLit()}) do (drvPath: string):
var ass = Realise(drv: drvPath)
ass.outputs = realise(ass)
discard publish(turn, ds, ass)
during(turn, ds, ?Observe(pattern: !Instantiate) ?? {0: grabLit(), 1: grabDict()}) do (e: string, o: Value):
var ass = Instantiate(expr: e)
if not fromPreserve(ass.options, unpackLiterals(o)):
stderr.writeLine "invalid options ", o
else:
ass.result = instantiate(ass)
discard publish(turn, ds, ass)
during(turn, ds, ?Observe(pattern: !Eval) ?? {0: grabLit(), 1: grabDict()}) do (e: string, o: Value):
var ass = Eval(expr: e)
if not fromPreserve(ass.options, unpackLiterals(o)):
stderr.writeLine "invalid options ", o
else:
ass.result = eval(ass)
discard publish(turn, ds, ass)
during(turn, ds, ?Observe(pattern: !Narinfo) ?? {0: grabLit()}) do (path: string):
narinfo(turn, ds, path)
type
RefArgs {.preservesDictionary.} = object
dataspace: Ref
SocketArgs {.preservesDictionary.} = object
`listen-socket`: string
ServeSshArgs {.preservesDictionary.} = object
`keyfile`: string
`sshhost`: string
`sshport`: int
proc bootNixActor(root: Ref; turn: var Turn) =
connectStdio(root, turn)
during(turn, root, ?RefArgs) do (ds: Ref):
discard bootNixFacet(ds, turn)
during(turn, root, ?SocketArgs) do (path: string):
removeFile(path)
asyncCheck(turn, emulateSocket(path))
during(turn, root, ?ServeSshArgs) do (keyFile: string, host: string, port: int):
let srv = serveSsh(keyFile, host, port)
do:
stderr.writeLine "stop SSH server"
stop(srv)
initNix() # Nix lib isn't actually being used but it's nice to know that it links.
runActor("main", bootNixActor)

1
src/nix_actor.nim.cfg Normal file
View File

@ -0,0 +1 @@
define:ssl

View File

@ -1,174 +0,0 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[asyncdispatch, asyncnet, os, sets, strutils, tables]
from std/algorithm import sort
import eris
import preserves, syndicate
import ./protocol, ./sockets
proc sendNext(client: Session; msg: string) {.async.} =
await send(client, STDERR_NEXT)
await send(client, msg)
proc sendWorkEnd(client: Session): Future[void] =
send(client, STDERR_LAST)
proc send(client: Session; miss: Missing) {.async.} =
await sendWorkEnd(client)
await send(client, miss.willBuild)
await send(client, miss.willSubstitute)
await send(client, miss.unknown)
await send(client, Word miss.downloadSize)
await send(client, Word miss.narSize)
proc send(client: Session; info: LegacyPathAttrs) {.async.} =
await send(client, info.deriver)
await send(client, info.narHash)
await send(client, info.references)
await send(client, Word info.registrationTime)
await send(client, Word info.narSize)
await send(client, Word info.ultimate)
await send(client, info.sigs)
await send(client, info.ca)
proc sendValidInfo(client: Session; info: LegacyPathAttrs) {.async.} =
await sendWorkEnd(client)
await send(client, 1) # valid
await send(client, info)
proc completeAddToStore(client: Session; path: string; info: LegacyPathAttrs) {.async.} =
await sendWorkEnd(client)
await send(client, path)
await send(client, info)
proc serveClient(facet: Facet; ds: Cap; store: ErisStore; client: Session) {.async.} =
block:
let clientMagic = await recvWord(client)
if clientMagic != WORKER_MAGIC_1:
raise newException(ProtocolError, "invalid protocol magic")
await send(client, WORKER_MAGIC_2, PROTOCOL_VERSION)
let clientVersion = Version(await recvWord(client))
if clientVersion < 0x1_21:
raise newException(ProtocolError, "obsolete protocol version")
assert clientVersion.minor >= 14
discard await(recvWord(client))
# obsolete CPU affinity
assert clientVersion.minor >= 11
discard await(recvWord(client))
# obsolete reserveSpace
assert clientVersion.minor >= 33
await send(client, "0.0.0")
await sendWorkEnd(client)
while not client.socket.isClosed:
let
w = await recvWord(client.socket)
wop = WorkerOperation(w)
case wop
of wopAddToStore:
let
name = await recvString(client)
caMethod = await recvString(client)
var storeRefs = await recvStringSeq(client)
sort(storeRefs) # sets not valid for patterns so use a sorted list
discard await recvWord(client) # repair, not implemented
let cap = await ingestChunks(client, store)
await sendNext(client, $cap & " " & name)
let attrsPat = inject(?AddToStoreAttrs, {
"name".toSymbol(Cap): ?name,
"ca-method".toSymbol(Cap): ?caMethod.toSymbol,
"references".toSymbol(Cap): ?storeRefs,
"eris".toSymbol(Cap): ?cap.bytes,
})
# bind AddToStoreAttrs and override with some literal values
let pat = PathInfo ? { 0: grab(), 1: attrsPat }
run(facet) do (turn: var Turn):
onPublish(turn, ds, pat) do (path: string, ca: string, deriver: string, narHash: string, narSize: BiggestInt, regTime: BiggestInt, sigs: StringSet, ultimate: bool):
asyncCheck(turn, completeAddToStore(client, path, LegacyPathAttrs(
ca: ca,
deriver: deriver,
narHash: narHash,
narSize: narSize,
references: storeRefs,
registrationTime: regTime,
sigs: sigs,
ultimate: ultimate,
)))
of wopQueryPathInfo:
let
path = await recvString(client)
pat = PathInfo ? { 0: ?path, 1: grab() }
run(facet) do (turn: var Turn):
onPublish(turn, ds, pat) do (info: LegacyPathAttrs):
asyncCheck(turn, sendValidInfo(client, info))
of wopQueryMissing:
var targets = toPreserve(await recvStringSeq(client))
sort(targets.sequence)
# would prefer to use a set but that doesn't translate into a pattern
let pat = inject(?Missing, { 0: ?targets })
run(facet) do (turn: var Turn):
onPublish(turn, ds, pat) do (
willBuild: StringSet,
willSubstitute: StringSet,
unknown: StringSet,
downloadSize: BiggestInt,
narSize: BiggestInt
):
let miss = Missing(
willBuild: willBuild,
willSubstitute: willSubstitute,
unknown: unknown,
downloadSize: downloadSize,
narSize: narSize,
)
asyncCheck(turn, send(client, miss))
of wopSetOptions:
await discardWords(client, 12)
# 01 keepFailed
# 02 keepGoing
# 03 tryFallback
# 04 verbosity
# 05 maxBuildJobs
# 06 maxSilentTime
# 07 useBuildHook
# 08 verboseBuild
# 09 logType
# 10 printBuildTrace
# 11 buildCores
# 12 useSubstitutes
let overridePairCount = await recvWord(client)
for _ in 1..overridePairCount:
discard await (recvString(client))
discard await (recvString(client))
await sendWorkEnd(client)
# all options from the client are ingored
else:
let msg = "unhandled worker op " & $wop
await sendNext(client, msg)
await sendWorkEnd(client)
close(client.socket)
proc serveClientSide(facet: Facet; ds: Cap; store: ErisStore; listener: AsyncSocket) {.async.} =
while not listener.isClosed:
let
client = await accept(listener)
fut = serveClient(facet, ds, store, newSession(client))
addCallback(fut) do ():
if not client.isClosed:
close(client)
proc bootClientSide*(turn: var Turn; ds: Cap; store: ErisStore; socketPath: string) =
let listener = newUnixSocket()
onStop(turn.facet) do (turn: var Turn):
close(listener)
removeFile(socketPath)
removeFile(socketPath)
bindUnix(listener, socketPath)
listen(listener)
asyncCheck(turn, serveClientSide(turn.facet, ds, store, listener))

View File

@ -1,188 +0,0 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[asyncdispatch, asyncnet, sets, strutils]
from std/algorithm import sort
import eris
import preserves, syndicate
from syndicate/protocols/dataspace import Observe
import ./protocol, ./sockets
type
Value = Preserve[void]
Observe = dataspace.Observe[Cap]
proc recvError(daemon: Session): Future[string] {.async.} =
discard #[typ]# await recvString(daemon)
discard #[lvl]# await recvWord(daemon)
discard #[name]# await recvString(daemon)
let msg = #[msg]# await recvString(daemon)
discard #[havePos]# await recvWord(daemon)
let nrTraces = await recvWord(daemon)
for i in 1..nrTraces:
discard #[havPos]# await recvWord(daemon)
discard #[msg]# await recvString(daemon)
return msg
proc recvFields(daemon: Session) {.async.} =
let count = await recvWord(daemon)
for i in 0..<count:
let typ = await recvWord(daemon)
case typ
of 0: discard await recvWord(daemon)
of 1: discard await recvString(daemon)
else: raiseAssert "unknown field type " & $typ
proc recvWork(daemon: Session) {.async.} =
while true:
let word = await recvWord(daemon)
case word
of STDERR_WRITE:
discard await recvString(daemon)
of STDERR_READ:
await send(daemon, "")
of STDERR_ERROR:
let err = await recvError(daemon)
raise newException(ProtocolError, "Nix daemon: " & err)
of STDERR_NEXT:
let msg = await recvString(daemon)
stderr.writeLine("Nix daemon: ", msg)
of STDERR_START_ACTIVITY:
discard await recvWord(daemon) # id
discard await recvWord(daemon) # level
discard await recvWord(daemon) # type
discard await recvString(daemon) # text
await recvFields(daemon) # fields
discard await recvWord(daemon) # parent
of STDERR_STOP_ACTIVITY:
discard await recvWord(daemon) # id
of STDERR_RESULT:
discard await recvWord(daemon) # id
discard await recvWord(daemon) # type
await recvFields(daemon) # fields
of STDERR_LAST:
break
else:
raise newException(ProtocolError, "unknown work verb " & $word)
proc connectDaemon(daemon: Session; socketPath: string) {.async.} =
await connectUnix(daemon.socket, socketPath)
await send(daemon, WORKER_MAGIC_1)
let daemonMagic = await recvWord(daemon)
if daemonMagic != WORKER_MAGIC_2:
raise newException(ProtocolError, "bad magic from daemon")
let daemonVersion = await recvWord(daemon)
daemon.version = min(Version daemonVersion, PROTOCOL_VERSION)
await send(daemon, Word daemon.version)
await send(daemon, 0) # CPU affinity
await send(daemon, 0) # reserve space
if daemon.version.minor >= 33:
discard await recvString(daemon) # version
if daemon.version.minor >= 35:
discard await recvWord(daemon) # remoteTrustsUs
await recvWork(daemon)
proc queryMissing(daemon: Session; targets: StringSeq): Future[Missing] {.async.} =
var miss = Missing(targets: targets)
await send(daemon, Word wopQueryMissing)
await send(daemon, miss.targets)
await recvWork(daemon)
miss.willBuild = await recvStringSet(daemon)
miss.willSubstitute = await recvStringSet(daemon)
miss.unknown = await recvStringSet(daemon)
miss.downloadSize = BiggestInt await recvWord(daemon)
miss.narSize = BiggestInt await recvWord(daemon)
return miss
proc queryPathInfo(daemon: Session; path: string): Future[LegacyPathAttrs] {.async.} =
var info: LegacyPathAttrs
await send(daemon, Word wopQueryPathInfo)
await send(daemon, path)
await recvWork(daemon)
let valid = await recvWord(daemon)
if valid != 0:
info.deriver = await recvString(daemon)
info.narHash = await recvString(daemon)
info.references = await recvStringSeq(daemon)
sort(info.references)
info.registrationTime = BiggestInt await recvWord(daemon)
info.narSize = BiggestInt await recvWord(daemon)
info.ultimate = (await recvWord(daemon)) != 0
info.sigs = await recvStringSet(daemon)
info.ca = await recvString(daemon)
return info
proc recvLegacyPathAttrs(daemon: Session): Future[AddToStoreAttrs] {.async.} =
var info: AddToStoreAttrs
info.deriver = await recvString(daemon)
info.narHash = await recvString(daemon)
info.references = await recvStringSeq(daemon)
sort(info.references)
info.registrationTime = BiggestInt await recvWord(daemon)
info.narSize = BiggestInt await recvWord(daemon)
assert daemon.version.minor >= 16
info.ultimate = (await recvWord(daemon)) != 0
info.sigs = await recvStringSet(daemon)
info.ca = await recvString(daemon)
return info
proc addToStore(daemon: Session; store: ErisStore; request: AddToStoreClientAttrs): Future[(string, AddToStoreAttrs)] {.async.} =
let erisCap = parseCap(request.eris)
await send(daemon, Word wopAddToStore)
await send(daemon, request.name)
await send(daemon, string request.`ca-method`)
await send(daemon, request.references)
await send(daemon, 0) # repair
await recoverChunks(daemon, store, erisCap)
await recvWork(daemon)
let path = await recvString(daemon)
var info = await recvLegacyPathAttrs(daemon)
info.eris = request.eris
info.`ca-method` = request.`ca-method`
info.name = request.name
info.references = request.references
return (path, info)
proc callDaemon(turn: var Turn; path: string; action: proc (daemon: Session; turn: var Turn) {.gcsafe.}): Session =
let
daemon = newSession()
fut = connectDaemon(daemon, path)
addCallback(fut, turn) do (turn: var Turn):
read(fut)
action(daemon, turn)
return daemon
proc bootDaemonSide*(turn: var Turn; ds: Cap; store: ErisStore; socketPath: string) =
during(turn, ds, ?Observe(pattern: !Missing) ?? {0: grab()}) do (targets: Literal[StringSeq]):
# cannot use `grabLit` here because an array is a compound
# TODO: unpack to a `Pattern`
let daemon = callDaemon(turn, socketPath) do (daemon: Session; turn: var Turn):
let missFut = queryMissing(daemon, targets.value)
addCallback(missFut, turn) do (turn: var Turn):
close(daemon)
var miss = read(missFut)
discard publish(turn, ds, miss)
do:
close(daemon)
during(turn, ds, ?Observe(pattern: !PathInfo) ?? {0: grabLit()}) do (path: string):
let daemon = callDaemon(turn, socketPath) do (daemon: Session; turn: var Turn):
let infoFut = queryPathInfo(daemon, path)
addCallback(infoFut, turn) do (turn: var Turn):
close(daemon)
var info = read(infoFut)
discard publish(turn, ds, initRecord("path", path.toPreserve, info.toPreserve))
do:
close(daemon)
during(turn, ds, ?Observe(pattern: !PathInfo) ?? {1: grabDict()}) do (request: Literal[AddToStoreClientAttrs]):
let daemon = callDaemon(turn, socketPath) do (daemon: Session; turn: var Turn):
let fut = addToStore(daemon, store, request.value)
addCallback(fut, turn) do (turn: var Turn):
close(daemon)
var (path, info) = read(fut)
discard publish(turn, ds, initRecord("path", path.toPreserve, info.toPreserve))
do:
close(daemon)

View File

@ -1,106 +0,0 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import ./stdpuspus, ./store
{.passC: staticExec("pkg-config --cflags nix-expr").}
{.passL: staticExec("pkg-config --libs nix-expr").}
proc parentDir(path: string): string =
var i = path.high
while path[i] != '/': dec(i)
path[0..i]
{.passC: "-I" & parentDir(currentSourcePath).}
type
NixInt* = int64
NixFloat* = float64
ValueKind* {.importcpp: "nix::ValueType", header: "value.hh".} = enum
nThunk,
nInt,
nFloat,
nBool,
nString,
nPath,
nNull,
nAttrs,
nList,
nFunction,
nExternal,
Value* = ValueObj | ValuePtr
ValuePtr* = ptr ValueObj
ValueObj* {.importcpp: "nix::Value", header: "value.hh".} = object
integer*: NixInt
boolean*: bool
string: StringContext
path*: cstring
fpoint*: NixFloat
attrs: Bindings
StringContext = object
s: cstring
Symbol* {.importcpp: "nix::Symbol", header: "symbol-table.hh".} = object
discard
Attr {.importcpp: "nix::Attr", header: "attr-set.hh".} = object
name: Symbol
value: ValuePtr
Bindings = ptr BindginsObj
BindginsObj {.importcpp: "nix::Bindings", header: "attr-set.hh".} = object
discard
proc kind*(val: Value): ValueKind {.importcpp: "#.type()".}
proc showType*(val: Value): StdString {.importcpp.}
proc shallowString*(val: Value): string =
if val.kind != nString:
raise newException(FieldDefect, "Value not an attribute set")
$val.string.s
proc size(bindings: Bindings): csize_t {.importcpp.}
proc `[]`(b: Bindings; i: Natural): Attr {.importcpp: "(*#)[#]".}
iterator pairs*(val: Value): (Symbol, ValuePtr) =
if val.kind != nAttrs:
raise newException(FieldDefect, "Value not an attribute set")
for i in 0..<val.attrs.size():
let attr = val.attrs[i]
yield (attr.name, attr.value)
proc listSize(val: Value): csize_t {.importcpp.}
proc listElems(val: Value): ptr UncheckedArray[ValuePtr] {.importcpp.}
iterator items*(val: Value): ValuePtr =
if val.kind != nList:
raise newException(FieldDefect, "Value not a list")
for i in 0..<val.listSize:
yield val.listElems()[i]
type
ExprObj {.importcpp: "nix::Expr", header: "nixexpr.hh".} = object
discard
Expr* = ptr ExprObj
EvalState* {.importcpp: "std::shared_ptr<nix::EvalState>", header: "eval.hh".} = object
discard
proc newEvalState*(store: Store): EvalState {.
importcpp: "nix::newEvalState(@)", header: "seepuspus.hh", constructor.}
proc parseExprFromString*(state: EvalState; s, basePath: cstring): Expr {.
importcpp: "#->parseExprFromString(@)".}
proc eval*(state: EvalState; expr: Expr; value: var ValueObj) {.
importcpp: "#->eval(@)".}
proc forceValueDeep*(state: EvalState; value: var ValueObj) {.
importcpp: "#->forceValueDeep(@)".}
proc stringView(state: EvalState; sym: Symbol): StringView {.
importcpp: "((std::string_view)#->symbols[#])".}
proc symbolString*(state: EvalState; sym: Symbol): string = $stringView(state, sym)
proc initGC*() {.importcpp: "nix::initGC", header: "eval.hh".}

View File

@ -1,21 +0,0 @@
#pragma once
#include "eval.hh"
namespace nix {
std::shared_ptr<nix::EvalState> newEvalState(ref<Store> store)
{
auto searchPath = Strings();
auto evalState =
#if HAVE_BOEHMGC
std::allocate_shared<EvalState>(
traceable_allocator<EvalState>(), searchPath, store, store)
#else
std::make_shared<EvalState>(
searchPath, store, store)
#endif
;
return evalState;
}
}

View File

@ -1,33 +0,0 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
type StdException* {.importcpp: "std::exception", header: "<exception>".} = object
proc what*(ex: StdException): cstring {.importcpp: "((char *)#.what())", nodecl.}
type StdString* {.importcpp: "std::string", header: "<string>".} = object
proc c_str*(s: StdString): cstring {.importcpp.}
type StringView* {.importcpp: "std::string_view", header: "<string>".} = object
proc toStringView*(s: pointer; count: int): StringView {.
importcpp: "std::string_view(static_cast<const char *>(#), #)", constructor.}
proc toStringView*(s: string): StringView {.inline.} =
if s.len == 0: toStringView(nil, 0)
else: toStringView(unsafeAddr s[0], s.len)
proc toStringView*(buf: openarray[byte]): StringView {.inline.} =
if buf.len == 0: toStringView(nil, 0)
else: toStringView(unsafeAddr buf[0], buf.len)
proc toStringView*(sv: StringView): StringView {.inline.} = sv
proc data(sv: StringView): pointer {.importcpp.}
proc size(sv: StringView): csize_t {.importcpp.}
proc `$`*(sv: StringView): string =
result = newString(sv.size)
copyMem(addr result[0], sv.data, result.len)

View File

@ -1,58 +0,0 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import ./nix_api_types, ./nix_api_util, ./nix_api_value, ./nix_api_store, ./nix_api_expr
export NixContext, Store, State, Value, ValueType,
gc_decref, isNil
{.passC: staticExec("pkg-config --cflags nix-expr-c").}
{.passL: staticExec("pkg-config --libs nix-expr-c").}
# Always pass NixContext as a nil pointer.
proc initLibexpr* =
var ctx: NixContext
discard libexpr_init(ctx)
proc openStore*(uri: string, params: varargs[string]): Store =
var ctx: NixContext
var args = allocCStringArray(params)
defer: deallocCStringArray(args)
result = store_open(ctx, uri, addr args)
proc openStore*(): Store =
var ctx: NixContext
result = store_open(ctx, nil, nil)
proc close*(store: Store) = store_unref(store)
proc newState*(store: Store; searchPath: varargs[string]): State =
var ctx: NixContext
var path = allocCStringArray(searchPath)
defer: deallocCStringArray(path)
result = state_create(ctx, path, store)
proc close*(state: State) = state_free(state)
proc newValue*(state: State): Value =
var ctx: NixContext
alloc_value(ctx, state)
proc evalFromString*(state: State; expr, path: string): Value =
var ctx: NixContext
result = alloc_value(ctx, state)
discard expr_eval_from_string(ctx, state, expr, path, result)
proc close*(value: Value) =
var ctx: NixContext
discard gc_decref(ctx, cast[pointer](value))
proc force*(state: State; value: Value) =
var ctx: NixContext
discard value_force(ctx, state, value)
proc get_attr_byidx*(ctx: NixContext; value: Value; state: State; i: cuint): (cstring, Value) =
var ctx: NixContext
result[1] = get_attr_byidx(ctx, value, state, i, addr result[0])

View File

@ -1,28 +0,0 @@
## Module generated by c2nim for nix_api_expr.h
import ./nix_api_types
{.pragma: nix_api_expr, header: "nix_api_expr.h", importc: "nix_$1".}
proc libexpr_init*(context: NixContext): nix_err {.nix_api_expr.}
proc expr_eval_from_string*(context: NixContext; state: State; expr: cstring; path: cstring; value: Value): nix_err {.nix_api_expr.}
proc value_call*(context: NixContext; state: State; fn: Value; arg: Value; value: Value): nix_err {.nix_api_expr.}
proc value_force*(context: NixContext; state: State; value: Value): nix_err {.nix_api_expr.}
proc value_force_deep*(context: NixContext; state: State; value: Value): nix_err {.nix_api_expr.}
proc state_create*(context: NixContext; searchPath: cstringArray; store: Store): State {.nix_api_expr.}
proc state_free*(state: State) {.nix_api_expr.}
proc gc_incref*(context: NixContext; `object`: pointer): nix_err {.nix_api_expr.}
proc gc_decref*(context: NixContext; `object`: pointer): nix_err {.nix_api_expr.}
proc gc_now*() {.nix_api_expr.}
proc gc_register_finalizer*(obj: pointer; cd: pointer; finalizer: proc (obj: pointer; cd: pointer)) {.nix_api_expr.}

View File

@ -1,26 +0,0 @@
## Module generated by c2nim for nix_api_store.h
import ./nix_api_types
{.pragma: nix_api_store, header: "nix_api_store.h", importc: "nix_$1".}
proc libstore_init*(context: NixContext): nix_err {.nix_api_store.}
proc init_plugins*(context: NixContext): nix_err {.nix_api_store.}
proc store_open*(a1: NixContext; uri: cstring; params: ptr cstringArray): Store {.nix_api_store.}
proc store_unref*(store: Store) {.nix_api_store.}
proc store_get_uri*(context: NixContext; store: Store; dest: cstring; n: cuint): nix_err {.nix_api_store.}
proc store_parse_path*(context: NixContext; store: Store; path: cstring): StorePath {.nix_api_store.}
proc store_path_free*(p: StorePath) {.nix_api_store.}
proc store_is_valid_path*(context: NixContext; store: Store; path: StorePath): bool {.nix_api_store.}
proc store_build*(context: NixContext; store: Store; path: StorePath; userdata: pointer; callback: proc (userdata: pointer; outname: cstring; `out`: cstring)): nix_err {.nix_api_store.}
proc store_get_version*(a1: NixContext; store: Store; dest: cstring; n: cuint): nix_err {.nix_api_store.}

View File

@ -1,35 +0,0 @@
## Module generated by c2nim for nix_api_util.h
import ./nix_api_types
{.pragma: nix_api_util, header: "nix_api_util.h", importc: "nix_$1".}
{.pragma: importUtil, header: "nix_api_util.h", importc.}
var
NIX_OK* {.importUtil.}: cint
NIX_ERR_UNKNOWN* {.importUtil.}: cint
NIX_ERR_OVERFLOW* {.importUtil.}: cint
NIX_ERR_KEY* {.importUtil.}: cint
NIX_ERR_NIX_ERROR* {.importUtil.}: cint
proc c_context_create*(): NixContext {.nix_api_util.}
proc c_context_free*(context: NixContext) {.nix_api_util.}
proc libutil_init*(context: NixContext): nix_err {.nix_api_util.}
proc setting_get*(context: NixContext; key: cstring; value: cstring; n: cint): nix_err {.nix_api_util.}
proc setting_set*(context: NixContext; key: cstring; value: cstring): nix_err {.nix_api_util.}
proc version_get*(): cstring {.nix_api_util.}
proc err_msg*(context: NixContext; ctx: NixContext; n: ptr cuint): cstring {.nix_api_util.}
proc err_info_msg*(context: NixContext; read_context: NixContext; value: cstring; n: cint): nix_err {.nix_api_util.}
proc err_name*(context: NixContext; read_context: NixContext; value: cstring; n: cint): nix_err {.nix_api_util.}
proc err_code*(read_context: NixContext): nix_err {.nix_api_util.}
proc set_err_msg*(context: NixContext; err: nix_err; msg: cstring): nix_err {.nix_api_util.}

View File

@ -1,72 +0,0 @@
## Module generated by c2nim for nix_api_value.h
import ./nix_api_types
type
PrimOpFun* = proc (user_data: pointer; context: NixContext; state: ptr State; args: ptr Value; ret: Value)
# proc alloc_primop*(context: NixContext; fun: PrimOpFun; arity: cint; name: cstring; args: cstringArray; doc: cstring; user_data: pointer): ptr PrimOp {.importc: "nix_$1", header: "nix_api_value.h".}
# proc register_primop*(context: NixContext; primOp: ptr PrimOp): nix_err {.importc: "nix_$1", header: "nix_api_value.h".}
proc alloc_value*(context: NixContext; state: State): Value {.importc: "nix_$1", header: "nix_api_value.h".}
proc get_type*(context: NixContext; value: Value): ValueType {.importc: "nix_$1", header: "nix_api_value.h".}
proc get_typename*(context: NixContext; value: Value): cstring {.importc: "nix_$1", header: "nix_api_value.h".}
proc get_bool*(context: NixContext; value: Value): bool {.importc: "nix_$1", header: "nix_api_value.h".}
proc get_string*(context: NixContext; value: Value): cstring {.importc: "nix_$1", header: "nix_api_value.h".}
proc get_path_string*(context: NixContext; value: Value): cstring {.importc: "nix_$1", header: "nix_api_value.h".}
proc get_list_size*(context: NixContext; value: Value): cuint {.importc: "nix_$1", header: "nix_api_value.h".}
proc get_attrs_size*(context: NixContext; value: Value): cuint {.importc: "nix_$1", header: "nix_api_value.h".}
proc get_float*(context: NixContext; value: Value): cdouble {.importc: "nix_$1", header: "nix_api_value.h".}
proc get_int*(context: NixContext; value: Value): int64 {.importc: "nix_$1", header: "nix_api_value.h".}
# proc get_external*(context: NixContext; a2: Value): ptr ExternalValue {.importc: "nix_$1", header: "nix_api_value.h".}
proc get_list_byidx*(context: NixContext; value: Value; state: State; ix: cuint): Value {.importc: "nix_$1", header: "nix_api_value.h".}
proc get_attr_byname*(context: NixContext; value: Value; state: State; name: cstring): Value {.importc: "nix_$1", header: "nix_api_value.h".}
proc has_attr_byname*(context: NixContext; value: Value; state: State; name: cstring): bool {.importc: "nix_$1", header: "nix_api_value.h".}
proc get_attr_byidx*(context: NixContext; value: Value; state: State; i: cuint; name: ptr cstring): Value {.importc: "nix_$1", header: "nix_api_value.h".}
proc get_attr_name_byidx*(context: NixContext; value: Value; state: State; i: cuint): cstring {.importc: "nix_$1", header: "nix_api_value.h".}
proc set_bool*(context: NixContext; value: Value; b: bool): nix_err {.importc: "nix_$1", header: "nix_api_value.h".}
proc set_string*(context: NixContext; value: Value; str: cstring): nix_err {.importc: "nix_$1", header: "nix_api_value.h".}
proc set_path_string*(context: NixContext; value: Value; str: cstring): nix_err {.importc: "nix_$1", header: "nix_api_value.h".}
proc set_float*(context: NixContext; value: Value; d: cdouble): nix_err {.importc: "nix_$1", header: "nix_api_value.h".}
proc set_int*(context: NixContext; value: Value; i: int64): nix_err {.importc: "nix_$1", header: "nix_api_value.h".}
proc set_null*(context: NixContext; value: Value): nix_err {.importc: "nix_$1", header: "nix_api_value.h".}
# proc set_external*(context: NixContext; value: Value; val: ptr ExternalValue): nix_err {.importc: "nix_$1", header: "nix_api_value.h".}
proc make_list*(context: NixContext; s: State; value: Value; size: cuint): nix_err {.importc: "nix_$1", header: "nix_api_value.h".}
proc set_list_byidx*(context: NixContext; value: Value; ix: cuint; elem: Value): nix_err {.importc: "nix_$1", header: "nix_api_value.h".}
# proc make_attrs*(context: NixContext; value: Value; b: ptr BindingsBuilder): nix_err {.importc: "nix_$1", header: "nix_api_value.h".}
# proc set_primop*(context: NixContext; value: Value; op: ptr PrimOp): nix_err {.importc: "nix_$1", header: "nix_api_value.h".}
proc copy_value*(context: NixContext; value: Value; source: Value): nix_err {.importc: "nix_$1", header: "nix_api_value.h".}
# proc make_bindings_builder*(context: NixContext; state: State; capacity: csize_t): ptr BindingsBuilder {.importc: "nix_$1", header: "nix_api_value.h".}
# proc bindings_builder_insert*(context: NixContext; builder: ptr BindingsBuilder; name: cstring; value: Value): nix_err {.importc: "nix_$1", header: "nix_api_value.h".}
# proc bindings_builder_free*(builder: ptr BindingsBuilder) {.importc: "nix_$1", header: "nix_api_value.h".}

View File

@ -1,369 +0,0 @@
type
Snoop = ref object
client, daemon: AsyncSocket
buffer: seq[Word]
version: Version
type ValidPathInfo = object
path: string
deriver: string
narHash: string
references: StringSet
registrationTime, narSize: BiggestInt
ultimate: bool
sigs: StringSet
ca: string
proc send(session: Snoop; sock: AsyncSocket; words: varargs[Word]): Future[void] =
for i, word in words: session.buffer[i] = word
send(sock, addr session.buffer[0], words.len shl 3)
proc send(session: Snoop; sock: AsyncSocket; s: string): Future[void] =
let wordCount = (s.len + 7) shr 3
if wordCount > session.buffer.len: setLen(session.buffer, wordCount)
session.buffer[0] = Word s.len
if wordCount > 0:
session.buffer[wordCount] = 0x00
copyMem(addr session.buffer[1], unsafeAddr s[0], s.len)
send(sock, addr session.buffer[0], (succ wordCount) shl 3)
proc passWord(a, b: AsyncSocket): Future[Word] {.async.} =
var w = await recvWord(a)
await send(b, addr w, sizeof(Word))
return w
proc passString(session: Snoop; a, b: AsyncSocket): Future[string] {.async.} =
var s = await recvString(a)
await send(session, b, s)
return s
proc passStringSeq(session: Snoop; a, b: AsyncSocket): Future[seq[string]] {.async.} =
let count = int(await passWord(a, b))
var strings = newSeq[string](count)
for i in 0..<count: strings[i] = await passString(session, a, b)
return strings
proc passStringSet(session: Snoop; a, b: AsyncSocket): Future[StringSet] {.async.} =
let count = int(await passWord(a, b))
var strings = initHashSet[string](count)
for i in 0..<count: incl(strings, await passString(session, a, b))
return strings
proc passStringMap(session: Snoop; a, b: AsyncSocket): Future[StringTableCap] {.async.} =
var table = newStringTable(modeCaseSensitive)
let n = await passWord(a, b)
for i in 1..n:
var
key = await passString(session, a, b)
val = await passString(session, a, b)
table[key] = val
return table
proc passClientWord(session: Snoop): Future[Word] =
passWord(session.client, session.daemon)
proc passDaemonWord(session: Snoop): Future[Word] =
passWord(session.daemon, session.client)
proc passClientString(session: Snoop): Future[string] =
passString(session, session.client, session.daemon)
proc passDaemonString(session: Snoop): Future[string] =
passString(session, session.daemon, session.client)
proc passClientStringSeq(session: Snoop): Future[seq[string]] =
passStringSeq(session, session.client, session.daemon)
proc passDaemonStringSeq(session: Snoop): Future[seq[string]] =
passStringSeq(session, session.daemon, session.client)
proc passClientStringSet(session: Snoop): Future[StringSet] =
passStringSet(session, session.client, session.daemon)
proc passDaemonStringSet(session: Snoop): Future[StringSet] =
passStringSet(session, session.daemon, session.client)
proc passClientStringMap(session: Snoop): Future[StringTableCap] =
passStringMap(session, session.client, session.daemon)
proc passDaemonStringMap(session: Snoop): Future[StringTableCap] =
passStringMap(session, session.daemon, session.client)
proc passDaemonValidPathInfo(session: Snoop; includePath: bool): Future[PathInfo] {.async.} =
var info: PathInfo
if includePath:
info.path = await passDaemonString(session)
info.deriver = await passDaemonString(session)
info.narHash = await passDaemonString(session)
info.references = await passDaemonStringSet(session)
info.registrationTime = BiggestInt(await passDaemonWord(session))
info.narSize = BiggestInt(await passDaemonWord(session))
assert session.version.minor >= 16
info.ultimate = (await passDaemonWord(session)) != 0
info.sigs = await passDaemonStringSet(session)
info.ca = await passDaemonString(session)
return info
proc passChunks(session: Snoop; a, b: AsyncSocket): Future[int] {.async.} =
var total: int
while true:
let chunkLen = int(await passWord(a, b))
if chunkLen == 0:
break
else:
let wordLen = (chunkLen + 7) shr 3
if session.buffer.len < wordLen: setLen(session.buffer, wordLen)
let recvLen = await recvInto(a, addr session.buffer[0], chunkLen)
# each chunk must be recved contiguously
if recvLen != chunkLen:
raise newException(ProtocolError, "invalid chunk read")
await send(b, addr session.buffer[0], recvLen)
inc(total, recvLen)
return total
proc passClientChunks(session: Snoop): Future[int] =
passChunks(session, session.client, session.daemon)
proc passErrorDaemonError(session: Snoop) {.async.} =
let
typ = await passDaemonString(session)
assert typ == "Error"
let
lvl = await passDaemonWord(session)
name = await passDaemonString(session)
msg = passDaemonString(session)
havePos = await passDaemonWord(session)
assert havePos == 0
let
nrTraces = await passDaemonWord(session)
for i in 1..nrTraces:
let havPos = await passDaemonWord(session)
assert havPos == 0
let msg = await passDaemonString(session)
proc passDaemonFields(session: Snoop): Future[Fields] {.async.} =
let count = await passDaemonWord(session)
var fields = newSeq[Field](count)
for i in 0..<count:
let typ = await passDaemonWord(session)
case typ
of 0:
let num = await passDaemonWord(session)
fields[i] = Field(orKind: FieldKind.int, int: int num)
of 1:
let str = await passDaemonString(session)
fields[i] = Field(orKind: FieldKind.string, string: str)
else:
raiseAssert "unknown field type " & $typ
return fields
proc passWork(session: Snoop) {.async.} =
while true:
let word = await passDaemonWord(session)
case word
of STDERR_WRITE:
discard await passDaemonString(session)
of STDERR_READ:
discard await passClientString(session)
of STDERR_ERROR:
assert session.version.minor >= 26
await passErrorDaemonError(session)
of STDERR_NEXT:
let s = await passDaemonString(session)
of STDERR_START_ACTIVITY:
var act: ActionStart
act.id = BiggestInt(await passDaemonWord(session))
act.level = BiggestInt(await passDaemonWord(session))
act.`type` = BiggestInt(await passDaemonWord(session))
act.text = await passDaemonString(session)
act.fields = await passDaemonFields(session)
act.parent = BiggestInt(await passDaemonWord(session))
of STDERR_STOP_ACTIVITY:
var act: ActionStop
act.id = BiggestInt(await passDaemonWord(session))
of STDERR_RESULT:
var act: ActionResult
act.id = BiggestInt(await passDaemonWord(session))
act.`type` = BiggestInt(await passDaemonWord(session))
act.fields = await passDaemonFields(session)
of STDERR_LAST:
break
else:
raise newException(ProtocolError, "unknown work verb " & $word)
#[
proc fromClient(miss: var Missing; socket: AsyncSocket) {.async.} =
result.targets = await passClientStringSet(session)
proc fromDaemon(miss: var Missing; socket: AsyncSocket) {.async.} =
miss.willBuild = await passDaemonStringSet(session)
miss.willSubstitute = await passDaemonStringSet(session)
miss.unknown = await passDaemonStringSet(session)
miss.downloadSize = BiggestInt await passDaemonWord(session)
miss.narSize = BiggestInt await passDaemonWord(session)
]#
proc loop(session: Snoop) {.async.} =
var chunksTotal: int
try:
while not session.client.isClosed:
let wop = await passClientWord(session)
case wop
of wopIsValidPath:
let path = await passClientString(session)
stderr.writeLine "wopIsValidPath ", path
await passWork(session)
let word = await passDaemonWord(session)
of wopAddToStore:
assert session.version.minor >= 25
let
name = await passClientString(session)
caMethod = await passClientString(session)
refs = await passClientStringSet(session)
repairBool = await passClientWord(session)
stderr.writeLine "wopAddToStore ", name
let n = await passClientChunks(session)
inc(chunksTotal, n)
await passWork(session)
let info = await passDaemonValidPathInfo(session, true)
of wopAddTempRoot:
let path = await passClientString(session)
stderr.writeLine "wopAddTempRoot ", path
await passWork(session)
discard await passDaemonWord(session)
of wopAddIndirectRoot:
let path = await passClientString(session)
stderr.writeLine "wopAddIndirectRoot ", path
await passWork(session)
discard await passDaemonWord(session)
of wopSetOptions:
discard passClientWord(session) # keepFailed
discard passClientWord(session) # keepGoing
discard passClientWord(session) # tryFallback
discard passClientWord(session) # verbosity
discard passClientWord(session) # maxBuildJobs
discard passClientWord(session) # maxSilentTime
discard passClientWord(session) # useBuildHook
discard passClientWord(session) # verboseBuild
discard passClientWord(session) # logType
discard passClientWord(session) # printBuildTrace
discard passClientWord(session) # buildCores
discard passClientWord(session) # useSubstitutes
assert session.version.minor >= 12
let overrides = await passClientStringMap(session)
await passWork(session)
of wopQueryPathInfo:
assert session.version >= 17
let path = await passClientString(session)
stderr.writeLine "wopQueryPathInfo ", path
await passWork(session)
let valid = await passDaemonWord(session)
if valid != 0:
var info = await passDaemonValidPathInfo(session, false)
info.path = path
stderr.writeLine "wopQueryPathInfo ", $info
of wopQueryMissing:
assert session.version >= 30
var miss: Missing
miss.targets = await passClientStringSeq(session)
await passWork(session)
miss.willBuild = await passDaemonStringSet(session)
miss.willSubstitute = await passDaemonStringSet(session)
miss.unknown = await passDaemonStringSet(session)
miss.downloadSize = BiggestInt await passDaemonWord(session)
miss.narSize = BiggestInt await passDaemonWord(session)
stderr.writeLine "wopQueryMissing ", $miss
of wopBuildPathsWithResults:
assert session.version >= 34
let
drvs = await passClientStringSeq(session)
buildMode = await passClientWord(session)
stderr.writeLine "wopBuildPathsWithResults drvs ", $drvs
await passWork(session)
let count = await passDaemonWord(session)
for _ in 1..count:
let
path = await passDaemonString(session)
status = await passDaemonWord(session)
errorMsg = await passDaemonString(session)
timesBUild = await passDaemonWord(session)
isNonDeterministic = await passDaemonWord(session)
startTime = await passDaemonWord(session)
stopTime = await passDaemonWord(session)
outputs = await passDaemonStringMap(session)
else:
stderr.writeLine "unknown worker op ", wop.int
break
except ProtocolError as err:
stderr.writeLine "connection terminated"
stderr.writeLine "chunk bytes transfered: ", formatSize(chunksTotal)
finally:
close(session.daemon)
close(session.client)
proc handshake(listener: AsyncSocket): Future[Snoop] {.async.} =
## Take the next connection from `listener` and return a `Session`.
let session = Snoop(buffer: newSeq[Word](1024)) # 8KiB
session.client = await listener.accept()
session.daemon = newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false,
)
await connectUnix(session.daemon, daemonSocketPath())
let clientMagic = await passClientWord(session)
if clientMagic != WORKER_MAGIC_1:
raise newException(ProtocolError, "invalid protocol magic")
let daemonMagic = await passDaemonWord(session)
let daemonVersion = await passDaemonWord(session)
session.version = Version(await passClientWord(session))
if session.version < PROTOCOL_VERSION:
raise newException(ProtocolError, "obsolete protocol version")
assert session.version.minor >= 14
discard await(passClientWord(session))
# obsolete CPU affinity
assert session.version.minor >= 11
discard await(passClientWord(session))
# obsolete reserveSpace
assert session.version.minor >= 33
let daemonVersionString = await passDaemonString(session)
assert daemonVersionString == $store.nixVersion
await passWork(session)
return session
proc emulateSocket*(path: string) {.async, gcsafe.} =
let listener = newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false)
bindUnix(listener, path)
listen(listener)
stderr.writeLine "listening on ", path
while not listener.isClosed:
try:
let session = await handshake(listener)
assert not session.isNil
asyncCheck loop(session)
except ProtocolError:
close(session)
finally:
close(session)

View File

@ -5,35 +5,24 @@ import
type
Eval* {.preservesRecord: "eval".} = object
`expr`*: string
`path`*: string
`options`*: Table[Symbol, Preserve[void]]
`result`*: Preserve[void]
AttrSet* = Table[Symbol, Preserve[void]]
Realise* {.preservesRecord: "realise".} = object
`drv`*: string
`outputs`*: StringSeq
LegacyPathAttrs* {.preservesDictionary.} = object
`ca`*: string
`deriver`*: string
`narHash`*: string
`narSize`*: BiggestInt
`references`*: StringSeq
`registrationTime`*: BiggestInt
`sigs`*: StringSet
`ultimate`*: bool
`outputs`*: seq[string]
Missing* {.preservesRecord: "missing".} = object
`targets`*: StringSeq
`willBuild`*: StringSet
`willSubstitute`*: StringSet
`unknown`*: StringSet
`targets`*: HashSet[string]
`willBuild`*: HashSet[string]
`willSubstitute`*: HashSet[string]
`unknown`*: HashSet[string]
`downloadSize`*: BiggestInt
`narSize`*: BiggestInt
Narinfo* {.preservesRecord: "narinfo".} = object
`path`*: string
`info`*: AttrSet
`info`*: Dict
FieldKind* {.pure.} = enum
`int`, `string`
@ -46,30 +35,18 @@ type
`string`*: string
StringSet* = HashSet[string]
AddToStoreAttrs* {.preservesDictionary.} = object
`ca`*: string
`ca-method`*: Symbol
`deriver`*: string
`eris`*: seq[byte]
`name`*: string
`narHash`*: string
`narSize`*: BiggestInt
`references`*: StringSeq
`registrationTime`*: BiggestInt
`sigs`*: StringSet
`ultimate`*: bool
AddToStoreClientAttrs* {.preservesDictionary.} = object
`ca-method`*: Symbol
`eris`*: seq[byte]
`name`*: string
`references`*: StringSeq
PathInfo* {.preservesRecord: "path".} = object
PathInfo* {.preservesRecord: "path-info".} = object
`path`*: string
`attrs`*: AttrSet
`deriver`*: string
`narHash`*: string
`references`*: HashSet[string]
`registrationTime`*: BiggestInt
`narSize`*: BiggestInt
`ultimate`*: bool
`sigs`*: HashSet[string]
`ca`*: string
Dict* = Table[Symbol, Preserve[void]]
Build* {.preservesRecord: "nix-build".} = object
`input`*: string
`output`*: Preserve[void]
@ -83,12 +60,13 @@ type
`fields`*: Fields
`parent`*: BiggestInt
FieldString* = string
Instantiate* {.preservesRecord: "instantiate".} = object
`expr`*: string
`options`*: AttrSet
`options`*: Dict
`result`*: Preserve[void]
StringSeq* = seq[string]
FieldInt* = BiggestInt
ActionStop* {.preservesRecord: "stop".} = object
`id`*: BiggestInt
@ -97,32 +75,24 @@ type
`type`*: BiggestInt
`fields`*: Fields
proc `$`*(x: Eval | AttrSet | Realise | LegacyPathAttrs | Missing | Narinfo |
Field |
StringSet |
AddToStoreAttrs |
AddToStoreClientAttrs |
PathInfo |
proc `$`*(x: Eval | Realise | Missing | Narinfo | Field | PathInfo | Dict |
Build |
Fields |
ActionStart |
FieldString |
Instantiate |
StringSeq |
FieldInt |
ActionStop |
ActionResult): string =
`$`(toPreserve(x))
proc encode*(x: Eval | AttrSet | Realise | LegacyPathAttrs | Missing | Narinfo |
Field |
StringSet |
AddToStoreAttrs |
AddToStoreClientAttrs |
PathInfo |
proc encode*(x: Eval | Realise | Missing | Narinfo | Field | PathInfo | Dict |
Build |
Fields |
ActionStart |
FieldString |
Instantiate |
StringSeq |
FieldInt |
ActionStop |
ActionResult): seq[byte] =
encode(toPreserve(x))

View File

@ -1,213 +1,488 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
## Common module for communicating with Nix clients and daemons.
import std/[asyncdispatch, asyncnet, os, sets, strtabs, strutils]
from std/nativesockets import AF_INET, AF_UNIX, SOCK_STREAM, Protocol
import std/[asyncdispatch, asyncnet, sets, strtabs, strutils, tables]
from std/nativesockets import AF_UNIX, SOCK_STREAM, Protocol
import eris
import preserves, syndicate
import ./protocol
import preserves
import ./protocol, ./store
{.pragma: workerProtocol, importc, header: "worker-protocol.hh".}
type Word* = uint64
proc `[]=`*[T](attrs: var AttrSet; key: string; val: T) =
attrs[Symbol key] = val.toPreserve
type Word = uint64
proc `$`(w: Word): string = toHex(w)
const
WORKER_MAGIC_1* = 0x6E697863
WORKER_MAGIC_2* = 0x6478696F
PROTOCOL_VERSION* = 0x100 or 34
WORKER_MAGIC_1 = 0x6E697863
WORKER_MAGIC_2 = 0x6478696F
PROTOCOL_VERSION = 256 or 35
STDERR_NEXT* = 0x6F6C6d67
STDERR_READ* = 0x64617461
STDERR_WRITE* = 0x64617416
STDERR_LAST* = 0x616C7473
STDERR_ERROR* = 0x63787470
STDERR_START_ACTIVITY* = 0x53545254
STDERR_STOP_ACTIVITY* = 0x53544F50
STDERR_RESULT* = 0x52534C54
STDERR_NEXT = 0x6F6C6d67
STDERR_READ = 0x64617461
STDERR_WRITE = 0x64617416
STDERR_LAST = 0x616C7473
STDERR_ERROR = 0x63787470
STDERR_START_ACTIVITY = 0x53545254
STDERR_STOP_ACTIVITY = 0x53544F50
STDERR_RESULT = 0x52534C54
type WorkerOperation* = enum
wopInvalid = 0,
wopIsValidPath = 1,
wopHasSubstitutes = 3,
wopQueryPathHash = 4, # obsolete
wopQueryReferences = 5, # obsolete
wopQueryReferrers = 6,
wopAddToStore = 7,
wopAddTextToStore = 8, # obsolete since 1.25, Nix 3.0. Use wopAddToStore
wopBuildPaths = 9,
wopEnsurePath = 10,
wopAddTempRoot = 11,
wopAddIndirectRoot = 12,
wopSyncWithGC = 13,
wopFindRoots = 14,
wopExportPath = 16, # obsolete
wopQueryDeriver = 18, # obsolete
wopSetOptions = 19,
wopCollectGarbage = 20,
wopQuerySubstitutablePathInfo = 21,
wopQueryDerivationOutputs = 22, # obsolete
wopQueryAllValidPaths = 23,
wopQueryFailedPaths = 24,
wopClearFailedPaths = 25,
wopQueryPathInfo = 26,
wopImportPaths = 27, # obsolete
wopQueryDerivationOutputNames = 28, # obsolete
wopQueryPathFromHashPart = 29,
wopQuerySubstitutablePathInfos = 30,
wopQueryValidPaths = 31,
wopQuerySubstitutablePaths = 32,
wopQueryValidDerivers = 33,
wopOptimiseStore = 34,
wopVerifyStore = 35,
wopBuildDerivation = 36,
wopAddSignatures = 37,
wopNarFromPath = 38,
wopAddToStoreNar = 39,
wopQueryMissing = 40,
wopQueryDerivationOutputMap = 41,
wopRegisterDrvOutput = 42,
wopQueryRealisation = 43,
wopAddMultipleToStore = 44,
wopAddBuildLog = 45,
wopBuildPathsWithResults = 46,
wopIsValidPath = 1
wopHasSubstitutes = 3
wopQueryReferrers = 6
wopAddToStore = 7
wopBuildPaths = 9
wopEnsurePath = 10
wopAddTempRoot = 11
wopAddIndirectRoot = 12
wopSyncWithGC = 13
wopFindRoots = 14
wopSetOptions = 19
wopCollectGarbage = 20
wopQuerySubstitutablePathInfo = 21
wopQueryAllValidPaths = 23
wopQueryFailedPaths = 24
wopClearFailedPaths = 25
wopQueryPathInfo = 26
wopQueryPathFromHashPart = 29
wopQuerySubstitutablePathInfos = 30
wopQueryValidPaths = 31
wopQuerySubstitutablePaths = 32
wopQueryValidDerivers = 33
wopOptimiseStore = 34
wopVerifyStore = 35
wopBuildDerivation = 36
wopAddSignatures = 37
wopNarFromPath = 38
wopAddToStoreNar = 39
wopQueryMissing = 40
wopQueryDerivationOutputMap = 41
wopRegisterDrvOutput = 42
wopQueryRealisation = 43
wopAddMultipleToStore = 44
wopAddBuildLog = 45
wopBuildPathsWithResults = 46
type
ProtocolError* = object of IOError
Version* = uint16
ProtocolError = object of IOError
Version = uint16
Session = ref object
client, daemon: AsyncSocket
buffer: seq[Word]
version: Version
Session* = ref object
socket*: AsyncSocket
buffer*: seq[Word]
version*: Version
func major(version: Version): uint16 = version and 0xff00
func minor(version: Version): uint16 = version and 0x00ff
func major*(version: Version): uint16 = version and 0xff00
func minor*(version: Version): uint16 = version and 0x00ff
proc daemonSocketPath: string =
getEnv(
"NIX_DAEMON_SOCKET_PATH",
"/nix/var/nix/daemon-socket/socket")
proc close*(session: Session) =
close(session.socket)
reset(session.buffer)
proc send*(session: Session; words: varargs[Word]): Future[void] =
if session.buffer.len < words.len:
session.buffer.setLen(words.len)
proc send(session: Session; sock: AsyncSocket; words: varargs[Word]): Future[void] =
for i, word in words: session.buffer[i] = word
send(session.socket, addr session.buffer[0], words.len shl 3)
send(sock, addr session.buffer[0], words.len shl 3)
proc send*(session: Session; s: string): Future[void] =
let wordCount = 1 + ((s.len + 7) shr 3)
if session.buffer.len < wordCount: setLen(session.buffer, wordCount)
proc send(session: Session; sock: AsyncSocket; s: string): Future[void] =
let wordCount = (s.len + 7) shr 3
if wordCount > session.buffer.len: setLen(session.buffer, wordCount)
session.buffer[0] = Word s.len
if s != "":
session.buffer[pred wordCount] = 0x00
if wordCount > 0:
session.buffer[wordCount] = 0x00
copyMem(addr session.buffer[1], unsafeAddr s[0], s.len)
send(session.socket, addr session.buffer[0], wordCount shl 3)
send(sock, addr session.buffer[0], (1 + wordCount) shl 3)
proc send*(session: Session; ss: StringSeq|StringSet): Future[void] =
## Send a set of strings. The set is sent as a contiguous buffer.
session.buffer[0] = Word ss.len
var off = 1
for s in ss:
let
stringWordLen = (s.len + 7) shr 3
bufferWordLen = off+1+stringWordLen
if session.buffer.len < bufferWordLen:
setLen(session.buffer, bufferWordLen)
session.buffer[off] = Word s.len
session.buffer[off+stringWordLen] = 0 # clear the aligning bits
inc(off)
copyMem(addr session.buffer[off], unsafeAddr s[0], s.len)
inc(off, stringWordLen)
send(session.socket, addr session.buffer[0], off shl 3)
proc recvWord*(sock: AsyncSocket): Future[Word] {.async.} =
proc recvWord(sock: AsyncSocket): Future[Word] {.async.} =
var w: Word
let n = await recvInto(sock, addr w, sizeof(Word))
if n != sizeof(Word): raise newException(ProtocolError, "short read")
if n != sizeof(Word): raise newException(ProtocolError, "short read of word")
return w
proc recvWord*(session: Session): Future[Word] =
recvWord(session.socket)
proc passWord(a, b: AsyncSocket): Future[Word] {.async.} =
var w = await recvWord(a)
await send(b, addr w, sizeof(Word))
return w
proc discardWords*(session: Session; n: int): Future[void] {.async.} =
if session.buffer.len < n: setLen(session.buffer, n)
let byteCount = n shl 3
let n = await recvInto(session.socket, addr session.buffer[0], byteCount)
if n != byteCount:
raise newException(ProtocolError, "short read")
proc recvString*(socket: AsyncSocket): Future[string] {.async.} =
let stringLen = int (await recvWord(socket))
proc recvString(sock: AsyncSocket): Future[string] {.async.} =
let w = await recvWord(sock)
let stringLen = int w
var s: string
if stringLen > 0:
var s = newString((stringLen + 7) and (not 7))
let n = await recvInto(socket, addr s[0], s.len)
s.setLen((stringLen + 7) and (not 7))
let n = await recvInto(sock, addr s[0], s.len)
if n != s.len:
raise newException(ProtocolError, "short read")
raise newException(ProtocolError, "short string read")
setLen(s, stringLen)
return s
return ""
return s
proc recvString*(session: Session): Future[string] =
recvString(session.socket)
proc passString(session: Session; a, b: AsyncSocket): Future[string] {.async.} =
var s = await recvString(a)
await send(session, b, s)
return s
proc recvStringSeq*(session: Session): Future[StringSeq] {.async.} =
let count = int(await recvWord(session.socket))
proc passStringSeq(session: Session; a, b: AsyncSocket): Future[seq[string]] {.async.} =
let count = int(await passWord(a, b))
var strings = newSeq[string](count)
for i in 0..<count: strings[i] = await recvString(session)
for i in 0..<count: strings[i] = await passString(session, a, b)
return strings
proc recvStringSet*(session: Session): Future[StringSet] {.async.} =
let count = int(await recvWord(session.socket))
proc passStringSet(session: Session; a, b: AsyncSocket): Future[HashSet[string]] {.async.} =
let count = int(await passWord(a, b))
var strings = initHashSet[string](count)
for i in 0..<count: incl(strings, await recvString(session))
for i in 0..<count: incl(strings, await passString(session, a, b))
return strings
proc newUnixSocket*(): AsyncSocket =
newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false,
)
proc passStringMap(session: Session; a, b: AsyncSocket): Future[StringTableRef] {.async.} =
var table = newStringTable(modeCaseSensitive)
let n = await passWord(a, b)
for i in 1..n:
var
key = await passString(session, a, b)
val = await passString(session, a, b)
table[key] = val
return table
proc newSession*(socket: AsyncSocket): Session =
Session(socket: socket, buffer: newSeq[Word](512))
proc passClientWord(session: Session): Future[Word] =
passWord(session.client, session.daemon)
proc newSession*(): Session =
newUnixSocket().newSession()
proc passDaemonWord(session: Session): Future[Word] =
passWord(session.daemon, session.client)
proc ingestChunks*(session: Session; store: ErisStore): Future[ErisCap] {.async.} =
var ingest: ErisIngest
proc passClientString(session: Session): Future[string] =
passString(session, session.client, session.daemon)
proc passDaemonString(session: Session): Future[string] =
passString(session, session.daemon, session.client)
proc passClientStringSeq(session: Session): Future[seq[string]] =
passStringSeq(session, session.client, session.daemon)
proc passDaemonStringSeq(session: Session): Future[seq[string]] =
passStringSeq(session, session.daemon, session.client)
proc passClientStringSet(session: Session): Future[HashSet[string]] =
passStringSet(session, session.client, session.daemon)
proc passDaemonStringSet(session: Session): Future[HashSet[string]] =
passStringSet(session, session.daemon, session.client)
proc passClientStringMap(session: Session): Future[StringTableRef] =
passStringMap(session, session.client, session.daemon)
proc passDaemonStringMap(session: Session): Future[StringTableRef] =
passStringMap(session, session.daemon, session.client)
type ValidPathInfo = object
path: string
deriver: string
narHash: string
references: HashSet[string]
registrationTime, narSize: BiggestInt
ultimate: bool
sigs: HashSet[string]
ca: string
proc passDaemonValidPathInfo(session: Session; includePath: bool): Future[PathInfo] {.async.} =
var info: PathInfo
if includePath:
info.path = await passDaemonString(session)
info.deriver = await passDaemonString(session)
info.narHash = await passDaemonString(session)
info.references = await passDaemonStringSet(session)
info.registrationTime = BiggestInt(await passDaemonWord(session))
info.narSize = BiggestInt(await passDaemonWord(session))
assert session.version.minor >= 16
info.ultimate = (await passDaemonWord(session)) != 0
info.sigs = await passDaemonStringSet(session)
info.ca = await passDaemonString(session)
return info
proc passChunks(session: Session; a, b: AsyncSocket): Future[int] {.async.} =
var total: int
while true:
let chunkLen = int await recvWord(session)
if ingest.isNil:
ingest = newErisIngest(
store, recommendedChunkSize(chunkLen), convergentMode)
let chunkLen = int(await passWord(a, b))
if chunkLen == 0:
break
else:
let wordLen = (chunkLen + 7) shr 3
if session.buffer.len < wordLen: setLen(session.buffer, wordLen)
let recvLen = await recvInto(session.socket, addr session.buffer[0], chunkLen)
# each chunk must be received contiguously
let recvLen = await recvInto(a, addr session.buffer[0], chunkLen)
# each chunk must be recved contiguously
if recvLen != chunkLen:
raise newException(ProtocolError, "invalid chunk read")
await append(ingest, addr session.buffer[0], chunkLen)
var cap = await cap(ingest)
return cap
raise newException(ProtocolError, "invalid chunk read")
await send(b, addr session.buffer[0], recvLen)
inc(total, recvLen)
return total
proc recoverChunks*(session: Session; store: ErisStore; cap: ErisCap) {.async.} =
let stream = newErisStream(store, cap)
session.buffer.setLen(succ(cap.chunkSize.int shr 3))
proc passClientChunks(session: Session): Future[int] =
passChunks(session, session.client, session.daemon)
proc passErrorDaemonError(session: Session) {.async.} =
let
typ = await passDaemonString(session)
assert typ == "Error"
let
lvl = await passDaemonWord(session)
name = await passDaemonString(session)
msg = passDaemonString(session)
havePos = await passDaemonWord(session)
assert havePos == 0
let
nrTraces = await passDaemonWord(session)
for i in 1..nrTraces:
let havPos = await passDaemonWord(session)
assert havPos == 0
let msg = await passDaemonString(session)
proc passDaemonFields(session: Session): Future[Fields] {.async.} =
let count = await passDaemonWord(session)
var fields = newSeq[Field](count)
for i in 0..<count:
let typ = await passDaemonWord(session)
case typ
of 0:
let num = await passDaemonWord(session)
fields[i] = Field(orKind: FieldKind.int, int: int num)
of 1:
let str = await passDaemonString(session)
fields[i] = Field(orKind: FieldKind.string, string: str)
else:
raiseAssert "unknown field type " & $typ
return fields
proc passWork(session: Session) {.async.} =
while true:
let n = await stream.readBuffer(addr session.buffer[1], cap.chunkSize.int)
session.buffer[0] = Word n
await send(session.socket, addr session.buffer[0], 8+n)
if n == 0: break
close(stream)
let word = await passDaemonWord(session)
case word
of STDERR_WRITE:
discard await passDaemonString(session)
of STDERR_READ:
discard await passClientString(session)
of STDERR_ERROR:
assert session.version.minor >= 26
await passErrorDaemonError(session)
of STDERR_NEXT:
let s = await passDaemonString(session)
of STDERR_START_ACTIVITY:
var act: ActionStart
act.id = BiggestInt(await passDaemonWord(session))
act.level = BiggestInt(await passDaemonWord(session))
act.`type` = BiggestInt(await passDaemonWord(session))
act.text = await passDaemonString(session)
act.fields = await passDaemonFields(session)
act.parent = BiggestInt(await passDaemonWord(session))
of STDERR_STOP_ACTIVITY:
var act: ActionStop
act.id = BiggestInt(await passDaemonWord(session))
of STDERR_RESULT:
var act: ActionResult
act.id = BiggestInt(await passDaemonWord(session))
act.`type` = BiggestInt(await passDaemonWord(session))
act.fields = await passDaemonFields(session)
of STDERR_LAST:
break
else:
raise newException(ProtocolError, "unknown work verb " & $word)
#[
proc fromClient(miss: var Missing; socket: AsyncSocket) {.async.} =
result.targets = await passClientStringSet(session)
proc fromDaemon(miss: var Missing; socket: AsyncSocket) {.async.} =
miss.willBuild = await passDaemonStringSet(session)
miss.willSubstitute = await passDaemonStringSet(session)
miss.unknown = await passDaemonStringSet(session)
miss.downloadSize = BiggestInt await passDaemonWord(session)
miss.narSize = BiggestInt await passDaemonWord(session)
]#
proc loop(session: Session) {.async.} =
var chunksTotal: int
try:
while not session.client.isClosed:
let wop = await passClientWord(session)
case wop
of wopIsValidPath:
let path = await passClientString(session)
stderr.writeLine "wopIsValidPath ", path
await passWork(session)
let word = await passDaemonWord(session)
of wopAddToStore:
assert session.version.minor >= 25
let
name = await passClientString(session)
caMethod = await passClientString(session)
refs = await passClientStringSet(session)
repairBool = await passClientWord(session)
stderr.writeLine "wopAddToStore ", name
let n = await passClientChunks(session)
inc(chunksTotal, n)
await passWork(session)
let info = await passDaemonValidPathInfo(session, true)
of wopAddTempRoot:
let path = await passClientString(session)
stderr.writeLine "wopAddTempRoot ", path
await passWork(session)
discard await passDaemonWord(session)
of wopAddIndirectRoot:
let path = await passClientString(session)
stderr.writeLine "wopAddIndirectRoot ", path
await passWork(session)
discard await passDaemonWord(session)
of wopSetOptions:
discard passClientWord(session) # keepFailed
discard passClientWord(session) # keepGoing
discard passClientWord(session) # tryFallback
discard passClientWord(session) # verbosity
discard passClientWord(session) # maxBuildJobs
discard passClientWord(session) # maxSilentTime
discard passClientWord(session) # useBuildHook
discard passClientWord(session) # verboseBuild
discard passClientWord(session) # logType
discard passClientWord(session) # printBuildTrace
discard passClientWord(session) # buildCores
discard passClientWord(session) # useSubstitutes
assert session.version.minor >= 12
let overrides = await passClientStringMap(session)
await passWork(session)
of wopQueryPathInfo:
assert session.version >= 17
let path = await passClientString(session)
stderr.writeLine "wopQueryPathInfo ", path
await passWork(session)
let valid = await passDaemonWord(session)
if valid != 0:
var info = await passDaemonValidPathInfo(session, false)
info.path = path
stderr.writeLine "wopQueryPathInfo ", $info
of wopQueryMissing:
assert session.version >= 30
var miss: Missing
miss.targets = await passClientStringSet(session)
await passWork(session)
miss.willBuild = await passDaemonStringSet(session)
miss.willSubstitute = await passDaemonStringSet(session)
miss.unknown = await passDaemonStringSet(session)
miss.downloadSize = BiggestInt await passDaemonWord(session)
miss.narSize = BiggestInt await passDaemonWord(session)
stderr.writeLine "wopQueryMissing ", $miss
of wopBuildPathsWithResults:
assert session.version >= 34
let
drvs = await passClientStringSeq(session)
buildMode = await passClientWord(session)
stderr.writeLine "wopBuildPathsWithResults drvs ", $drvs
await passWork(session)
let count = await passDaemonWord(session)
for _ in 1..count:
let
path = await passDaemonString(session)
status = await passDaemonWord(session)
errorMsg = await passDaemonString(session)
timesBUild = await passDaemonWord(session)
isNonDeterministic = await passDaemonWord(session)
startTime = await passDaemonWord(session)
stopTime = await passDaemonWord(session)
outputs = await passDaemonStringMap(session)
else:
stderr.writeLine "unknown worker op ", wop.int
break
except ProtocolError as err:
stderr.writeLine "connection terminated"
stderr.writeLine "chunk bytes transfered: ", formatSize(chunksTotal)
finally:
close(session.daemon)
close(session.client)
proc handshake(listener: AsyncSocket): Future[Session] {.async.} =
## Take the next connection from `listener` and return a `Session`.
let session = Session(buffer: newSeq[Word](1024)) # 8KiB
session.client = await listener.accept()
session.daemon = newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false)
await connectUnix(session.daemon, daemonSocketPath())
let clientMagic = await passClientWord(session)
if clientMagic != WORKER_MAGIC_1:
raise newException(ProtocolError, "invalid protocol magic")
let daemonMagic = await passDaemonWord(session)
let daemonVersion = await passDaemonWord(session)
session.version = Version(await passClientWord(session))
if session.version < 0x1_0a:
raise newException(ProtocolError, "obsolete protocol version")
assert session.version.minor >= 14
discard await(passClientWord(session))
# obsolete CPU affinity
assert session.version.minor >= 11
discard await(passClientWord(session))
# obsolete reserveSpace
assert session.version.minor >= 33
let daemonVersionString = await passDaemonString(session)
assert daemonVersionString == $store.nixVersion
await passWork(session)
return session
proc emulateSocket*(path: string) {.async, gcsafe.} =
let listener = newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false)
bindUnix(listener, path)
listen(listener)
stderr.writeLine "listening on ", path
while not listener.isClosed:
try:
let session = await handshake(listener)
assert not session.isNil
asyncCheck loop(session)
except ProtocolError as err:
stderr.writeLine "failed to service client, ", err.msg
import libssh
type Server* = ref object
bnd: Bind
callbacks: BindCallbacks
proc stop*(srv: Server) =
free(srv.bnd)
proc serveSsh*(keyFile: string, host: string, port: int): Server =
stderr.writeLine "initialize libssh"
libssh.init()
let srv = Server(bnd: newBind())
try:
stderr.writeLine "load key ", keyFile
setOption(srv.bnd, SSH_BIND_OPTIONS_HOSTKEY, keyFile)
setOption(srv.bnd, SSH_BIND_OPTIONS_BINDADDR, host)
setOption(srv.bnd, SSH_BIND_OPTIONS_BINDPORT, uint port)
srv.callbacks.incoming_connection = proc (b: Bind; data: pointer) {.cdecl.} =
let srv = cast[Server](data)
stderr.writeLine "got an incoming connection"
setCallbacks(srv.bnd, addr srv.callbacks, cast[pointer](srv))
listen(srv.bnd)
return srv
except CatchableError as err:
stop(srv)
raise err

View File

@ -23,7 +23,7 @@ var nixVersion* {.importc: "nix::nixVersion", header: "globals.hh".}: StdString
proc isDerivation*(path: StorePath): bool {.importcpp.}
type
Store* {.importcpp: "nix::ref<nix::Store>", header: "store-api.hh".} = object
Store {.importcpp: "nix::ref<nix::Store>", header: "store-api.hh".} = object
discard
proc ensurePath*(store: Store; path: StorePath) {.importcpp.}