Other stuff

This commit is contained in:
Emery Hemingway 2023-06-07 22:25:42 +01:00
parent b54417bb58
commit ff56152bbd
3 changed files with 163 additions and 62 deletions

View File

@ -24,6 +24,7 @@ ActionResult = <result @id int @type int @fields Fields> .
; TODO: why not make target a singleton?
Missing = <missing @targets #{string} @willBuild #{string} @willSubstitute #{string} @unknown #{string} @downloadSize int @narSize int> .
; TODO keep a few critical fields and move the rest into a dictionary
PathInfo = <path-info
@path string
@deriver string

View File

@ -110,18 +110,17 @@ proc bootNixFacet(ds: Ref; turn: var Turn): Facet =
type
RefArgs {.preservesDictionary.} = object
dataspace: Ref
SocketArgs {.preservesDictionary.} = object
ClientSideArgs {.preservesDictionary.} = object
`listen-socket`: string
proc bootNixActor(root: Ref; turn: var Turn) =
connectStdio(root, turn)
during(turn, root, ?RefArgs) do (ds: Ref):
discard bootNixFacet(ds, turn)
during(turn, root, ?SocketArgs) do (path: string):
removeFile(path)
asyncCheck(turn, emulateSocket(path))
do:
removeFile(path)
during(turn, root, ?ClientSideArgs) do (`listen-socket`: string):
serveClientside(turn.facet, ds, `listen-socket`)
initNix() # Nix lib isn't actually being used but it's nice to know that it links.
runActor("main", bootNixActor)

View File

@ -4,7 +4,7 @@
import std/[asyncdispatch, asyncnet, os, sets, strtabs, strutils]
from std/nativesockets import AF_INET, AF_UNIX, SOCK_STREAM, Protocol
import preserves
import preserves, syndicate
import ./protocol, ./store
{.pragma: workerProtocol, importc, header: "worker-protocol.hh".}
@ -66,72 +66,99 @@ type
ProtocolError = object of IOError
Version = uint16
Session = ref object
client, daemon: AsyncSocket
socket: AsyncSocket
buffer: seq[Word]
version: Version
func major(version: Version): uint16 = version and 0xff00
func minor(version: Version): uint16 = version and 0x00ff
proc daemonSocketPath: string =
getEnv(
"NIX_DAEMON_SOCKET_PATH",
"/nix/var/nix/daemon-socket/socket")
proc sendWord(session: Session; words: varargs[Word]): Future[void] =
if session.buffer.len < words.len:
session.buffer.setLen(words.len)
for i, word in words: session.buffer[i] = word
send(session.socket, addr session.buffer[0], words.len shl 3)
proc send(session: Session; sock: AsyncSocket; words: varargs[Word]): Future[void] =
proc sendString(session: Session; s: string): Future[void] =
let wordCount = 1 + ((s.len + 7) shr 3)
if session.buffer.len < wordCount: setLen(session.buffer, wordCount)
session.buffer[0] = Word s.len
if wordCount > 0:
session.buffer[pred wordCount] = 0x00
copyMem(addr session.buffer[1], unsafeAddr s[0], s.len)
send(session.socket, addr session.buffer[0], wordCount shl 3)
proc recvWord(sock: AsyncSocket): Future[Word] {.async.} =
var w: Word
let n = await recvInto(sock, addr w, sizeof(Word))
if n != sizeof(Word): raise newException(ProtocolError, "short read")
return w
proc recvWord(session: Session): Future[Word] =
recvWord(session.socket)
proc discardWords(session: Session; n: int): Future[void] {.async.} =
if session.buffer.len < n: setLen(session.buffer, n)
let byteCount = n shl 3
let n = await recvInto(session.socket, addr session.buffer[0], byteCount)
if n != byteCount:
raise newException(ProtocolError, "short read")
proc recvString(socket: AsyncSocket): Future[string] {.async.} =
let stringLen = int (await recvWord(socket))
if stringLen > 0:
var s = newString((stringLen + 7) and (not 7))
let n = await recvInto(socket, addr s[0], s.len)
if n != s.len:
raise newException(ProtocolError, "short read")
setLen(s, stringLen)
return s
return ""
proc recvString(session: Session): Future[string] =
recvString(session.socket)
type
Snoop = ref object
client, daemon: AsyncSocket
buffer: seq[Word]
version: Version
proc send(session: Snoop; sock: AsyncSocket; words: varargs[Word]): Future[void] =
for i, word in words: session.buffer[i] = word
send(sock, addr session.buffer[0], words.len shl 3)
proc send(session: Session; sock: AsyncSocket; s: string): Future[void] =
proc send(session: Snoop; sock: AsyncSocket; s: string): Future[void] =
let wordCount = (s.len + 7) shr 3
if wordCount > session.buffer.len: setLen(session.buffer, wordCount)
session.buffer[0] = Word s.len
if wordCount > 0:
session.buffer[wordCount] = 0x00
copyMem(addr session.buffer[1], unsafeAddr s[0], s.len)
send(sock, addr session.buffer[0], (1 + wordCount) shl 3)
proc recvWord(sock: AsyncSocket): Future[Word] {.async.} =
var w: Word
let n = await recvInto(sock, addr w, sizeof(Word))
if n != sizeof(Word): raise newException(ProtocolError, "short read of word")
return w
send(sock, addr session.buffer[0], (succ wordCount) shl 3)
proc passWord(a, b: AsyncSocket): Future[Word] {.async.} =
var w = await recvWord(a)
await send(b, addr w, sizeof(Word))
return w
proc recvString(sock: AsyncSocket): Future[string] {.async.} =
let w = await recvWord(sock)
let stringLen = int w
var s: string
if stringLen > 0:
s.setLen((stringLen + 7) and (not 7))
let n = await recvInto(sock, addr s[0], s.len)
if n != s.len:
raise newException(ProtocolError, "short string read")
setLen(s, stringLen)
return s
proc passString(session: Session; a, b: AsyncSocket): Future[string] {.async.} =
proc passString(session: Snoop; a, b: AsyncSocket): Future[string] {.async.} =
var s = await recvString(a)
await send(session, b, s)
return s
proc passStringSeq(session: Session; a, b: AsyncSocket): Future[seq[string]] {.async.} =
proc passStringSeq(session: Snoop; a, b: AsyncSocket): Future[seq[string]] {.async.} =
let count = int(await passWord(a, b))
var strings = newSeq[string](count)
for i in 0..<count: strings[i] = await passString(session, a, b)
return strings
proc passStringSet(session: Session; a, b: AsyncSocket): Future[HashSet[string]] {.async.} =
proc passStringSet(session: Snoop; a, b: AsyncSocket): Future[HashSet[string]] {.async.} =
let count = int(await passWord(a, b))
var strings = initHashSet[string](count)
for i in 0..<count: incl(strings, await passString(session, a, b))
return strings
proc passStringMap(session: Session; a, b: AsyncSocket): Future[StringTableRef] {.async.} =
proc passStringMap(session: Snoop; a, b: AsyncSocket): Future[StringTableRef] {.async.} =
var table = newStringTable(modeCaseSensitive)
let n = await passWord(a, b)
for i in 1..n:
@ -141,34 +168,34 @@ proc passStringMap(session: Session; a, b: AsyncSocket): Future[StringTableRef]
table[key] = val
return table
proc passClientWord(session: Session): Future[Word] =
proc passClientWord(session: Snoop): Future[Word] =
passWord(session.client, session.daemon)
proc passDaemonWord(session: Session): Future[Word] =
proc passDaemonWord(session: Snoop): Future[Word] =
passWord(session.daemon, session.client)
proc passClientString(session: Session): Future[string] =
proc passClientString(session: Snoop): Future[string] =
passString(session, session.client, session.daemon)
proc passDaemonString(session: Session): Future[string] =
proc passDaemonString(session: Snoop): Future[string] =
passString(session, session.daemon, session.client)
proc passClientStringSeq(session: Session): Future[seq[string]] =
proc passClientStringSeq(session: Snoop): Future[seq[string]] =
passStringSeq(session, session.client, session.daemon)
proc passDaemonStringSeq(session: Session): Future[seq[string]] =
proc passDaemonStringSeq(session: Snoop): Future[seq[string]] =
passStringSeq(session, session.daemon, session.client)
proc passClientStringSet(session: Session): Future[HashSet[string]] =
proc passClientStringSet(session: Snoop): Future[HashSet[string]] =
passStringSet(session, session.client, session.daemon)
proc passDaemonStringSet(session: Session): Future[HashSet[string]] =
proc passDaemonStringSet(session: Snoop): Future[HashSet[string]] =
passStringSet(session, session.daemon, session.client)
proc passClientStringMap(session: Session): Future[StringTableRef] =
proc passClientStringMap(session: Snoop): Future[StringTableRef] =
passStringMap(session, session.client, session.daemon)
proc passDaemonStringMap(session: Session): Future[StringTableRef] =
proc passDaemonStringMap(session: Snoop): Future[StringTableRef] =
passStringMap(session, session.daemon, session.client)
type ValidPathInfo = object
@ -181,7 +208,7 @@ type ValidPathInfo = object
sigs: HashSet[string]
ca: string
proc passDaemonValidPathInfo(session: Session; includePath: bool): Future[PathInfo] {.async.} =
proc passDaemonValidPathInfo(session: Snoop; includePath: bool): Future[PathInfo] {.async.} =
var info: PathInfo
if includePath:
info.path = await passDaemonString(session)
@ -196,7 +223,7 @@ proc passDaemonValidPathInfo(session: Session; includePath: bool): Future[PathIn
info.ca = await passDaemonString(session)
return info
proc passChunks(session: Session; a, b: AsyncSocket): Future[int] {.async.} =
proc passChunks(session: Snoop; a, b: AsyncSocket): Future[int] {.async.} =
var total: int
while true:
let chunkLen = int(await passWord(a, b))
@ -213,10 +240,10 @@ proc passChunks(session: Session; a, b: AsyncSocket): Future[int] {.async.} =
inc(total, recvLen)
return total
proc passClientChunks(session: Session): Future[int] =
proc passClientChunks(session: Snoop): Future[int] =
passChunks(session, session.client, session.daemon)
proc passErrorDaemonError(session: Session) {.async.} =
proc passErrorDaemonError(session: Snoop) {.async.} =
let
typ = await passDaemonString(session)
assert typ == "Error"
@ -233,7 +260,7 @@ proc passErrorDaemonError(session: Session) {.async.} =
assert havPos == 0
let msg = await passDaemonString(session)
proc passDaemonFields(session: Session): Future[Fields] {.async.} =
proc passDaemonFields(session: Snoop): Future[Fields] {.async.} =
let count = await passDaemonWord(session)
var fields = newSeq[Field](count)
for i in 0..<count:
@ -249,7 +276,7 @@ proc passDaemonFields(session: Session): Future[Fields] {.async.} =
raiseAssert "unknown field type " & $typ
return fields
proc passWork(session: Session) {.async.} =
proc passWork(session: Snoop) {.async.} =
while true:
let word = await passDaemonWord(session)
case word
@ -304,7 +331,7 @@ proc fromDaemon(miss: var Missing; socket: AsyncSocket) {.async.} =
miss.narSize = BiggestInt await passDaemonWord(session)
]#
proc loop(session: Session) {.async.} =
proc loop(session: Snoop) {.async.} =
var chunksTotal: int
try:
while not session.client.isClosed:
@ -410,9 +437,14 @@ proc loop(session: Session) {.async.} =
close(session.daemon)
close(session.client)
proc handshake(listener: AsyncSocket): Future[Session] {.async.} =
proc daemonSocketPath: string =
getEnv(
"NIX_DAEMON_SOCKET_PATH",
"/nix/var/nix/daemon-socket/socket")
proc handshake(listener: AsyncSocket): Future[Snoop] {.async.} =
## Take the next connection from `listener` and return a `Session`.
let session = Session(buffer: newSeq[Word](1024)) # 8KiB
let session = Snoop(buffer: newSeq[Word](1024)) # 8KiB
session.client = await listener.accept()
session.daemon = newAsyncSocket(
domain = AF_UNIX,
@ -457,8 +489,77 @@ proc emulateSocket*(path: string) {.async, gcsafe.} =
except ProtocolError as err:
stderr.writeLine "failed to service client, ", err.msg
when isMainModule:
const path = "/tmp/worker.nix.socket"
if fileExists(path): removeFile(path)
try: waitFor emulateSocket(path)
finally: removeFile(path)
proc serveClient(facet: Facet; ds: Ref; client: AsyncSocket) {.async.} =
let session = Session(socket: client, buffer: newSeq[Word](512))
block:
let clientMagic = await recvWord(session)
if clientMagic != WORKER_MAGIC_1:
raise newException(ProtocolError, "invalid protocol magic")
await sendWord(session, WORKER_MAGIC_2, PROTOCOL_VERSION)
let clientVersion = Version(await recvWord(session))
if clientVersion < 0x1_21:
raise newException(ProtocolError, "obsolete protocol version")
assert clientVersion.minor >= 14
discard await(recvWord(session))
# obsolete CPU affinity
assert clientVersion.minor >= 11
discard await(recvWord(session))
# obsolete reserveSpace
assert clientVersion.minor >= 33
await sendString(session, "0.0.0")
await sendWord(session, STDERR_LAST)
while not session.socket.isClosed:
let wop = await recvWord(session.socket)
case wop
of wopSetOptions:
await discardWords(session, 12) # don't care
# 01 keepFailed
# 02 keepGoing
# 03 tryFallback
# 04 verbosity
# 05 maxBuildJobs
# 06 maxSilentTime
# 07 useBuildHook
# 08 verboseBuild
# 09 logType
# 10 printBuildTrace
# 11 buildCores
# 12 useSubstitutes
let overridePairCount = await recvWord(session)
for _ in 1..overridePairCount:
discard await (recvString(session))
discard await (recvString(session))
await sendWord(session, STDERR_LAST)
else:
stderr.writeLine "client sends wop ", $wop.int
close(session.socket)
proc serveClientside*(facet: Facet; ds: Ref; listener: AsyncSocket) {.async.} =
while not listener.isClosed:
let
client = await accept(listener)
stderr.writeLine "accepted client from listener"
let
fut = serveClient(facet, ds, client)
addCallback(fut) do ():
if not client.isClosed:
close(client)
if fut.failed:
stderr.writeLine "failed to service client, ", fut.error.msg
proc serveClientside*(facet: Facet; ds: Ref; socketPath: string) =
stderr.writeLine "serve ", socketPath
let listener = newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false)
onStop(facet) do (turn: var Turn):
stderr.writeLine "close and remove ", socketPath
close(listener)
removeFile(socketPath)
removeFile(socketPath)
bindUnix(listener, socketPath)
listen(listener)
stderr.writeLine "listening on ", socketPath
asyncCheck(facet, serveClientSide(facet, ds, listener))