Break up sockets module, don't link libnix, wopAddToStore

This commit is contained in:
Emery Hemingway 2023-06-11 12:24:25 +01:00
parent 9a0d2a22ec
commit 584c01ef08
11 changed files with 609 additions and 426 deletions

View File

@ -34,8 +34,7 @@ A demo script for the [Syndicate server](https://git.syndicate-lang.org/syndicat
} }>
]
? <path-info "/nix/store/jhgh02lyizd1kyl71brvc01ygsmgi40a-tzdata-2023c"
?deriver ?narHash _ _ ?narSize _ ?sigs _> [
? <path-info "/nix/store/jhgh02lyizd1kyl71brvc01ygsmgi40a-tzdata-2023c" ?deriver ?narHash _ _ ?narSize _ ?sigs _> [
$log ! <log "-" { tzdata-2023c: {
deriver: $deriver
narHash: $narHash

View File

@ -1,4 +1,5 @@
include ../eris-nim/depends.tup
NIM_FLAGS += --path:$(TUP_CWD)/../eris-nim/src
include ../syndicate-nim/depends.tup
NIM_FLAGS += --path:$(TUP_CWD)/../syndicate-nim/src
NIM_FLAGS += --backend:cpp

View File

@ -1,9 +1,8 @@
version = "20230610"
version = "20230611"
author = "Emery Hemingway"
description = "Syndicated Nix Actor"
license = "Unlicense"
srcDir = "src"
bin = @["nix_actor"]
backend = "cpp"
requires "nim >= 1.6.10", "syndicate >= 20230530"

View File

@ -1,19 +1,19 @@
version 1 .
StringSeq = [string ...] .
StringSet = #{string} .
AttrSet = {symbol: any ...:...} .
Build = <nix-build @input string @output any> .
Realise = <realise @drv string @outputs [string ...]> .
Realise = <realise @drv string @outputs StringSeq> .
Instantiate = <instantiate @expr string @options Dict @result any> .
Instantiate = <instantiate @expr string @options AttrSet @result any> .
Eval = <eval @expr string @options {symbol: any ...:...} @result any> .
Narinfo = <narinfo @path string @info Dict> .
Narinfo = <narinfo @path string @info AttrSet> .
Dict = {symbol: any ...:...} .
FieldInt = int .
FieldString = string .
Field = int / string .
Fields = [Field ...] .
@ -22,16 +22,43 @@ ActionStop = <stop @id int> .
ActionResult = <result @id int @type int @fields Fields> .
; TODO: why not make target a singleton?
Missing = <missing @targets [string ...] @willBuild #{string} @willSubstitute #{string} @unknown #{string} @downloadSize int @narSize int> .
Missing = <missing @targets StringSeq @willBuild StringSet @willSubstitute StringSet @unknown StringSet @downloadSize int @narSize int> .
; TODO keep a few critical fields and move the rest into a dictionary
PathInfo = <path-info
@path string
@deriver string
@narHash string
@references #{string}
@registrationTime int
@narSize int
@ultimate bool
@sigs #{string}
@ca string> .
; Path info for the worker protocol version 35.
LegacyPathAttrs = {
deriver: string
narHash: string
references: StringSeq ; prefer a set
registrationTime: int
narSize: int
ultimate: bool
sigs: StringSet
ca: string
} .
AddToStoreClientAttrs = {
name: string
eris: bytes
ca-method: symbol
references: StringSeq ; prefer a set
} .
; Intersection of the attributes needed to add a path to a store
; and the attributes returned by the daemon after adding the path.
AddToStoreAttrs = {
name: string
eris: bytes
ca-method: symbol
references: StringSeq ; prefer a set
deriver: string
narHash: string
registrationTime: int
narSize: int
ultimate: bool
sigs: StringSet
ca: string
} .
; Any collection of attributes describing a store path.
PathInfo = <path @path string @attrs AttrSet> .

View File

@ -1,18 +1,20 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[asyncdispatch, httpclient, json, osproc, parseutils, strutils, tables]
import std/[json, osproc, parseutils, strutils, tables]
import eris/memory_stores
import preserves, preserves/jsonhooks
import syndicate
from syndicate/protocols/dataspace import Observe
import ./nix_actor/protocol
import ./nix_actor/[main, sockets]
import ./nix_actor/[clients, daemons]
type
Value = Preserve[void]
Observe = dataspace.Observe[Ref]
proc parseArgs(args: var seq[string]; opts: Dict) =
proc parseArgs(args: var seq[string]; opts: AttrSet) =
for sym, val in opts:
add(args, "--" & $sym)
if not val.isString "":
@ -20,7 +22,8 @@ proc parseArgs(args: var seq[string]; opts: Dict) =
if fromPreserve(js, val): add(args, $js)
else: stderr.writeLine "invalid option --", sym, " ", val
proc parseNarinfo(info: var Dict; text: string) =
#[
proc parseNarinfo(info: var AttrSet; text: string) =
var
key, val: string
off: int
@ -50,6 +53,7 @@ proc narinfo(turn: var Turn; ds: Ref; path: string) =
var narinfo = Narinfo(path: path)
parseNarinfo(narinfo.info, read(futBody))
discard publish(turn, ds, narinfo)
]# # I never link to openssl if I can avoid it.
proc build(spec: string): Build =
var execOutput = execProcess("nix", args = ["build", "--json", "--no-link", spec], options = {poUsePath})
@ -104,8 +108,10 @@ proc bootNixFacet(turn: var Turn; ds: Ref): Facet =
ass.result = eval(ass)
discard publish(turn, ds, ass)
#[
during(turn, ds, ?Observe(pattern: !Narinfo) ?? {0: grabLit()}) do (path: string):
narinfo(turn, ds, path)
]#
type
RefArgs {.preservesDictionary.} = object
@ -115,17 +121,15 @@ type
DaemonSideArgs {.preservesDictionary.} = object
`daemon-socket`: string
proc bootNixActor(root: Ref; turn: var Turn) =
runActor("main") do (root: Ref; turn: var Turn):
let store = newMemoryStore()
connectStdio(root, turn)
during(turn, root, ?RefArgs) do (ds: Ref):
discard bootNixFacet(turn, ds)
during(turn, root, ?ClientSideArgs) do (socketPath: string):
bootClientSide(turn.facet, ds, socketPath)
bootClientSide(turn, ds, store, socketPath)
during(turn, root, ?DaemonSideArgs) do (socketPath: string):
bootDaemonSide(turn, ds, socketPath)
initNix() # Nix lib isn't actually being used but it's nice to know that it links.
runActor("main", bootNixActor)
bootDaemonSide(turn, ds, store, socketPath)

View File

@ -1 +0,0 @@
define:ssl

172
src/nix_actor/clients.nim Normal file
View File

@ -0,0 +1,172 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[asyncdispatch, asyncnet, os, sets, strutils, tables]
from std/algorithm import sort
import eris
import preserves, syndicate
import ./protocol, ./sockets
proc sendNext(client: Session; msg: string) {.async.} =
await send(client, STDERR_NEXT)
await send(client, msg)
proc sendWorkEnd(client: Session): Future[void] =
send(client, STDERR_LAST)
proc send(client: Session; miss: Missing) {.async.} =
await sendWorkEnd(client)
await send(client, miss.willBuild)
await send(client, miss.willSubstitute)
await send(client, miss.unknown)
await send(client, Word miss.downloadSize)
await send(client, Word miss.narSize)
proc send(client: Session; info: LegacyPathAttrs) {.async.} =
await send(client, info.deriver)
await send(client, info.narHash)
await send(client, info.references)
await send(client, Word info.registrationTime)
await send(client, Word info.narSize)
await send(client, Word info.ultimate)
await send(client, info.sigs)
await send(client, info.ca)
proc sendValidInfo(client: Session; info: LegacyPathAttrs) {.async.} =
await sendWorkEnd(client)
await send(client, 1) # valid
await send(client, info)
proc completeAddToStore(client: Session; path: string; info: LegacyPathAttrs) {.async.} =
await sendWorkEnd(client)
await send(client, path)
await send(client, info)
proc serveClient(facet: Facet; ds: Ref; store: ErisStore; client: Session) {.async.} =
block:
let clientMagic = await recvWord(client)
if clientMagic != WORKER_MAGIC_1:
raise newException(ProtocolError, "invalid protocol magic")
await send(client, WORKER_MAGIC_2, PROTOCOL_VERSION)
let clientVersion = Version(await recvWord(client))
if clientVersion < 0x1_21:
raise newException(ProtocolError, "obsolete protocol version")
assert clientVersion.minor >= 14
discard await(recvWord(client))
# obsolete CPU affinity
assert clientVersion.minor >= 11
discard await(recvWord(client))
# obsolete reserveSpace
assert clientVersion.minor >= 33
await send(client, "0.0.0")
await sendWorkEnd(client)
while not client.socket.isClosed:
let wop = await recvWord(client.socket)
case wop
of wopAddToStore:
let
name = await recvString(client)
caMethod = await recvString(client)
var storeRefs = await recvStringSeq(client)
sort(storeRefs) # sets not valid for patterns so use a sorted list
discard await recvWord(client) # repair, not implemented
let cap = await ingestChunks(client, store)
await sendNext(client, $cap & " " & name)
let attrsPat = inject(?AddToStoreAttrs, {
"name".toSymbol(Ref): ?name,
"ca-method".toSymbol(Ref): ?caMethod.toSymbol,
"references".toSymbol(Ref): ?storeRefs,
"eris".toSymbol(Ref): ?cap.bytes,
})
# bind AddToStoreAttrs and override with some literal values
let pat = PathInfo ? { 0: grab(), 1: attrsPat }
run(facet) do (turn: var Turn):
onPublish(turn, ds, pat) do (path: string, ca: string, deriver: string, narHash: string, narSize: BiggestInt, regTime: BiggestInt, sigs: StringSet, ultimate: bool):
asyncCheck(turn, completeAddToStore(client, path, LegacyPathAttrs(
ca: ca,
deriver: deriver,
narHash: narHash,
narSize: narSize,
references: storeRefs,
registrationTime: regTime,
sigs: sigs,
ultimate: ultimate,
)))
of wopQueryPathInfo:
let
path = await recvString(client)
pat = PathInfo ? { 0: ?path, 1: grab() }
run(facet) do (turn: var Turn):
onPublish(turn, ds, pat) do (info: LegacyPathAttrs):
asyncCheck(turn, sendValidInfo(client, info))
of wopQueryMissing:
var targets = toPreserve(await recvStringSeq(client))
sort(targets.sequence)
# would prefer to use a set but that doesn't translate into a pattern
let pat = inject(?Missing, { 0: ?targets })
run(facet) do (turn: var Turn):
onPublish(turn, ds, pat) do (
willBuild: StringSet,
willSubstitute: StringSet,
unknown: StringSet,
downloadSize: BiggestInt,
narSize: BiggestInt
):
let miss = Missing(
willBuild: willBuild,
willSubstitute: willSubstitute,
unknown: unknown,
downloadSize: downloadSize,
narSize: narSize,
)
asyncCheck(turn, send(client, miss))
of wopSetOptions:
await discardWords(client, 12)
# 01 keepFailed
# 02 keepGoing
# 03 tryFallback
# 04 verbosity
# 05 maxBuildJobs
# 06 maxSilentTime
# 07 useBuildHook
# 08 verboseBuild
# 09 logType
# 10 printBuildTrace
# 11 buildCores
# 12 useSubstitutes
let overridePairCount = await recvWord(client)
for _ in 1..overridePairCount:
discard await (recvString(client))
discard await (recvString(client))
await sendWorkEnd(client)
# all options from the client are ingored
else:
let msg = "unhandled worker op " & $wop.int
await sendNext(client, msg)
await sendWorkEnd(client)
close(client.socket)
proc serveClientSide(facet: Facet; ds: Ref; store: ErisStore; listener: AsyncSocket) {.async.} =
while not listener.isClosed:
let
client = await accept(listener)
fut = serveClient(facet, ds, store, newSession(client))
addCallback(fut) do ():
if not client.isClosed:
close(client)
proc bootClientSide*(turn: var Turn; ds: Ref; store: ErisStore; socketPath: string) =
let listener = newUnixSocket()
onStop(turn.facet) do (turn: var Turn):
close(listener)
removeFile(socketPath)
removeFile(socketPath)
bindUnix(listener, socketPath)
listen(listener)
asyncCheck(turn, serveClientSide(turn.facet, ds, store, listener))

205
src/nix_actor/daemons.nim Normal file
View File

@ -0,0 +1,205 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[asyncdispatch, asyncnet, sets, streams, strutils]
from std/algorithm import sort
import eris
import preserves, syndicate
from syndicate/protocols/dataspace import Observe
import ./protocol, ./sockets
type Value = Preserve[void]
proc merge(items: varargs[Value]): Value =
# TODO: just a hack, not a proper imlementation
# https://preserves.dev/preserves.html#appendix-merging-values
result = initDictionary()
for e in items:
for (key, val) in e.pairs:
result[key] = val
cannonicalize(result)
type Observe = dataspace.Observe[Ref]
proc recvError(daemon: Session): Future[string] {.async.} =
discard #[typ]# await recvString(daemon)
discard #[lvl]# await recvWord(daemon)
discard #[name]# await recvString(daemon)
let msg = #[msg]# await recvString(daemon)
discard #[havePos]# await recvWord(daemon)
let nrTraces = await recvWord(daemon)
for i in 1..nrTraces:
discard #[havPos]# await recvWord(daemon)
discard #[msg]# await recvString(daemon)
return msg
proc recvFields(daemon: Session) {.async.} =
let count = await recvWord(daemon)
for i in 0..<count:
let typ = await recvWord(daemon)
case typ
of 0: discard await recvWord(daemon)
of 1: discard await recvString(daemon)
else: raiseAssert "unknown field type " & $typ
proc recvWork(daemon: Session) {.async.} =
while true:
let word = await recvWord(daemon)
case word
of STDERR_WRITE:
discard await recvString(daemon)
of STDERR_READ:
await send(daemon, "")
of STDERR_ERROR:
let err = await recvError(daemon)
raise newException(ProtocolError, "Nix daemon: " & err)
of STDERR_NEXT:
let msg = await recvString(daemon)
stderr.writeLine("Nix daemon: ", msg)
of STDERR_START_ACTIVITY:
discard await recvWord(daemon) # id
discard await recvWord(daemon) # level
discard await recvWord(daemon) # type
discard await recvString(daemon) # text
await recvFields(daemon) # fields
discard await recvWord(daemon) # parent
of STDERR_STOP_ACTIVITY:
discard await recvWord(daemon) # id
of STDERR_RESULT:
discard await recvWord(daemon) # id
discard await recvWord(daemon) # type
await recvFields(daemon) # fields
of STDERR_LAST:
break
else:
raise newException(ProtocolError, "unknown work verb " & $word)
proc connectDaemon(daemon: Session; socketPath: string) {.async.} =
await connectUnix(daemon.socket, socketPath)
await send(daemon, WORKER_MAGIC_1)
let daemonMagic = await recvWord(daemon)
if daemonMagic != WORKER_MAGIC_2:
raise newException(ProtocolError, "bad magic from daemon")
let daemonVersion = await recvWord(daemon)
daemon.version = min(Version daemonVersion, PROTOCOL_VERSION)
await send(daemon, Word daemon.version)
await send(daemon, 0) # CPU affinity
await send(daemon, 0) # reserve space
if daemon.version.minor >= 33:
discard await recvString(daemon) # version
if daemon.version.minor >= 35:
discard await recvWord(daemon) # remoteTrustsUs
await recvWork(daemon)
proc queryMissing(daemon: Session; targets: StringSeq): Future[Missing] {.async.} =
var miss = Missing(targets: targets)
await send(daemon, wopQueryMissing)
await send(daemon, miss.targets)
await recvWork(daemon)
miss.willBuild = await recvStringSet(daemon)
miss.willSubstitute = await recvStringSet(daemon)
miss.unknown = await recvStringSet(daemon)
miss.downloadSize = BiggestInt await recvWord(daemon)
miss.narSize = BiggestInt await recvWord(daemon)
return miss
proc queryPathInfo(daemon: Session; path: string): Future[LegacyPathAttrs] {.async.} =
var info: LegacyPathAttrs
await send(daemon, wopQueryPathInfo)
await send(daemon, path)
await recvWork(daemon)
let valid = await recvWord(daemon)
if valid != 0:
info.deriver = await recvString(daemon)
info.narHash = await recvString(daemon)
info.references = await recvStringSeq(daemon)
sort(info.references)
info.registrationTime = BiggestInt await recvWord(daemon)
info.narSize = BiggestInt await recvWord(daemon)
info.ultimate = (await recvWord(daemon)) != 0
info.sigs = await recvStringSet(daemon)
info.ca = await recvString(daemon)
return info
proc recvLegacyPathAttrs(daemon: Session): Future[AddToStoreAttrs] {.async.} =
var info: AddToStoreAttrs
info.deriver = await recvString(daemon)
info.narHash = await recvString(daemon)
info.references = await recvStringSeq(daemon)
sort(info.references)
info.registrationTime = BiggestInt await recvWord(daemon)
info.narSize = BiggestInt await recvWord(daemon)
assert daemon.version.minor >= 16
info.ultimate = (await recvWord(daemon)) != 0
info.sigs = await recvStringSet(daemon)
info.ca = await recvString(daemon)
return info
proc addToStore(daemon: Session; store: ErisStore; request: AddToStoreClientAttrs): Future[(string, AddToStoreAttrs)] {.async.} =
let
erisCap = parseCap(request.eris)
stream = newErisStream(store, erisCap)
await send(daemon, wopAddToStore)
await send(daemon, request.name)
await send(daemon, string request.`ca-method`)
await send(daemon, request.references)
await send(daemon, 0) # repair
await recoverChunks(daemon, store, erisCap)
await recvWork(daemon)
let path = await recvString(daemon)
var info = await recvLegacyPathAttrs(daemon)
info.eris = request.eris
info.`ca-method` = request.`ca-method`
info.name = request.name
info.references = request.references
return (path, info)
proc callDaemon(turn: var Turn; path: string; action: proc (daemon: Session; turn: var Turn) {.gcsafe.}): Session =
let
daemon = newSession()
fut = connectDaemon(daemon, path)
addCallback(fut, turn) do (turn: var Turn):
read(fut)
action(daemon, turn)
return daemon
proc bootDaemonSide*(turn: var Turn; ds: Ref; store: ErisStore; socketPath: string) =
during(turn, ds, ?Observe(pattern: !Missing) ?? {0: grab()}) do (a: Preserve[Ref]):
# cannot use `grabLit` here because an array is a compound
# TODO: unpack to a `Pattern`
let daemon = callDaemon(turn, socketPath) do (daemon: Session; turn: var Turn):
var targets: StringSeq
doAssert targets.fromPreserve(unpackLiterals(a))
# unpack <arr [<lit " …">]>
let missFut = queryMissing(daemon, targets)
addCallback(missFut, turn) do (turn: var Turn):
close(daemon)
var miss = read(missFut)
discard publish(turn, ds, miss)
do:
close(daemon)
during(turn, ds, ?Observe(pattern: !PathInfo) ?? {0: grabLit()}) do (path: string):
let daemon = callDaemon(turn, socketPath) do (daemon: Session; turn: var Turn):
let infoFut = queryPathInfo(daemon, path)
addCallback(infoFut, turn) do (turn: var Turn):
close(daemon)
var info = read(infoFut)
discard publish(turn, ds, initRecord("path", path.toPreserve, info.toPreserve))
do:
close(daemon)
during(turn, ds, ?Observe(pattern: !PathInfo) ?? {1: grabDict()}) do (pat: Value):
var daemon: Session
var request: AddToStoreClientAttrs
if request.fromPreserve(unpackLiterals pat):
daemon = callDaemon(turn, socketPath) do (daemon: Session; turn: var Turn):
let fut = addToStore(daemon, store, request)
addCallback(fut, turn) do (turn: var Turn):
close(daemon)
var (path, info) = read(fut)
discard publish(turn, ds, initRecord("path", path.toPreserve, info.toPreserve))
do:
close(daemon)

View File

@ -1,7 +0,0 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
{.passC: staticExec("pkg-config --cflags nix-main").}
{.passL: staticExec("pkg-config --libs nix-main").}
proc initNix*() {.importcpp: "nix::initNix", header: "shared.hh".}

View File

@ -8,21 +8,32 @@ type
`options`*: Table[Symbol, Preserve[void]]
`result`*: Preserve[void]
AttrSet* = Table[Symbol, Preserve[void]]
Realise* {.preservesRecord: "realise".} = object
`drv`*: string
`outputs`*: seq[string]
`outputs`*: StringSeq
LegacyPathAttrs* {.preservesDictionary.} = object
`ca`*: string
`deriver`*: string
`narHash`*: string
`narSize`*: BiggestInt
`references`*: StringSeq
`registrationTime`*: BiggestInt
`sigs`*: StringSet
`ultimate`*: bool
Missing* {.preservesRecord: "missing".} = object
`targets`*: seq[string]
`willBuild`*: HashSet[string]
`willSubstitute`*: HashSet[string]
`unknown`*: HashSet[string]
`targets`*: StringSeq
`willBuild`*: StringSet
`willSubstitute`*: StringSet
`unknown`*: StringSet
`downloadSize`*: BiggestInt
`narSize`*: BiggestInt
Narinfo* {.preservesRecord: "narinfo".} = object
`path`*: string
`info`*: Dict
`info`*: AttrSet
FieldKind* {.pure.} = enum
`int`, `string`
@ -35,18 +46,30 @@ type
`string`*: string
PathInfo* {.preservesRecord: "path-info".} = object
`path`*: string
`deriver`*: string
`narHash`*: string
`references`*: HashSet[string]
`registrationTime`*: BiggestInt
`narSize`*: BiggestInt
`ultimate`*: bool
`sigs`*: HashSet[string]
StringSet* = HashSet[string]
AddToStoreAttrs* {.preservesDictionary.} = object
`ca`*: string
`ca-method`*: Symbol
`deriver`*: string
`eris`*: seq[byte]
`name`*: string
`narHash`*: string
`narSize`*: BiggestInt
`references`*: StringSeq
`registrationTime`*: BiggestInt
`sigs`*: StringSet
`ultimate`*: bool
AddToStoreClientAttrs* {.preservesDictionary.} = object
`ca-method`*: Symbol
`eris`*: seq[byte]
`name`*: string
`references`*: StringSeq
PathInfo* {.preservesRecord: "path".} = object
`path`*: string
`attrs`*: AttrSet
Dict* = Table[Symbol, Preserve[void]]
Build* {.preservesRecord: "nix-build".} = object
`input`*: string
`output`*: Preserve[void]
@ -60,13 +83,12 @@ type
`fields`*: Fields
`parent`*: BiggestInt
FieldString* = string
Instantiate* {.preservesRecord: "instantiate".} = object
`expr`*: string
`options`*: Dict
`options`*: AttrSet
`result`*: Preserve[void]
FieldInt* = BiggestInt
StringSeq* = seq[string]
ActionStop* {.preservesRecord: "stop".} = object
`id`*: BiggestInt
@ -75,24 +97,32 @@ type
`type`*: BiggestInt
`fields`*: Fields
proc `$`*(x: Eval | Realise | Missing | Narinfo | Field | PathInfo | Dict |
proc `$`*(x: Eval | AttrSet | Realise | LegacyPathAttrs | Missing | Narinfo |
Field |
StringSet |
AddToStoreAttrs |
AddToStoreClientAttrs |
PathInfo |
Build |
Fields |
ActionStart |
FieldString |
Instantiate |
FieldInt |
StringSeq |
ActionStop |
ActionResult): string =
`$`(toPreserve(x))
proc encode*(x: Eval | Realise | Missing | Narinfo | Field | PathInfo | Dict |
proc encode*(x: Eval | AttrSet | Realise | LegacyPathAttrs | Missing | Narinfo |
Field |
StringSet |
AddToStoreAttrs |
AddToStoreClientAttrs |
PathInfo |
Build |
Fields |
ActionStart |
FieldString |
Instantiate |
FieldInt |
StringSeq |
ActionStop |
ActionResult): seq[byte] =
encode(toPreserve(x))

View File

@ -1,93 +1,96 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[algorithm, asyncdispatch, asyncnet, os, sets, strtabs, strutils]
from std/nativesockets import AF_INET, AF_UNIX, SOCK_STREAM, Protocol
## Common module for communicating with Nix clients and daemons.
import std/[asyncdispatch, asyncnet, sets, strtabs, strutils, tables]
from std/nativesockets import AF_UNIX, SOCK_STREAM, Protocol
import eris
import preserves, syndicate
from syndicate/protocols/dataspace import Observe
import ./protocol
{.pragma: workerProtocol, importc, header: "worker-protocol.hh".}
type Word = uint64
proc `$`(w: Word): string = toHex(w)
type Word* = uint64
proc `[]=`*[T](attrs: var AttrSet; key: string; val: T) =
attrs[Symbol key] = val.toPreserve
const
WORKER_MAGIC_1 = 0x6E697863
WORKER_MAGIC_2 = 0x6478696F
PROTOCOL_VERSION = 0x100 or 35
WORKER_MAGIC_1* = 0x6E697863
WORKER_MAGIC_2* = 0x6478696F
PROTOCOL_VERSION* = 0x100 or 35
STDERR_NEXT = 0x6F6C6d67
STDERR_READ = 0x64617461
STDERR_WRITE = 0x64617416
STDERR_LAST = 0x616C7473
STDERR_ERROR = 0x63787470
STDERR_START_ACTIVITY = 0x53545254
STDERR_STOP_ACTIVITY = 0x53544F50
STDERR_RESULT = 0x52534C54
STDERR_NEXT* = 0x6F6C6d67
STDERR_READ* = 0x64617461
STDERR_WRITE* = 0x64617416
STDERR_LAST* = 0x616C7473
STDERR_ERROR* = 0x63787470
STDERR_START_ACTIVITY* = 0x53545254
STDERR_STOP_ACTIVITY* = 0x53544F50
STDERR_RESULT* = 0x52534C54
wopIsValidPath = 1
wopHasSubstitutes = 3
wopQueryReferrers = 6
wopAddToStore = 7
wopBuildPaths = 9
wopEnsurePath = 10
wopAddTempRoot = 11
wopAddIndirectRoot = 12
wopSyncWithGC = 13
wopFindRoots = 14
wopSetOptions = 19
wopCollectGarbage = 20
wopQuerySubstitutablePathInfo = 21
wopQueryAllValidPaths = 23
wopQueryFailedPaths = 24
wopClearFailedPaths = 25
wopQueryPathInfo = 26
wopQueryPathFromHashPart = 29
wopQuerySubstitutablePathInfos = 30
wopQueryValidPaths = 31
wopQuerySubstitutablePaths = 32
wopQueryValidDerivers = 33
wopOptimiseStore = 34
wopVerifyStore = 35
wopBuildDerivation = 36
wopAddSignatures = 37
wopNarFromPath = 38
wopAddToStoreNar = 39
wopQueryMissing = 40
wopQueryDerivationOutputMap = 41
wopRegisterDrvOutput = 42
wopQueryRealisation = 43
wopAddMultipleToStore = 44
wopAddBuildLog = 45
wopBuildPathsWithResults = 46
wopIsValidPath* = 1
wopHasSubstitutes* = 3
wopQueryReferrers* = 6
wopAddToStore* = 7
wopBuildPaths* = 9
wopEnsurePath* = 10
wopAddTempRoot* = 11
wopAddIndirectRoot* = 12
wopSyncWithGC* = 13
wopFindRoots* = 14
wopSetOptions* = 19
wopCollectGarbage* = 20
wopQuerySubstitutablePathInfo* = 21
wopQueryAllValidPaths* = 23
wopQueryFailedPaths* = 24
wopClearFailedPaths* = 25
wopQueryPathInfo* = 26
wopQueryPathFromHashPart* = 29
wopQuerySubstitutablePathInfos* = 30
wopQueryValidPaths* = 31
wopQuerySubstitutablePaths* = 32
wopQueryValidDerivers* = 33
wopOptimiseStore* = 34
wopVerifyStore* = 35
wopBuildDerivation* = 36
wopAddSignatures* = 37
wopNarFromPath* = 38
wopAddToStoreNar* = 39
wopQueryMissing* = 40
wopQueryDerivationOutputMap* = 41
wopRegisterDrvOutput* = 42
wopQueryRealisation* = 43
wopAddMultipleToStore* = 44
wopAddBuildLog* = 45
wopBuildPathsWithResults* = 46
type
ProtocolError = object of IOError
Version = uint16
StringSeq = seq[string]
StringSet = HashSet[string]
Session = ref object
socket: AsyncSocket
buffer: seq[Word]
version: Version
Observe = dataspace.Observe[Ref]
ProtocolError* = object of IOError
Version* = uint16
func major(version: Version): uint16 = version and 0xff00
func minor(version: Version): uint16 = version and 0x00ff
Session* = ref object
socket*: AsyncSocket
buffer*: seq[Word]
version*: Version
proc close(session: Session) =
func major*(version: Version): uint16 = version and 0xff00
func minor*(version: Version): uint16 = version and 0x00ff
proc close*(session: Session) =
close(session.socket)
reset(session.buffer)
proc send(session: Session; words: varargs[Word]): Future[void] =
proc send*(session: Session; words: varargs[Word]): Future[void] =
if session.buffer.len < words.len:
session.buffer.setLen(words.len)
for i, word in words: session.buffer[i] = word
send(session.socket, addr session.buffer[0], words.len shl 3)
proc send(session: Session; s: string): Future[void] =
proc send*(session: Session; s: string): Future[void] =
let wordCount = 1 + ((s.len + 7) shr 3)
if session.buffer.len < wordCount: setLen(session.buffer, wordCount)
session.buffer[0] = Word s.len
@ -96,7 +99,7 @@ proc send(session: Session; s: string): Future[void] =
copyMem(addr session.buffer[1], unsafeAddr s[0], s.len)
send(session.socket, addr session.buffer[0], wordCount shl 3)
proc send(session: Session; ss: StringSeq|StringSet): Future[void] =
proc send*(session: Session; ss: StringSeq|StringSet): Future[void] =
## Send a set of strings. The set is sent as a contiguous buffer.
session.buffer[0] = Word ss.len
var off = 1
@ -113,23 +116,23 @@ proc send(session: Session; ss: StringSeq|StringSet): Future[void] =
inc(off, stringWordLen)
send(session.socket, addr session.buffer[0], off shl 3)
proc recvWord(sock: AsyncSocket): Future[Word] {.async.} =
proc recvWord*(sock: AsyncSocket): Future[Word] {.async.} =
var w: Word
let n = await recvInto(sock, addr w, sizeof(Word))
if n != sizeof(Word): raise newException(ProtocolError, "short read")
return w
proc recvWord(session: Session): Future[Word] =
proc recvWord*(session: Session): Future[Word] =
recvWord(session.socket)
proc discardWords(session: Session; n: int): Future[void] {.async.} =
proc discardWords*(session: Session; n: int): Future[void] {.async.} =
if session.buffer.len < n: setLen(session.buffer, n)
let byteCount = n shl 3
let n = await recvInto(session.socket, addr session.buffer[0], byteCount)
if n != byteCount:
raise newException(ProtocolError, "short read")
proc recvString(socket: AsyncSocket): Future[string] {.async.} =
proc recvString*(socket: AsyncSocket): Future[string] {.async.} =
let stringLen = int (await recvWord(socket))
if stringLen > 0:
var s = newString((stringLen + 7) and (not 7))
@ -140,310 +143,61 @@ proc recvString(socket: AsyncSocket): Future[string] {.async.} =
return s
return ""
proc recvString(session: Session): Future[string] =
proc recvString*(session: Session): Future[string] =
recvString(session.socket)
proc recvStringSeq(session: Session): Future[StringSeq] {.async.} =
proc recvStringSeq*(session: Session): Future[StringSeq] {.async.} =
let count = int(await recvWord(session.socket))
var strings = newSeq[string](count)
for i in 0..<count: strings[i] = await recvString(session)
return strings
proc recvStringSet(session: Session): Future[StringSet] {.async.} =
proc recvStringSet*(session: Session): Future[StringSet] {.async.} =
let count = int(await recvWord(session.socket))
var strings = initHashSet[string](count)
for i in 0..<count: incl(strings, await recvString(session))
return strings
proc recvError(session: Session) {.async.} =
discard #[typ]# await recvString(session)
discard #[lvl]# await recvWord(session)
discard #[name]# await recvString(session)
discard #[msg]# await recvString(session)
discard #[havePos]# await recvWord(session)
let nrTraces = await recvWord(session)
for i in 1..nrTraces:
discard #[havPos]# await recvWord(session)
discard #[msg]# await recvString(session)
proc recvFields(session: Session) {.async.} =
let count = await recvWord(session)
for i in 0..<count:
let typ = await recvWord(session)
case typ
of 0: discard await recvWord(session)
of 1: discard await recvString(session)
else: raiseAssert "unknown field type " & $typ
proc recvWork(session: Session) {.async.} =
while true:
let word = await recvWord(session)
case word
of STDERR_WRITE:
discard await recvString(session)
of STDERR_READ:
await send(session, "")
of STDERR_ERROR:
await recvError(session)
of STDERR_NEXT:
discard await recvString(session)
of STDERR_START_ACTIVITY:
discard await recvWord(session) # id
discard await recvWord(session) # level
discard await recvWord(session) # type
discard await recvString(session) # text
await recvFields(session) # fields
discard await recvWord(session) # parent
of STDERR_STOP_ACTIVITY:
discard await recvWord(session) # id
of STDERR_RESULT:
var act: ActionResult
discard await recvWord(session) # id
discard await recvWord(session) # type
await recvFields(session) # fields
of STDERR_LAST:
break
else:
raise newException(ProtocolError, "unknown work verb " & $word)
proc daemonSocketPath: string =
getEnv(
"NIX_DAEMON_SOCKET_PATH",
"/nix/var/nix/daemon-socket/socket")
proc newSession(socket: AsyncSocket): Session =
Session(socket: socket, buffer: newSeq[Word](512))
proc newSession(): Session =
newSession(newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false))
proc send(session: Session; miss: Missing) {.async.} =
await send(session, STDERR_LAST)
await send(session, miss.willBuild)
await send(session, miss.willSubstitute)
await send(session, miss.unknown)
await send(session, Word miss.downloadSize)
await send(session, Word miss.narSize)
proc send(session: Session; info: PathInfo) {.async.} =
await send(session, STDERR_LAST)
await send(session, 1)
if info.path != "":
await send(session, info.path)
await send(session, info.deriver)
await send(session, info.narHash)
await send(session, info.references)
await send(session, Word info.registrationTime)
await send(session, Word info.narSize)
await send(session, Word info.ultimate)
await send(session, info.sigs)
await send(session, info.ca)
proc serveClient(facet: Facet; ds: Ref; session: Session) {.async.} =
block:
let clientMagic = await recvWord(session)
if clientMagic != WORKER_MAGIC_1:
raise newException(ProtocolError, "invalid protocol magic")
await send(session, WORKER_MAGIC_2, PROTOCOL_VERSION)
let clientVersion = Version(await recvWord(session))
if clientVersion < 0x1_21:
raise newException(ProtocolError, "obsolete protocol version")
assert clientVersion.minor >= 14
discard await(recvWord(session))
# obsolete CPU affinity
assert clientVersion.minor >= 11
discard await(recvWord(session))
# obsolete reserveSpace
assert clientVersion.minor >= 33
await send(session, "0.0.0")
await send(session, STDERR_LAST)
while not session.socket.isClosed:
let wop = await recvWord(session.socket)
case wop
of wopQueryPathInfo:
let
path = await recvString(session)
pat = inject(?PathInfo, { 0: ?path })
await send(session, STDERR_NEXT)
await send(session, $pat)
run(facet) do (turn: var Turn):
onPublish(turn, ds, pat) do (
deriver: string,
narHash: string,
references: StringSet,
registrationTime: BiggestInt,
narSize: BiggestInt,
ultimate: bool,
sigs: StringSet,
ca: string
):
var info = PathInfo(
deriver: deriver,
narHash: narHash,
references: references,
registrationTime: registrationTime,
narSize: narSize,
ultimate: ultimate,
sigs: sigs,
ca: ca,
)
asyncCheck(turn, send(session, info))
of wopQueryMissing:
var targets = toPreserve(await recvStringSeq(session))
sort(targets.sequence)
# would prefer to use a set but that doesn't translate into a pattern
let pat = inject(?Missing, { 0: ?targets })
# TODO send the pattern to the client as a log line
await send(session, STDERR_NEXT)
await send(session, $pat)
run(facet) do (turn: var Turn):
onPublish(turn, ds, pat) do (
willBuild: StringSet,
willSubstitute: StringSet,
unknown: StringSet,
downloadSize: BiggestInt,
narSize: BiggestInt
):
let miss = Missing(
willBuild: willBuild,
willSubstitute: willSubstitute,
unknown: unknown,
downloadSize: downloadSize,
narSize: narSize,
)
asyncCheck(turn, send(session, miss))
of wopSetOptions:
await discardWords(session, 12)
# 01 keepFailed
# 02 keepGoing
# 03 tryFallback
# 04 verbosity
# 05 maxBuildJobs
# 06 maxSilentTime
# 07 useBuildHook
# 08 verboseBuild
# 09 logType
# 10 printBuildTrace
# 11 buildCores
# 12 useSubstitutes
let overridePairCount = await recvWord(session)
for _ in 1..overridePairCount:
discard await (recvString(session))
discard await (recvString(session))
await send(session, STDERR_LAST)
# all options from the client are ingored
else:
let msg = "unhandled worker op " & $wop.int
await send(session, STDERR_NEXT)
await send(session, msg)
await send(session, STDERR_LAST)
close(session.socket)
proc serveClientSide*(facet: Facet; ds: Ref; listener: AsyncSocket) {.async.} =
while not listener.isClosed:
let
client = await accept(listener)
fut = serveClient(facet, ds, newSession(client))
addCallback(fut) do ():
if not client.isClosed:
close(client)
proc bootClientSide*(facet: Facet; ds: Ref; socketPath: string) =
let listener = newAsyncSocket(
proc newUnixSocket*(): AsyncSocket =
newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false)
onStop(facet) do (turn: var Turn):
close(listener)
removeFile(socketPath)
removeFile(socketPath)
bindUnix(listener, socketPath)
listen(listener)
asyncCheck(facet, serveClientSide(facet, ds, listener))
buffered = false,
)
proc connectDaemon(session: Session; socketPath: string) {.async.} =
await connectUnix(session.socket, socketPath)
await send(session, WORKER_MAGIC_1)
let daemonMagic = await recvWord(session)
if daemonMagic != WORKER_MAGIC_2:
raise newException(ProtocolError, "bad magic from daemon")
let daemonVersion = await recvWord(session)
session.version = min(Version daemonVersion, PROTOCOL_VERSION)
await send(session, Word session.version)
await send(session, 0) # CPU affinity
await send(session, 0) # reserve space
if session.version.minor >= 33:
discard await recvString(session) # version
if session.version.minor >= 35:
discard await recvWord(session) # remoteTrustsUs
await recvWork(session)
proc newSession*(socket: AsyncSocket): Session =
Session(socket: socket, buffer: newSeq[Word](512))
proc queryMissing(session: Session; targets: StringSeq): Future[Missing] {.async.} =
var miss = Missing(targets: targets)
await send(session, wopQueryMissing)
await send(session, miss.targets)
await recvWork(session)
miss.willBuild = await recvStringSet(session)
miss.willSubstitute = await recvStringSet(session)
miss.unknown = await recvStringSet(session)
miss.downloadSize = BiggestInt await recvWord(session)
miss.narSize = BiggestInt await recvWord(session)
return miss
proc newSession*(): Session =
newUnixSocket().newSession()
proc queryPathInfo(session: Session; path: string): Future[PathInfo] {.async.} =
var info = PathInfo(path: path)
await send(session, wopQueryPathInfo)
await send(session, info.path)
await recvWork(session)
let valid = await recvWord(session)
if valid != 0:
info.deriver = await recvString(session)
info.narHash = await recvString(session)
info.references = await recvStringSet(session)
info.registrationTime = BiggestInt await recvWord(session)
info.narSize = BiggestInt await recvWord(session)
info.ultimate = (await recvWord(session)) != 0
info.sigs = await recvStringSet(session)
info.ca = await recvString(session)
return info
proc ingestChunks*(session: Session; store: ErisStore): Future[ErisCap] {.async.} =
var ingest: ErisIngest
while true:
let chunkLen = int await recvWord(session)
if ingest.isNil:
ingest = newErisIngest(
store, recommendedChunkSize(chunkLen), convergentMode)
if chunkLen == 0:
break
else:
let wordLen = (chunkLen + 7) shr 3
if session.buffer.len < wordLen: setLen(session.buffer, wordLen)
let recvLen = await recvInto(session.socket, addr session.buffer[0], chunkLen)
# each chunk must be received contiguously
if recvLen != chunkLen:
raise newException(ProtocolError, "invalid chunk read")
await append(ingest, addr session.buffer[0], chunkLen)
var cap = await cap(ingest)
return cap
proc bootDaemonSide*(turn: var Turn; ds: Ref; socketPath: string) =
during(turn, ds, ?Observe(pattern: !Missing) ?? {0: grab()}) do (a: Preserve[Ref]):
# cannot use `grabLit` here because an array is a compound
let
session = newSession()
fut = connectDaemon(session, socketPath)
addCallback(fut, turn) do (turn: var Turn):
read(fut)
var targets: StringSeq
doAssert targets.fromPreserve(unpackLiterals(a))
# unpack <arr [<lit " …">]>
let missFut = queryMissing(session, targets)
addCallback(missFut, turn) do (turn: var Turn):
var miss = read(missFut)
discard publish(turn, ds, miss)
do:
close(session)
during(turn, ds, ?Observe(pattern: !PathInfo) ?? {0: grabLit()}) do (path: string):
let
session = newSession()
fut = connectDaemon(session, socketPath)
addCallback(fut, turn) do (turn: var Turn):
read(fut)
let infoFut = queryPathInfo(session, path)
addCallback(infoFut, turn) do (turn: var Turn):
var info = read(infoFut)
discard publish(turn, ds, info)
do:
close(session)
proc recoverChunks*(session: Session; store: ErisStore; cap: ErisCap) {.async.} =
let stream = newErisStream(store, cap)
session.buffer.setLen(succ(cap.chunkSize.int shr 3))
while true:
let n = await stream.readBuffer(addr session.buffer[1], cap.chunkSize.int)
session.buffer[0] = Word n
await send(session.socket, addr session.buffer[0], 8+n)
if n == 0: break
close(stream)