Do the right stuff

This commit is contained in:
Emery Hemingway 2023-06-10 01:01:14 +01:00
parent ff56152bbd
commit cb885ef642
4 changed files with 281 additions and 370 deletions

View File

@ -22,7 +22,7 @@ ActionStop = <stop @id int> .
ActionResult = <result @id int @type int @fields Fields> .
; TODO: why not make target a singleton?
Missing = <missing @targets #{string} @willBuild #{string} @willSubstitute #{string} @unknown #{string} @downloadSize int @narSize int> .
Missing = <missing @targets [string ...] @willBuild #{string} @willSubstitute #{string} @unknown #{string} @downloadSize int @narSize int> .
; TODO keep a few critical fields and move the rest into a dictionary
PathInfo = <path-info

View File

@ -1,7 +1,7 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[asyncdispatch, httpclient, json, os, osproc, parseutils, strutils, tables]
import std/[asyncdispatch, httpclient, json, osproc, parseutils, strutils, tables]
import preserves, preserves/jsonhooks
import syndicate
from syndicate/protocols/dataspace import Observe
@ -76,7 +76,7 @@ proc eval(eval: Eval): Value =
var js = parseJson(execOutput)
result = js.toPreserve
proc bootNixFacet(ds: Ref; turn: var Turn): Facet =
proc bootNixFacet(turn: var Turn; ds: Ref): Facet =
# let store = openStore()
result = inFacet(turn) do (turn: var Turn):
@ -112,15 +112,20 @@ type
dataspace: Ref
ClientSideArgs {.preservesDictionary.} = object
`listen-socket`: string
DaemonSideArgs {.preservesDictionary.} = object
`daemon-socket`: string
proc bootNixActor(root: Ref; turn: var Turn) =
connectStdio(root, turn)
during(turn, root, ?RefArgs) do (ds: Ref):
discard bootNixFacet(ds, turn)
discard bootNixFacet(turn, ds)
during(turn, root, ?ClientSideArgs) do (`listen-socket`: string):
serveClientside(turn.facet, ds, `listen-socket`)
during(turn, root, ?ClientSideArgs) do (socketPath: string):
bootClientSide(turn.facet, ds, socketPath)
during(turn, root, ?DaemonSideArgs) do (socketPath: string):
bootDaemonSide(turn, ds, socketPath)
initNix() # Nix lib isn't actually being used but it's nice to know that it links.
runActor("main", bootNixActor)

View File

@ -13,7 +13,7 @@ type
`outputs`*: seq[string]
Missing* {.preservesRecord: "missing".} = object
`targets`*: HashSet[string]
`targets`*: seq[string]
`willBuild`*: HashSet[string]
`willSubstitute`*: HashSet[string]
`unknown`*: HashSet[string]

View File

@ -1,11 +1,12 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense
import std/[asyncdispatch, asyncnet, os, sets, strtabs, strutils]
import std/[algorithm, asyncdispatch, asyncnet, os, sets, strtabs, strutils]
from std/nativesockets import AF_INET, AF_UNIX, SOCK_STREAM, Protocol
import preserves, syndicate
import ./protocol, ./store
from syndicate/protocols/dataspace import Observe
import ./protocol
{.pragma: workerProtocol, importc, header: "worker-protocol.hh".}
@ -15,7 +16,7 @@ proc `$`(w: Word): string = toHex(w)
const
WORKER_MAGIC_1 = 0x6E697863
WORKER_MAGIC_2 = 0x6478696F
PROTOCOL_VERSION = 256 or 35
PROTOCOL_VERSION = 0x100 or 35
STDERR_NEXT = 0x6F6C6d67
STDERR_READ = 0x64617461
@ -65,28 +66,53 @@ const
type
ProtocolError = object of IOError
Version = uint16
StringSeq = seq[string]
StringSet = HashSet[string]
Session = ref object
socket: AsyncSocket
buffer: seq[Word]
version: Version
Observe = dataspace.Observe[Ref]
func major(version: Version): uint16 = version and 0xff00
func minor(version: Version): uint16 = version and 0x00ff
proc sendWord(session: Session; words: varargs[Word]): Future[void] =
proc close(session: Session) =
close(session.socket)
reset(session.buffer)
proc send(session: Session; words: varargs[Word]): Future[void] =
if session.buffer.len < words.len:
session.buffer.setLen(words.len)
for i, word in words: session.buffer[i] = word
send(session.socket, addr session.buffer[0], words.len shl 3)
proc sendString(session: Session; s: string): Future[void] =
proc send(session: Session; s: string): Future[void] =
let wordCount = 1 + ((s.len + 7) shr 3)
if session.buffer.len < wordCount: setLen(session.buffer, wordCount)
session.buffer[0] = Word s.len
if wordCount > 0:
if s != "":
session.buffer[pred wordCount] = 0x00
copyMem(addr session.buffer[1], unsafeAddr s[0], s.len)
send(session.socket, addr session.buffer[0], wordCount shl 3)
proc send(session: Session; ss: StringSeq|StringSet): Future[void] =
## Send a set of strings. The set is sent as a contiguous buffer.
session.buffer[0] = Word ss.len
var off = 1
for s in ss:
let
stringWordLen = (s.len + 7) shr 3
bufferWordLen = off+1+stringWordLen
if session.buffer.len < bufferWordLen:
setLen(session.buffer, bufferWordLen)
session.buffer[off] = Word s.len
session.buffer[off+stringWordLen] = 0 # clear the aligning bits
inc(off)
copyMem(addr session.buffer[off], unsafeAddr s[0], s.len)
inc(off, stringWordLen)
send(session.socket, addr session.buffer[0], off shl 3)
proc recvWord(sock: AsyncSocket): Future[Word] {.async.} =
var w: Word
let n = await recvInto(sock, addr w, sizeof(Word))
@ -117,385 +143,119 @@ proc recvString(socket: AsyncSocket): Future[string] {.async.} =
proc recvString(session: Session): Future[string] =
recvString(session.socket)
type
Snoop = ref object
client, daemon: AsyncSocket
buffer: seq[Word]
version: Version
proc send(session: Snoop; sock: AsyncSocket; words: varargs[Word]): Future[void] =
for i, word in words: session.buffer[i] = word
send(sock, addr session.buffer[0], words.len shl 3)
proc send(session: Snoop; sock: AsyncSocket; s: string): Future[void] =
let wordCount = (s.len + 7) shr 3
if wordCount > session.buffer.len: setLen(session.buffer, wordCount)
session.buffer[0] = Word s.len
if wordCount > 0:
session.buffer[wordCount] = 0x00
copyMem(addr session.buffer[1], unsafeAddr s[0], s.len)
send(sock, addr session.buffer[0], (succ wordCount) shl 3)
proc passWord(a, b: AsyncSocket): Future[Word] {.async.} =
var w = await recvWord(a)
await send(b, addr w, sizeof(Word))
return w
proc passString(session: Snoop; a, b: AsyncSocket): Future[string] {.async.} =
var s = await recvString(a)
await send(session, b, s)
return s
proc passStringSeq(session: Snoop; a, b: AsyncSocket): Future[seq[string]] {.async.} =
let count = int(await passWord(a, b))
proc recvStringSeq(session: Session): Future[StringSeq] {.async.} =
let count = int(await recvWord(session.socket))
var strings = newSeq[string](count)
for i in 0..<count: strings[i] = await passString(session, a, b)
for i in 0..<count: strings[i] = await recvString(session)
return strings
proc passStringSet(session: Snoop; a, b: AsyncSocket): Future[HashSet[string]] {.async.} =
let count = int(await passWord(a, b))
proc recvStringSet(session: Session): Future[StringSet] {.async.} =
let count = int(await recvWord(session.socket))
var strings = initHashSet[string](count)
for i in 0..<count: incl(strings, await passString(session, a, b))
for i in 0..<count: incl(strings, await recvString(session))
return strings
proc passStringMap(session: Snoop; a, b: AsyncSocket): Future[StringTableRef] {.async.} =
var table = newStringTable(modeCaseSensitive)
let n = await passWord(a, b)
for i in 1..n:
var
key = await passString(session, a, b)
val = await passString(session, a, b)
table[key] = val
return table
proc passClientWord(session: Snoop): Future[Word] =
passWord(session.client, session.daemon)
proc passDaemonWord(session: Snoop): Future[Word] =
passWord(session.daemon, session.client)
proc passClientString(session: Snoop): Future[string] =
passString(session, session.client, session.daemon)
proc passDaemonString(session: Snoop): Future[string] =
passString(session, session.daemon, session.client)
proc passClientStringSeq(session: Snoop): Future[seq[string]] =
passStringSeq(session, session.client, session.daemon)
proc passDaemonStringSeq(session: Snoop): Future[seq[string]] =
passStringSeq(session, session.daemon, session.client)
proc passClientStringSet(session: Snoop): Future[HashSet[string]] =
passStringSet(session, session.client, session.daemon)
proc passDaemonStringSet(session: Snoop): Future[HashSet[string]] =
passStringSet(session, session.daemon, session.client)
proc passClientStringMap(session: Snoop): Future[StringTableRef] =
passStringMap(session, session.client, session.daemon)
proc passDaemonStringMap(session: Snoop): Future[StringTableRef] =
passStringMap(session, session.daemon, session.client)
type ValidPathInfo = object
path: string
deriver: string
narHash: string
references: HashSet[string]
registrationTime, narSize: BiggestInt
ultimate: bool
sigs: HashSet[string]
ca: string
proc passDaemonValidPathInfo(session: Snoop; includePath: bool): Future[PathInfo] {.async.} =
var info: PathInfo
if includePath:
info.path = await passDaemonString(session)
info.deriver = await passDaemonString(session)
info.narHash = await passDaemonString(session)
info.references = await passDaemonStringSet(session)
info.registrationTime = BiggestInt(await passDaemonWord(session))
info.narSize = BiggestInt(await passDaemonWord(session))
assert session.version.minor >= 16
info.ultimate = (await passDaemonWord(session)) != 0
info.sigs = await passDaemonStringSet(session)
info.ca = await passDaemonString(session)
return info
proc passChunks(session: Snoop; a, b: AsyncSocket): Future[int] {.async.} =
var total: int
while true:
let chunkLen = int(await passWord(a, b))
if chunkLen == 0:
break
else:
let wordLen = (chunkLen + 7) shr 3
if session.buffer.len < wordLen: setLen(session.buffer, wordLen)
let recvLen = await recvInto(a, addr session.buffer[0], chunkLen)
# each chunk must be recved contiguously
if recvLen != chunkLen:
raise newException(ProtocolError, "invalid chunk read")
await send(b, addr session.buffer[0], recvLen)
inc(total, recvLen)
return total
proc passClientChunks(session: Snoop): Future[int] =
passChunks(session, session.client, session.daemon)
proc passErrorDaemonError(session: Snoop) {.async.} =
let
typ = await passDaemonString(session)
assert typ == "Error"
let
lvl = await passDaemonWord(session)
name = await passDaemonString(session)
msg = passDaemonString(session)
havePos = await passDaemonWord(session)
assert havePos == 0
let
nrTraces = await passDaemonWord(session)
proc recvError(session: Session) {.async.} =
discard #[typ]# await recvString(session)
discard #[lvl]# await recvWord(session)
discard #[name]# await recvString(session)
discard #[msg]# await recvString(session)
discard #[havePos]# await recvWord(session)
let nrTraces = await recvWord(session)
for i in 1..nrTraces:
let havPos = await passDaemonWord(session)
assert havPos == 0
let msg = await passDaemonString(session)
discard #[havPos]# await recvWord(session)
discard #[msg]# await recvString(session)
proc passDaemonFields(session: Snoop): Future[Fields] {.async.} =
let count = await passDaemonWord(session)
var fields = newSeq[Field](count)
proc recvFields(session: Session) {.async.} =
let count = await recvWord(session)
for i in 0..<count:
let typ = await passDaemonWord(session)
let typ = await recvWord(session)
case typ
of 0:
let num = await passDaemonWord(session)
fields[i] = Field(orKind: FieldKind.int, int: int num)
of 1:
let str = await passDaemonString(session)
fields[i] = Field(orKind: FieldKind.string, string: str)
else:
raiseAssert "unknown field type " & $typ
return fields
of 0: discard await recvWord(session)
of 1: discard await recvString(session)
else: raiseAssert "unknown field type " & $typ
proc passWork(session: Snoop) {.async.} =
proc recvWork(session: Session) {.async.} =
while true:
let word = await passDaemonWord(session)
let word = await recvWord(session)
case word
of STDERR_WRITE:
discard await passDaemonString(session)
let s = await recvString(session)
stderr.writeLine "STDERR_WRITE ", s
of STDERR_READ:
discard await passClientString(session)
await send(session, "")
of STDERR_ERROR:
assert session.version.minor >= 26
await passErrorDaemonError(session)
stderr.writeLine "STDERR_ERROR"
await recvError(session)
of STDERR_NEXT:
let s = await passDaemonString(session)
let s = await recvString(session)
stderr.writeLine "STDERR_NEXT ", s
of STDERR_START_ACTIVITY:
var act: ActionStart
act.id = BiggestInt(await passDaemonWord(session))
act.level = BiggestInt(await passDaemonWord(session))
act.`type` = BiggestInt(await passDaemonWord(session))
act.text = await passDaemonString(session)
act.fields = await passDaemonFields(session)
act.parent = BiggestInt(await passDaemonWord(session))
let id = await recvWord(session) # id
stderr.writeLine "STDERR_START_ACTIVITY ", id
discard await recvWord(session) # level
discard await recvWord(session) # type
discard await recvString(session) # text
await recvFields(session) # fields
discard await recvWord(session) # parent
of STDERR_STOP_ACTIVITY:
var act: ActionStop
act.id = BiggestInt(await passDaemonWord(session))
let id = await recvWord(session) # id
stderr.writeLine "STDERR_STOP_ACTIVITY ", id
of STDERR_RESULT:
var act: ActionResult
act.id = BiggestInt(await passDaemonWord(session))
act.`type` = BiggestInt(await passDaemonWord(session))
act.fields = await passDaemonFields(session)
let id = await recvWord(session) # id
stderr.writeLine "STDERR_RESULT ", id
discard await recvWord(session) # type
await recvFields(session) # fields
of STDERR_LAST:
stderr.writeLine "STDERR_LAST"
break
else:
raise newException(ProtocolError, "unknown work verb " & $word)
#[
proc fromClient(miss: var Missing; socket: AsyncSocket) {.async.} =
result.targets = await passClientStringSet(session)
proc fromDaemon(miss: var Missing; socket: AsyncSocket) {.async.} =
miss.willBuild = await passDaemonStringSet(session)
miss.willSubstitute = await passDaemonStringSet(session)
miss.unknown = await passDaemonStringSet(session)
miss.downloadSize = BiggestInt await passDaemonWord(session)
miss.narSize = BiggestInt await passDaemonWord(session)
]#
proc loop(session: Snoop) {.async.} =
var chunksTotal: int
try:
while not session.client.isClosed:
let wop = await passClientWord(session)
case wop
of wopIsValidPath:
let path = await passClientString(session)
stderr.writeLine "wopIsValidPath ", path
await passWork(session)
let word = await passDaemonWord(session)
of wopAddToStore:
assert session.version.minor >= 25
let
name = await passClientString(session)
caMethod = await passClientString(session)
refs = await passClientStringSet(session)
repairBool = await passClientWord(session)
stderr.writeLine "wopAddToStore ", name
let n = await passClientChunks(session)
inc(chunksTotal, n)
await passWork(session)
let info = await passDaemonValidPathInfo(session, true)
of wopAddTempRoot:
let path = await passClientString(session)
stderr.writeLine "wopAddTempRoot ", path
await passWork(session)
discard await passDaemonWord(session)
of wopAddIndirectRoot:
let path = await passClientString(session)
stderr.writeLine "wopAddIndirectRoot ", path
await passWork(session)
discard await passDaemonWord(session)
of wopSetOptions:
discard passClientWord(session) # keepFailed
discard passClientWord(session) # keepGoing
discard passClientWord(session) # tryFallback
discard passClientWord(session) # verbosity
discard passClientWord(session) # maxBuildJobs
discard passClientWord(session) # maxSilentTime
discard passClientWord(session) # useBuildHook
discard passClientWord(session) # verboseBuild
discard passClientWord(session) # logType
discard passClientWord(session) # printBuildTrace
discard passClientWord(session) # buildCores
discard passClientWord(session) # useSubstitutes
assert session.version.minor >= 12
let overrides = await passClientStringMap(session)
await passWork(session)
of wopQueryPathInfo:
assert session.version >= 17
let path = await passClientString(session)
stderr.writeLine "wopQueryPathInfo ", path
await passWork(session)
let valid = await passDaemonWord(session)
if valid != 0:
var info = await passDaemonValidPathInfo(session, false)
info.path = path
stderr.writeLine "wopQueryPathInfo ", $info
of wopQueryMissing:
assert session.version >= 30
var miss: Missing
miss.targets = await passClientStringSet(session)
await passWork(session)
miss.willBuild = await passDaemonStringSet(session)
miss.willSubstitute = await passDaemonStringSet(session)
miss.unknown = await passDaemonStringSet(session)
miss.downloadSize = BiggestInt await passDaemonWord(session)
miss.narSize = BiggestInt await passDaemonWord(session)
stderr.writeLine "wopQueryMissing ", $miss
of wopBuildPathsWithResults:
assert session.version >= 34
let
drvs = await passClientStringSeq(session)
buildMode = await passClientWord(session)
stderr.writeLine "wopBuildPathsWithResults drvs ", $drvs
await passWork(session)
let count = await passDaemonWord(session)
for _ in 1..count:
let
path = await passDaemonString(session)
status = await passDaemonWord(session)
errorMsg = await passDaemonString(session)
timesBUild = await passDaemonWord(session)
isNonDeterministic = await passDaemonWord(session)
startTime = await passDaemonWord(session)
stopTime = await passDaemonWord(session)
outputs = await passDaemonStringMap(session)
else:
stderr.writeLine "unknown worker op ", wop.int
break
except ProtocolError as err:
stderr.writeLine "connection terminated"
stderr.writeLine "chunk bytes transfered: ", formatSize(chunksTotal)
finally:
close(session.daemon)
close(session.client)
proc daemonSocketPath: string =
getEnv(
"NIX_DAEMON_SOCKET_PATH",
"/nix/var/nix/daemon-socket/socket")
proc handshake(listener: AsyncSocket): Future[Snoop] {.async.} =
## Take the next connection from `listener` and return a `Session`.
let session = Snoop(buffer: newSeq[Word](1024)) # 8KiB
session.client = await listener.accept()
session.daemon = newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false)
await connectUnix(session.daemon, daemonSocketPath())
let clientMagic = await passClientWord(session)
if clientMagic != WORKER_MAGIC_1:
raise newException(ProtocolError, "invalid protocol magic")
let daemonMagic = await passDaemonWord(session)
let daemonVersion = await passDaemonWord(session)
session.version = Version(await passClientWord(session))
if session.version < 0x1_0a:
raise newException(ProtocolError, "obsolete protocol version")
assert session.version.minor >= 14
discard await(passClientWord(session))
# obsolete CPU affinity
assert session.version.minor >= 11
discard await(passClientWord(session))
# obsolete reserveSpace
assert session.version.minor >= 33
let daemonVersionString = await passDaemonString(session)
assert daemonVersionString == $store.nixVersion
await passWork(session)
return session
proc newSession(socket: AsyncSocket): Session =
Session(socket: socket, buffer: newSeq[Word](512))
proc emulateSocket*(path: string) {.async, gcsafe.} =
let listener = newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false)
bindUnix(listener, path)
listen(listener)
stderr.writeLine "listening on ", path
while not listener.isClosed:
try:
let session = await handshake(listener)
assert not session.isNil
asyncCheck loop(session)
except ProtocolError as err:
stderr.writeLine "failed to service client, ", err.msg
proc newSession(): Session =
newSession(newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false))
proc serveClient(facet: Facet; ds: Ref; client: AsyncSocket) {.async.} =
let session = Session(socket: client, buffer: newSeq[Word](512))
proc send(session: Session; miss: Missing) {.async.} =
await send(session, STDERR_LAST)
await send(session, miss.willBuild)
await send(session, miss.willSubstitute)
await send(session, miss.unknown)
await send(session, Word miss.downloadSize)
await send(session, Word miss.narSize)
proc send(session: Session; info: PathInfo) {.async.} =
await send(session, STDERR_LAST)
await send(session, 1)
if info.path != "":
await send(session, info.path)
await send(session, info.deriver)
await send(session, info.narHash)
await send(session, info.references)
await send(session, Word info.registrationTime)
await send(session, Word info.narSize)
await send(session, Word info.ultimate)
await send(session, info.sigs)
await send(session, info.ca)
proc serveClient(facet: Facet; ds: Ref; session: Session) {.async.} =
block:
let clientMagic = await recvWord(session)
if clientMagic != WORKER_MAGIC_1:
raise newException(ProtocolError, "invalid protocol magic")
await sendWord(session, WORKER_MAGIC_2, PROTOCOL_VERSION)
await send(session, WORKER_MAGIC_2, PROTOCOL_VERSION)
let clientVersion = Version(await recvWord(session))
if clientVersion < 0x1_21:
raise newException(ProtocolError, "obsolete protocol version")
@ -506,13 +266,69 @@ proc serveClient(facet: Facet; ds: Ref; client: AsyncSocket) {.async.} =
discard await(recvWord(session))
# obsolete reserveSpace
assert clientVersion.minor >= 33
await sendString(session, "0.0.0")
await sendWord(session, STDERR_LAST)
await send(session, "0.0.0")
await send(session, STDERR_LAST)
while not session.socket.isClosed:
let wop = await recvWord(session.socket)
case wop
of wopQueryPathInfo:
let
path = await recvString(session)
pat = inject(?PathInfo, { 0: ?path })
await send(session, STDERR_NEXT)
await send(session, $pat)
stderr.writeLine "await ", pat
run(facet) do (turn: var Turn):
onPublish(turn, ds, pat) do (
deriver: string,
narHash: string,
references: StringSet,
registrationTime: BiggestInt,
narSize: BiggestInt,
ultimate: bool,
sigs: StringSet,
ca: string
):
var info = PathInfo(
deriver: deriver,
narHash: narHash,
references: references,
registrationTime: registrationTime,
narSize: narSize,
ultimate: ultimate,
sigs: sigs,
ca: ca,
)
asyncCheck(turn, send(session, info))
of wopQueryMissing:
var targets = toPreserve(await recvStringSeq(session))
sort(targets.sequence)
# would prefer to use a set but that doesn't translate into a pattern
let pat = inject(?Missing, { 0: ?targets })
# TODO send the pattern to the client as a log line
await send(session, STDERR_NEXT)
await send(session, $pat)
run(facet) do (turn: var Turn):
onPublish(turn, ds, pat) do (
willBuild: StringSet,
willSubstitute: StringSet,
unknown: StringSet,
downloadSize: BiggestInt,
narSize: BiggestInt
):
let miss = Missing(
willBuild: willBuild,
willSubstitute: willSubstitute,
unknown: unknown,
downloadSize: downloadSize,
narSize: narSize,
)
asyncCheck(turn, send(session, miss))
of wopSetOptions:
await discardWords(session, 12) # don't care
await discardWords(session, 12)
# 01 keepFailed
# 02 keepGoing
# 03 tryFallback
@ -529,37 +345,127 @@ proc serveClient(facet: Facet; ds: Ref; client: AsyncSocket) {.async.} =
for _ in 1..overridePairCount:
discard await (recvString(session))
discard await (recvString(session))
await sendWord(session, STDERR_LAST)
await send(session, STDERR_LAST)
# all options from the client are ingored
else:
stderr.writeLine "client sends wop ", $wop.int
let msg = "unhandled worker op " & $wop.int
stderr.writeLine msg
await send(session, STDERR_NEXT)
await send(session, msg)
await send(session, STDERR_LAST)
close(session.socket)
proc serveClientside*(facet: Facet; ds: Ref; listener: AsyncSocket) {.async.} =
proc serveClientSide*(facet: Facet; ds: Ref; listener: AsyncSocket) {.async.} =
while not listener.isClosed:
let
client = await accept(listener)
stderr.writeLine "accepted client from listener"
let
fut = serveClient(facet, ds, client)
fut = serveClient(facet, ds, newSession(client))
addCallback(fut) do ():
if not client.isClosed:
close(client)
if fut.failed:
stderr.writeLine "failed to service client, ", fut.error.msg
proc serveClientside*(facet: Facet; ds: Ref; socketPath: string) =
stderr.writeLine "serve ", socketPath
proc bootClientSide*(facet: Facet; ds: Ref; socketPath: string) =
let listener = newAsyncSocket(
domain = AF_UNIX,
sockType = SOCK_STREAM,
protocol = cast[Protocol](0),
buffered = false)
onStop(facet) do (turn: var Turn):
stderr.writeLine "close and remove ", socketPath
stderr.writeLine "facet stopped, close and remove ", socketPath
close(listener)
removeFile(socketPath)
removeFile(socketPath)
bindUnix(listener, socketPath)
listen(listener)
stderr.writeLine "listening on ", socketPath
asyncCheck(facet, serveClientSide(facet, ds, listener))
stderr.writeLine "serve clients at ", socketPath
proc connectDaemon(session: Session; socketPath: string) {.async.} =
await connectUnix(session.socket, socketPath)
await send(session, WORKER_MAGIC_1)
let daemonMagic = await recvWord(session)
if daemonMagic != WORKER_MAGIC_2:
raise newException(ProtocolError, "bad magic from daemon")
let daemonVersion = await recvWord(session)
session.version = min(Version daemonVersion, PROTOCOL_VERSION)
await send(session, Word session.version)
await send(session, 0) # CPU affinity
await send(session, 0) # reserve space
if session.version.minor >= 33:
discard await recvString(session) # version
if session.version.minor >= 35:
discard await recvWord(session) # remoteTrustsUs
await recvWork(session)
proc queryMissing(session: Session; targets: StringSeq): Future[Missing] {.async.} =
var miss = Missing(targets: targets)
await send(session, wopQueryMissing)
await send(session, miss.targets)
await recvWork(session)
miss.willBuild = await recvStringSet(session)
miss.willSubstitute = await recvStringSet(session)
miss.unknown = await recvStringSet(session)
miss.downloadSize = BiggestInt await recvWord(session)
miss.narSize = BiggestInt await recvWord(session)
return miss
proc queryPathInfo(session: Session; path: string): Future[PathInfo] {.async.} =
echo "sending daemon wopQueryPathInfo"
var info = PathInfo(path: path)
await send(session, wopQueryPathInfo)
await send(session, info.path)
await recvWork(session)
let valid = await recvWord(session)
if valid == 0:
stderr.writeLine "not a valid path, client will probably deadlock - ", path
else:
info.deriver = await recvString(session)
info.narHash = await recvString(session)
info.references = await recvStringSet(session)
info.registrationTime = BiggestInt await recvWord(session)
info.narSize = BiggestInt await recvWord(session)
info.ultimate = (await recvWord(session)) != 0
info.sigs = await recvStringSet(session)
info.ca = await recvString(session)
stderr.writeLine "got from daemon path info ", info
return info
proc bootDaemonSide*(turn: var Turn; ds: Ref; socketPath: string) =
during(turn, ds, ?Observe(pattern: !Missing) ?? {0: grab()}) do (a: Preserve[Ref]):
# cannot use `grabLit` here because an array is a compound
let
session = newSession()
fut = connectDaemon(session, socketPath)
addCallback(fut, turn) do (turn: var Turn):
read(fut)
var targets: StringSeq
doAssert targets.fromPreserve(unpackLiterals(a))
# unpack <arr [<lit " …">]>
let missFut = queryMissing(session, targets)
addCallback(missFut, turn) do (turn: var Turn):
var miss = read(missFut)
stderr.writeLine "publishing ", miss
discard publish(turn, ds, miss)
do:
close(session)
let pathInfoPat = ?Observe(pattern: !PathInfo) ?? {0: grabLit()}
stderr.writeLine "looking for path info requests with ", pathInfoPat
during(turn, ds, pathInfoPat) do (a: Preserve[Ref]):
stderr.writeLine "capured ", a
during(turn, ds, pathInfoPat) do (path: string):
stderr.writeLine "something looking for path info for ", path
let
session = newSession()
fut = connectDaemon(session, socketPath)
addCallback(fut, turn) do (turn: var Turn):
read(fut)
let infoFut = queryPathInfo(session, path)
addCallback(infoFut, turn) do (turn: var Turn):
var info = read(infoFut)
stderr.writeLine "publishing ", info
discard publish(turn, ds, info)
do:
close(session)