Compare commits
5 Commits
trunk
...
ssh-server
Author | SHA1 | Date |
---|---|---|
Emery Hemingway | b3dbb89529 | |
Emery Hemingway | 9b843905f3 | |
Emery Hemingway | a18cdd16b4 | |
Emery Hemingway | a1068cd836 | |
Emery Hemingway | e5148cc654 |
|
@ -1,4 +1,5 @@
|
||||||
include ../syndicate-nim/depends.tup
|
include ../syndicate-nim/depends.tup
|
||||||
NIM_FLAGS += --path:$(TUP_CWD)/../syndicate-nim/src
|
NIM_FLAGS += --path:$(TUP_CWD)/../syndicate-nim/src
|
||||||
|
NIM_FLAGS += --path:$(TUP_CWD)/../libssh/src
|
||||||
|
|
||||||
NIM_FLAGS += --backend:cpp
|
NIM_FLAGS += --backend:cpp
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
version = "20230530"
|
version = "20230607"
|
||||||
author = "Emery Hemingway"
|
author = "Emery Hemingway"
|
||||||
description = "Syndicated Nix Actor"
|
description = "Syndicated Nix Actor"
|
||||||
license = "Unlicense"
|
license = "Unlicense"
|
||||||
|
|
24
protocol.prs
24
protocol.prs
|
@ -1,6 +1,5 @@
|
||||||
version 1 .
|
version 1 .
|
||||||
|
|
||||||
|
|
||||||
Build = <nix-build @input string @output any> .
|
Build = <nix-build @input string @output any> .
|
||||||
|
|
||||||
Realise = <realise @drv string @outputs [string ...]> .
|
Realise = <realise @drv string @outputs [string ...]> .
|
||||||
|
@ -12,3 +11,26 @@ Eval = <eval @expr string @options {symbol: any ...:...} @result any> .
|
||||||
Narinfo = <narinfo @path string @info Dict> .
|
Narinfo = <narinfo @path string @info Dict> .
|
||||||
|
|
||||||
Dict = {symbol: any ...:...} .
|
Dict = {symbol: any ...:...} .
|
||||||
|
|
||||||
|
FieldInt = int .
|
||||||
|
FieldString = string .
|
||||||
|
Field = int / string .
|
||||||
|
Fields = [Field ...] .
|
||||||
|
|
||||||
|
ActionStart = <start @id int @level int @type int @text string @fields Fields @parent int> .
|
||||||
|
ActionStop = <stop @id int> .
|
||||||
|
ActionResult = <result @id int @type int @fields Fields> .
|
||||||
|
|
||||||
|
; TODO: why not make target a singleton?
|
||||||
|
Missing = <missing @targets #{string} @willBuild #{string} @willSubstitute #{string} @unknown #{string} @downloadSize int @narSize int> .
|
||||||
|
|
||||||
|
PathInfo = <path-info
|
||||||
|
@path string
|
||||||
|
@deriver string
|
||||||
|
@narHash string
|
||||||
|
@references #{string}
|
||||||
|
@registrationTime int
|
||||||
|
@narSize int
|
||||||
|
@ultimate bool
|
||||||
|
@sigs #{string}
|
||||||
|
@ca string> .
|
||||||
|
|
|
@ -1,12 +1,12 @@
|
||||||
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||||
# SPDX-License-Identifier: Unlicense
|
# SPDX-License-Identifier: Unlicense
|
||||||
|
|
||||||
import std/[asyncdispatch, httpclient, json, osproc, parseutils, strutils, tables]
|
import std/[asyncdispatch, httpclient, json, os, osproc, parseutils, strutils, tables]
|
||||||
import preserves, preserves/jsonhooks
|
import preserves, preserves/jsonhooks
|
||||||
import syndicate
|
import syndicate
|
||||||
from syndicate/protocols/dataspace import Observe
|
from syndicate/protocols/dataspace import Observe
|
||||||
import ./nix_actor/protocol
|
import ./nix_actor/protocol
|
||||||
import ./nix_actor/[main, store]
|
import ./nix_actor/[main, sockets]
|
||||||
|
|
||||||
type
|
type
|
||||||
Value = Preserve[void]
|
Value = Preserve[void]
|
||||||
|
@ -39,7 +39,6 @@ proc narinfo(turn: var Turn; ds: Ref; path: string) =
|
||||||
client = newAsyncHttpClient()
|
client = newAsyncHttpClient()
|
||||||
url = "https://cache.nixos.org/" & path & ".narinfo"
|
url = "https://cache.nixos.org/" & path & ".narinfo"
|
||||||
futGet = get(client, url)
|
futGet = get(client, url)
|
||||||
stderr.writeLine "fetching ", url
|
|
||||||
addCallback(futGet, turn) do (turn: var Turn):
|
addCallback(futGet, turn) do (turn: var Turn):
|
||||||
let resp = read(futGet)
|
let resp = read(futGet)
|
||||||
if code(resp) != Http200:
|
if code(resp) != Http200:
|
||||||
|
@ -108,13 +107,31 @@ proc bootNixFacet(ds: Ref; turn: var Turn): Facet =
|
||||||
during(turn, ds, ?Observe(pattern: !Narinfo) ?? {0: grabLit()}) do (path: string):
|
during(turn, ds, ?Observe(pattern: !Narinfo) ?? {0: grabLit()}) do (path: string):
|
||||||
narinfo(turn, ds, path)
|
narinfo(turn, ds, path)
|
||||||
|
|
||||||
type Args {.preservesDictionary.} = object
|
type
|
||||||
dataspace: Ref
|
RefArgs {.preservesDictionary.} = object
|
||||||
|
dataspace: Ref
|
||||||
|
SocketArgs {.preservesDictionary.} = object
|
||||||
|
`listen-socket`: string
|
||||||
|
ServeSshArgs {.preservesDictionary.} = object
|
||||||
|
`keyfile`: string
|
||||||
|
`sshhost`: string
|
||||||
|
`sshport`: int
|
||||||
|
|
||||||
proc bootNixActor(root: Ref; turn: var Turn) =
|
proc bootNixActor(root: Ref; turn: var Turn) =
|
||||||
connectStdio(root, turn)
|
connectStdio(root, turn)
|
||||||
during(turn, root, ?Args) do (ds: Ref):
|
|
||||||
|
during(turn, root, ?RefArgs) do (ds: Ref):
|
||||||
discard bootNixFacet(ds, turn)
|
discard bootNixFacet(ds, turn)
|
||||||
|
|
||||||
|
during(turn, root, ?SocketArgs) do (path: string):
|
||||||
|
removeFile(path)
|
||||||
|
asyncCheck(turn, emulateSocket(path))
|
||||||
|
|
||||||
|
during(turn, root, ?ServeSshArgs) do (keyFile: string, host: string, port: int):
|
||||||
|
let srv = serveSsh(keyFile, host, port)
|
||||||
|
do:
|
||||||
|
stderr.writeLine "stop SSH server"
|
||||||
|
stop(srv)
|
||||||
|
|
||||||
initNix() # Nix lib isn't actually being used but it's nice to know that it links.
|
initNix() # Nix lib isn't actually being used but it's nice to know that it links.
|
||||||
runActor("main", bootNixActor)
|
runActor("main", bootNixActor)
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
|
|
||||||
import
|
import
|
||||||
preserves, std/tables
|
preserves, std/sets, std/tables
|
||||||
|
|
||||||
type
|
type
|
||||||
Eval* {.preservesRecord: "eval".} = object
|
Eval* {.preservesRecord: "eval".} = object
|
||||||
|
@ -12,22 +12,87 @@ type
|
||||||
`drv`*: string
|
`drv`*: string
|
||||||
`outputs`*: seq[string]
|
`outputs`*: seq[string]
|
||||||
|
|
||||||
|
Missing* {.preservesRecord: "missing".} = object
|
||||||
|
`targets`*: HashSet[string]
|
||||||
|
`willBuild`*: HashSet[string]
|
||||||
|
`willSubstitute`*: HashSet[string]
|
||||||
|
`unknown`*: HashSet[string]
|
||||||
|
`downloadSize`*: BiggestInt
|
||||||
|
`narSize`*: BiggestInt
|
||||||
|
|
||||||
Narinfo* {.preservesRecord: "narinfo".} = object
|
Narinfo* {.preservesRecord: "narinfo".} = object
|
||||||
`path`*: string
|
`path`*: string
|
||||||
`info`*: Dict
|
`info`*: Dict
|
||||||
|
|
||||||
|
FieldKind* {.pure.} = enum
|
||||||
|
`int`, `string`
|
||||||
|
`Field`* {.preservesOr.} = object
|
||||||
|
case orKind*: FieldKind
|
||||||
|
of FieldKind.`int`:
|
||||||
|
`int`*: int
|
||||||
|
|
||||||
|
of FieldKind.`string`:
|
||||||
|
`string`*: string
|
||||||
|
|
||||||
|
|
||||||
|
PathInfo* {.preservesRecord: "path-info".} = object
|
||||||
|
`path`*: string
|
||||||
|
`deriver`*: string
|
||||||
|
`narHash`*: string
|
||||||
|
`references`*: HashSet[string]
|
||||||
|
`registrationTime`*: BiggestInt
|
||||||
|
`narSize`*: BiggestInt
|
||||||
|
`ultimate`*: bool
|
||||||
|
`sigs`*: HashSet[string]
|
||||||
|
`ca`*: string
|
||||||
|
|
||||||
Dict* = Table[Symbol, Preserve[void]]
|
Dict* = Table[Symbol, Preserve[void]]
|
||||||
Build* {.preservesRecord: "nix-build".} = object
|
Build* {.preservesRecord: "nix-build".} = object
|
||||||
`input`*: string
|
`input`*: string
|
||||||
`output`*: Preserve[void]
|
`output`*: Preserve[void]
|
||||||
|
|
||||||
|
Fields* = seq[Field]
|
||||||
|
ActionStart* {.preservesRecord: "start".} = object
|
||||||
|
`id`*: BiggestInt
|
||||||
|
`level`*: BiggestInt
|
||||||
|
`type`*: BiggestInt
|
||||||
|
`text`*: string
|
||||||
|
`fields`*: Fields
|
||||||
|
`parent`*: BiggestInt
|
||||||
|
|
||||||
|
FieldString* = string
|
||||||
Instantiate* {.preservesRecord: "instantiate".} = object
|
Instantiate* {.preservesRecord: "instantiate".} = object
|
||||||
`expr`*: string
|
`expr`*: string
|
||||||
`options`*: Dict
|
`options`*: Dict
|
||||||
`result`*: Preserve[void]
|
`result`*: Preserve[void]
|
||||||
|
|
||||||
proc `$`*(x: Eval | Realise | Narinfo | Dict | Build | Instantiate): string =
|
FieldInt* = BiggestInt
|
||||||
|
ActionStop* {.preservesRecord: "stop".} = object
|
||||||
|
`id`*: BiggestInt
|
||||||
|
|
||||||
|
ActionResult* {.preservesRecord: "result".} = object
|
||||||
|
`id`*: BiggestInt
|
||||||
|
`type`*: BiggestInt
|
||||||
|
`fields`*: Fields
|
||||||
|
|
||||||
|
proc `$`*(x: Eval | Realise | Missing | Narinfo | Field | PathInfo | Dict |
|
||||||
|
Build |
|
||||||
|
Fields |
|
||||||
|
ActionStart |
|
||||||
|
FieldString |
|
||||||
|
Instantiate |
|
||||||
|
FieldInt |
|
||||||
|
ActionStop |
|
||||||
|
ActionResult): string =
|
||||||
`$`(toPreserve(x))
|
`$`(toPreserve(x))
|
||||||
|
|
||||||
proc encode*(x: Eval | Realise | Narinfo | Dict | Build | Instantiate): seq[byte] =
|
proc encode*(x: Eval | Realise | Missing | Narinfo | Field | PathInfo | Dict |
|
||||||
|
Build |
|
||||||
|
Fields |
|
||||||
|
ActionStart |
|
||||||
|
FieldString |
|
||||||
|
Instantiate |
|
||||||
|
FieldInt |
|
||||||
|
ActionStop |
|
||||||
|
ActionResult): seq[byte] =
|
||||||
encode(toPreserve(x))
|
encode(toPreserve(x))
|
||||||
|
|
|
@ -0,0 +1,488 @@
|
||||||
|
# SPDX-FileCopyrightText: ☭ Emery Hemingway
|
||||||
|
# SPDX-License-Identifier: Unlicense
|
||||||
|
|
||||||
|
import std/[asyncdispatch, asyncnet, os, sets, strtabs, strutils]
|
||||||
|
from std/nativesockets import AF_INET, AF_UNIX, SOCK_STREAM, Protocol
|
||||||
|
|
||||||
|
import preserves
|
||||||
|
import ./protocol, ./store
|
||||||
|
|
||||||
|
{.pragma: workerProtocol, importc, header: "worker-protocol.hh".}
|
||||||
|
|
||||||
|
type Word = uint64
|
||||||
|
proc `$`(w: Word): string = toHex(w)
|
||||||
|
|
||||||
|
const
|
||||||
|
WORKER_MAGIC_1 = 0x6E697863
|
||||||
|
WORKER_MAGIC_2 = 0x6478696F
|
||||||
|
PROTOCOL_VERSION = 256 or 35
|
||||||
|
|
||||||
|
STDERR_NEXT = 0x6F6C6d67
|
||||||
|
STDERR_READ = 0x64617461
|
||||||
|
STDERR_WRITE = 0x64617416
|
||||||
|
STDERR_LAST = 0x616C7473
|
||||||
|
STDERR_ERROR = 0x63787470
|
||||||
|
STDERR_START_ACTIVITY = 0x53545254
|
||||||
|
STDERR_STOP_ACTIVITY = 0x53544F50
|
||||||
|
STDERR_RESULT = 0x52534C54
|
||||||
|
|
||||||
|
wopIsValidPath = 1
|
||||||
|
wopHasSubstitutes = 3
|
||||||
|
wopQueryReferrers = 6
|
||||||
|
wopAddToStore = 7
|
||||||
|
wopBuildPaths = 9
|
||||||
|
wopEnsurePath = 10
|
||||||
|
wopAddTempRoot = 11
|
||||||
|
wopAddIndirectRoot = 12
|
||||||
|
wopSyncWithGC = 13
|
||||||
|
wopFindRoots = 14
|
||||||
|
wopSetOptions = 19
|
||||||
|
wopCollectGarbage = 20
|
||||||
|
wopQuerySubstitutablePathInfo = 21
|
||||||
|
wopQueryAllValidPaths = 23
|
||||||
|
wopQueryFailedPaths = 24
|
||||||
|
wopClearFailedPaths = 25
|
||||||
|
wopQueryPathInfo = 26
|
||||||
|
wopQueryPathFromHashPart = 29
|
||||||
|
wopQuerySubstitutablePathInfos = 30
|
||||||
|
wopQueryValidPaths = 31
|
||||||
|
wopQuerySubstitutablePaths = 32
|
||||||
|
wopQueryValidDerivers = 33
|
||||||
|
wopOptimiseStore = 34
|
||||||
|
wopVerifyStore = 35
|
||||||
|
wopBuildDerivation = 36
|
||||||
|
wopAddSignatures = 37
|
||||||
|
wopNarFromPath = 38
|
||||||
|
wopAddToStoreNar = 39
|
||||||
|
wopQueryMissing = 40
|
||||||
|
wopQueryDerivationOutputMap = 41
|
||||||
|
wopRegisterDrvOutput = 42
|
||||||
|
wopQueryRealisation = 43
|
||||||
|
wopAddMultipleToStore = 44
|
||||||
|
wopAddBuildLog = 45
|
||||||
|
wopBuildPathsWithResults = 46
|
||||||
|
|
||||||
|
type
|
||||||
|
ProtocolError = object of IOError
|
||||||
|
Version = uint16
|
||||||
|
Session = ref object
|
||||||
|
client, daemon: AsyncSocket
|
||||||
|
buffer: seq[Word]
|
||||||
|
version: Version
|
||||||
|
|
||||||
|
func major(version: Version): uint16 = version and 0xff00
|
||||||
|
func minor(version: Version): uint16 = version and 0x00ff
|
||||||
|
|
||||||
|
proc daemonSocketPath: string =
|
||||||
|
getEnv(
|
||||||
|
"NIX_DAEMON_SOCKET_PATH",
|
||||||
|
"/nix/var/nix/daemon-socket/socket")
|
||||||
|
|
||||||
|
proc send(session: Session; sock: AsyncSocket; words: varargs[Word]): Future[void] =
|
||||||
|
for i, word in words: session.buffer[i] = word
|
||||||
|
send(sock, addr session.buffer[0], words.len shl 3)
|
||||||
|
|
||||||
|
proc send(session: Session; sock: AsyncSocket; s: string): Future[void] =
|
||||||
|
let wordCount = (s.len + 7) shr 3
|
||||||
|
if wordCount > session.buffer.len: setLen(session.buffer, wordCount)
|
||||||
|
session.buffer[0] = Word s.len
|
||||||
|
if wordCount > 0:
|
||||||
|
session.buffer[wordCount] = 0x00
|
||||||
|
copyMem(addr session.buffer[1], unsafeAddr s[0], s.len)
|
||||||
|
send(sock, addr session.buffer[0], (1 + wordCount) shl 3)
|
||||||
|
|
||||||
|
proc recvWord(sock: AsyncSocket): Future[Word] {.async.} =
|
||||||
|
var w: Word
|
||||||
|
let n = await recvInto(sock, addr w, sizeof(Word))
|
||||||
|
if n != sizeof(Word): raise newException(ProtocolError, "short read of word")
|
||||||
|
return w
|
||||||
|
|
||||||
|
proc passWord(a, b: AsyncSocket): Future[Word] {.async.} =
|
||||||
|
var w = await recvWord(a)
|
||||||
|
await send(b, addr w, sizeof(Word))
|
||||||
|
return w
|
||||||
|
|
||||||
|
proc recvString(sock: AsyncSocket): Future[string] {.async.} =
|
||||||
|
let w = await recvWord(sock)
|
||||||
|
let stringLen = int w
|
||||||
|
var s: string
|
||||||
|
if stringLen > 0:
|
||||||
|
s.setLen((stringLen + 7) and (not 7))
|
||||||
|
let n = await recvInto(sock, addr s[0], s.len)
|
||||||
|
if n != s.len:
|
||||||
|
raise newException(ProtocolError, "short string read")
|
||||||
|
setLen(s, stringLen)
|
||||||
|
return s
|
||||||
|
|
||||||
|
proc passString(session: Session; a, b: AsyncSocket): Future[string] {.async.} =
|
||||||
|
var s = await recvString(a)
|
||||||
|
await send(session, b, s)
|
||||||
|
return s
|
||||||
|
|
||||||
|
proc passStringSeq(session: Session; a, b: AsyncSocket): Future[seq[string]] {.async.} =
|
||||||
|
let count = int(await passWord(a, b))
|
||||||
|
var strings = newSeq[string](count)
|
||||||
|
for i in 0..<count: strings[i] = await passString(session, a, b)
|
||||||
|
return strings
|
||||||
|
|
||||||
|
proc passStringSet(session: Session; a, b: AsyncSocket): Future[HashSet[string]] {.async.} =
|
||||||
|
let count = int(await passWord(a, b))
|
||||||
|
var strings = initHashSet[string](count)
|
||||||
|
for i in 0..<count: incl(strings, await passString(session, a, b))
|
||||||
|
return strings
|
||||||
|
|
||||||
|
proc passStringMap(session: Session; a, b: AsyncSocket): Future[StringTableRef] {.async.} =
|
||||||
|
var table = newStringTable(modeCaseSensitive)
|
||||||
|
let n = await passWord(a, b)
|
||||||
|
for i in 1..n:
|
||||||
|
var
|
||||||
|
key = await passString(session, a, b)
|
||||||
|
val = await passString(session, a, b)
|
||||||
|
table[key] = val
|
||||||
|
return table
|
||||||
|
|
||||||
|
proc passClientWord(session: Session): Future[Word] =
|
||||||
|
passWord(session.client, session.daemon)
|
||||||
|
|
||||||
|
proc passDaemonWord(session: Session): Future[Word] =
|
||||||
|
passWord(session.daemon, session.client)
|
||||||
|
|
||||||
|
proc passClientString(session: Session): Future[string] =
|
||||||
|
passString(session, session.client, session.daemon)
|
||||||
|
|
||||||
|
proc passDaemonString(session: Session): Future[string] =
|
||||||
|
passString(session, session.daemon, session.client)
|
||||||
|
|
||||||
|
proc passClientStringSeq(session: Session): Future[seq[string]] =
|
||||||
|
passStringSeq(session, session.client, session.daemon)
|
||||||
|
|
||||||
|
proc passDaemonStringSeq(session: Session): Future[seq[string]] =
|
||||||
|
passStringSeq(session, session.daemon, session.client)
|
||||||
|
|
||||||
|
proc passClientStringSet(session: Session): Future[HashSet[string]] =
|
||||||
|
passStringSet(session, session.client, session.daemon)
|
||||||
|
|
||||||
|
proc passDaemonStringSet(session: Session): Future[HashSet[string]] =
|
||||||
|
passStringSet(session, session.daemon, session.client)
|
||||||
|
|
||||||
|
proc passClientStringMap(session: Session): Future[StringTableRef] =
|
||||||
|
passStringMap(session, session.client, session.daemon)
|
||||||
|
|
||||||
|
proc passDaemonStringMap(session: Session): Future[StringTableRef] =
|
||||||
|
passStringMap(session, session.daemon, session.client)
|
||||||
|
|
||||||
|
type ValidPathInfo = object
|
||||||
|
path: string
|
||||||
|
deriver: string
|
||||||
|
narHash: string
|
||||||
|
references: HashSet[string]
|
||||||
|
registrationTime, narSize: BiggestInt
|
||||||
|
ultimate: bool
|
||||||
|
sigs: HashSet[string]
|
||||||
|
ca: string
|
||||||
|
|
||||||
|
proc passDaemonValidPathInfo(session: Session; includePath: bool): Future[PathInfo] {.async.} =
|
||||||
|
var info: PathInfo
|
||||||
|
if includePath:
|
||||||
|
info.path = await passDaemonString(session)
|
||||||
|
info.deriver = await passDaemonString(session)
|
||||||
|
info.narHash = await passDaemonString(session)
|
||||||
|
info.references = await passDaemonStringSet(session)
|
||||||
|
info.registrationTime = BiggestInt(await passDaemonWord(session))
|
||||||
|
info.narSize = BiggestInt(await passDaemonWord(session))
|
||||||
|
assert session.version.minor >= 16
|
||||||
|
info.ultimate = (await passDaemonWord(session)) != 0
|
||||||
|
info.sigs = await passDaemonStringSet(session)
|
||||||
|
info.ca = await passDaemonString(session)
|
||||||
|
return info
|
||||||
|
|
||||||
|
proc passChunks(session: Session; a, b: AsyncSocket): Future[int] {.async.} =
|
||||||
|
var total: int
|
||||||
|
while true:
|
||||||
|
let chunkLen = int(await passWord(a, b))
|
||||||
|
if chunkLen == 0:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
let wordLen = (chunkLen + 7) shr 3
|
||||||
|
if session.buffer.len < wordLen: setLen(session.buffer, wordLen)
|
||||||
|
let recvLen = await recvInto(a, addr session.buffer[0], chunkLen)
|
||||||
|
# each chunk must be recved contiguously
|
||||||
|
if recvLen != chunkLen:
|
||||||
|
raise newException(ProtocolError, "invalid chunk read")
|
||||||
|
await send(b, addr session.buffer[0], recvLen)
|
||||||
|
inc(total, recvLen)
|
||||||
|
return total
|
||||||
|
|
||||||
|
proc passClientChunks(session: Session): Future[int] =
|
||||||
|
passChunks(session, session.client, session.daemon)
|
||||||
|
|
||||||
|
proc passErrorDaemonError(session: Session) {.async.} =
|
||||||
|
let
|
||||||
|
typ = await passDaemonString(session)
|
||||||
|
assert typ == "Error"
|
||||||
|
let
|
||||||
|
lvl = await passDaemonWord(session)
|
||||||
|
name = await passDaemonString(session)
|
||||||
|
msg = passDaemonString(session)
|
||||||
|
havePos = await passDaemonWord(session)
|
||||||
|
assert havePos == 0
|
||||||
|
let
|
||||||
|
nrTraces = await passDaemonWord(session)
|
||||||
|
for i in 1..nrTraces:
|
||||||
|
let havPos = await passDaemonWord(session)
|
||||||
|
assert havPos == 0
|
||||||
|
let msg = await passDaemonString(session)
|
||||||
|
|
||||||
|
proc passDaemonFields(session: Session): Future[Fields] {.async.} =
|
||||||
|
let count = await passDaemonWord(session)
|
||||||
|
var fields = newSeq[Field](count)
|
||||||
|
for i in 0..<count:
|
||||||
|
let typ = await passDaemonWord(session)
|
||||||
|
case typ
|
||||||
|
of 0:
|
||||||
|
let num = await passDaemonWord(session)
|
||||||
|
fields[i] = Field(orKind: FieldKind.int, int: int num)
|
||||||
|
of 1:
|
||||||
|
let str = await passDaemonString(session)
|
||||||
|
fields[i] = Field(orKind: FieldKind.string, string: str)
|
||||||
|
else:
|
||||||
|
raiseAssert "unknown field type " & $typ
|
||||||
|
return fields
|
||||||
|
|
||||||
|
proc passWork(session: Session) {.async.} =
|
||||||
|
while true:
|
||||||
|
let word = await passDaemonWord(session)
|
||||||
|
case word
|
||||||
|
|
||||||
|
of STDERR_WRITE:
|
||||||
|
discard await passDaemonString(session)
|
||||||
|
|
||||||
|
of STDERR_READ:
|
||||||
|
discard await passClientString(session)
|
||||||
|
|
||||||
|
of STDERR_ERROR:
|
||||||
|
assert session.version.minor >= 26
|
||||||
|
await passErrorDaemonError(session)
|
||||||
|
|
||||||
|
of STDERR_NEXT:
|
||||||
|
let s = await passDaemonString(session)
|
||||||
|
|
||||||
|
of STDERR_START_ACTIVITY:
|
||||||
|
var act: ActionStart
|
||||||
|
act.id = BiggestInt(await passDaemonWord(session))
|
||||||
|
act.level = BiggestInt(await passDaemonWord(session))
|
||||||
|
act.`type` = BiggestInt(await passDaemonWord(session))
|
||||||
|
act.text = await passDaemonString(session)
|
||||||
|
act.fields = await passDaemonFields(session)
|
||||||
|
act.parent = BiggestInt(await passDaemonWord(session))
|
||||||
|
|
||||||
|
of STDERR_STOP_ACTIVITY:
|
||||||
|
var act: ActionStop
|
||||||
|
act.id = BiggestInt(await passDaemonWord(session))
|
||||||
|
|
||||||
|
of STDERR_RESULT:
|
||||||
|
var act: ActionResult
|
||||||
|
act.id = BiggestInt(await passDaemonWord(session))
|
||||||
|
act.`type` = BiggestInt(await passDaemonWord(session))
|
||||||
|
act.fields = await passDaemonFields(session)
|
||||||
|
|
||||||
|
of STDERR_LAST:
|
||||||
|
break
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise newException(ProtocolError, "unknown work verb " & $word)
|
||||||
|
|
||||||
|
#[
|
||||||
|
proc fromClient(miss: var Missing; socket: AsyncSocket) {.async.} =
|
||||||
|
result.targets = await passClientStringSet(session)
|
||||||
|
|
||||||
|
proc fromDaemon(miss: var Missing; socket: AsyncSocket) {.async.} =
|
||||||
|
miss.willBuild = await passDaemonStringSet(session)
|
||||||
|
miss.willSubstitute = await passDaemonStringSet(session)
|
||||||
|
miss.unknown = await passDaemonStringSet(session)
|
||||||
|
miss.downloadSize = BiggestInt await passDaemonWord(session)
|
||||||
|
miss.narSize = BiggestInt await passDaemonWord(session)
|
||||||
|
]#
|
||||||
|
|
||||||
|
proc loop(session: Session) {.async.} =
|
||||||
|
var chunksTotal: int
|
||||||
|
try:
|
||||||
|
while not session.client.isClosed:
|
||||||
|
let wop = await passClientWord(session)
|
||||||
|
case wop
|
||||||
|
of wopIsValidPath:
|
||||||
|
let path = await passClientString(session)
|
||||||
|
stderr.writeLine "wopIsValidPath ", path
|
||||||
|
await passWork(session)
|
||||||
|
let word = await passDaemonWord(session)
|
||||||
|
|
||||||
|
of wopAddToStore:
|
||||||
|
assert session.version.minor >= 25
|
||||||
|
let
|
||||||
|
name = await passClientString(session)
|
||||||
|
caMethod = await passClientString(session)
|
||||||
|
refs = await passClientStringSet(session)
|
||||||
|
repairBool = await passClientWord(session)
|
||||||
|
stderr.writeLine "wopAddToStore ", name
|
||||||
|
let n = await passClientChunks(session)
|
||||||
|
inc(chunksTotal, n)
|
||||||
|
await passWork(session)
|
||||||
|
let info = await passDaemonValidPathInfo(session, true)
|
||||||
|
|
||||||
|
of wopAddTempRoot:
|
||||||
|
let path = await passClientString(session)
|
||||||
|
stderr.writeLine "wopAddTempRoot ", path
|
||||||
|
await passWork(session)
|
||||||
|
discard await passDaemonWord(session)
|
||||||
|
|
||||||
|
of wopAddIndirectRoot:
|
||||||
|
let path = await passClientString(session)
|
||||||
|
stderr.writeLine "wopAddIndirectRoot ", path
|
||||||
|
await passWork(session)
|
||||||
|
discard await passDaemonWord(session)
|
||||||
|
|
||||||
|
of wopSetOptions:
|
||||||
|
discard passClientWord(session) # keepFailed
|
||||||
|
discard passClientWord(session) # keepGoing
|
||||||
|
discard passClientWord(session) # tryFallback
|
||||||
|
discard passClientWord(session) # verbosity
|
||||||
|
discard passClientWord(session) # maxBuildJobs
|
||||||
|
discard passClientWord(session) # maxSilentTime
|
||||||
|
discard passClientWord(session) # useBuildHook
|
||||||
|
discard passClientWord(session) # verboseBuild
|
||||||
|
discard passClientWord(session) # logType
|
||||||
|
discard passClientWord(session) # printBuildTrace
|
||||||
|
discard passClientWord(session) # buildCores
|
||||||
|
discard passClientWord(session) # useSubstitutes
|
||||||
|
assert session.version.minor >= 12
|
||||||
|
let overrides = await passClientStringMap(session)
|
||||||
|
await passWork(session)
|
||||||
|
|
||||||
|
of wopQueryPathInfo:
|
||||||
|
assert session.version >= 17
|
||||||
|
let path = await passClientString(session)
|
||||||
|
stderr.writeLine "wopQueryPathInfo ", path
|
||||||
|
await passWork(session)
|
||||||
|
let valid = await passDaemonWord(session)
|
||||||
|
if valid != 0:
|
||||||
|
var info = await passDaemonValidPathInfo(session, false)
|
||||||
|
info.path = path
|
||||||
|
stderr.writeLine "wopQueryPathInfo ", $info
|
||||||
|
|
||||||
|
of wopQueryMissing:
|
||||||
|
assert session.version >= 30
|
||||||
|
var miss: Missing
|
||||||
|
miss.targets = await passClientStringSet(session)
|
||||||
|
await passWork(session)
|
||||||
|
miss.willBuild = await passDaemonStringSet(session)
|
||||||
|
miss.willSubstitute = await passDaemonStringSet(session)
|
||||||
|
miss.unknown = await passDaemonStringSet(session)
|
||||||
|
miss.downloadSize = BiggestInt await passDaemonWord(session)
|
||||||
|
miss.narSize = BiggestInt await passDaemonWord(session)
|
||||||
|
stderr.writeLine "wopQueryMissing ", $miss
|
||||||
|
|
||||||
|
of wopBuildPathsWithResults:
|
||||||
|
assert session.version >= 34
|
||||||
|
let
|
||||||
|
drvs = await passClientStringSeq(session)
|
||||||
|
buildMode = await passClientWord(session)
|
||||||
|
stderr.writeLine "wopBuildPathsWithResults drvs ", $drvs
|
||||||
|
await passWork(session)
|
||||||
|
let count = await passDaemonWord(session)
|
||||||
|
for _ in 1..count:
|
||||||
|
let
|
||||||
|
path = await passDaemonString(session)
|
||||||
|
status = await passDaemonWord(session)
|
||||||
|
errorMsg = await passDaemonString(session)
|
||||||
|
timesBUild = await passDaemonWord(session)
|
||||||
|
isNonDeterministic = await passDaemonWord(session)
|
||||||
|
startTime = await passDaemonWord(session)
|
||||||
|
stopTime = await passDaemonWord(session)
|
||||||
|
outputs = await passDaemonStringMap(session)
|
||||||
|
|
||||||
|
else:
|
||||||
|
stderr.writeLine "unknown worker op ", wop.int
|
||||||
|
break
|
||||||
|
except ProtocolError as err:
|
||||||
|
stderr.writeLine "connection terminated"
|
||||||
|
stderr.writeLine "chunk bytes transfered: ", formatSize(chunksTotal)
|
||||||
|
finally:
|
||||||
|
close(session.daemon)
|
||||||
|
close(session.client)
|
||||||
|
|
||||||
|
proc handshake(listener: AsyncSocket): Future[Session] {.async.} =
|
||||||
|
## Take the next connection from `listener` and return a `Session`.
|
||||||
|
let session = Session(buffer: newSeq[Word](1024)) # 8KiB
|
||||||
|
session.client = await listener.accept()
|
||||||
|
session.daemon = newAsyncSocket(
|
||||||
|
domain = AF_UNIX,
|
||||||
|
sockType = SOCK_STREAM,
|
||||||
|
protocol = cast[Protocol](0),
|
||||||
|
buffered = false)
|
||||||
|
await connectUnix(session.daemon, daemonSocketPath())
|
||||||
|
let clientMagic = await passClientWord(session)
|
||||||
|
if clientMagic != WORKER_MAGIC_1:
|
||||||
|
raise newException(ProtocolError, "invalid protocol magic")
|
||||||
|
let daemonMagic = await passDaemonWord(session)
|
||||||
|
let daemonVersion = await passDaemonWord(session)
|
||||||
|
session.version = Version(await passClientWord(session))
|
||||||
|
if session.version < 0x1_0a:
|
||||||
|
raise newException(ProtocolError, "obsolete protocol version")
|
||||||
|
assert session.version.minor >= 14
|
||||||
|
discard await(passClientWord(session))
|
||||||
|
# obsolete CPU affinity
|
||||||
|
assert session.version.minor >= 11
|
||||||
|
discard await(passClientWord(session))
|
||||||
|
# obsolete reserveSpace
|
||||||
|
assert session.version.minor >= 33
|
||||||
|
let daemonVersionString = await passDaemonString(session)
|
||||||
|
assert daemonVersionString == $store.nixVersion
|
||||||
|
await passWork(session)
|
||||||
|
return session
|
||||||
|
|
||||||
|
proc emulateSocket*(path: string) {.async, gcsafe.} =
|
||||||
|
let listener = newAsyncSocket(
|
||||||
|
domain = AF_UNIX,
|
||||||
|
sockType = SOCK_STREAM,
|
||||||
|
protocol = cast[Protocol](0),
|
||||||
|
buffered = false)
|
||||||
|
bindUnix(listener, path)
|
||||||
|
listen(listener)
|
||||||
|
stderr.writeLine "listening on ", path
|
||||||
|
while not listener.isClosed:
|
||||||
|
try:
|
||||||
|
let session = await handshake(listener)
|
||||||
|
assert not session.isNil
|
||||||
|
asyncCheck loop(session)
|
||||||
|
except ProtocolError as err:
|
||||||
|
stderr.writeLine "failed to service client, ", err.msg
|
||||||
|
|
||||||
|
import libssh
|
||||||
|
|
||||||
|
type Server* = ref object
|
||||||
|
bnd: Bind
|
||||||
|
callbacks: BindCallbacks
|
||||||
|
|
||||||
|
proc stop*(srv: Server) =
|
||||||
|
free(srv.bnd)
|
||||||
|
|
||||||
|
proc serveSsh*(keyFile: string, host: string, port: int): Server =
|
||||||
|
stderr.writeLine "initialize libssh"
|
||||||
|
libssh.init()
|
||||||
|
|
||||||
|
let srv = Server(bnd: newBind())
|
||||||
|
try:
|
||||||
|
stderr.writeLine "load key ", keyFile
|
||||||
|
setOption(srv.bnd, SSH_BIND_OPTIONS_HOSTKEY, keyFile)
|
||||||
|
setOption(srv.bnd, SSH_BIND_OPTIONS_BINDADDR, host)
|
||||||
|
setOption(srv.bnd, SSH_BIND_OPTIONS_BINDPORT, uint port)
|
||||||
|
|
||||||
|
srv.callbacks.incoming_connection = proc (b: Bind; data: pointer) {.cdecl.} =
|
||||||
|
let srv = cast[Server](data)
|
||||||
|
stderr.writeLine "got an incoming connection"
|
||||||
|
setCallbacks(srv.bnd, addr srv.callbacks, cast[pointer](srv))
|
||||||
|
listen(srv.bnd)
|
||||||
|
return srv
|
||||||
|
except CatchableError as err:
|
||||||
|
stop(srv)
|
||||||
|
raise err
|
|
@ -6,10 +6,20 @@
|
||||||
|
|
||||||
{.passC: "'-DSYSTEM=\"x86_64-linux\"'".}
|
{.passC: "'-DSYSTEM=\"x86_64-linux\"'".}
|
||||||
|
|
||||||
|
type StdString {.importcpp: "std::string", header: "<string>".} = object
|
||||||
|
proc data(s: StdString): pointer {.importcpp: "#.data()".}
|
||||||
|
proc len(s: StdString): csize_t {.importcpp: "#.length()".}
|
||||||
|
proc `$`*(cpp: StdString): string =
|
||||||
|
result.setLen(cpp.len)
|
||||||
|
if result.len > 0:
|
||||||
|
copyMem(addr result[0], cpp.data, result.len)
|
||||||
|
|
||||||
type
|
type
|
||||||
StorePath {.importcpp: "nix::StorePath", header: "path.hh".} = object
|
StorePath {.importcpp: "nix::StorePath", header: "path.hh".} = object
|
||||||
discard
|
discard
|
||||||
|
|
||||||
|
var nixVersion* {.importc: "nix::nixVersion", header: "globals.hh".}: StdString
|
||||||
|
|
||||||
proc isDerivation*(path: StorePath): bool {.importcpp.}
|
proc isDerivation*(path: StorePath): bool {.importcpp.}
|
||||||
|
|
||||||
type
|
type
|
||||||
|
|
|
@ -1,14 +0,0 @@
|
||||||
|
|
||||||
import
|
|
||||||
std/typetraits, preserves
|
|
||||||
|
|
||||||
type
|
|
||||||
Build* {.preservesRecord: "nix-build".} = object
|
|
||||||
`input`*: string
|
|
||||||
`output`*: string
|
|
||||||
|
|
||||||
proc `$`*(x: Build): string =
|
|
||||||
`$`(toPreserve(x))
|
|
||||||
|
|
||||||
proc encode*(x: Build): seq[byte] =
|
|
||||||
encode(toPreserve(x))
|
|
Loading…
Reference in New Issue