Compare commits

...

2 Commits

Author SHA1 Message Date
Emery Hemingway b54417bb58 Better build-system 2023-06-07 18:06:11 +01:00
Emery Hemingway 9b843905f3 Better stuff 2023-06-07 13:14:47 +01:00
6 changed files with 98 additions and 56 deletions

2
.envrc
View File

@ -1,2 +1,2 @@
source_env ..
use flake syndicate#nix_actor
use nix

View File

@ -1,4 +1,4 @@
version = "20230606"
version = "20230607"
author = "Emery Hemingway"
description = "Syndicated Nix Actor"
license = "Unlicense"

View File

@ -1,6 +1,5 @@
version 1 .
Build = <nix-build @input string @output any> .
Realise = <realise @drv string @outputs [string ...]> .
@ -15,9 +14,23 @@ Dict = {symbol: any ...:...} .
FieldInt = int .
FieldString = string .
Field = FieldInt / FieldString .
Field = int / string .
Fields = [Field ...] .
ActionStart = <start @id int @level int @type int @text string @fields Fields @parent int> .
ActionStop = <stop @id int> .
ActionResult = <result @id int @type int @fields Fields> .
; TODO: why not make target a singleton?
Missing = <missing @targets #{string} @willBuild #{string} @willSubstitute #{string} @unknown #{string} @downloadSize int @narSize int> .
PathInfo = <path-info
@path string
@deriver string
@narHash string
@references #{string}
@registrationTime int
@narSize int
@ultimate bool
@sigs #{string}
@ca string> .

4
shell.nix Normal file
View File

@ -0,0 +1,4 @@
let
flake = builtins.getFlake "syndicate";
pkgs = import <nixpkgs> { overlays = [ flake.overlays.default ]; };
in pkgs.nix_actor

View File

@ -1,6 +1,6 @@
import
preserves, std/tables
preserves, std/sets, std/tables
type
Eval* {.preservesRecord: "eval".} = object
@ -12,21 +12,40 @@ type
`drv`*: string
`outputs`*: seq[string]
Missing* {.preservesRecord: "missing".} = object
`targets`*: HashSet[string]
`willBuild`*: HashSet[string]
`willSubstitute`*: HashSet[string]
`unknown`*: HashSet[string]
`downloadSize`*: BiggestInt
`narSize`*: BiggestInt
Narinfo* {.preservesRecord: "narinfo".} = object
`path`*: string
`info`*: Dict
FieldKind* {.pure.} = enum
`FieldInt`, `FieldString`
`int`, `string`
`Field`* {.preservesOr.} = object
case orKind*: FieldKind
of FieldKind.`FieldInt`:
`fieldint`*: FieldInt
of FieldKind.`int`:
`int`*: int
of FieldKind.`FieldString`:
`fieldstring`*: FieldString
of FieldKind.`string`:
`string`*: string
PathInfo* {.preservesRecord: "path-info".} = object
`path`*: string
`deriver`*: string
`narHash`*: string
`references`*: HashSet[string]
`registrationTime`*: BiggestInt
`narSize`*: BiggestInt
`ultimate`*: bool
`sigs`*: HashSet[string]
`ca`*: string
Dict* = Table[Symbol, Preserve[void]]
Build* {.preservesRecord: "nix-build".} = object
`input`*: string
@ -56,7 +75,9 @@ type
`type`*: BiggestInt
`fields`*: Fields
proc `$`*(x: Eval | Realise | Narinfo | Field | Dict | Build | Fields |
proc `$`*(x: Eval | Realise | Missing | Narinfo | Field | PathInfo | Dict |
Build |
Fields |
ActionStart |
FieldString |
Instantiate |
@ -65,7 +86,9 @@ proc `$`*(x: Eval | Realise | Narinfo | Field | Dict | Build | Fields |
ActionResult): string =
`$`(toPreserve(x))
proc encode*(x: Eval | Realise | Narinfo | Field | Dict | Build | Fields |
proc encode*(x: Eval | Realise | Missing | Narinfo | Field | PathInfo | Dict |
Build |
Fields |
ActionStart |
FieldString |
Instantiate |

View File

@ -4,6 +4,7 @@
import std/[asyncdispatch, asyncnet, os, sets, strtabs, strutils]
from std/nativesockets import AF_INET, AF_UNIX, SOCK_STREAM, Protocol
import preserves
import ./protocol, ./store
{.pragma: workerProtocol, importc, header: "worker-protocol.hh".}
@ -180,8 +181,8 @@ type ValidPathInfo = object
sigs: HashSet[string]
ca: string
proc passDaemonValidPathInfo(session: Session; includePath: bool): Future[ValidPathInfo] {.async.} =
var info: ValidPathInfo
proc passDaemonValidPathInfo(session: Session; includePath: bool): Future[PathInfo] {.async.} =
var info: PathInfo
if includePath:
info.path = await passDaemonString(session)
info.deriver = await passDaemonString(session)
@ -195,7 +196,8 @@ proc passDaemonValidPathInfo(session: Session; includePath: bool): Future[ValidP
info.ca = await passDaemonString(session)
return info
proc passChunks(session: Session; a, b: AsyncSocket) {.async.} =
proc passChunks(session: Session; a, b: AsyncSocket): Future[int] {.async.} =
var total: int
while true:
let chunkLen = int(await passWord(a, b))
if chunkLen == 0:
@ -208,8 +210,10 @@ proc passChunks(session: Session; a, b: AsyncSocket) {.async.} =
if recvLen != chunkLen:
raise newException(ProtocolError, "invalid chunk read")
await send(b, addr session.buffer[0], recvLen)
inc(total, recvLen)
return total
proc passClientChunks(session: Session): Future[void] =
proc passClientChunks(session: Session): Future[int] =
passChunks(session, session.client, session.daemon)
proc passErrorDaemonError(session: Session) {.async.} =
@ -237,10 +241,10 @@ proc passDaemonFields(session: Session): Future[Fields] {.async.} =
case typ
of 0:
let num = await passDaemonWord(session)
fields[i] = Field(orKind: FieldKind.FieldInt, fieldint: BiggestInt num)
fields[i] = Field(orKind: FieldKind.int, int: int num)
of 1:
let str = await passDaemonString(session)
fields[i] = Field(orKind: FieldKind.FieldString, fieldstring: str)
fields[i] = Field(orKind: FieldKind.string, string: str)
else:
raiseAssert "unknown field type " & $typ
return fields
@ -262,7 +266,6 @@ proc passWork(session: Session) {.async.} =
of STDERR_NEXT:
let s = await passDaemonString(session)
echo s
of STDERR_START_ACTIVITY:
var act: ActionStart
@ -289,48 +292,56 @@ proc passWork(session: Session) {.async.} =
else:
raise newException(ProtocolError, "unknown work verb " & $word)
#[
proc fromClient(miss: var Missing; socket: AsyncSocket) {.async.} =
result.targets = await passClientStringSet(session)
proc fromDaemon(miss: var Missing; socket: AsyncSocket) {.async.} =
miss.willBuild = await passDaemonStringSet(session)
miss.willSubstitute = await passDaemonStringSet(session)
miss.unknown = await passDaemonStringSet(session)
miss.downloadSize = BiggestInt await passDaemonWord(session)
miss.narSize = BiggestInt await passDaemonWord(session)
]#
proc loop(session: Session) {.async.} =
var chunksTotal: int
try:
while not session.client.isClosed:
let wop = await passClientWord(session)
case wop
of wopIsValidPath:
echo "wopIsValidPath"
let path = await passClientString(session)
stderr.writeLine "wopIsValidPath ", path
await passWork(session)
let word = await passDaemonWord(session)
echo "wopIsValidPath: ", path, " - ", word != 0
of wopAddToStore:
echo "wopAddToStore"
assert session.version.minor >= 25
let
name = await passClientString(session)
caMethod = await passClientString(session)
refs = await passClientStringSet(session)
repairBool = await passClientWord(session)
echo "wopAddToStore: ", name, " refs: ", refs
await passClientChunks(session)
stderr.writeLine "wopAddToStore ", name
let n = await passClientChunks(session)
inc(chunksTotal, n)
await passWork(session)
let info = await passDaemonValidPathInfo(session, true)
echo "finshed wopAddToStore ", name
of wopAddTempRoot:
echo "wopAddTempRoot"
let path = await passClientString(session)
echo "wopAddTempRoot: ", path
stderr.writeLine "wopAddTempRoot ", path
await passWork(session)
discard await passDaemonWord(session)
of wopAddIndirectRoot:
echo "wopAddIndirectRoot"
let path = await passClientString(session)
echo "wopAddIndirectRoot: ", path
stderr.writeLine "wopAddIndirectRoot ", path
await passWork(session)
discard await passDaemonWord(session)
of wopSetOptions:
echo "wopSetOptions"
discard passClientWord(session) # keepFailed
discard passClientWord(session) # keepGoing
discard passClientWord(session) # tryFallback
@ -345,45 +356,37 @@ proc loop(session: Session) {.async.} =
discard passClientWord(session) # useSubstitutes
assert session.version.minor >= 12
let overrides = await passClientStringMap(session)
echo "got overrides ", overrides
await passWork(session)
of wopQueryPathInfo:
echo "wopQueryPathInfo"
assert session.version >= 17
let path = await passClientString(session)
stderr.writeLine "wopQueryPathInfo ", path
await passWork(session)
let valid = await passDaemonWord(session)
echo "daemon says valid is ", valid
if valid != 0:
echo "get path info from daemon"
let info = await passDaemonValidPathInfo(session, false)
echo "wopQueryPathInfo ", path, " ", info
var info = await passDaemonValidPathInfo(session, false)
info.path = path
stderr.writeLine "wopQueryPathInfo ", $info
of wopQueryMissing:
echo "wopQueryMissing"
assert session.version >= 30
let targets = await passClientStringSeq(session)
echo "wopQueryMissing ", targets
var miss: Missing
miss.targets = await passClientStringSet(session)
await passWork(session)
let
willBuild = await passDaemonStringSet(session)
echo "willBuild is ", willBuild
let
willSubstitute = await passDaemonStringSet(session)
unknown = await passDaemonStringSet(session)
downloadSize = await passDaemonWord(session)
narSize = await passDaemonWord(session)
echo "downloadSize: ", downloadSize, " narSize: ", narSize
miss.willBuild = await passDaemonStringSet(session)
miss.willSubstitute = await passDaemonStringSet(session)
miss.unknown = await passDaemonStringSet(session)
miss.downloadSize = BiggestInt await passDaemonWord(session)
miss.narSize = BiggestInt await passDaemonWord(session)
stderr.writeLine "wopQueryMissing ", $miss
of wopBuildPathsWithResults:
echo "wopBuildPathsWithResults"
assert session.version >= 34
let
drvs = await passClientStringSeq(session)
buildMode = await passClientWord(session)
echo "wopBuildPathsWithResults drvs: ", drvs, " mode: ", buildMode
stderr.writeLine "wopBuildPathsWithResults drvs ", $drvs
await passWork(session)
let count = await passDaemonWord(session)
for _ in 1..count:
@ -396,14 +399,13 @@ proc loop(session: Session) {.async.} =
startTime = await passDaemonWord(session)
stopTime = await passDaemonWord(session)
outputs = await passDaemonStringMap(session)
echo "wopBuildPathsWithResults ", path, " ", outputs
else:
echo "unknown worker op ", wop.int
stderr.writeLine "unknown worker op ", wop.int
break
except ProtocolError as err:
stderr.writeLine "connection terminated"
stderr.writeLine err.msg
stderr.writeLine "chunk bytes transfered: ", formatSize(chunksTotal)
finally:
close(session.daemon)
close(session.client)
@ -446,14 +448,14 @@ proc emulateSocket*(path: string) {.async, gcsafe.} =
buffered = false)
bindUnix(listener, path)
listen(listener)
echo "listening on ", path
stderr.writeLine "listening on ", path
while not listener.isClosed:
try:
let session = await handshake(listener)
assert not session.isNil
asyncCheck loop(session)
except ProtocolError as err:
echo "failed to service client, ", err.msg
stderr.writeLine "failed to service client, ", err.msg
when isMainModule:
const path = "/tmp/worker.nix.socket"