diff --git a/src/libstore/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc index 4978eecd2..d2e02c6c3 100644 --- a/src/libstore/build/derivation-goal.cc +++ b/src/libstore/build/derivation-goal.cc @@ -142,8 +142,8 @@ Goal::Co DerivationGoal::init() { substitute. */ if (buildMode != bmNormal || !worker.evalStore.isValidPath(drvPath)) { - addWaitee(upcast_goal(worker.makePathSubstitutionGoal(drvPath))); - co_await Suspend{}; + Goals waitees{upcast_goal(worker.makePathSubstitutionGoal(drvPath))}; + co_await await(std::move(waitees)); } trace("loading derivation"); @@ -235,6 +235,8 @@ Goal::Co DerivationGoal::haveDerivation() } } + Goals waitees; + /* We are first going to try to create the invalid output paths through substitutes. If that doesn't work, we'll build them. */ @@ -242,7 +244,7 @@ Goal::Co DerivationGoal::haveDerivation() for (auto & [outputName, status] : initialOutputs) { if (!status.wanted) continue; if (!status.known) - addWaitee( + waitees.insert( upcast_goal( worker.makeDrvOutputSubstitutionGoal( DrvOutput{status.outputHash, outputName}, @@ -252,14 +254,14 @@ Goal::Co DerivationGoal::haveDerivation() ); else { auto * cap = getDerivationCA(*drv); - addWaitee(upcast_goal(worker.makePathSubstitutionGoal( + waitees.insert(upcast_goal(worker.makePathSubstitutionGoal( status.known->path, buildMode == bmRepair ? Repair : NoRepair, cap ? std::optional { *cap } : std::nullopt))); } } - if (!waitees.empty()) co_await Suspend{}; /* to prevent hang (no wake-up event) */ + co_await await(std::move(waitees)); trace("all outputs substituted (maybe)"); @@ -342,6 +344,8 @@ Goal::Co DerivationGoal::gaveUpOnSubstitution() is no need to restart. */ needRestart = NeedRestartForMoreOutputs::BuildInProgressWillNotNeed; + Goals waitees; + std::map, GoalPtr, value_comparison> inputGoals; if (useDerivation) { @@ -356,7 +360,7 @@ Goal::Co DerivationGoal::gaveUpOnSubstitution() }, buildMode == bmRepair ? bmRepair : bmNormal); inputGoals.insert_or_assign(inputDrv, g); - addWaitee(std::move(g)); + waitees.insert(std::move(g)); } for (const auto & [outputName, childNode] : inputNode.childMap) addWaiteeDerivedPath( @@ -397,10 +401,11 @@ Goal::Co DerivationGoal::gaveUpOnSubstitution() if (!settings.useSubstitutes) throw Error("dependency '%s' of '%s' does not exist, and substitution is disabled", worker.store.printStorePath(i), worker.store.printStorePath(drvPath)); - addWaitee(upcast_goal(worker.makePathSubstitutionGoal(i))); + waitees.insert(upcast_goal(worker.makePathSubstitutionGoal(i))); } - if (!waitees.empty()) co_await Suspend{}; /* to prevent hang (no wake-up event) */ + co_await await(std::move(waitees)); + trace("all inputs realised"); @@ -495,9 +500,11 @@ Goal::Co DerivationGoal::gaveUpOnSubstitution() resolvedDrvGoal = worker.makeDerivationGoal( pathResolved, wantedOutputs, buildMode); - addWaitee(resolvedDrvGoal); + { + Goals waitees{resolvedDrvGoal}; + co_await await(std::move(waitees)); + } - co_await Suspend{}; co_return resolvedFinished(); } @@ -536,8 +543,7 @@ Goal::Co DerivationGoal::gaveUpOnSubstitution() /* Okay, try to build. Note that here we don't wait for a build slot to become available, since we don't need one if there is a build hook. */ - worker.wakeUp(shared_from_this()); - co_await Suspend{}; + co_await yield(); co_return tryToBuild(); } @@ -602,8 +608,7 @@ Goal::Co DerivationGoal::tryToBuild() /* Wait then try locking again, repeat until success (returned boolean is true). */ do { - worker.waitForAWhile(shared_from_this()); - co_await Suspend{}; + co_await waitForAWhile(); } while (!outputLocks.lockPaths(lockFiles, "", false)); } @@ -667,8 +672,7 @@ Goal::Co DerivationGoal::tryToBuild() actLock.reset(); - worker.wakeUp(shared_from_this()); - co_await Suspend{}; + co_await yield(); co_return tryLocalBuild(); } @@ -719,6 +723,8 @@ Goal::Co DerivationGoal::repairClosure() outputsToDrv.insert_or_assign(*j.second, i); } + Goals waitees; + /* Check each path (slow!). */ for (auto & i : outputClosure) { if (worker.pathContentsGood(i)) continue; @@ -727,9 +733,9 @@ Goal::Co DerivationGoal::repairClosure() worker.store.printStorePath(i), worker.store.printStorePath(drvPath)); auto drvPath2 = outputsToDrv.find(i); if (drvPath2 == outputsToDrv.end()) - addWaitee(upcast_goal(worker.makePathSubstitutionGoal(i, Repair))); + waitees.insert(upcast_goal(worker.makePathSubstitutionGoal(i, Repair))); else - addWaitee(worker.makeGoal( + waitees.insert(worker.makeGoal( DerivedPath::Built { .drvPath = makeConstantStorePathRef(drvPath2->second), .outputs = OutputsSpec::All { }, @@ -737,17 +743,15 @@ Goal::Co DerivationGoal::repairClosure() bmRepair)); } - if (waitees.empty()) { - co_return done(BuildResult::AlreadyValid, assertPathValidity()); - } else { - co_await Suspend{}; + co_await await(std::move(waitees)); + if (!waitees.empty()) { trace("closure repaired"); if (nrFailed > 0) throw Error("some paths in the output closure of derivation '%s' could not be repaired", worker.store.printStorePath(drvPath)); - co_return done(BuildResult::AlreadyValid, assertPathValidity()); } + co_return done(BuildResult::AlreadyValid, assertPathValidity()); } diff --git a/src/libstore/build/drv-output-substitution-goal.cc b/src/libstore/build/drv-output-substitution-goal.cc index c39793a65..a5926be87 100644 --- a/src/libstore/build/drv-output-substitution-goal.cc +++ b/src/libstore/build/drv-output-substitution-goal.cc @@ -87,6 +87,8 @@ Goal::Co DrvOutputSubstitutionGoal::init() bool failed = false; + Goals waitees; + for (const auto & [depId, depPath] : outputInfo->dependentRealisations) { if (depId != id) { if (auto localOutputInfo = worker.store.queryRealisation(depId); @@ -103,13 +105,13 @@ Goal::Co DrvOutputSubstitutionGoal::init() failed = true; break; } - addWaitee(worker.makeDrvOutputSubstitutionGoal(depId)); + waitees.insert(worker.makeDrvOutputSubstitutionGoal(depId)); } } if (failed) continue; - co_return realisationFetched(outputInfo, sub); + co_return realisationFetched(std::move(waitees), outputInfo, sub); } /* None left. Terminate this goal and let someone else deal @@ -127,10 +129,10 @@ Goal::Co DrvOutputSubstitutionGoal::init() co_return amDone(substituterFailed ? ecFailed : ecNoSubstituters); } -Goal::Co DrvOutputSubstitutionGoal::realisationFetched(std::shared_ptr outputInfo, nix::ref sub) { - addWaitee(worker.makePathSubstitutionGoal(outputInfo->outPath)); +Goal::Co DrvOutputSubstitutionGoal::realisationFetched(Goals waitees, std::shared_ptr outputInfo, nix::ref sub) { + waitees.insert(worker.makePathSubstitutionGoal(outputInfo->outPath)); - if (!waitees.empty()) co_await Suspend{}; + co_await await(std::move(waitees)); trace("output path substituted"); diff --git a/src/libstore/build/drv-output-substitution-goal.hh b/src/libstore/build/drv-output-substitution-goal.hh index 8c60d0198..63d1cd2f8 100644 --- a/src/libstore/build/drv-output-substitution-goal.hh +++ b/src/libstore/build/drv-output-substitution-goal.hh @@ -34,7 +34,7 @@ public: GoalState state; Co init() override; - Co realisationFetched(std::shared_ptr outputInfo, nix::ref sub); + Co realisationFetched(Goals waitees, std::shared_ptr outputInfo, nix::ref sub); void timedOut(Error && ex) override { unreachable(); }; diff --git a/src/libstore/build/goal.cc b/src/libstore/build/goal.cc index 9a16da145..6628d7d8e 100644 --- a/src/libstore/build/goal.cc +++ b/src/libstore/build/goal.cc @@ -132,38 +132,18 @@ void addToWeakGoals(WeakGoals & goals, GoalPtr p) goals.insert(p); } - -void Goal::addWaitee(GoalPtr waitee) +Co Goal::await(Goals new_waitees) { - waitees.insert(waitee); - addToWeakGoals(waitee->waiters, shared_from_this()); -} - - -void Goal::waiteeDone(GoalPtr waitee, ExitCode result) -{ - assert(waitees.count(waitee)); - waitees.erase(waitee); - - trace(fmt("waitee '%s' done; %d left", waitee->name, waitees.size())); - - if (result == ecFailed || result == ecNoSubstituters || result == ecIncompleteClosure) ++nrFailed; - - if (result == ecNoSubstituters) ++nrNoSubstituters; - - if (result == ecIncompleteClosure) ++nrIncompleteClosure; - - if (waitees.empty() || (result == ecFailed && !settings.keepGoing)) { - - /* If we failed and keepGoing is not set, we remove all - remaining waitees. */ - for (auto & goal : waitees) { - goal->waiters.extract(shared_from_this()); + assert(waitees.empty()); + if (!new_waitees.empty()) { + waitees = std::move(new_waitees); + for (auto waitee : waitees) { + addToWeakGoals(waitee->waiters, shared_from_this()); } - waitees.clear(); - - worker.wakeUp(shared_from_this()); + co_await Suspend{}; + assert(waitees.empty()); } + co_return Return{}; } Goal::Done Goal::amDone(ExitCode result, std::optional ex) @@ -183,7 +163,32 @@ Goal::Done Goal::amDone(ExitCode result, std::optional ex) for (auto & i : waiters) { GoalPtr goal = i.lock(); - if (goal) goal->waiteeDone(shared_from_this(), result); + if (goal) { + auto me = shared_from_this(); + assert(goal->waitees.count(me)); + goal->waitees.erase(me); + + goal->trace(fmt("waitee '%s' done; %d left", name, goal->waitees.size())); + + if (result == ecFailed || result == ecNoSubstituters || result == ecIncompleteClosure) ++goal->nrFailed; + + if (result == ecNoSubstituters) ++goal->nrNoSubstituters; + + if (result == ecIncompleteClosure) ++goal->nrIncompleteClosure; + + if (goal->waitees.empty()) { + worker.wakeUp(goal); + } else if (result == ecFailed && !settings.keepGoing) { + /* If we failed and keepGoing is not set, we remove all + remaining waitees. */ + for (auto & g : goal->waitees) { + g->waiters.extract(goal); + } + goal->waitees.clear(); + + worker.wakeUp(goal); + } + } } waiters.clear(); worker.removeGoal(shared_from_this()); @@ -215,5 +220,22 @@ void Goal::work() assert(top_co || exitCode != ecBusy); } +Goal::Co Goal::yield() { + worker.wakeUp(shared_from_this()); + co_await Suspend{}; + co_return Return{}; +} + +Goal::Co Goal::waitForAWhile() { + worker.waitForAWhile(shared_from_this()); + co_await Suspend{}; + co_return Return{}; +} + +Goal::Co Goal::waitForBuildSlot() { + worker.waitForBuildSlot(shared_from_this()); + co_await Suspend{}; + co_return Return{}; +} } diff --git a/src/libstore/build/goal.hh b/src/libstore/build/goal.hh index 9893c6108..6054975d7 100644 --- a/src/libstore/build/goal.hh +++ b/src/libstore/build/goal.hh @@ -54,6 +54,13 @@ enum struct JobCategory { struct Goal : public std::enable_shared_from_this { +private: + /** + * Goals that this goal is waiting for. + */ + Goals waitees; + +public: typedef enum {ecBusy, ecSuccess, ecFailed, ecNoSubstituters, ecIncompleteClosure} ExitCode; /** @@ -61,11 +68,6 @@ struct Goal : public std::enable_shared_from_this */ Worker & worker; - /** - * Goals that this goal is waiting for. - */ - Goals waitees; - /** * Goals waiting for this one to finish. Must use weak pointers * here to prevent cycles. @@ -104,8 +106,8 @@ protected: * Build result. */ BuildResult buildResult; -public: +public: /** * Suspend our goal and wait until we get `work`-ed again. * `co_await`-able by @ref Co. @@ -332,6 +334,7 @@ public: std::suspend_always await_transform(Suspend) { return {}; }; }; +protected: /** * The coroutine being currently executed. * MUST be updated when switching the coroutine being executed. @@ -359,6 +362,7 @@ public: */ Done amDone(ExitCode result, std::optional ex = {}); +public: virtual void cleanup() { } /** @@ -394,10 +398,6 @@ public: void work(); - void addWaitee(GoalPtr waitee); - - void waiteeDone(GoalPtr waitee, ExitCode result); - virtual void handleChildOutput(Descriptor fd, std::string_view data) { unreachable(); @@ -429,6 +429,13 @@ public: * @see JobCategory */ virtual JobCategory jobCategory() const = 0; + +protected: + Co await(Goals waitees); + + Co waitForAWhile(); + Co waitForBuildSlot(); + Co yield(); }; void addToWeakGoals(WeakGoals & goals, GoalPtr p); diff --git a/src/libstore/build/substitution-goal.cc b/src/libstore/build/substitution-goal.cc index 245fc983f..524bf1f16 100644 --- a/src/libstore/build/substitution-goal.cc +++ b/src/libstore/build/substitution-goal.cc @@ -128,13 +128,15 @@ Goal::Co PathSubstitutionGoal::init() continue; } + Goals waitees; + /* To maintain the closure invariant, we first have to realise the paths referenced by this one. */ for (auto & i : info->references) if (i != storePath) /* ignore self-references */ - addWaitee(worker.makePathSubstitutionGoal(i)); + waitees.insert(worker.makePathSubstitutionGoal(i)); - if (!waitees.empty()) co_await Suspend{}; + co_await await(std::move(waitees)); // FIXME: consider returning boolean instead of passing in reference bool out = false; // is mutated by tryToRun @@ -175,8 +177,7 @@ Goal::Co PathSubstitutionGoal::tryToRun(StorePath subPath, nix::ref sub, if (i != storePath) /* ignore self-references */ assert(worker.store.isValidPath(i)); - worker.wakeUp(shared_from_this()); - co_await Suspend{}; + co_await yield(); trace("trying to run"); @@ -184,8 +185,7 @@ Goal::Co PathSubstitutionGoal::tryToRun(StorePath subPath, nix::ref sub, if maxSubstitutionJobs == 0, we still allow a substituter to run. This prevents infinite waiting. */ while (worker.getNrSubstitutions() >= std::max(1U, (unsigned int) settings.maxSubstitutionJobs)) { - worker.waitForBuildSlot(shared_from_this()); - co_await Suspend{}; + co_await waitForBuildSlot(); } auto maintainRunningSubstitutions = std::make_unique>(worker.runningSubstitutions);