Compare commits
2 Commits
a18cdd16b4
...
b54417bb58
Author | SHA1 | Date |
---|---|---|
Emery Hemingway | b54417bb58 | |
Emery Hemingway | 9b843905f3 |
|
@ -1,4 +1,4 @@
|
||||||
version = "20230606"
|
version = "20230607"
|
||||||
author = "Emery Hemingway"
|
author = "Emery Hemingway"
|
||||||
description = "Syndicated Nix Actor"
|
description = "Syndicated Nix Actor"
|
||||||
license = "Unlicense"
|
license = "Unlicense"
|
||||||
|
|
17
protocol.prs
17
protocol.prs
|
@ -1,6 +1,5 @@
|
||||||
version 1 .
|
version 1 .
|
||||||
|
|
||||||
|
|
||||||
Build = <nix-build @input string @output any> .
|
Build = <nix-build @input string @output any> .
|
||||||
|
|
||||||
Realise = <realise @drv string @outputs [string ...]> .
|
Realise = <realise @drv string @outputs [string ...]> .
|
||||||
|
@ -15,9 +14,23 @@ Dict = {symbol: any ...:...} .
|
||||||
|
|
||||||
FieldInt = int .
|
FieldInt = int .
|
||||||
FieldString = string .
|
FieldString = string .
|
||||||
Field = FieldInt / FieldString .
|
Field = int / string .
|
||||||
Fields = [Field ...] .
|
Fields = [Field ...] .
|
||||||
|
|
||||||
ActionStart = <start @id int @level int @type int @text string @fields Fields @parent int> .
|
ActionStart = <start @id int @level int @type int @text string @fields Fields @parent int> .
|
||||||
ActionStop = <stop @id int> .
|
ActionStop = <stop @id int> .
|
||||||
ActionResult = <result @id int @type int @fields Fields> .
|
ActionResult = <result @id int @type int @fields Fields> .
|
||||||
|
|
||||||
|
; TODO: why not make target a singleton?
|
||||||
|
Missing = <missing @targets #{string} @willBuild #{string} @willSubstitute #{string} @unknown #{string} @downloadSize int @narSize int> .
|
||||||
|
|
||||||
|
PathInfo = <path-info
|
||||||
|
@path string
|
||||||
|
@deriver string
|
||||||
|
@narHash string
|
||||||
|
@references #{string}
|
||||||
|
@registrationTime int
|
||||||
|
@narSize int
|
||||||
|
@ultimate bool
|
||||||
|
@sigs #{string}
|
||||||
|
@ca string> .
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
let
|
||||||
|
flake = builtins.getFlake "syndicate";
|
||||||
|
pkgs = import <nixpkgs> { overlays = [ flake.overlays.default ]; };
|
||||||
|
in pkgs.nix_actor
|
|
@ -1,6 +1,6 @@
|
||||||
|
|
||||||
import
|
import
|
||||||
preserves, std/tables
|
preserves, std/sets, std/tables
|
||||||
|
|
||||||
type
|
type
|
||||||
Eval* {.preservesRecord: "eval".} = object
|
Eval* {.preservesRecord: "eval".} = object
|
||||||
|
@ -12,21 +12,40 @@ type
|
||||||
`drv`*: string
|
`drv`*: string
|
||||||
`outputs`*: seq[string]
|
`outputs`*: seq[string]
|
||||||
|
|
||||||
|
Missing* {.preservesRecord: "missing".} = object
|
||||||
|
`targets`*: HashSet[string]
|
||||||
|
`willBuild`*: HashSet[string]
|
||||||
|
`willSubstitute`*: HashSet[string]
|
||||||
|
`unknown`*: HashSet[string]
|
||||||
|
`downloadSize`*: BiggestInt
|
||||||
|
`narSize`*: BiggestInt
|
||||||
|
|
||||||
Narinfo* {.preservesRecord: "narinfo".} = object
|
Narinfo* {.preservesRecord: "narinfo".} = object
|
||||||
`path`*: string
|
`path`*: string
|
||||||
`info`*: Dict
|
`info`*: Dict
|
||||||
|
|
||||||
FieldKind* {.pure.} = enum
|
FieldKind* {.pure.} = enum
|
||||||
`FieldInt`, `FieldString`
|
`int`, `string`
|
||||||
`Field`* {.preservesOr.} = object
|
`Field`* {.preservesOr.} = object
|
||||||
case orKind*: FieldKind
|
case orKind*: FieldKind
|
||||||
of FieldKind.`FieldInt`:
|
of FieldKind.`int`:
|
||||||
`fieldint`*: FieldInt
|
`int`*: int
|
||||||
|
|
||||||
of FieldKind.`FieldString`:
|
of FieldKind.`string`:
|
||||||
`fieldstring`*: FieldString
|
`string`*: string
|
||||||
|
|
||||||
|
|
||||||
|
PathInfo* {.preservesRecord: "path-info".} = object
|
||||||
|
`path`*: string
|
||||||
|
`deriver`*: string
|
||||||
|
`narHash`*: string
|
||||||
|
`references`*: HashSet[string]
|
||||||
|
`registrationTime`*: BiggestInt
|
||||||
|
`narSize`*: BiggestInt
|
||||||
|
`ultimate`*: bool
|
||||||
|
`sigs`*: HashSet[string]
|
||||||
|
`ca`*: string
|
||||||
|
|
||||||
Dict* = Table[Symbol, Preserve[void]]
|
Dict* = Table[Symbol, Preserve[void]]
|
||||||
Build* {.preservesRecord: "nix-build".} = object
|
Build* {.preservesRecord: "nix-build".} = object
|
||||||
`input`*: string
|
`input`*: string
|
||||||
|
@ -56,7 +75,9 @@ type
|
||||||
`type`*: BiggestInt
|
`type`*: BiggestInt
|
||||||
`fields`*: Fields
|
`fields`*: Fields
|
||||||
|
|
||||||
proc `$`*(x: Eval | Realise | Narinfo | Field | Dict | Build | Fields |
|
proc `$`*(x: Eval | Realise | Missing | Narinfo | Field | PathInfo | Dict |
|
||||||
|
Build |
|
||||||
|
Fields |
|
||||||
ActionStart |
|
ActionStart |
|
||||||
FieldString |
|
FieldString |
|
||||||
Instantiate |
|
Instantiate |
|
||||||
|
@ -65,7 +86,9 @@ proc `$`*(x: Eval | Realise | Narinfo | Field | Dict | Build | Fields |
|
||||||
ActionResult): string =
|
ActionResult): string =
|
||||||
`$`(toPreserve(x))
|
`$`(toPreserve(x))
|
||||||
|
|
||||||
proc encode*(x: Eval | Realise | Narinfo | Field | Dict | Build | Fields |
|
proc encode*(x: Eval | Realise | Missing | Narinfo | Field | PathInfo | Dict |
|
||||||
|
Build |
|
||||||
|
Fields |
|
||||||
ActionStart |
|
ActionStart |
|
||||||
FieldString |
|
FieldString |
|
||||||
Instantiate |
|
Instantiate |
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
import std/[asyncdispatch, asyncnet, os, sets, strtabs, strutils]
|
import std/[asyncdispatch, asyncnet, os, sets, strtabs, strutils]
|
||||||
from std/nativesockets import AF_INET, AF_UNIX, SOCK_STREAM, Protocol
|
from std/nativesockets import AF_INET, AF_UNIX, SOCK_STREAM, Protocol
|
||||||
|
|
||||||
|
import preserves
|
||||||
import ./protocol, ./store
|
import ./protocol, ./store
|
||||||
|
|
||||||
{.pragma: workerProtocol, importc, header: "worker-protocol.hh".}
|
{.pragma: workerProtocol, importc, header: "worker-protocol.hh".}
|
||||||
|
@ -180,8 +181,8 @@ type ValidPathInfo = object
|
||||||
sigs: HashSet[string]
|
sigs: HashSet[string]
|
||||||
ca: string
|
ca: string
|
||||||
|
|
||||||
proc passDaemonValidPathInfo(session: Session; includePath: bool): Future[ValidPathInfo] {.async.} =
|
proc passDaemonValidPathInfo(session: Session; includePath: bool): Future[PathInfo] {.async.} =
|
||||||
var info: ValidPathInfo
|
var info: PathInfo
|
||||||
if includePath:
|
if includePath:
|
||||||
info.path = await passDaemonString(session)
|
info.path = await passDaemonString(session)
|
||||||
info.deriver = await passDaemonString(session)
|
info.deriver = await passDaemonString(session)
|
||||||
|
@ -195,7 +196,8 @@ proc passDaemonValidPathInfo(session: Session; includePath: bool): Future[ValidP
|
||||||
info.ca = await passDaemonString(session)
|
info.ca = await passDaemonString(session)
|
||||||
return info
|
return info
|
||||||
|
|
||||||
proc passChunks(session: Session; a, b: AsyncSocket) {.async.} =
|
proc passChunks(session: Session; a, b: AsyncSocket): Future[int] {.async.} =
|
||||||
|
var total: int
|
||||||
while true:
|
while true:
|
||||||
let chunkLen = int(await passWord(a, b))
|
let chunkLen = int(await passWord(a, b))
|
||||||
if chunkLen == 0:
|
if chunkLen == 0:
|
||||||
|
@ -208,8 +210,10 @@ proc passChunks(session: Session; a, b: AsyncSocket) {.async.} =
|
||||||
if recvLen != chunkLen:
|
if recvLen != chunkLen:
|
||||||
raise newException(ProtocolError, "invalid chunk read")
|
raise newException(ProtocolError, "invalid chunk read")
|
||||||
await send(b, addr session.buffer[0], recvLen)
|
await send(b, addr session.buffer[0], recvLen)
|
||||||
|
inc(total, recvLen)
|
||||||
|
return total
|
||||||
|
|
||||||
proc passClientChunks(session: Session): Future[void] =
|
proc passClientChunks(session: Session): Future[int] =
|
||||||
passChunks(session, session.client, session.daemon)
|
passChunks(session, session.client, session.daemon)
|
||||||
|
|
||||||
proc passErrorDaemonError(session: Session) {.async.} =
|
proc passErrorDaemonError(session: Session) {.async.} =
|
||||||
|
@ -237,10 +241,10 @@ proc passDaemonFields(session: Session): Future[Fields] {.async.} =
|
||||||
case typ
|
case typ
|
||||||
of 0:
|
of 0:
|
||||||
let num = await passDaemonWord(session)
|
let num = await passDaemonWord(session)
|
||||||
fields[i] = Field(orKind: FieldKind.FieldInt, fieldint: BiggestInt num)
|
fields[i] = Field(orKind: FieldKind.int, int: int num)
|
||||||
of 1:
|
of 1:
|
||||||
let str = await passDaemonString(session)
|
let str = await passDaemonString(session)
|
||||||
fields[i] = Field(orKind: FieldKind.FieldString, fieldstring: str)
|
fields[i] = Field(orKind: FieldKind.string, string: str)
|
||||||
else:
|
else:
|
||||||
raiseAssert "unknown field type " & $typ
|
raiseAssert "unknown field type " & $typ
|
||||||
return fields
|
return fields
|
||||||
|
@ -262,7 +266,6 @@ proc passWork(session: Session) {.async.} =
|
||||||
|
|
||||||
of STDERR_NEXT:
|
of STDERR_NEXT:
|
||||||
let s = await passDaemonString(session)
|
let s = await passDaemonString(session)
|
||||||
echo s
|
|
||||||
|
|
||||||
of STDERR_START_ACTIVITY:
|
of STDERR_START_ACTIVITY:
|
||||||
var act: ActionStart
|
var act: ActionStart
|
||||||
|
@ -289,48 +292,56 @@ proc passWork(session: Session) {.async.} =
|
||||||
else:
|
else:
|
||||||
raise newException(ProtocolError, "unknown work verb " & $word)
|
raise newException(ProtocolError, "unknown work verb " & $word)
|
||||||
|
|
||||||
|
#[
|
||||||
|
proc fromClient(miss: var Missing; socket: AsyncSocket) {.async.} =
|
||||||
|
result.targets = await passClientStringSet(session)
|
||||||
|
|
||||||
|
proc fromDaemon(miss: var Missing; socket: AsyncSocket) {.async.} =
|
||||||
|
miss.willBuild = await passDaemonStringSet(session)
|
||||||
|
miss.willSubstitute = await passDaemonStringSet(session)
|
||||||
|
miss.unknown = await passDaemonStringSet(session)
|
||||||
|
miss.downloadSize = BiggestInt await passDaemonWord(session)
|
||||||
|
miss.narSize = BiggestInt await passDaemonWord(session)
|
||||||
|
]#
|
||||||
|
|
||||||
proc loop(session: Session) {.async.} =
|
proc loop(session: Session) {.async.} =
|
||||||
|
var chunksTotal: int
|
||||||
try:
|
try:
|
||||||
while not session.client.isClosed:
|
while not session.client.isClosed:
|
||||||
let wop = await passClientWord(session)
|
let wop = await passClientWord(session)
|
||||||
case wop
|
case wop
|
||||||
of wopIsValidPath:
|
of wopIsValidPath:
|
||||||
echo "wopIsValidPath"
|
|
||||||
let path = await passClientString(session)
|
let path = await passClientString(session)
|
||||||
|
stderr.writeLine "wopIsValidPath ", path
|
||||||
await passWork(session)
|
await passWork(session)
|
||||||
let word = await passDaemonWord(session)
|
let word = await passDaemonWord(session)
|
||||||
echo "wopIsValidPath: ", path, " - ", word != 0
|
|
||||||
|
|
||||||
of wopAddToStore:
|
of wopAddToStore:
|
||||||
echo "wopAddToStore"
|
|
||||||
assert session.version.minor >= 25
|
assert session.version.minor >= 25
|
||||||
let
|
let
|
||||||
name = await passClientString(session)
|
name = await passClientString(session)
|
||||||
caMethod = await passClientString(session)
|
caMethod = await passClientString(session)
|
||||||
refs = await passClientStringSet(session)
|
refs = await passClientStringSet(session)
|
||||||
repairBool = await passClientWord(session)
|
repairBool = await passClientWord(session)
|
||||||
echo "wopAddToStore: ", name, " refs: ", refs
|
stderr.writeLine "wopAddToStore ", name
|
||||||
await passClientChunks(session)
|
let n = await passClientChunks(session)
|
||||||
|
inc(chunksTotal, n)
|
||||||
await passWork(session)
|
await passWork(session)
|
||||||
let info = await passDaemonValidPathInfo(session, true)
|
let info = await passDaemonValidPathInfo(session, true)
|
||||||
echo "finshed wopAddToStore ", name
|
|
||||||
|
|
||||||
of wopAddTempRoot:
|
of wopAddTempRoot:
|
||||||
echo "wopAddTempRoot"
|
|
||||||
let path = await passClientString(session)
|
let path = await passClientString(session)
|
||||||
echo "wopAddTempRoot: ", path
|
stderr.writeLine "wopAddTempRoot ", path
|
||||||
await passWork(session)
|
await passWork(session)
|
||||||
discard await passDaemonWord(session)
|
discard await passDaemonWord(session)
|
||||||
|
|
||||||
of wopAddIndirectRoot:
|
of wopAddIndirectRoot:
|
||||||
echo "wopAddIndirectRoot"
|
|
||||||
let path = await passClientString(session)
|
let path = await passClientString(session)
|
||||||
echo "wopAddIndirectRoot: ", path
|
stderr.writeLine "wopAddIndirectRoot ", path
|
||||||
await passWork(session)
|
await passWork(session)
|
||||||
discard await passDaemonWord(session)
|
discard await passDaemonWord(session)
|
||||||
|
|
||||||
of wopSetOptions:
|
of wopSetOptions:
|
||||||
echo "wopSetOptions"
|
|
||||||
discard passClientWord(session) # keepFailed
|
discard passClientWord(session) # keepFailed
|
||||||
discard passClientWord(session) # keepGoing
|
discard passClientWord(session) # keepGoing
|
||||||
discard passClientWord(session) # tryFallback
|
discard passClientWord(session) # tryFallback
|
||||||
|
@ -345,45 +356,37 @@ proc loop(session: Session) {.async.} =
|
||||||
discard passClientWord(session) # useSubstitutes
|
discard passClientWord(session) # useSubstitutes
|
||||||
assert session.version.minor >= 12
|
assert session.version.minor >= 12
|
||||||
let overrides = await passClientStringMap(session)
|
let overrides = await passClientStringMap(session)
|
||||||
echo "got overrides ", overrides
|
|
||||||
await passWork(session)
|
await passWork(session)
|
||||||
|
|
||||||
of wopQueryPathInfo:
|
of wopQueryPathInfo:
|
||||||
echo "wopQueryPathInfo"
|
|
||||||
assert session.version >= 17
|
assert session.version >= 17
|
||||||
let path = await passClientString(session)
|
let path = await passClientString(session)
|
||||||
|
stderr.writeLine "wopQueryPathInfo ", path
|
||||||
await passWork(session)
|
await passWork(session)
|
||||||
let valid = await passDaemonWord(session)
|
let valid = await passDaemonWord(session)
|
||||||
echo "daemon says valid is ", valid
|
|
||||||
if valid != 0:
|
if valid != 0:
|
||||||
echo "get path info from daemon"
|
var info = await passDaemonValidPathInfo(session, false)
|
||||||
let info = await passDaemonValidPathInfo(session, false)
|
info.path = path
|
||||||
echo "wopQueryPathInfo ", path, " ", info
|
stderr.writeLine "wopQueryPathInfo ", $info
|
||||||
|
|
||||||
of wopQueryMissing:
|
of wopQueryMissing:
|
||||||
echo "wopQueryMissing"
|
|
||||||
assert session.version >= 30
|
assert session.version >= 30
|
||||||
let targets = await passClientStringSeq(session)
|
var miss: Missing
|
||||||
echo "wopQueryMissing ", targets
|
miss.targets = await passClientStringSet(session)
|
||||||
|
|
||||||
await passWork(session)
|
await passWork(session)
|
||||||
let
|
miss.willBuild = await passDaemonStringSet(session)
|
||||||
willBuild = await passDaemonStringSet(session)
|
miss.willSubstitute = await passDaemonStringSet(session)
|
||||||
echo "willBuild is ", willBuild
|
miss.unknown = await passDaemonStringSet(session)
|
||||||
let
|
miss.downloadSize = BiggestInt await passDaemonWord(session)
|
||||||
willSubstitute = await passDaemonStringSet(session)
|
miss.narSize = BiggestInt await passDaemonWord(session)
|
||||||
unknown = await passDaemonStringSet(session)
|
stderr.writeLine "wopQueryMissing ", $miss
|
||||||
downloadSize = await passDaemonWord(session)
|
|
||||||
narSize = await passDaemonWord(session)
|
|
||||||
echo "downloadSize: ", downloadSize, " narSize: ", narSize
|
|
||||||
|
|
||||||
of wopBuildPathsWithResults:
|
of wopBuildPathsWithResults:
|
||||||
echo "wopBuildPathsWithResults"
|
|
||||||
assert session.version >= 34
|
assert session.version >= 34
|
||||||
let
|
let
|
||||||
drvs = await passClientStringSeq(session)
|
drvs = await passClientStringSeq(session)
|
||||||
buildMode = await passClientWord(session)
|
buildMode = await passClientWord(session)
|
||||||
echo "wopBuildPathsWithResults drvs: ", drvs, " mode: ", buildMode
|
stderr.writeLine "wopBuildPathsWithResults drvs ", $drvs
|
||||||
await passWork(session)
|
await passWork(session)
|
||||||
let count = await passDaemonWord(session)
|
let count = await passDaemonWord(session)
|
||||||
for _ in 1..count:
|
for _ in 1..count:
|
||||||
|
@ -396,14 +399,13 @@ proc loop(session: Session) {.async.} =
|
||||||
startTime = await passDaemonWord(session)
|
startTime = await passDaemonWord(session)
|
||||||
stopTime = await passDaemonWord(session)
|
stopTime = await passDaemonWord(session)
|
||||||
outputs = await passDaemonStringMap(session)
|
outputs = await passDaemonStringMap(session)
|
||||||
echo "wopBuildPathsWithResults ", path, " ", outputs
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
echo "unknown worker op ", wop.int
|
stderr.writeLine "unknown worker op ", wop.int
|
||||||
break
|
break
|
||||||
except ProtocolError as err:
|
except ProtocolError as err:
|
||||||
stderr.writeLine "connection terminated"
|
stderr.writeLine "connection terminated"
|
||||||
stderr.writeLine err.msg
|
stderr.writeLine "chunk bytes transfered: ", formatSize(chunksTotal)
|
||||||
finally:
|
finally:
|
||||||
close(session.daemon)
|
close(session.daemon)
|
||||||
close(session.client)
|
close(session.client)
|
||||||
|
@ -446,14 +448,14 @@ proc emulateSocket*(path: string) {.async, gcsafe.} =
|
||||||
buffered = false)
|
buffered = false)
|
||||||
bindUnix(listener, path)
|
bindUnix(listener, path)
|
||||||
listen(listener)
|
listen(listener)
|
||||||
echo "listening on ", path
|
stderr.writeLine "listening on ", path
|
||||||
while not listener.isClosed:
|
while not listener.isClosed:
|
||||||
try:
|
try:
|
||||||
let session = await handshake(listener)
|
let session = await handshake(listener)
|
||||||
assert not session.isNil
|
assert not session.isNil
|
||||||
asyncCheck loop(session)
|
asyncCheck loop(session)
|
||||||
except ProtocolError as err:
|
except ProtocolError as err:
|
||||||
echo "failed to service client, ", err.msg
|
stderr.writeLine "failed to service client, ", err.msg
|
||||||
|
|
||||||
when isMainModule:
|
when isMainModule:
|
||||||
const path = "/tmp/worker.nix.socket"
|
const path = "/tmp/worker.nix.socket"
|
||||||
|
|
Loading…
Reference in New Issue