1
0
Fork 0
mirror of https://github.com/NixOS/nix synced 2025-06-27 00:11:17 +02:00

Merge pull request #12697 from NixOS/worker-abstraction

Make Goal code use abstractions over interations with Worker
This commit is contained in:
mergify[bot] 2025-03-21 10:38:29 +00:00 committed by GitHub
commit f0b7b37425
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 110 additions and 75 deletions

View file

@ -142,8 +142,8 @@ Goal::Co DerivationGoal::init() {
substitute. */ substitute. */
if (buildMode != bmNormal || !worker.evalStore.isValidPath(drvPath)) { if (buildMode != bmNormal || !worker.evalStore.isValidPath(drvPath)) {
addWaitee(upcast_goal(worker.makePathSubstitutionGoal(drvPath))); Goals waitees{upcast_goal(worker.makePathSubstitutionGoal(drvPath))};
co_await Suspend{}; co_await await(std::move(waitees));
} }
trace("loading derivation"); trace("loading derivation");
@ -235,6 +235,8 @@ Goal::Co DerivationGoal::haveDerivation()
} }
} }
Goals waitees;
/* We are first going to try to create the invalid output paths /* We are first going to try to create the invalid output paths
through substitutes. If that doesn't work, we'll build through substitutes. If that doesn't work, we'll build
them. */ them. */
@ -242,7 +244,7 @@ Goal::Co DerivationGoal::haveDerivation()
for (auto & [outputName, status] : initialOutputs) { for (auto & [outputName, status] : initialOutputs) {
if (!status.wanted) continue; if (!status.wanted) continue;
if (!status.known) if (!status.known)
addWaitee( waitees.insert(
upcast_goal( upcast_goal(
worker.makeDrvOutputSubstitutionGoal( worker.makeDrvOutputSubstitutionGoal(
DrvOutput{status.outputHash, outputName}, DrvOutput{status.outputHash, outputName},
@ -252,14 +254,14 @@ Goal::Co DerivationGoal::haveDerivation()
); );
else { else {
auto * cap = getDerivationCA(*drv); auto * cap = getDerivationCA(*drv);
addWaitee(upcast_goal(worker.makePathSubstitutionGoal( waitees.insert(upcast_goal(worker.makePathSubstitutionGoal(
status.known->path, status.known->path,
buildMode == bmRepair ? Repair : NoRepair, buildMode == bmRepair ? Repair : NoRepair,
cap ? std::optional { *cap } : std::nullopt))); cap ? std::optional { *cap } : std::nullopt)));
} }
} }
if (!waitees.empty()) co_await Suspend{}; /* to prevent hang (no wake-up event) */ co_await await(std::move(waitees));
trace("all outputs substituted (maybe)"); trace("all outputs substituted (maybe)");
@ -342,6 +344,8 @@ Goal::Co DerivationGoal::gaveUpOnSubstitution()
is no need to restart. */ is no need to restart. */
needRestart = NeedRestartForMoreOutputs::BuildInProgressWillNotNeed; needRestart = NeedRestartForMoreOutputs::BuildInProgressWillNotNeed;
Goals waitees;
std::map<ref<const SingleDerivedPath>, GoalPtr, value_comparison> inputGoals; std::map<ref<const SingleDerivedPath>, GoalPtr, value_comparison> inputGoals;
if (useDerivation) { if (useDerivation) {
@ -356,7 +360,7 @@ Goal::Co DerivationGoal::gaveUpOnSubstitution()
}, },
buildMode == bmRepair ? bmRepair : bmNormal); buildMode == bmRepair ? bmRepair : bmNormal);
inputGoals.insert_or_assign(inputDrv, g); inputGoals.insert_or_assign(inputDrv, g);
addWaitee(std::move(g)); waitees.insert(std::move(g));
} }
for (const auto & [outputName, childNode] : inputNode.childMap) for (const auto & [outputName, childNode] : inputNode.childMap)
addWaiteeDerivedPath( addWaiteeDerivedPath(
@ -397,10 +401,11 @@ Goal::Co DerivationGoal::gaveUpOnSubstitution()
if (!settings.useSubstitutes) if (!settings.useSubstitutes)
throw Error("dependency '%s' of '%s' does not exist, and substitution is disabled", throw Error("dependency '%s' of '%s' does not exist, and substitution is disabled",
worker.store.printStorePath(i), worker.store.printStorePath(drvPath)); worker.store.printStorePath(i), worker.store.printStorePath(drvPath));
addWaitee(upcast_goal(worker.makePathSubstitutionGoal(i))); waitees.insert(upcast_goal(worker.makePathSubstitutionGoal(i)));
} }
if (!waitees.empty()) co_await Suspend{}; /* to prevent hang (no wake-up event) */ co_await await(std::move(waitees));
trace("all inputs realised"); trace("all inputs realised");
@ -495,9 +500,11 @@ Goal::Co DerivationGoal::gaveUpOnSubstitution()
resolvedDrvGoal = worker.makeDerivationGoal( resolvedDrvGoal = worker.makeDerivationGoal(
pathResolved, wantedOutputs, buildMode); pathResolved, wantedOutputs, buildMode);
addWaitee(resolvedDrvGoal); {
Goals waitees{resolvedDrvGoal};
co_await await(std::move(waitees));
}
co_await Suspend{};
co_return resolvedFinished(); co_return resolvedFinished();
} }
@ -536,8 +543,7 @@ Goal::Co DerivationGoal::gaveUpOnSubstitution()
/* Okay, try to build. Note that here we don't wait for a build /* Okay, try to build. Note that here we don't wait for a build
slot to become available, since we don't need one if there is a slot to become available, since we don't need one if there is a
build hook. */ build hook. */
worker.wakeUp(shared_from_this()); co_await yield();
co_await Suspend{};
co_return tryToBuild(); co_return tryToBuild();
} }
@ -602,8 +608,7 @@ Goal::Co DerivationGoal::tryToBuild()
/* Wait then try locking again, repeat until success (returned /* Wait then try locking again, repeat until success (returned
boolean is true). */ boolean is true). */
do { do {
worker.waitForAWhile(shared_from_this()); co_await waitForAWhile();
co_await Suspend{};
} while (!outputLocks.lockPaths(lockFiles, "", false)); } while (!outputLocks.lockPaths(lockFiles, "", false));
} }
@ -667,8 +672,7 @@ Goal::Co DerivationGoal::tryToBuild()
actLock.reset(); actLock.reset();
worker.wakeUp(shared_from_this()); co_await yield();
co_await Suspend{};
co_return tryLocalBuild(); co_return tryLocalBuild();
} }
@ -719,6 +723,8 @@ Goal::Co DerivationGoal::repairClosure()
outputsToDrv.insert_or_assign(*j.second, i); outputsToDrv.insert_or_assign(*j.second, i);
} }
Goals waitees;
/* Check each path (slow!). */ /* Check each path (slow!). */
for (auto & i : outputClosure) { for (auto & i : outputClosure) {
if (worker.pathContentsGood(i)) continue; if (worker.pathContentsGood(i)) continue;
@ -727,9 +733,9 @@ Goal::Co DerivationGoal::repairClosure()
worker.store.printStorePath(i), worker.store.printStorePath(drvPath)); worker.store.printStorePath(i), worker.store.printStorePath(drvPath));
auto drvPath2 = outputsToDrv.find(i); auto drvPath2 = outputsToDrv.find(i);
if (drvPath2 == outputsToDrv.end()) if (drvPath2 == outputsToDrv.end())
addWaitee(upcast_goal(worker.makePathSubstitutionGoal(i, Repair))); waitees.insert(upcast_goal(worker.makePathSubstitutionGoal(i, Repair)));
else else
addWaitee(worker.makeGoal( waitees.insert(worker.makeGoal(
DerivedPath::Built { DerivedPath::Built {
.drvPath = makeConstantStorePathRef(drvPath2->second), .drvPath = makeConstantStorePathRef(drvPath2->second),
.outputs = OutputsSpec::All { }, .outputs = OutputsSpec::All { },
@ -737,17 +743,15 @@ Goal::Co DerivationGoal::repairClosure()
bmRepair)); bmRepair));
} }
if (waitees.empty()) { co_await await(std::move(waitees));
co_return done(BuildResult::AlreadyValid, assertPathValidity());
} else {
co_await Suspend{};
if (!waitees.empty()) {
trace("closure repaired"); trace("closure repaired");
if (nrFailed > 0) if (nrFailed > 0)
throw Error("some paths in the output closure of derivation '%s' could not be repaired", throw Error("some paths in the output closure of derivation '%s' could not be repaired",
worker.store.printStorePath(drvPath)); worker.store.printStorePath(drvPath));
co_return done(BuildResult::AlreadyValid, assertPathValidity());
} }
co_return done(BuildResult::AlreadyValid, assertPathValidity());
} }

View file

@ -87,6 +87,8 @@ Goal::Co DrvOutputSubstitutionGoal::init()
bool failed = false; bool failed = false;
Goals waitees;
for (const auto & [depId, depPath] : outputInfo->dependentRealisations) { for (const auto & [depId, depPath] : outputInfo->dependentRealisations) {
if (depId != id) { if (depId != id) {
if (auto localOutputInfo = worker.store.queryRealisation(depId); if (auto localOutputInfo = worker.store.queryRealisation(depId);
@ -103,13 +105,13 @@ Goal::Co DrvOutputSubstitutionGoal::init()
failed = true; failed = true;
break; break;
} }
addWaitee(worker.makeDrvOutputSubstitutionGoal(depId)); waitees.insert(worker.makeDrvOutputSubstitutionGoal(depId));
} }
} }
if (failed) continue; if (failed) continue;
co_return realisationFetched(outputInfo, sub); co_return realisationFetched(std::move(waitees), outputInfo, sub);
} }
/* None left. Terminate this goal and let someone else deal /* None left. Terminate this goal and let someone else deal
@ -127,10 +129,10 @@ Goal::Co DrvOutputSubstitutionGoal::init()
co_return amDone(substituterFailed ? ecFailed : ecNoSubstituters); co_return amDone(substituterFailed ? ecFailed : ecNoSubstituters);
} }
Goal::Co DrvOutputSubstitutionGoal::realisationFetched(std::shared_ptr<const Realisation> outputInfo, nix::ref<nix::Store> sub) { Goal::Co DrvOutputSubstitutionGoal::realisationFetched(Goals waitees, std::shared_ptr<const Realisation> outputInfo, nix::ref<nix::Store> sub) {
addWaitee(worker.makePathSubstitutionGoal(outputInfo->outPath)); waitees.insert(worker.makePathSubstitutionGoal(outputInfo->outPath));
if (!waitees.empty()) co_await Suspend{}; co_await await(std::move(waitees));
trace("output path substituted"); trace("output path substituted");

View file

@ -34,7 +34,7 @@ public:
GoalState state; GoalState state;
Co init() override; Co init() override;
Co realisationFetched(std::shared_ptr<const Realisation> outputInfo, nix::ref<nix::Store> sub); Co realisationFetched(Goals waitees, std::shared_ptr<const Realisation> outputInfo, nix::ref<nix::Store> sub);
void timedOut(Error && ex) override { unreachable(); }; void timedOut(Error && ex) override { unreachable(); };

View file

@ -132,38 +132,18 @@ void addToWeakGoals(WeakGoals & goals, GoalPtr p)
goals.insert(p); goals.insert(p);
} }
Co Goal::await(Goals new_waitees)
void Goal::addWaitee(GoalPtr waitee)
{ {
waitees.insert(waitee); assert(waitees.empty());
if (!new_waitees.empty()) {
waitees = std::move(new_waitees);
for (auto waitee : waitees) {
addToWeakGoals(waitee->waiters, shared_from_this()); addToWeakGoals(waitee->waiters, shared_from_this());
} }
co_await Suspend{};
assert(waitees.empty());
void Goal::waiteeDone(GoalPtr waitee, ExitCode result)
{
assert(waitees.count(waitee));
waitees.erase(waitee);
trace(fmt("waitee '%s' done; %d left", waitee->name, waitees.size()));
if (result == ecFailed || result == ecNoSubstituters || result == ecIncompleteClosure) ++nrFailed;
if (result == ecNoSubstituters) ++nrNoSubstituters;
if (result == ecIncompleteClosure) ++nrIncompleteClosure;
if (waitees.empty() || (result == ecFailed && !settings.keepGoing)) {
/* If we failed and keepGoing is not set, we remove all
remaining waitees. */
for (auto & goal : waitees) {
goal->waiters.extract(shared_from_this());
}
waitees.clear();
worker.wakeUp(shared_from_this());
} }
co_return Return{};
} }
Goal::Done Goal::amDone(ExitCode result, std::optional<Error> ex) Goal::Done Goal::amDone(ExitCode result, std::optional<Error> ex)
@ -183,7 +163,32 @@ Goal::Done Goal::amDone(ExitCode result, std::optional<Error> ex)
for (auto & i : waiters) { for (auto & i : waiters) {
GoalPtr goal = i.lock(); GoalPtr goal = i.lock();
if (goal) goal->waiteeDone(shared_from_this(), result); if (goal) {
auto me = shared_from_this();
assert(goal->waitees.count(me));
goal->waitees.erase(me);
goal->trace(fmt("waitee '%s' done; %d left", name, goal->waitees.size()));
if (result == ecFailed || result == ecNoSubstituters || result == ecIncompleteClosure) ++goal->nrFailed;
if (result == ecNoSubstituters) ++goal->nrNoSubstituters;
if (result == ecIncompleteClosure) ++goal->nrIncompleteClosure;
if (goal->waitees.empty()) {
worker.wakeUp(goal);
} else if (result == ecFailed && !settings.keepGoing) {
/* If we failed and keepGoing is not set, we remove all
remaining waitees. */
for (auto & g : goal->waitees) {
g->waiters.extract(goal);
}
goal->waitees.clear();
worker.wakeUp(goal);
}
}
} }
waiters.clear(); waiters.clear();
worker.removeGoal(shared_from_this()); worker.removeGoal(shared_from_this());
@ -215,5 +220,22 @@ void Goal::work()
assert(top_co || exitCode != ecBusy); assert(top_co || exitCode != ecBusy);
} }
Goal::Co Goal::yield() {
worker.wakeUp(shared_from_this());
co_await Suspend{};
co_return Return{};
}
Goal::Co Goal::waitForAWhile() {
worker.waitForAWhile(shared_from_this());
co_await Suspend{};
co_return Return{};
}
Goal::Co Goal::waitForBuildSlot() {
worker.waitForBuildSlot(shared_from_this());
co_await Suspend{};
co_return Return{};
}
} }

View file

@ -54,6 +54,13 @@ enum struct JobCategory {
struct Goal : public std::enable_shared_from_this<Goal> struct Goal : public std::enable_shared_from_this<Goal>
{ {
private:
/**
* Goals that this goal is waiting for.
*/
Goals waitees;
public:
typedef enum {ecBusy, ecSuccess, ecFailed, ecNoSubstituters, ecIncompleteClosure} ExitCode; typedef enum {ecBusy, ecSuccess, ecFailed, ecNoSubstituters, ecIncompleteClosure} ExitCode;
/** /**
@ -61,11 +68,6 @@ struct Goal : public std::enable_shared_from_this<Goal>
*/ */
Worker & worker; Worker & worker;
/**
* Goals that this goal is waiting for.
*/
Goals waitees;
/** /**
* Goals waiting for this one to finish. Must use weak pointers * Goals waiting for this one to finish. Must use weak pointers
* here to prevent cycles. * here to prevent cycles.
@ -104,8 +106,8 @@ protected:
* Build result. * Build result.
*/ */
BuildResult buildResult; BuildResult buildResult;
public:
public:
/** /**
* Suspend our goal and wait until we get `work`-ed again. * Suspend our goal and wait until we get `work`-ed again.
* `co_await`-able by @ref Co. * `co_await`-able by @ref Co.
@ -332,6 +334,7 @@ public:
std::suspend_always await_transform(Suspend) { return {}; }; std::suspend_always await_transform(Suspend) { return {}; };
}; };
protected:
/** /**
* The coroutine being currently executed. * The coroutine being currently executed.
* MUST be updated when switching the coroutine being executed. * MUST be updated when switching the coroutine being executed.
@ -359,6 +362,7 @@ public:
*/ */
Done amDone(ExitCode result, std::optional<Error> ex = {}); Done amDone(ExitCode result, std::optional<Error> ex = {});
public:
virtual void cleanup() { } virtual void cleanup() { }
/** /**
@ -394,10 +398,6 @@ public:
void work(); void work();
void addWaitee(GoalPtr waitee);
void waiteeDone(GoalPtr waitee, ExitCode result);
virtual void handleChildOutput(Descriptor fd, std::string_view data) virtual void handleChildOutput(Descriptor fd, std::string_view data)
{ {
unreachable(); unreachable();
@ -429,6 +429,13 @@ public:
* @see JobCategory * @see JobCategory
*/ */
virtual JobCategory jobCategory() const = 0; virtual JobCategory jobCategory() const = 0;
protected:
Co await(Goals waitees);
Co waitForAWhile();
Co waitForBuildSlot();
Co yield();
}; };
void addToWeakGoals(WeakGoals & goals, GoalPtr p); void addToWeakGoals(WeakGoals & goals, GoalPtr p);

View file

@ -128,13 +128,15 @@ Goal::Co PathSubstitutionGoal::init()
continue; continue;
} }
Goals waitees;
/* To maintain the closure invariant, we first have to realise the /* To maintain the closure invariant, we first have to realise the
paths referenced by this one. */ paths referenced by this one. */
for (auto & i : info->references) for (auto & i : info->references)
if (i != storePath) /* ignore self-references */ if (i != storePath) /* ignore self-references */
addWaitee(worker.makePathSubstitutionGoal(i)); waitees.insert(worker.makePathSubstitutionGoal(i));
if (!waitees.empty()) co_await Suspend{}; co_await await(std::move(waitees));
// FIXME: consider returning boolean instead of passing in reference // FIXME: consider returning boolean instead of passing in reference
bool out = false; // is mutated by tryToRun bool out = false; // is mutated by tryToRun
@ -175,8 +177,7 @@ Goal::Co PathSubstitutionGoal::tryToRun(StorePath subPath, nix::ref<Store> sub,
if (i != storePath) /* ignore self-references */ if (i != storePath) /* ignore self-references */
assert(worker.store.isValidPath(i)); assert(worker.store.isValidPath(i));
worker.wakeUp(shared_from_this()); co_await yield();
co_await Suspend{};
trace("trying to run"); trace("trying to run");
@ -184,8 +185,7 @@ Goal::Co PathSubstitutionGoal::tryToRun(StorePath subPath, nix::ref<Store> sub,
if maxSubstitutionJobs == 0, we still allow a substituter to run. This if maxSubstitutionJobs == 0, we still allow a substituter to run. This
prevents infinite waiting. */ prevents infinite waiting. */
while (worker.getNrSubstitutions() >= std::max(1U, (unsigned int) settings.maxSubstitutionJobs)) { while (worker.getNrSubstitutions() >= std::max(1U, (unsigned int) settings.maxSubstitutionJobs)) {
worker.waitForBuildSlot(shared_from_this()); co_await waitForBuildSlot();
co_await Suspend{};
} }
auto maintainRunningSubstitutions = std::make_unique<MaintainCount<uint64_t>>(worker.runningSubstitutions); auto maintainRunningSubstitutions = std::make_unique<MaintainCount<uint64_t>>(worker.runningSubstitutions);