Compare commits

...

2 Commits

Author SHA1 Message Date
Emery Hemingway b54417bb58 Better build-system 2023-06-07 18:06:11 +01:00
Emery Hemingway 9b843905f3 Better stuff 2023-06-07 13:14:47 +01:00
6 changed files with 98 additions and 56 deletions

2
.envrc
View File

@ -1,2 +1,2 @@
source_env .. source_env ..
use flake syndicate#nix_actor use nix

View File

@ -1,4 +1,4 @@
version = "20230606" version = "20230607"
author = "Emery Hemingway" author = "Emery Hemingway"
description = "Syndicated Nix Actor" description = "Syndicated Nix Actor"
license = "Unlicense" license = "Unlicense"

View File

@ -1,6 +1,5 @@
version 1 . version 1 .
Build = <nix-build @input string @output any> . Build = <nix-build @input string @output any> .
Realise = <realise @drv string @outputs [string ...]> . Realise = <realise @drv string @outputs [string ...]> .
@ -15,9 +14,23 @@ Dict = {symbol: any ...:...} .
FieldInt = int . FieldInt = int .
FieldString = string . FieldString = string .
Field = FieldInt / FieldString . Field = int / string .
Fields = [Field ...] . Fields = [Field ...] .
ActionStart = <start @id int @level int @type int @text string @fields Fields @parent int> . ActionStart = <start @id int @level int @type int @text string @fields Fields @parent int> .
ActionStop = <stop @id int> . ActionStop = <stop @id int> .
ActionResult = <result @id int @type int @fields Fields> . ActionResult = <result @id int @type int @fields Fields> .
; TODO: why not make target a singleton?
Missing = <missing @targets #{string} @willBuild #{string} @willSubstitute #{string} @unknown #{string} @downloadSize int @narSize int> .
PathInfo = <path-info
@path string
@deriver string
@narHash string
@references #{string}
@registrationTime int
@narSize int
@ultimate bool
@sigs #{string}
@ca string> .

4
shell.nix Normal file
View File

@ -0,0 +1,4 @@
let
flake = builtins.getFlake "syndicate";
pkgs = import <nixpkgs> { overlays = [ flake.overlays.default ]; };
in pkgs.nix_actor

View File

@ -1,6 +1,6 @@
import import
preserves, std/tables preserves, std/sets, std/tables
type type
Eval* {.preservesRecord: "eval".} = object Eval* {.preservesRecord: "eval".} = object
@ -12,21 +12,40 @@ type
`drv`*: string `drv`*: string
`outputs`*: seq[string] `outputs`*: seq[string]
Missing* {.preservesRecord: "missing".} = object
`targets`*: HashSet[string]
`willBuild`*: HashSet[string]
`willSubstitute`*: HashSet[string]
`unknown`*: HashSet[string]
`downloadSize`*: BiggestInt
`narSize`*: BiggestInt
Narinfo* {.preservesRecord: "narinfo".} = object Narinfo* {.preservesRecord: "narinfo".} = object
`path`*: string `path`*: string
`info`*: Dict `info`*: Dict
FieldKind* {.pure.} = enum FieldKind* {.pure.} = enum
`FieldInt`, `FieldString` `int`, `string`
`Field`* {.preservesOr.} = object `Field`* {.preservesOr.} = object
case orKind*: FieldKind case orKind*: FieldKind
of FieldKind.`FieldInt`: of FieldKind.`int`:
`fieldint`*: FieldInt `int`*: int
of FieldKind.`FieldString`: of FieldKind.`string`:
`fieldstring`*: FieldString `string`*: string
PathInfo* {.preservesRecord: "path-info".} = object
`path`*: string
`deriver`*: string
`narHash`*: string
`references`*: HashSet[string]
`registrationTime`*: BiggestInt
`narSize`*: BiggestInt
`ultimate`*: bool
`sigs`*: HashSet[string]
`ca`*: string
Dict* = Table[Symbol, Preserve[void]] Dict* = Table[Symbol, Preserve[void]]
Build* {.preservesRecord: "nix-build".} = object Build* {.preservesRecord: "nix-build".} = object
`input`*: string `input`*: string
@ -56,7 +75,9 @@ type
`type`*: BiggestInt `type`*: BiggestInt
`fields`*: Fields `fields`*: Fields
proc `$`*(x: Eval | Realise | Narinfo | Field | Dict | Build | Fields | proc `$`*(x: Eval | Realise | Missing | Narinfo | Field | PathInfo | Dict |
Build |
Fields |
ActionStart | ActionStart |
FieldString | FieldString |
Instantiate | Instantiate |
@ -65,7 +86,9 @@ proc `$`*(x: Eval | Realise | Narinfo | Field | Dict | Build | Fields |
ActionResult): string = ActionResult): string =
`$`(toPreserve(x)) `$`(toPreserve(x))
proc encode*(x: Eval | Realise | Narinfo | Field | Dict | Build | Fields | proc encode*(x: Eval | Realise | Missing | Narinfo | Field | PathInfo | Dict |
Build |
Fields |
ActionStart | ActionStart |
FieldString | FieldString |
Instantiate | Instantiate |

View File

@ -4,6 +4,7 @@
import std/[asyncdispatch, asyncnet, os, sets, strtabs, strutils] import std/[asyncdispatch, asyncnet, os, sets, strtabs, strutils]
from std/nativesockets import AF_INET, AF_UNIX, SOCK_STREAM, Protocol from std/nativesockets import AF_INET, AF_UNIX, SOCK_STREAM, Protocol
import preserves
import ./protocol, ./store import ./protocol, ./store
{.pragma: workerProtocol, importc, header: "worker-protocol.hh".} {.pragma: workerProtocol, importc, header: "worker-protocol.hh".}
@ -180,8 +181,8 @@ type ValidPathInfo = object
sigs: HashSet[string] sigs: HashSet[string]
ca: string ca: string
proc passDaemonValidPathInfo(session: Session; includePath: bool): Future[ValidPathInfo] {.async.} = proc passDaemonValidPathInfo(session: Session; includePath: bool): Future[PathInfo] {.async.} =
var info: ValidPathInfo var info: PathInfo
if includePath: if includePath:
info.path = await passDaemonString(session) info.path = await passDaemonString(session)
info.deriver = await passDaemonString(session) info.deriver = await passDaemonString(session)
@ -195,7 +196,8 @@ proc passDaemonValidPathInfo(session: Session; includePath: bool): Future[ValidP
info.ca = await passDaemonString(session) info.ca = await passDaemonString(session)
return info return info
proc passChunks(session: Session; a, b: AsyncSocket) {.async.} = proc passChunks(session: Session; a, b: AsyncSocket): Future[int] {.async.} =
var total: int
while true: while true:
let chunkLen = int(await passWord(a, b)) let chunkLen = int(await passWord(a, b))
if chunkLen == 0: if chunkLen == 0:
@ -208,8 +210,10 @@ proc passChunks(session: Session; a, b: AsyncSocket) {.async.} =
if recvLen != chunkLen: if recvLen != chunkLen:
raise newException(ProtocolError, "invalid chunk read") raise newException(ProtocolError, "invalid chunk read")
await send(b, addr session.buffer[0], recvLen) await send(b, addr session.buffer[0], recvLen)
inc(total, recvLen)
return total
proc passClientChunks(session: Session): Future[void] = proc passClientChunks(session: Session): Future[int] =
passChunks(session, session.client, session.daemon) passChunks(session, session.client, session.daemon)
proc passErrorDaemonError(session: Session) {.async.} = proc passErrorDaemonError(session: Session) {.async.} =
@ -237,10 +241,10 @@ proc passDaemonFields(session: Session): Future[Fields] {.async.} =
case typ case typ
of 0: of 0:
let num = await passDaemonWord(session) let num = await passDaemonWord(session)
fields[i] = Field(orKind: FieldKind.FieldInt, fieldint: BiggestInt num) fields[i] = Field(orKind: FieldKind.int, int: int num)
of 1: of 1:
let str = await passDaemonString(session) let str = await passDaemonString(session)
fields[i] = Field(orKind: FieldKind.FieldString, fieldstring: str) fields[i] = Field(orKind: FieldKind.string, string: str)
else: else:
raiseAssert "unknown field type " & $typ raiseAssert "unknown field type " & $typ
return fields return fields
@ -262,7 +266,6 @@ proc passWork(session: Session) {.async.} =
of STDERR_NEXT: of STDERR_NEXT:
let s = await passDaemonString(session) let s = await passDaemonString(session)
echo s
of STDERR_START_ACTIVITY: of STDERR_START_ACTIVITY:
var act: ActionStart var act: ActionStart
@ -289,48 +292,56 @@ proc passWork(session: Session) {.async.} =
else: else:
raise newException(ProtocolError, "unknown work verb " & $word) raise newException(ProtocolError, "unknown work verb " & $word)
#[
proc fromClient(miss: var Missing; socket: AsyncSocket) {.async.} =
result.targets = await passClientStringSet(session)
proc fromDaemon(miss: var Missing; socket: AsyncSocket) {.async.} =
miss.willBuild = await passDaemonStringSet(session)
miss.willSubstitute = await passDaemonStringSet(session)
miss.unknown = await passDaemonStringSet(session)
miss.downloadSize = BiggestInt await passDaemonWord(session)
miss.narSize = BiggestInt await passDaemonWord(session)
]#
proc loop(session: Session) {.async.} = proc loop(session: Session) {.async.} =
var chunksTotal: int
try: try:
while not session.client.isClosed: while not session.client.isClosed:
let wop = await passClientWord(session) let wop = await passClientWord(session)
case wop case wop
of wopIsValidPath: of wopIsValidPath:
echo "wopIsValidPath"
let path = await passClientString(session) let path = await passClientString(session)
stderr.writeLine "wopIsValidPath ", path
await passWork(session) await passWork(session)
let word = await passDaemonWord(session) let word = await passDaemonWord(session)
echo "wopIsValidPath: ", path, " - ", word != 0
of wopAddToStore: of wopAddToStore:
echo "wopAddToStore"
assert session.version.minor >= 25 assert session.version.minor >= 25
let let
name = await passClientString(session) name = await passClientString(session)
caMethod = await passClientString(session) caMethod = await passClientString(session)
refs = await passClientStringSet(session) refs = await passClientStringSet(session)
repairBool = await passClientWord(session) repairBool = await passClientWord(session)
echo "wopAddToStore: ", name, " refs: ", refs stderr.writeLine "wopAddToStore ", name
await passClientChunks(session) let n = await passClientChunks(session)
inc(chunksTotal, n)
await passWork(session) await passWork(session)
let info = await passDaemonValidPathInfo(session, true) let info = await passDaemonValidPathInfo(session, true)
echo "finshed wopAddToStore ", name
of wopAddTempRoot: of wopAddTempRoot:
echo "wopAddTempRoot"
let path = await passClientString(session) let path = await passClientString(session)
echo "wopAddTempRoot: ", path stderr.writeLine "wopAddTempRoot ", path
await passWork(session) await passWork(session)
discard await passDaemonWord(session) discard await passDaemonWord(session)
of wopAddIndirectRoot: of wopAddIndirectRoot:
echo "wopAddIndirectRoot"
let path = await passClientString(session) let path = await passClientString(session)
echo "wopAddIndirectRoot: ", path stderr.writeLine "wopAddIndirectRoot ", path
await passWork(session) await passWork(session)
discard await passDaemonWord(session) discard await passDaemonWord(session)
of wopSetOptions: of wopSetOptions:
echo "wopSetOptions"
discard passClientWord(session) # keepFailed discard passClientWord(session) # keepFailed
discard passClientWord(session) # keepGoing discard passClientWord(session) # keepGoing
discard passClientWord(session) # tryFallback discard passClientWord(session) # tryFallback
@ -345,45 +356,37 @@ proc loop(session: Session) {.async.} =
discard passClientWord(session) # useSubstitutes discard passClientWord(session) # useSubstitutes
assert session.version.minor >= 12 assert session.version.minor >= 12
let overrides = await passClientStringMap(session) let overrides = await passClientStringMap(session)
echo "got overrides ", overrides
await passWork(session) await passWork(session)
of wopQueryPathInfo: of wopQueryPathInfo:
echo "wopQueryPathInfo"
assert session.version >= 17 assert session.version >= 17
let path = await passClientString(session) let path = await passClientString(session)
stderr.writeLine "wopQueryPathInfo ", path
await passWork(session) await passWork(session)
let valid = await passDaemonWord(session) let valid = await passDaemonWord(session)
echo "daemon says valid is ", valid
if valid != 0: if valid != 0:
echo "get path info from daemon" var info = await passDaemonValidPathInfo(session, false)
let info = await passDaemonValidPathInfo(session, false) info.path = path
echo "wopQueryPathInfo ", path, " ", info stderr.writeLine "wopQueryPathInfo ", $info
of wopQueryMissing: of wopQueryMissing:
echo "wopQueryMissing"
assert session.version >= 30 assert session.version >= 30
let targets = await passClientStringSeq(session) var miss: Missing
echo "wopQueryMissing ", targets miss.targets = await passClientStringSet(session)
await passWork(session) await passWork(session)
let miss.willBuild = await passDaemonStringSet(session)
willBuild = await passDaemonStringSet(session) miss.willSubstitute = await passDaemonStringSet(session)
echo "willBuild is ", willBuild miss.unknown = await passDaemonStringSet(session)
let miss.downloadSize = BiggestInt await passDaemonWord(session)
willSubstitute = await passDaemonStringSet(session) miss.narSize = BiggestInt await passDaemonWord(session)
unknown = await passDaemonStringSet(session) stderr.writeLine "wopQueryMissing ", $miss
downloadSize = await passDaemonWord(session)
narSize = await passDaemonWord(session)
echo "downloadSize: ", downloadSize, " narSize: ", narSize
of wopBuildPathsWithResults: of wopBuildPathsWithResults:
echo "wopBuildPathsWithResults"
assert session.version >= 34 assert session.version >= 34
let let
drvs = await passClientStringSeq(session) drvs = await passClientStringSeq(session)
buildMode = await passClientWord(session) buildMode = await passClientWord(session)
echo "wopBuildPathsWithResults drvs: ", drvs, " mode: ", buildMode stderr.writeLine "wopBuildPathsWithResults drvs ", $drvs
await passWork(session) await passWork(session)
let count = await passDaemonWord(session) let count = await passDaemonWord(session)
for _ in 1..count: for _ in 1..count:
@ -396,14 +399,13 @@ proc loop(session: Session) {.async.} =
startTime = await passDaemonWord(session) startTime = await passDaemonWord(session)
stopTime = await passDaemonWord(session) stopTime = await passDaemonWord(session)
outputs = await passDaemonStringMap(session) outputs = await passDaemonStringMap(session)
echo "wopBuildPathsWithResults ", path, " ", outputs
else: else:
echo "unknown worker op ", wop.int stderr.writeLine "unknown worker op ", wop.int
break break
except ProtocolError as err: except ProtocolError as err:
stderr.writeLine "connection terminated" stderr.writeLine "connection terminated"
stderr.writeLine err.msg stderr.writeLine "chunk bytes transfered: ", formatSize(chunksTotal)
finally: finally:
close(session.daemon) close(session.daemon)
close(session.client) close(session.client)
@ -446,14 +448,14 @@ proc emulateSocket*(path: string) {.async, gcsafe.} =
buffered = false) buffered = false)
bindUnix(listener, path) bindUnix(listener, path)
listen(listener) listen(listener)
echo "listening on ", path stderr.writeLine "listening on ", path
while not listener.isClosed: while not listener.isClosed:
try: try:
let session = await handshake(listener) let session = await handshake(listener)
assert not session.isNil assert not session.isNil
asyncCheck loop(session) asyncCheck loop(session)
except ProtocolError as err: except ProtocolError as err:
echo "failed to service client, ", err.msg stderr.writeLine "failed to service client, ", err.msg
when isMainModule: when isMainModule:
const path = "/tmp/worker.nix.socket" const path = "/tmp/worker.nix.socket"