1
0
Fork 0
mirror of https://github.com/NixOS/nix synced 2025-07-04 15:31:47 +02:00

Merge pull request #10678 from nix-windows/windows-substitution-goal

Start building the scheduler for Windows
This commit is contained in:
John Ericson 2024-05-27 17:47:29 -04:00 committed by GitHub
commit d0c7da131f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 285 additions and 94 deletions

View file

@ -1280,7 +1280,7 @@ bool DerivationGoal::isReadDesc(int fd)
return fd == hook->builderOut.readSide.get();
}
void DerivationGoal::handleChildOutput(int fd, std::string_view data)
void DerivationGoal::handleChildOutput(Descriptor fd, std::string_view data)
{
// local & `ssh://`-builds are dealt with here.
auto isWrittenToLog = isReadDesc(fd);
@ -1347,7 +1347,7 @@ void DerivationGoal::handleChildOutput(int fd, std::string_view data)
}
void DerivationGoal::handleEOF(int fd)
void DerivationGoal::handleEOF(Descriptor fd)
{
if (!currentLogLine.empty()) flushLine();
worker.wakeUp(shared_from_this());

View file

@ -2,7 +2,7 @@
///@file
#include "parsed-derivations.hh"
#include "lock.hh"
#include "user-lock.hh"
#include "outputs-spec.hh"
#include "store-api.hh"
#include "pathlocks.hh"
@ -292,8 +292,8 @@ struct DerivationGoal : public Goal
/**
* Callback used by the worker to write to the log.
*/
void handleChildOutput(int fd, std::string_view data) override;
void handleEOF(int fd) override;
void handleChildOutput(Descriptor fd, std::string_view data) override;
void handleEOF(Descriptor fd) override;
void flushLine();
/**

View file

@ -1,167 +0,0 @@
#include "drv-output-substitution-goal.hh"
#include "finally.hh"
#include "worker.hh"
#include "substitution-goal.hh"
#include "callback.hh"
namespace nix {
DrvOutputSubstitutionGoal::DrvOutputSubstitutionGoal(
const DrvOutput & id,
Worker & worker,
RepairFlag repair,
std::optional<ContentAddress> ca)
: Goal(worker, DerivedPath::Opaque { StorePath::dummy })
, id(id)
{
state = &DrvOutputSubstitutionGoal::init;
name = fmt("substitution of '%s'", id.to_string());
trace("created");
}
void DrvOutputSubstitutionGoal::init()
{
trace("init");
/* If the derivation already exists, were done */
if (worker.store.queryRealisation(id)) {
amDone(ecSuccess);
return;
}
subs = settings.useSubstitutes ? getDefaultSubstituters() : std::list<ref<Store>>();
tryNext();
}
void DrvOutputSubstitutionGoal::tryNext()
{
trace("trying next substituter");
if (subs.size() == 0) {
/* None left. Terminate this goal and let someone else deal
with it. */
debug("derivation output '%s' is required, but there is no substituter that can provide it", id.to_string());
/* Hack: don't indicate failure if there were no substituters.
In that case the calling derivation should just do a
build. */
amDone(substituterFailed ? ecFailed : ecNoSubstituters);
if (substituterFailed) {
worker.failedSubstitutions++;
worker.updateProgress();
}
return;
}
sub = subs.front();
subs.pop_front();
// FIXME: Make async
// outputInfo = sub->queryRealisation(id);
/* The callback of the curl download below can outlive `this` (if
some other error occurs), so it must not touch `this`. So put
the shared state in a separate refcounted object. */
downloadState = std::make_shared<DownloadState>();
downloadState->outPipe.create();
sub->queryRealisation(
id,
{ [downloadState(downloadState)](std::future<std::shared_ptr<const Realisation>> res) {
try {
Finally updateStats([&]() { downloadState->outPipe.writeSide.close(); });
downloadState->promise.set_value(res.get());
} catch (...) {
downloadState->promise.set_exception(std::current_exception());
}
} });
worker.childStarted(shared_from_this(), {downloadState->outPipe.readSide.get()}, true, false);
state = &DrvOutputSubstitutionGoal::realisationFetched;
}
void DrvOutputSubstitutionGoal::realisationFetched()
{
worker.childTerminated(this);
try {
outputInfo = downloadState->promise.get_future().get();
} catch (std::exception & e) {
printError(e.what());
substituterFailed = true;
}
if (!outputInfo) {
return tryNext();
}
for (const auto & [depId, depPath] : outputInfo->dependentRealisations) {
if (depId != id) {
if (auto localOutputInfo = worker.store.queryRealisation(depId);
localOutputInfo && localOutputInfo->outPath != depPath) {
warn(
"substituter '%s' has an incompatible realisation for '%s', ignoring.\n"
"Local: %s\n"
"Remote: %s",
sub->getUri(),
depId.to_string(),
worker.store.printStorePath(localOutputInfo->outPath),
worker.store.printStorePath(depPath)
);
tryNext();
return;
}
addWaitee(worker.makeDrvOutputSubstitutionGoal(depId));
}
}
addWaitee(worker.makePathSubstitutionGoal(outputInfo->outPath));
if (waitees.empty()) outPathValid();
else state = &DrvOutputSubstitutionGoal::outPathValid;
}
void DrvOutputSubstitutionGoal::outPathValid()
{
assert(outputInfo);
trace("output path substituted");
if (nrFailed > 0) {
debug("The output path of the derivation output '%s' could not be substituted", id.to_string());
amDone(nrNoSubstituters > 0 || nrIncompleteClosure > 0 ? ecIncompleteClosure : ecFailed);
return;
}
worker.store.registerDrvOutput(*outputInfo);
finished();
}
void DrvOutputSubstitutionGoal::finished()
{
trace("finished");
amDone(ecSuccess);
}
std::string DrvOutputSubstitutionGoal::key()
{
/* "a$" ensures substitution goals happen before derivation
goals. */
return "a$" + std::string(id.to_string());
}
void DrvOutputSubstitutionGoal::work()
{
(this->*state)();
}
void DrvOutputSubstitutionGoal::handleEOF(int fd)
{
if (fd == downloadState->outPipe.readSide.get()) worker.wakeUp(shared_from_this());
}
}

View file

@ -1,81 +0,0 @@
#pragma once
///@file
#include "store-api.hh"
#include "goal.hh"
#include "realisation.hh"
#include <thread>
#include <future>
namespace nix {
class Worker;
/**
* Substitution of a derivation output.
* This is done in three steps:
* 1. Fetch the output info from a substituter
* 2. Substitute the corresponding output path
* 3. Register the output info
*/
class DrvOutputSubstitutionGoal : public Goal {
/**
* The drv output we're trying to substitute
*/
DrvOutput id;
/**
* The realisation corresponding to the given output id.
* Will be filled once we can get it.
*/
std::shared_ptr<const Realisation> outputInfo;
/**
* The remaining substituters.
*/
std::list<ref<Store>> subs;
/**
* The current substituter.
*/
std::shared_ptr<Store> sub;
struct DownloadState
{
Pipe outPipe;
std::promise<std::shared_ptr<const Realisation>> promise;
};
std::shared_ptr<DownloadState> downloadState;
/**
* Whether a substituter failed.
*/
bool substituterFailed = false;
public:
DrvOutputSubstitutionGoal(const DrvOutput& id, Worker & worker, RepairFlag repair = NoRepair, std::optional<ContentAddress> ca = std::nullopt);
typedef void (DrvOutputSubstitutionGoal::*GoalState)();
GoalState state;
void init();
void tryNext();
void realisationFetched();
void outPathValid();
void finished();
void timedOut(Error && ex) override { abort(); };
std::string key() override;
void work() override;
void handleEOF(int fd) override;
JobCategory jobCategory() const override {
return JobCategory::Substitution;
};
};
}

View file

@ -1,140 +0,0 @@
#include "worker.hh"
#include "substitution-goal.hh"
#include "derivation-goal.hh"
#include "local-store.hh"
namespace nix {
void Store::buildPaths(const std::vector<DerivedPath> & reqs, BuildMode buildMode, std::shared_ptr<Store> evalStore)
{
Worker worker(*this, evalStore ? *evalStore : *this);
Goals goals;
for (auto & br : reqs)
goals.insert(worker.makeGoal(br, buildMode));
worker.run(goals);
StringSet failed;
std::optional<Error> ex;
for (auto & i : goals) {
if (i->ex) {
if (ex)
logError(i->ex->info());
else
ex = std::move(i->ex);
}
if (i->exitCode != Goal::ecSuccess) {
if (auto i2 = dynamic_cast<DerivationGoal *>(i.get()))
failed.insert(printStorePath(i2->drvPath));
else if (auto i2 = dynamic_cast<PathSubstitutionGoal *>(i.get()))
failed.insert(printStorePath(i2->storePath));
}
}
if (failed.size() == 1 && ex) {
ex->withExitStatus(worker.failingExitStatus());
throw std::move(*ex);
} else if (!failed.empty()) {
if (ex) logError(ex->info());
throw Error(worker.failingExitStatus(), "build of %s failed", concatStringsSep(", ", quoteStrings(failed)));
}
}
std::vector<KeyedBuildResult> Store::buildPathsWithResults(
const std::vector<DerivedPath> & reqs,
BuildMode buildMode,
std::shared_ptr<Store> evalStore)
{
Worker worker(*this, evalStore ? *evalStore : *this);
Goals goals;
std::vector<std::pair<const DerivedPath &, GoalPtr>> state;
for (const auto & req : reqs) {
auto goal = worker.makeGoal(req, buildMode);
goals.insert(goal);
state.push_back({req, goal});
}
worker.run(goals);
std::vector<KeyedBuildResult> results;
for (auto & [req, goalPtr] : state)
results.emplace_back(KeyedBuildResult {
goalPtr->getBuildResult(req),
/* .path = */ req,
});
return results;
}
BuildResult Store::buildDerivation(const StorePath & drvPath, const BasicDerivation & drv,
BuildMode buildMode)
{
Worker worker(*this, *this);
auto goal = worker.makeBasicDerivationGoal(drvPath, drv, OutputsSpec::All {}, buildMode);
try {
worker.run(Goals{goal});
return goal->getBuildResult(DerivedPath::Built {
.drvPath = makeConstantStorePathRef(drvPath),
.outputs = OutputsSpec::All {},
});
} catch (Error & e) {
return BuildResult {
.status = BuildResult::MiscFailure,
.errorMsg = e.msg(),
};
};
}
void Store::ensurePath(const StorePath & path)
{
/* If the path is already valid, we're done. */
if (isValidPath(path)) return;
Worker worker(*this, *this);
GoalPtr goal = worker.makePathSubstitutionGoal(path);
Goals goals = {goal};
worker.run(goals);
if (goal->exitCode != Goal::ecSuccess) {
if (goal->ex) {
goal->ex->withExitStatus(worker.failingExitStatus());
throw std::move(*goal->ex);
} else
throw Error(worker.failingExitStatus(), "path '%s' does not exist and cannot be created", printStorePath(path));
}
}
void Store::repairPath(const StorePath & path)
{
Worker worker(*this, *this);
GoalPtr goal = worker.makePathSubstitutionGoal(path, Repair);
Goals goals = {goal};
worker.run(goals);
if (goal->exitCode != Goal::ecSuccess) {
/* Since substituting the path didn't work, if we have a valid
deriver, then rebuild the deriver. */
auto info = queryPathInfo(path);
if (info->deriver && isValidPath(*info->deriver)) {
goals.clear();
goals.insert(worker.makeGoal(DerivedPath::Built {
.drvPath = makeConstantStorePathRef(*info->deriver),
// FIXME: Should just build the specific output we need.
.outputs = OutputsSpec::All { },
}, bmRepair));
worker.run(goals);
} else
throw Error(worker.failingExitStatus(), "cannot repair path '%s'", printStorePath(path));
}
}
}

View file

@ -1,109 +0,0 @@
#include "goal.hh"
#include "worker.hh"
namespace nix {
bool CompareGoalPtrs::operator() (const GoalPtr & a, const GoalPtr & b) const {
std::string s1 = a->key();
std::string s2 = b->key();
return s1 < s2;
}
BuildResult Goal::getBuildResult(const DerivedPath & req) const {
BuildResult res { buildResult };
if (auto pbp = std::get_if<DerivedPath::Built>(&req)) {
auto & bp = *pbp;
/* Because goals are in general shared between derived paths
that share the same derivation, we need to filter their
results to get back just the results we care about.
*/
for (auto it = res.builtOutputs.begin(); it != res.builtOutputs.end();) {
if (bp.outputs.contains(it->first))
++it;
else
it = res.builtOutputs.erase(it);
}
}
return res;
}
void addToWeakGoals(WeakGoals & goals, GoalPtr p)
{
if (goals.find(p) != goals.end())
return;
goals.insert(p);
}
void Goal::addWaitee(GoalPtr waitee)
{
waitees.insert(waitee);
addToWeakGoals(waitee->waiters, shared_from_this());
}
void Goal::waiteeDone(GoalPtr waitee, ExitCode result)
{
assert(waitees.count(waitee));
waitees.erase(waitee);
trace(fmt("waitee '%s' done; %d left", waitee->name, waitees.size()));
if (result == ecFailed || result == ecNoSubstituters || result == ecIncompleteClosure) ++nrFailed;
if (result == ecNoSubstituters) ++nrNoSubstituters;
if (result == ecIncompleteClosure) ++nrIncompleteClosure;
if (waitees.empty() || (result == ecFailed && !settings.keepGoing)) {
/* If we failed and keepGoing is not set, we remove all
remaining waitees. */
for (auto & goal : waitees) {
goal->waiters.extract(shared_from_this());
}
waitees.clear();
worker.wakeUp(shared_from_this());
}
}
void Goal::amDone(ExitCode result, std::optional<Error> ex)
{
trace("done");
assert(exitCode == ecBusy);
assert(result == ecSuccess || result == ecFailed || result == ecNoSubstituters || result == ecIncompleteClosure);
exitCode = result;
if (ex) {
if (!waiters.empty())
logError(ex->info());
else
this->ex = std::move(*ex);
}
for (auto & i : waiters) {
GoalPtr goal = i.lock();
if (goal) goal->waiteeDone(shared_from_this(), result);
}
waiters.clear();
worker.removeGoal(shared_from_this());
cleanup();
}
void Goal::trace(std::string_view s)
{
debug("%1%: %2%", name, s);
}
}

View file

@ -1,180 +0,0 @@
#pragma once
///@file
#include "types.hh"
#include "store-api.hh"
#include "build-result.hh"
namespace nix {
/**
* Forward definition.
*/
struct Goal;
class Worker;
/**
* A pointer to a goal.
*/
typedef std::shared_ptr<Goal> GoalPtr;
typedef std::weak_ptr<Goal> WeakGoalPtr;
struct CompareGoalPtrs {
bool operator() (const GoalPtr & a, const GoalPtr & b) const;
};
/**
* Set of goals.
*/
typedef std::set<GoalPtr, CompareGoalPtrs> Goals;
typedef std::set<WeakGoalPtr, std::owner_less<WeakGoalPtr>> WeakGoals;
/**
* A map of paths to goals (and the other way around).
*/
typedef std::map<StorePath, WeakGoalPtr> WeakGoalMap;
/**
* Used as a hint to the worker on how to schedule a particular goal. For example,
* builds are typically CPU- and memory-bound, while substitutions are I/O bound.
* Using this information, the worker might decide to schedule more or fewer goals
* of each category in parallel.
*/
enum struct JobCategory {
/**
* A build of a derivation; it will use CPU and disk resources.
*/
Build,
/**
* A substitution an arbitrary store object; it will use network resources.
*/
Substitution,
};
struct Goal : public std::enable_shared_from_this<Goal>
{
typedef enum {ecBusy, ecSuccess, ecFailed, ecNoSubstituters, ecIncompleteClosure} ExitCode;
/**
* Backlink to the worker.
*/
Worker & worker;
/**
* Goals that this goal is waiting for.
*/
Goals waitees;
/**
* Goals waiting for this one to finish. Must use weak pointers
* here to prevent cycles.
*/
WeakGoals waiters;
/**
* Number of goals we are/were waiting for that have failed.
*/
size_t nrFailed = 0;
/**
* Number of substitution goals we are/were waiting for that
* failed because there are no substituters.
*/
size_t nrNoSubstituters = 0;
/**
* Number of substitution goals we are/were waiting for that
* failed because they had unsubstitutable references.
*/
size_t nrIncompleteClosure = 0;
/**
* Name of this goal for debugging purposes.
*/
std::string name;
/**
* Whether the goal is finished.
*/
ExitCode exitCode = ecBusy;
protected:
/**
* Build result.
*/
BuildResult buildResult;
public:
/**
* Project a `BuildResult` with just the information that pertains
* to the given request.
*
* In general, goals may be aliased between multiple requests, and
* the stored `BuildResult` has information for the union of all
* requests. We don't want to leak what the other request are for
* sake of both privacy and determinism, and this "safe accessor"
* ensures we don't.
*/
BuildResult getBuildResult(const DerivedPath &) const;
/**
* Exception containing an error message, if any.
*/
std::optional<Error> ex;
Goal(Worker & worker, DerivedPath path)
: worker(worker)
{ }
virtual ~Goal()
{
trace("goal destroyed");
}
virtual void work() = 0;
void addWaitee(GoalPtr waitee);
virtual void waiteeDone(GoalPtr waitee, ExitCode result);
virtual void handleChildOutput(int fd, std::string_view data)
{
abort();
}
virtual void handleEOF(int fd)
{
abort();
}
void trace(std::string_view s);
std::string getName() const
{
return name;
}
/**
* Callback in case of a timeout. It should wake up its waiters,
* get rid of any running child processes that are being monitored
* by the worker (important!), etc.
*/
virtual void timedOut(Error && ex) = 0;
virtual std::string key() = 0;
void amDone(ExitCode result, std::optional<Error> ex = {});
virtual void cleanup() { }
/**
* @brief Hint for the scheduler, which concurrency limit applies.
* @see JobCategory
*/
virtual JobCategory jobCategory() const = 0;
};
void addToWeakGoals(WeakGoals & goals, GoalPtr p);
}

View file

@ -1,324 +0,0 @@
#include "worker.hh"
#include "substitution-goal.hh"
#include "nar-info.hh"
#include "finally.hh"
#include "signals.hh"
namespace nix {
PathSubstitutionGoal::PathSubstitutionGoal(const StorePath & storePath, Worker & worker, RepairFlag repair, std::optional<ContentAddress> ca)
: Goal(worker, DerivedPath::Opaque { storePath })
, storePath(storePath)
, repair(repair)
, ca(ca)
{
state = &PathSubstitutionGoal::init;
name = fmt("substitution of '%s'", worker.store.printStorePath(this->storePath));
trace("created");
maintainExpectedSubstitutions = std::make_unique<MaintainCount<uint64_t>>(worker.expectedSubstitutions);
}
PathSubstitutionGoal::~PathSubstitutionGoal()
{
cleanup();
}
void PathSubstitutionGoal::done(
ExitCode result,
BuildResult::Status status,
std::optional<std::string> errorMsg)
{
buildResult.status = status;
if (errorMsg) {
debug(*errorMsg);
buildResult.errorMsg = *errorMsg;
}
amDone(result);
}
void PathSubstitutionGoal::work()
{
(this->*state)();
}
void PathSubstitutionGoal::init()
{
trace("init");
worker.store.addTempRoot(storePath);
/* If the path already exists we're done. */
if (!repair && worker.store.isValidPath(storePath)) {
done(ecSuccess, BuildResult::AlreadyValid);
return;
}
if (settings.readOnlyMode)
throw Error("cannot substitute path '%s' - no write access to the Nix store", worker.store.printStorePath(storePath));
subs = settings.useSubstitutes ? getDefaultSubstituters() : std::list<ref<Store>>();
tryNext();
}
void PathSubstitutionGoal::tryNext()
{
trace("trying next substituter");
cleanup();
if (subs.size() == 0) {
/* None left. Terminate this goal and let someone else deal
with it. */
/* Hack: don't indicate failure if there were no substituters.
In that case the calling derivation should just do a
build. */
done(
substituterFailed ? ecFailed : ecNoSubstituters,
BuildResult::NoSubstituters,
fmt("path '%s' is required, but there is no substituter that can build it", worker.store.printStorePath(storePath)));
if (substituterFailed) {
worker.failedSubstitutions++;
worker.updateProgress();
}
return;
}
sub = subs.front();
subs.pop_front();
if (ca) {
subPath = sub->makeFixedOutputPathFromCA(
std::string { storePath.name() },
ContentAddressWithReferences::withoutRefs(*ca));
if (sub->storeDir == worker.store.storeDir)
assert(subPath == storePath);
} else if (sub->storeDir != worker.store.storeDir) {
tryNext();
return;
}
try {
// FIXME: make async
info = sub->queryPathInfo(subPath ? *subPath : storePath);
} catch (InvalidPath &) {
tryNext();
return;
} catch (SubstituterDisabled &) {
if (settings.tryFallback) {
tryNext();
return;
}
throw;
} catch (Error & e) {
if (settings.tryFallback) {
logError(e.info());
tryNext();
return;
}
throw;
}
if (info->path != storePath) {
if (info->isContentAddressed(*sub) && info->references.empty()) {
auto info2 = std::make_shared<ValidPathInfo>(*info);
info2->path = storePath;
info = info2;
} else {
printError("asked '%s' for '%s' but got '%s'",
sub->getUri(), worker.store.printStorePath(storePath), sub->printStorePath(info->path));
tryNext();
return;
}
}
/* Update the total expected download size. */
auto narInfo = std::dynamic_pointer_cast<const NarInfo>(info);
maintainExpectedNar = std::make_unique<MaintainCount<uint64_t>>(worker.expectedNarSize, info->narSize);
maintainExpectedDownload =
narInfo && narInfo->fileSize
? std::make_unique<MaintainCount<uint64_t>>(worker.expectedDownloadSize, narInfo->fileSize)
: nullptr;
worker.updateProgress();
/* Bail out early if this substituter lacks a valid
signature. LocalStore::addToStore() also checks for this, but
only after we've downloaded the path. */
if (!sub->isTrusted && worker.store.pathInfoIsUntrusted(*info))
{
warn("ignoring substitute for '%s' from '%s', as it's not signed by any of the keys in 'trusted-public-keys'",
worker.store.printStorePath(storePath), sub->getUri());
tryNext();
return;
}
/* To maintain the closure invariant, we first have to realise the
paths referenced by this one. */
for (auto & i : info->references)
if (i != storePath) /* ignore self-references */
addWaitee(worker.makePathSubstitutionGoal(i));
if (waitees.empty()) /* to prevent hang (no wake-up event) */
referencesValid();
else
state = &PathSubstitutionGoal::referencesValid;
}
void PathSubstitutionGoal::referencesValid()
{
trace("all references realised");
if (nrFailed > 0) {
done(
nrNoSubstituters > 0 || nrIncompleteClosure > 0 ? ecIncompleteClosure : ecFailed,
BuildResult::DependencyFailed,
fmt("some references of path '%s' could not be realised", worker.store.printStorePath(storePath)));
return;
}
for (auto & i : info->references)
if (i != storePath) /* ignore self-references */
assert(worker.store.isValidPath(i));
state = &PathSubstitutionGoal::tryToRun;
worker.wakeUp(shared_from_this());
}
void PathSubstitutionGoal::tryToRun()
{
trace("trying to run");
/* Make sure that we are allowed to start a substitution. Note that even
if maxSubstitutionJobs == 0, we still allow a substituter to run. This
prevents infinite waiting. */
if (worker.getNrSubstitutions() >= std::max(1U, (unsigned int) settings.maxSubstitutionJobs)) {
worker.waitForBuildSlot(shared_from_this());
return;
}
maintainRunningSubstitutions = std::make_unique<MaintainCount<uint64_t>>(worker.runningSubstitutions);
worker.updateProgress();
outPipe.create();
promise = std::promise<void>();
thr = std::thread([this]() {
try {
ReceiveInterrupts receiveInterrupts;
/* Wake up the worker loop when we're done. */
Finally updateStats([this]() { outPipe.writeSide.close(); });
Activity act(*logger, actSubstitute, Logger::Fields{worker.store.printStorePath(storePath), sub->getUri()});
PushActivity pact(act.id);
copyStorePath(*sub, worker.store,
subPath ? *subPath : storePath, repair, sub->isTrusted ? NoCheckSigs : CheckSigs);
promise.set_value();
} catch (...) {
promise.set_exception(std::current_exception());
}
});
worker.childStarted(shared_from_this(), {outPipe.readSide.get()}, true, false);
state = &PathSubstitutionGoal::finished;
}
void PathSubstitutionGoal::finished()
{
trace("substitute finished");
thr.join();
worker.childTerminated(this);
try {
promise.get_future().get();
} catch (std::exception & e) {
printError(e.what());
/* Cause the parent build to fail unless --fallback is given,
or the substitute has disappeared. The latter case behaves
the same as the substitute never having existed in the
first place. */
try {
throw;
} catch (SubstituteGone &) {
} catch (...) {
substituterFailed = true;
}
/* Try the next substitute. */
state = &PathSubstitutionGoal::tryNext;
worker.wakeUp(shared_from_this());
return;
}
worker.markContentsGood(storePath);
printMsg(lvlChatty, "substitution of path '%s' succeeded", worker.store.printStorePath(storePath));
maintainRunningSubstitutions.reset();
maintainExpectedSubstitutions.reset();
worker.doneSubstitutions++;
if (maintainExpectedDownload) {
auto fileSize = maintainExpectedDownload->delta;
maintainExpectedDownload.reset();
worker.doneDownloadSize += fileSize;
}
worker.doneNarSize += maintainExpectedNar->delta;
maintainExpectedNar.reset();
worker.updateProgress();
done(ecSuccess, BuildResult::Substituted);
}
void PathSubstitutionGoal::handleChildOutput(int fd, std::string_view data)
{
}
void PathSubstitutionGoal::handleEOF(int fd)
{
if (fd == outPipe.readSide.get()) worker.wakeUp(shared_from_this());
}
void PathSubstitutionGoal::cleanup()
{
try {
if (thr.joinable()) {
// FIXME: signal worker thread to quit.
thr.join();
worker.childTerminated(this);
}
outPipe.close();
} catch (...) {
ignoreException();
}
}
}

View file

@ -1,125 +0,0 @@
#pragma once
///@file
#include "lock.hh"
#include "store-api.hh"
#include "goal.hh"
namespace nix {
class Worker;
struct PathSubstitutionGoal : public Goal
{
/**
* The store path that should be realised through a substitute.
*/
StorePath storePath;
/**
* The path the substituter refers to the path as. This will be
* different when the stores have different names.
*/
std::optional<StorePath> subPath;
/**
* The remaining substituters.
*/
std::list<ref<Store>> subs;
/**
* The current substituter.
*/
std::shared_ptr<Store> sub;
/**
* Whether a substituter failed.
*/
bool substituterFailed = false;
/**
* Path info returned by the substituter's query info operation.
*/
std::shared_ptr<const ValidPathInfo> info;
/**
* Pipe for the substituter's standard output.
*/
Pipe outPipe;
/**
* The substituter thread.
*/
std::thread thr;
std::promise<void> promise;
/**
* Whether to try to repair a valid path.
*/
RepairFlag repair;
/**
* Location where we're downloading the substitute. Differs from
* storePath when doing a repair.
*/
Path destPath;
std::unique_ptr<MaintainCount<uint64_t>> maintainExpectedSubstitutions,
maintainRunningSubstitutions, maintainExpectedNar, maintainExpectedDownload;
typedef void (PathSubstitutionGoal::*GoalState)();
GoalState state;
/**
* Content address for recomputing store path
*/
std::optional<ContentAddress> ca;
void done(
ExitCode result,
BuildResult::Status status,
std::optional<std::string> errorMsg = {});
public:
PathSubstitutionGoal(const StorePath & storePath, Worker & worker, RepairFlag repair = NoRepair, std::optional<ContentAddress> ca = std::nullopt);
~PathSubstitutionGoal();
void timedOut(Error && ex) override { abort(); };
/**
* We prepend "a$" to the key name to ensure substitution goals
* happen before derivation goals.
*/
std::string key() override
{
return "a$" + std::string(storePath.name()) + "$" + worker.store.printStorePath(storePath);
}
void work() override;
/**
* The states.
*/
void init();
void tryNext();
void gotInfo();
void referencesValid();
void tryToRun();
void finished();
/**
* Callback used by the worker to write to the log.
*/
void handleChildOutput(int fd, std::string_view data) override;
void handleEOF(int fd) override;
/* Called by destructor, can't be overridden */
void cleanup() override final;
JobCategory jobCategory() const override {
return JobCategory::Substitution;
};
};
}

View file

@ -1,561 +0,0 @@
#include "machines.hh"
#include "worker.hh"
#include "substitution-goal.hh"
#include "drv-output-substitution-goal.hh"
#include "local-derivation-goal.hh"
#include "hook-instance.hh"
#include "signals.hh"
#include <poll.h>
namespace nix {
Worker::Worker(Store & store, Store & evalStore)
: act(*logger, actRealise)
, actDerivations(*logger, actBuilds)
, actSubstitutions(*logger, actCopyPaths)
, store(store)
, evalStore(evalStore)
{
/* Debugging: prevent recursive workers. */
nrLocalBuilds = 0;
nrSubstitutions = 0;
lastWokenUp = steady_time_point::min();
permanentFailure = false;
timedOut = false;
hashMismatch = false;
checkMismatch = false;
}
Worker::~Worker()
{
/* Explicitly get rid of all strong pointers now. After this all
goals that refer to this worker should be gone. (Otherwise we
are in trouble, since goals may call childTerminated() etc. in
their destructors). */
topGoals.clear();
assert(expectedSubstitutions == 0);
assert(expectedDownloadSize == 0);
assert(expectedNarSize == 0);
}
std::shared_ptr<DerivationGoal> Worker::makeDerivationGoalCommon(
const StorePath & drvPath,
const OutputsSpec & wantedOutputs,
std::function<std::shared_ptr<DerivationGoal>()> mkDrvGoal)
{
std::weak_ptr<DerivationGoal> & goal_weak = derivationGoals[drvPath];
std::shared_ptr<DerivationGoal> goal = goal_weak.lock();
if (!goal) {
goal = mkDrvGoal();
goal_weak = goal;
wakeUp(goal);
} else {
goal->addWantedOutputs(wantedOutputs);
}
return goal;
}
std::shared_ptr<DerivationGoal> Worker::makeDerivationGoal(const StorePath & drvPath,
const OutputsSpec & wantedOutputs, BuildMode buildMode)
{
return makeDerivationGoalCommon(drvPath, wantedOutputs, [&]() -> std::shared_ptr<DerivationGoal> {
return !dynamic_cast<LocalStore *>(&store)
? std::make_shared</* */DerivationGoal>(drvPath, wantedOutputs, *this, buildMode)
: std::make_shared<LocalDerivationGoal>(drvPath, wantedOutputs, *this, buildMode);
});
}
std::shared_ptr<DerivationGoal> Worker::makeBasicDerivationGoal(const StorePath & drvPath,
const BasicDerivation & drv, const OutputsSpec & wantedOutputs, BuildMode buildMode)
{
return makeDerivationGoalCommon(drvPath, wantedOutputs, [&]() -> std::shared_ptr<DerivationGoal> {
return !dynamic_cast<LocalStore *>(&store)
? std::make_shared</* */DerivationGoal>(drvPath, drv, wantedOutputs, *this, buildMode)
: std::make_shared<LocalDerivationGoal>(drvPath, drv, wantedOutputs, *this, buildMode);
});
}
std::shared_ptr<PathSubstitutionGoal> Worker::makePathSubstitutionGoal(const StorePath & path, RepairFlag repair, std::optional<ContentAddress> ca)
{
std::weak_ptr<PathSubstitutionGoal> & goal_weak = substitutionGoals[path];
auto goal = goal_weak.lock(); // FIXME
if (!goal) {
goal = std::make_shared<PathSubstitutionGoal>(path, *this, repair, ca);
goal_weak = goal;
wakeUp(goal);
}
return goal;
}
std::shared_ptr<DrvOutputSubstitutionGoal> Worker::makeDrvOutputSubstitutionGoal(const DrvOutput& id, RepairFlag repair, std::optional<ContentAddress> ca)
{
std::weak_ptr<DrvOutputSubstitutionGoal> & goal_weak = drvOutputSubstitutionGoals[id];
auto goal = goal_weak.lock(); // FIXME
if (!goal) {
goal = std::make_shared<DrvOutputSubstitutionGoal>(id, *this, repair, ca);
goal_weak = goal;
wakeUp(goal);
}
return goal;
}
GoalPtr Worker::makeGoal(const DerivedPath & req, BuildMode buildMode)
{
return std::visit(overloaded {
[&](const DerivedPath::Built & bfd) -> GoalPtr {
if (auto bop = std::get_if<DerivedPath::Opaque>(&*bfd.drvPath))
return makeDerivationGoal(bop->path, bfd.outputs, buildMode);
else
throw UnimplementedError("Building dynamic derivations in one shot is not yet implemented.");
},
[&](const DerivedPath::Opaque & bo) -> GoalPtr {
return makePathSubstitutionGoal(bo.path, buildMode == bmRepair ? Repair : NoRepair);
},
}, req.raw());
}
template<typename K, typename G>
static void removeGoal(std::shared_ptr<G> goal, std::map<K, std::weak_ptr<G>> & goalMap)
{
/* !!! inefficient */
for (auto i = goalMap.begin();
i != goalMap.end(); )
if (i->second.lock() == goal) {
auto j = i; ++j;
goalMap.erase(i);
i = j;
}
else ++i;
}
void Worker::removeGoal(GoalPtr goal)
{
if (auto drvGoal = std::dynamic_pointer_cast<DerivationGoal>(goal))
nix::removeGoal(drvGoal, derivationGoals);
else if (auto subGoal = std::dynamic_pointer_cast<PathSubstitutionGoal>(goal))
nix::removeGoal(subGoal, substitutionGoals);
else if (auto subGoal = std::dynamic_pointer_cast<DrvOutputSubstitutionGoal>(goal))
nix::removeGoal(subGoal, drvOutputSubstitutionGoals);
else
assert(false);
if (topGoals.find(goal) != topGoals.end()) {
topGoals.erase(goal);
/* If a top-level goal failed, then kill all other goals
(unless keepGoing was set). */
if (goal->exitCode == Goal::ecFailed && !settings.keepGoing)
topGoals.clear();
}
/* Wake up goals waiting for any goal to finish. */
for (auto & i : waitingForAnyGoal) {
GoalPtr goal = i.lock();
if (goal) wakeUp(goal);
}
waitingForAnyGoal.clear();
}
void Worker::wakeUp(GoalPtr goal)
{
goal->trace("woken up");
addToWeakGoals(awake, goal);
}
unsigned Worker::getNrLocalBuilds()
{
return nrLocalBuilds;
}
unsigned Worker::getNrSubstitutions()
{
return nrSubstitutions;
}
void Worker::childStarted(GoalPtr goal, const std::set<int> & fds,
bool inBuildSlot, bool respectTimeouts)
{
Child child;
child.goal = goal;
child.goal2 = goal.get();
child.fds = fds;
child.timeStarted = child.lastOutput = steady_time_point::clock::now();
child.inBuildSlot = inBuildSlot;
child.respectTimeouts = respectTimeouts;
children.emplace_back(child);
if (inBuildSlot) {
switch (goal->jobCategory()) {
case JobCategory::Substitution:
nrSubstitutions++;
break;
case JobCategory::Build:
nrLocalBuilds++;
break;
default:
abort();
}
}
}
void Worker::childTerminated(Goal * goal, bool wakeSleepers)
{
auto i = std::find_if(children.begin(), children.end(),
[&](const Child & child) { return child.goal2 == goal; });
if (i == children.end()) return;
if (i->inBuildSlot) {
switch (goal->jobCategory()) {
case JobCategory::Substitution:
assert(nrSubstitutions > 0);
nrSubstitutions--;
break;
case JobCategory::Build:
assert(nrLocalBuilds > 0);
nrLocalBuilds--;
break;
default:
abort();
}
}
children.erase(i);
if (wakeSleepers) {
/* Wake up goals waiting for a build slot. */
for (auto & j : wantingToBuild) {
GoalPtr goal = j.lock();
if (goal) wakeUp(goal);
}
wantingToBuild.clear();
}
}
void Worker::waitForBuildSlot(GoalPtr goal)
{
goal->trace("wait for build slot");
bool isSubstitutionGoal = goal->jobCategory() == JobCategory::Substitution;
if ((!isSubstitutionGoal && getNrLocalBuilds() < settings.maxBuildJobs) ||
(isSubstitutionGoal && getNrSubstitutions() < settings.maxSubstitutionJobs))
wakeUp(goal); /* we can do it right away */
else
addToWeakGoals(wantingToBuild, goal);
}
void Worker::waitForAnyGoal(GoalPtr goal)
{
debug("wait for any goal");
addToWeakGoals(waitingForAnyGoal, goal);
}
void Worker::waitForAWhile(GoalPtr goal)
{
debug("wait for a while");
addToWeakGoals(waitingForAWhile, goal);
}
void Worker::run(const Goals & _topGoals)
{
std::vector<nix::DerivedPath> topPaths;
for (auto & i : _topGoals) {
topGoals.insert(i);
if (auto goal = dynamic_cast<DerivationGoal *>(i.get())) {
topPaths.push_back(DerivedPath::Built {
.drvPath = makeConstantStorePathRef(goal->drvPath),
.outputs = goal->wantedOutputs,
});
} else if (auto goal = dynamic_cast<PathSubstitutionGoal *>(i.get())) {
topPaths.push_back(DerivedPath::Opaque{goal->storePath});
}
}
/* Call queryMissing() to efficiently query substitutes. */
StorePathSet willBuild, willSubstitute, unknown;
uint64_t downloadSize, narSize;
store.queryMissing(topPaths, willBuild, willSubstitute, unknown, downloadSize, narSize);
debug("entered goal loop");
while (1) {
checkInterrupt();
// TODO GC interface?
if (auto localStore = dynamic_cast<LocalStore *>(&store))
localStore->autoGC(false);
/* Call every wake goal (in the ordering established by
CompareGoalPtrs). */
while (!awake.empty() && !topGoals.empty()) {
Goals awake2;
for (auto & i : awake) {
GoalPtr goal = i.lock();
if (goal) awake2.insert(goal);
}
awake.clear();
for (auto & goal : awake2) {
checkInterrupt();
goal->work();
if (topGoals.empty()) break; // stuff may have been cancelled
}
}
if (topGoals.empty()) break;
/* Wait for input. */
if (!children.empty() || !waitingForAWhile.empty())
waitForInput();
else {
if (awake.empty() && 0U == settings.maxBuildJobs)
{
if (getMachines().empty())
throw Error(
R"(
Unable to start any build;
either increase '--max-jobs' or enable remote builds.
For more information run 'man nix.conf' and search for '/machines'.
)"
);
else
throw Error(
R"(
Unable to start any build;
remote machines may not have all required system features.
For more information run 'man nix.conf' and search for '/machines'.
)"
);
}
assert(!awake.empty());
}
}
/* If --keep-going is not set, it's possible that the main goal
exited while some of its subgoals were still active. But if
--keep-going *is* set, then they must all be finished now. */
assert(!settings.keepGoing || awake.empty());
assert(!settings.keepGoing || wantingToBuild.empty());
assert(!settings.keepGoing || children.empty());
}
void Worker::waitForInput()
{
printMsg(lvlVomit, "waiting for children");
/* Process output from the file descriptors attached to the
children, namely log output and output path creation commands.
We also use this to detect child termination: if we get EOF on
the logger pipe of a build, we assume that the builder has
terminated. */
bool useTimeout = false;
long timeout = 0;
auto before = steady_time_point::clock::now();
/* If we're monitoring for silence on stdout/stderr, or if there
is a build timeout, then wait for input until the first
deadline for any child. */
auto nearest = steady_time_point::max(); // nearest deadline
if (settings.minFree.get() != 0)
// Periodicallty wake up to see if we need to run the garbage collector.
nearest = before + std::chrono::seconds(10);
for (auto & i : children) {
if (!i.respectTimeouts) continue;
if (0 != settings.maxSilentTime)
nearest = std::min(nearest, i.lastOutput + std::chrono::seconds(settings.maxSilentTime));
if (0 != settings.buildTimeout)
nearest = std::min(nearest, i.timeStarted + std::chrono::seconds(settings.buildTimeout));
}
if (nearest != steady_time_point::max()) {
timeout = std::max(1L, (long) std::chrono::duration_cast<std::chrono::seconds>(nearest - before).count());
useTimeout = true;
}
/* If we are polling goals that are waiting for a lock, then wake
up after a few seconds at most. */
if (!waitingForAWhile.empty()) {
useTimeout = true;
if (lastWokenUp == steady_time_point::min() || lastWokenUp > before) lastWokenUp = before;
timeout = std::max(1L,
(long) std::chrono::duration_cast<std::chrono::seconds>(
lastWokenUp + std::chrono::seconds(settings.pollInterval) - before).count());
} else lastWokenUp = steady_time_point::min();
if (useTimeout)
vomit("sleeping %d seconds", timeout);
/* Use select() to wait for the input side of any logger pipe to
become `available'. Note that `available' (i.e., non-blocking)
includes EOF. */
std::vector<struct pollfd> pollStatus;
std::map<int, size_t> fdToPollStatus;
for (auto & i : children) {
for (auto & j : i.fds) {
pollStatus.push_back((struct pollfd) { .fd = j, .events = POLLIN });
fdToPollStatus[j] = pollStatus.size() - 1;
}
}
if (poll(pollStatus.data(), pollStatus.size(),
useTimeout ? timeout * 1000 : -1) == -1) {
if (errno == EINTR) return;
throw SysError("waiting for input");
}
auto after = steady_time_point::clock::now();
/* Process all available file descriptors. FIXME: this is
O(children * fds). */
decltype(children)::iterator i;
for (auto j = children.begin(); j != children.end(); j = i) {
i = std::next(j);
checkInterrupt();
GoalPtr goal = j->goal.lock();
assert(goal);
std::set<int> fds2(j->fds);
std::vector<unsigned char> buffer(4096);
for (auto & k : fds2) {
const auto fdPollStatusId = get(fdToPollStatus, k);
assert(fdPollStatusId);
assert(*fdPollStatusId < pollStatus.size());
if (pollStatus.at(*fdPollStatusId).revents) {
ssize_t rd = ::read(k, buffer.data(), buffer.size());
// FIXME: is there a cleaner way to handle pt close
// than EIO? Is this even standard?
if (rd == 0 || (rd == -1 && errno == EIO)) {
debug("%1%: got EOF", goal->getName());
goal->handleEOF(k);
j->fds.erase(k);
} else if (rd == -1) {
if (errno != EINTR)
throw SysError("%s: read failed", goal->getName());
} else {
printMsg(lvlVomit, "%1%: read %2% bytes",
goal->getName(), rd);
std::string_view data((char *) buffer.data(), rd);
j->lastOutput = after;
goal->handleChildOutput(k, data);
}
}
}
if (goal->exitCode == Goal::ecBusy &&
0 != settings.maxSilentTime &&
j->respectTimeouts &&
after - j->lastOutput >= std::chrono::seconds(settings.maxSilentTime))
{
goal->timedOut(Error(
"%1% timed out after %2% seconds of silence",
goal->getName(), settings.maxSilentTime));
}
else if (goal->exitCode == Goal::ecBusy &&
0 != settings.buildTimeout &&
j->respectTimeouts &&
after - j->timeStarted >= std::chrono::seconds(settings.buildTimeout))
{
goal->timedOut(Error(
"%1% timed out after %2% seconds",
goal->getName(), settings.buildTimeout));
}
}
if (!waitingForAWhile.empty() && lastWokenUp + std::chrono::seconds(settings.pollInterval) <= after) {
lastWokenUp = after;
for (auto & i : waitingForAWhile) {
GoalPtr goal = i.lock();
if (goal) wakeUp(goal);
}
waitingForAWhile.clear();
}
}
unsigned int Worker::failingExitStatus()
{
// See API docs in header for explanation
unsigned int mask = 0;
bool buildFailure = permanentFailure || timedOut || hashMismatch;
if (buildFailure)
mask |= 0x04; // 100
if (timedOut)
mask |= 0x01; // 101
if (hashMismatch)
mask |= 0x02; // 102
if (checkMismatch) {
mask |= 0x08; // 104
}
if (mask)
mask |= 0x60;
return mask ? mask : 1;
}
bool Worker::pathContentsGood(const StorePath & path)
{
auto i = pathContentsGoodCache.find(path);
if (i != pathContentsGoodCache.end()) return i->second;
printInfo("checking path '%s'...", store.printStorePath(path));
auto info = store.queryPathInfo(path);
bool res;
if (!pathExists(store.printStorePath(path)))
res = false;
else {
Hash current = hashPath(
{store.getFSAccessor(), CanonPath(store.printStorePath(path))},
FileIngestionMethod::Recursive, info->narHash.algo);
Hash nullHash(HashAlgorithm::SHA256);
res = info->narHash == nullHash || info->narHash == current;
}
pathContentsGoodCache.insert_or_assign(path, res);
if (!res)
printError("path '%s' is corrupted or missing!", store.printStorePath(path));
return res;
}
void Worker::markContentsGood(const StorePath & path)
{
pathContentsGoodCache.insert_or_assign(path, true);
}
GoalPtr upcast_goal(std::shared_ptr<PathSubstitutionGoal> subGoal)
{
return subGoal;
}
GoalPtr upcast_goal(std::shared_ptr<DrvOutputSubstitutionGoal> subGoal)
{
return subGoal;
}
}

View file

@ -1,322 +0,0 @@
#pragma once
///@file
#include "types.hh"
#include "lock.hh"
#include "store-api.hh"
#include "goal.hh"
#include "realisation.hh"
#include <future>
#include <thread>
namespace nix {
/* Forward definition. */
struct DerivationGoal;
struct PathSubstitutionGoal;
class DrvOutputSubstitutionGoal;
/**
* Workaround for not being able to declare a something like
*
* ```c++
* class PathSubstitutionGoal : public Goal;
* ```
* even when Goal is a complete type.
*
* This is still a static cast. The purpose of exporting it is to define it in
* a place where `PathSubstitutionGoal` is concrete, and use it in a place where it
* is opaque.
*/
GoalPtr upcast_goal(std::shared_ptr<PathSubstitutionGoal> subGoal);
GoalPtr upcast_goal(std::shared_ptr<DrvOutputSubstitutionGoal> subGoal);
typedef std::chrono::time_point<std::chrono::steady_clock> steady_time_point;
/**
* A mapping used to remember for each child process to what goal it
* belongs, and file descriptors for receiving log data and output
* path creation commands.
*/
struct Child
{
WeakGoalPtr goal;
Goal * goal2; // ugly hackery
std::set<int> fds;
bool respectTimeouts;
bool inBuildSlot;
/**
* Time we last got output on stdout/stderr
*/
steady_time_point lastOutput;
steady_time_point timeStarted;
};
/* Forward definition. */
struct HookInstance;
/**
* The worker class.
*/
class Worker
{
private:
/* Note: the worker should only have strong pointers to the
top-level goals. */
/**
* The top-level goals of the worker.
*/
Goals topGoals;
/**
* Goals that are ready to do some work.
*/
WeakGoals awake;
/**
* Goals waiting for a build slot.
*/
WeakGoals wantingToBuild;
/**
* Child processes currently running.
*/
std::list<Child> children;
/**
* Number of build slots occupied. This includes local builds but does not
* include substitutions or remote builds via the build hook.
*/
unsigned int nrLocalBuilds;
/**
* Number of substitution slots occupied.
*/
unsigned int nrSubstitutions;
/**
* Maps used to prevent multiple instantiations of a goal for the
* same derivation / path.
*/
std::map<StorePath, std::weak_ptr<DerivationGoal>> derivationGoals;
std::map<StorePath, std::weak_ptr<PathSubstitutionGoal>> substitutionGoals;
std::map<DrvOutput, std::weak_ptr<DrvOutputSubstitutionGoal>> drvOutputSubstitutionGoals;
/**
* Goals waiting for busy paths to be unlocked.
*/
WeakGoals waitingForAnyGoal;
/**
* Goals sleeping for a few seconds (polling a lock).
*/
WeakGoals waitingForAWhile;
/**
* Last time the goals in `waitingForAWhile` were woken up.
*/
steady_time_point lastWokenUp;
/**
* Cache for pathContentsGood().
*/
std::map<StorePath, bool> pathContentsGoodCache;
public:
const Activity act;
const Activity actDerivations;
const Activity actSubstitutions;
/**
* Set if at least one derivation had a BuildError (i.e. permanent
* failure).
*/
bool permanentFailure;
/**
* Set if at least one derivation had a timeout.
*/
bool timedOut;
/**
* Set if at least one derivation fails with a hash mismatch.
*/
bool hashMismatch;
/**
* Set if at least one derivation is not deterministic in check mode.
*/
bool checkMismatch;
Store & store;
Store & evalStore;
std::unique_ptr<HookInstance> hook;
uint64_t expectedBuilds = 0;
uint64_t doneBuilds = 0;
uint64_t failedBuilds = 0;
uint64_t runningBuilds = 0;
uint64_t expectedSubstitutions = 0;
uint64_t doneSubstitutions = 0;
uint64_t failedSubstitutions = 0;
uint64_t runningSubstitutions = 0;
uint64_t expectedDownloadSize = 0;
uint64_t doneDownloadSize = 0;
uint64_t expectedNarSize = 0;
uint64_t doneNarSize = 0;
/**
* Whether to ask the build hook if it can build a derivation. If
* it answers with "decline-permanently", we don't try again.
*/
bool tryBuildHook = true;
Worker(Store & store, Store & evalStore);
~Worker();
/**
* Make a goal (with caching).
*/
/**
* @ref DerivationGoal "derivation goal"
*/
private:
std::shared_ptr<DerivationGoal> makeDerivationGoalCommon(
const StorePath & drvPath, const OutputsSpec & wantedOutputs,
std::function<std::shared_ptr<DerivationGoal>()> mkDrvGoal);
public:
std::shared_ptr<DerivationGoal> makeDerivationGoal(
const StorePath & drvPath,
const OutputsSpec & wantedOutputs, BuildMode buildMode = bmNormal);
std::shared_ptr<DerivationGoal> makeBasicDerivationGoal(
const StorePath & drvPath, const BasicDerivation & drv,
const OutputsSpec & wantedOutputs, BuildMode buildMode = bmNormal);
/**
* @ref SubstitutionGoal "substitution goal"
*/
std::shared_ptr<PathSubstitutionGoal> makePathSubstitutionGoal(const StorePath & storePath, RepairFlag repair = NoRepair, std::optional<ContentAddress> ca = std::nullopt);
std::shared_ptr<DrvOutputSubstitutionGoal> makeDrvOutputSubstitutionGoal(const DrvOutput & id, RepairFlag repair = NoRepair, std::optional<ContentAddress> ca = std::nullopt);
/**
* Make a goal corresponding to the `DerivedPath`.
*
* It will be a `DerivationGoal` for a `DerivedPath::Built` or
* a `SubstitutionGoal` for a `DerivedPath::Opaque`.
*/
GoalPtr makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal);
/**
* Remove a dead goal.
*/
void removeGoal(GoalPtr goal);
/**
* Wake up a goal (i.e., there is something for it to do).
*/
void wakeUp(GoalPtr goal);
/**
* Return the number of local build processes currently running (but not
* remote builds via the build hook).
*/
unsigned int getNrLocalBuilds();
/**
* Return the number of substitution processes currently running.
*/
unsigned int getNrSubstitutions();
/**
* Registers a running child process. `inBuildSlot` means that
* the process counts towards the jobs limit.
*/
void childStarted(GoalPtr goal, const std::set<int> & fds,
bool inBuildSlot, bool respectTimeouts);
/**
* Unregisters a running child process. `wakeSleepers` should be
* false if there is no sense in waking up goals that are sleeping
* because they can't run yet (e.g., there is no free build slot,
* or the hook would still say `postpone`).
*/
void childTerminated(Goal * goal, bool wakeSleepers = true);
/**
* Put `goal` to sleep until a build slot becomes available (which
* might be right away).
*/
void waitForBuildSlot(GoalPtr goal);
/**
* Wait for any goal to finish. Pretty indiscriminate way to
* wait for some resource that some other goal is holding.
*/
void waitForAnyGoal(GoalPtr goal);
/**
* Wait for a few seconds and then retry this goal. Used when
* waiting for a lock held by another process. This kind of
* polling is inefficient, but POSIX doesn't really provide a way
* to wait for multiple locks in the main select() loop.
*/
void waitForAWhile(GoalPtr goal);
/**
* Loop until the specified top-level goals have finished.
*/
void run(const Goals & topGoals);
/**
* Wait for input to become available.
*/
void waitForInput();
/***
* The exit status in case of failure.
*
* In the case of a build failure, returned value follows this
* bitmask:
*
* ```
* 0b1100100
* ^^^^
* |||`- timeout
* ||`-- output hash mismatch
* |`--- build failure
* `---- not deterministic
* ```
*
* In other words, the failure code is at least 100 (0b1100100), but
* might also be greater.
*
* Otherwise (no build failure, but some other sort of failure by
* assumption), this returned value is 1.
*/
unsigned int failingExitStatus();
/**
* Check whether the given valid path exists and has the right
* contents.
*/
bool pathContentsGood(const StorePath & path);
void markContentsGood(const StorePath & path);
void updateProgress()
{
actDerivations.progress(doneBuilds, expectedBuilds + doneBuilds, runningBuilds, failedBuilds);
actSubstitutions.progress(doneSubstitutions, expectedSubstitutions + doneSubstitutions, runningSubstitutions, failedSubstitutions);
act.setExpected(actFileTransfer, expectedDownloadSize + doneDownloadSize);
act.setExpected(actCopyPath, expectedNarSize + doneNarSize);
}
};
}