mirror of
https://github.com/NixOS/nix
synced 2025-06-28 01:11:15 +02:00
Eliminate the substituter mechanism
Substitution is now simply a Store -> Store copy operation, most typically from BinaryCacheStore to LocalStore.
This commit is contained in:
parent
21e9d183cc
commit
aa3bc3d5dc
16 changed files with 166 additions and 597 deletions
|
@ -8,11 +8,14 @@
|
|||
#include "archive.hh"
|
||||
#include "affinity.hh"
|
||||
#include "builtins.hh"
|
||||
#include "finally.hh"
|
||||
|
||||
#include <algorithm>
|
||||
#include <iostream>
|
||||
#include <map>
|
||||
#include <sstream>
|
||||
#include <thread>
|
||||
#include <future>
|
||||
|
||||
#include <limits.h>
|
||||
#include <time.h>
|
||||
|
@ -199,8 +202,6 @@ struct Child
|
|||
time_t timeStarted;
|
||||
};
|
||||
|
||||
typedef map<pid_t, Child> Children;
|
||||
|
||||
|
||||
/* The worker class. */
|
||||
class Worker
|
||||
|
@ -220,7 +221,7 @@ private:
|
|||
WeakGoals wantingToBuild;
|
||||
|
||||
/* Child processes currently running. */
|
||||
Children children;
|
||||
std::list<Child> children;
|
||||
|
||||
/* Number of build slots occupied. This includes local builds and
|
||||
substitutions but not remote builds via the build hook. */
|
||||
|
@ -278,14 +279,14 @@ public:
|
|||
|
||||
/* Registers a running child process. `inBuildSlot' means that
|
||||
the process counts towards the jobs limit. */
|
||||
void childStarted(GoalPtr goal, pid_t pid,
|
||||
const set<int> & fds, bool inBuildSlot, bool respectTimeouts);
|
||||
void childStarted(GoalPtr goal, const set<int> & fds,
|
||||
bool inBuildSlot, bool respectTimeouts);
|
||||
|
||||
/* Unregisters a running child process. `wakeSleepers' should be
|
||||
false if there is no sense in waking up goals that are sleeping
|
||||
because they can't run yet (e.g., there is no free build slot,
|
||||
or the hook would still say `postpone'). */
|
||||
void childTerminated(pid_t pid, bool wakeSleepers = true);
|
||||
void childTerminated(GoalPtr goal, bool wakeSleepers = true);
|
||||
|
||||
/* Put `goal' to sleep until a build slot becomes available (which
|
||||
might be right away). */
|
||||
|
@ -942,7 +943,7 @@ DerivationGoal::~DerivationGoal()
|
|||
void DerivationGoal::killChild()
|
||||
{
|
||||
if (pid != -1) {
|
||||
worker.childTerminated(pid);
|
||||
worker.childTerminated(shared_from_this());
|
||||
|
||||
if (buildUser.enabled()) {
|
||||
/* If we're using a build user, then there is a tricky
|
||||
|
@ -1403,22 +1404,14 @@ void DerivationGoal::buildDone()
|
|||
to have terminated. In fact, the builder could also have
|
||||
simply have closed its end of the pipe --- just don't do that
|
||||
:-) */
|
||||
int status;
|
||||
pid_t savedPid;
|
||||
if (hook) {
|
||||
savedPid = hook->pid;
|
||||
status = hook->pid.wait(true);
|
||||
} else {
|
||||
/* !!! this could block! security problem! solution: kill the
|
||||
child */
|
||||
savedPid = pid;
|
||||
status = pid.wait(true);
|
||||
}
|
||||
/* !!! this could block! security problem! solution: kill the
|
||||
child */
|
||||
int status = hook ? hook->pid.wait(true) : pid.wait(true);
|
||||
|
||||
debug(format("builder process for ‘%1%’ finished") % drvPath);
|
||||
|
||||
/* So the child is gone now. */
|
||||
worker.childTerminated(savedPid);
|
||||
worker.childTerminated(shared_from_this());
|
||||
|
||||
/* Close the read side of the logger pipe. */
|
||||
if (hook) {
|
||||
|
@ -1621,7 +1614,7 @@ HookReply DerivationGoal::tryBuildHook()
|
|||
set<int> fds;
|
||||
fds.insert(hook->fromHook.readSide);
|
||||
fds.insert(hook->builderOut.readSide);
|
||||
worker.childStarted(shared_from_this(), hook->pid, fds, false, false);
|
||||
worker.childStarted(shared_from_this(), fds, false, false);
|
||||
|
||||
return rpAccept;
|
||||
}
|
||||
|
@ -2155,7 +2148,7 @@ void DerivationGoal::startBuilder()
|
|||
/* parent */
|
||||
pid.setSeparatePG(true);
|
||||
builderOut.writeSide.close();
|
||||
worker.childStarted(shared_from_this(), pid,
|
||||
worker.childStarted(shared_from_this(),
|
||||
singleton<set<int> >(builderOut.readSide), true, true);
|
||||
|
||||
/* Check if setting up the build environment failed. */
|
||||
|
@ -3032,28 +3025,24 @@ private:
|
|||
Path storePath;
|
||||
|
||||
/* The remaining substituters. */
|
||||
Paths subs;
|
||||
std::list<ref<Store>> subs;
|
||||
|
||||
/* The current substituter. */
|
||||
Path sub;
|
||||
std::shared_ptr<Store> sub;
|
||||
|
||||
/* Whether any substituter can realise this path */
|
||||
/* Whether any substituter can realise this path. */
|
||||
bool hasSubstitute;
|
||||
|
||||
/* Path info returned by the substituter's query info operation. */
|
||||
SubstitutablePathInfo info;
|
||||
std::shared_ptr<const ValidPathInfo> info;
|
||||
|
||||
/* Pipe for the substituter's standard output. */
|
||||
Pipe outPipe;
|
||||
|
||||
/* Pipe for the substituter's standard error. */
|
||||
Pipe logPipe;
|
||||
/* The substituter thread. */
|
||||
std::thread thr;
|
||||
|
||||
/* The process ID of the builder. */
|
||||
Pid pid;
|
||||
|
||||
/* Lock on the store path. */
|
||||
std::shared_ptr<PathLocks> outputLock;
|
||||
std::promise<void> promise;
|
||||
|
||||
/* Whether to try to repair a valid path. */
|
||||
bool repair;
|
||||
|
@ -3069,7 +3058,7 @@ public:
|
|||
SubstitutionGoal(const Path & storePath, Worker & worker, bool repair = false);
|
||||
~SubstitutionGoal();
|
||||
|
||||
void timedOut();
|
||||
void timedOut() { abort(); };
|
||||
|
||||
string key()
|
||||
{
|
||||
|
@ -3110,18 +3099,14 @@ SubstitutionGoal::SubstitutionGoal(const Path & storePath, Worker & worker, bool
|
|||
|
||||
SubstitutionGoal::~SubstitutionGoal()
|
||||
{
|
||||
if (pid != -1) worker.childTerminated(pid);
|
||||
}
|
||||
|
||||
|
||||
void SubstitutionGoal::timedOut()
|
||||
{
|
||||
if (pid != -1) {
|
||||
pid_t savedPid = pid;
|
||||
pid.kill();
|
||||
worker.childTerminated(savedPid);
|
||||
try {
|
||||
if (thr.joinable()) {
|
||||
thr.join();
|
||||
worker.childTerminated(shared_from_this());
|
||||
}
|
||||
} catch (...) {
|
||||
ignoreException();
|
||||
}
|
||||
amDone(ecFailed);
|
||||
}
|
||||
|
||||
|
||||
|
@ -3146,7 +3131,7 @@ void SubstitutionGoal::init()
|
|||
if (settings.readOnlyMode)
|
||||
throw Error(format("cannot substitute path ‘%1%’ - no write access to the Nix store") % storePath);
|
||||
|
||||
subs = settings.substituters;
|
||||
subs = getDefaultSubstituters();
|
||||
|
||||
tryNext();
|
||||
}
|
||||
|
@ -3171,17 +3156,19 @@ void SubstitutionGoal::tryNext()
|
|||
sub = subs.front();
|
||||
subs.pop_front();
|
||||
|
||||
SubstitutablePathInfos infos;
|
||||
PathSet dummy(singleton<PathSet>(storePath));
|
||||
worker.store.querySubstitutablePathInfos(sub, dummy, infos);
|
||||
SubstitutablePathInfos::iterator k = infos.find(storePath);
|
||||
if (k == infos.end()) { tryNext(); return; }
|
||||
info = k->second;
|
||||
try {
|
||||
// FIXME: make async
|
||||
info = sub->queryPathInfo(storePath);
|
||||
} catch (InvalidPath &) {
|
||||
tryNext();
|
||||
return;
|
||||
}
|
||||
|
||||
hasSubstitute = true;
|
||||
|
||||
/* To maintain the closure invariant, we first have to realise the
|
||||
paths referenced by this one. */
|
||||
for (auto & i : info.references)
|
||||
for (auto & i : info->references)
|
||||
if (i != storePath) /* ignore self-references */
|
||||
addWaitee(worker.makeSubstitutionGoal(i));
|
||||
|
||||
|
@ -3202,7 +3189,7 @@ void SubstitutionGoal::referencesValid()
|
|||
return;
|
||||
}
|
||||
|
||||
for (auto & i : info.references)
|
||||
for (auto & i : info->references)
|
||||
if (i != storePath) /* ignore self-references */
|
||||
assert(worker.store.isValidPath(i));
|
||||
|
||||
|
@ -3224,70 +3211,30 @@ void SubstitutionGoal::tryToRun()
|
|||
return;
|
||||
}
|
||||
|
||||
/* Maybe a derivation goal has already locked this path
|
||||
(exceedingly unlikely, since it should have used a substitute
|
||||
first, but let's be defensive). */
|
||||
outputLock.reset(); // make sure this goal's lock is gone
|
||||
if (pathIsLockedByMe(storePath)) {
|
||||
debug(format("restarting substitution of ‘%1%’ because it's locked by another goal")
|
||||
% storePath);
|
||||
worker.waitForAnyGoal(shared_from_this());
|
||||
return; /* restart in the tryToRun() state when another goal finishes */
|
||||
}
|
||||
|
||||
/* Acquire a lock on the output path. */
|
||||
outputLock = std::make_shared<PathLocks>();
|
||||
if (!outputLock->lockPaths(singleton<PathSet>(storePath), "", false)) {
|
||||
worker.waitForAWhile(shared_from_this());
|
||||
return;
|
||||
}
|
||||
|
||||
/* Check again whether the path is invalid. */
|
||||
if (!repair && worker.store.isValidPath(storePath)) {
|
||||
debug(format("store path ‘%1%’ has become valid") % storePath);
|
||||
outputLock->setDeletion(true);
|
||||
amDone(ecSuccess);
|
||||
return;
|
||||
}
|
||||
|
||||
printMsg(lvlInfo, format("fetching path ‘%1%’...") % storePath);
|
||||
|
||||
outPipe.create();
|
||||
logPipe.create();
|
||||
|
||||
destPath = repair ? storePath + ".tmp" : storePath;
|
||||
promise = std::promise<void>();
|
||||
|
||||
/* Remove the (stale) output path if it exists. */
|
||||
deletePath(destPath);
|
||||
thr = std::thread([this]() {
|
||||
try {
|
||||
/* Wake up the worker loop when we're done. */
|
||||
Finally updateStats([this]() { outPipe.writeSide.close(); });
|
||||
|
||||
worker.store.setSubstituterEnv();
|
||||
StringSink sink;
|
||||
sub->exportPaths({storePath}, false, sink);
|
||||
|
||||
/* Fill in the arguments. */
|
||||
Strings args;
|
||||
args.push_back(baseNameOf(sub));
|
||||
args.push_back("--substitute");
|
||||
args.push_back(storePath);
|
||||
args.push_back(destPath);
|
||||
StringSource source(*sink.s);
|
||||
worker.store.importPaths(false, source, 0);
|
||||
|
||||
/* Fork the substitute program. */
|
||||
pid = startProcess([&]() {
|
||||
|
||||
commonChildInit(logPipe);
|
||||
|
||||
if (dup2(outPipe.writeSide, STDOUT_FILENO) == -1)
|
||||
throw SysError("cannot dup output pipe into stdout");
|
||||
|
||||
execv(sub.c_str(), stringsToCharPtrs(args).data());
|
||||
|
||||
throw SysError(format("executing ‘%1%’") % sub);
|
||||
promise.set_value();
|
||||
} catch (...) {
|
||||
promise.set_exception(std::current_exception());
|
||||
}
|
||||
});
|
||||
|
||||
pid.setSeparatePG(true);
|
||||
pid.setKillSignal(SIGTERM);
|
||||
outPipe.writeSide.close();
|
||||
logPipe.writeSide.close();
|
||||
worker.childStarted(shared_from_this(),
|
||||
pid, singleton<set<int> >(logPipe.readSide), true, true);
|
||||
worker.childStarted(shared_from_this(), {outPipe.readSide}, true, false);
|
||||
|
||||
state = &SubstitutionGoal::finished;
|
||||
}
|
||||
|
@ -3297,52 +3244,12 @@ void SubstitutionGoal::finished()
|
|||
{
|
||||
trace("substitute finished");
|
||||
|
||||
/* Since we got an EOF on the logger pipe, the substitute is
|
||||
presumed to have terminated. */
|
||||
pid_t savedPid = pid;
|
||||
int status = pid.wait(true);
|
||||
thr.join();
|
||||
worker.childTerminated(shared_from_this());
|
||||
|
||||
/* So the child is gone now. */
|
||||
worker.childTerminated(savedPid);
|
||||
|
||||
/* Close the read side of the logger pipe. */
|
||||
logPipe.readSide.close();
|
||||
|
||||
/* Get the hash info from stdout. */
|
||||
string dummy = readLine(outPipe.readSide);
|
||||
string expectedHashStr = statusOk(status) ? readLine(outPipe.readSide) : "";
|
||||
outPipe.readSide.close();
|
||||
|
||||
/* Check the exit status and the build result. */
|
||||
HashResult hash;
|
||||
try {
|
||||
|
||||
if (!statusOk(status))
|
||||
throw SubstError(format("fetching path ‘%1%’ %2%")
|
||||
% storePath % statusToString(status));
|
||||
|
||||
if (!pathExists(destPath))
|
||||
throw SubstError(format("substitute did not produce path ‘%1%’") % destPath);
|
||||
|
||||
hash = hashPath(htSHA256, destPath);
|
||||
|
||||
/* Verify the expected hash we got from the substituer. */
|
||||
if (expectedHashStr != "") {
|
||||
size_t n = expectedHashStr.find(':');
|
||||
if (n == string::npos)
|
||||
throw Error(format("bad hash from substituter: %1%") % expectedHashStr);
|
||||
HashType hashType = parseHashType(string(expectedHashStr, 0, n));
|
||||
if (hashType == htUnknown)
|
||||
throw Error(format("unknown hash algorithm in ‘%1%’") % expectedHashStr);
|
||||
Hash expectedHash = parseHash16or32(hashType, string(expectedHashStr, n + 1));
|
||||
Hash actualHash = hashType == htSHA256 ? hash.first : hashPath(hashType, destPath).first;
|
||||
if (expectedHash != actualHash)
|
||||
throw SubstError(format("hash mismatch in downloaded path ‘%1%’: expected %2%, got %3%")
|
||||
% storePath % printHash(expectedHash) % printHash(actualHash));
|
||||
}
|
||||
|
||||
} catch (SubstError & e) {
|
||||
|
||||
promise.get_future().get();
|
||||
} catch (Error & e) {
|
||||
printMsg(lvlInfo, e.msg());
|
||||
|
||||
/* Try the next substitute. */
|
||||
|
@ -3351,23 +3258,6 @@ void SubstitutionGoal::finished()
|
|||
return;
|
||||
}
|
||||
|
||||
if (repair) replaceValidPath(storePath, destPath);
|
||||
|
||||
canonicalisePathMetaData(storePath, -1);
|
||||
|
||||
worker.store.optimisePath(storePath); // FIXME: combine with hashPath()
|
||||
|
||||
ValidPathInfo info2;
|
||||
info2.path = storePath;
|
||||
info2.narHash = hash.first;
|
||||
info2.narSize = hash.second;
|
||||
info2.references = info.references;
|
||||
info2.deriver = info.deriver;
|
||||
worker.store.registerValidPath(info2);
|
||||
|
||||
outputLock->setDeletion(true);
|
||||
outputLock.reset();
|
||||
|
||||
worker.markContentsGood(storePath);
|
||||
|
||||
printMsg(lvlChatty,
|
||||
|
@ -3379,18 +3269,15 @@ void SubstitutionGoal::finished()
|
|||
|
||||
void SubstitutionGoal::handleChildOutput(int fd, const string & data)
|
||||
{
|
||||
assert(fd == logPipe.readSide);
|
||||
printMsg(lvlError, data); // FIXME
|
||||
}
|
||||
|
||||
|
||||
void SubstitutionGoal::handleEOF(int fd)
|
||||
{
|
||||
if (fd == logPipe.readSide) worker.wakeUp(shared_from_this());
|
||||
if (fd == outPipe.readSide) worker.wakeUp(shared_from_this());
|
||||
}
|
||||
|
||||
|
||||
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
|
||||
|
||||
|
@ -3506,9 +3393,8 @@ unsigned Worker::getNrLocalBuilds()
|
|||
}
|
||||
|
||||
|
||||
void Worker::childStarted(GoalPtr goal,
|
||||
pid_t pid, const set<int> & fds, bool inBuildSlot,
|
||||
bool respectTimeouts)
|
||||
void Worker::childStarted(GoalPtr goal, const set<int> & fds,
|
||||
bool inBuildSlot, bool respectTimeouts)
|
||||
{
|
||||
Child child;
|
||||
child.goal = goal;
|
||||
|
@ -3516,30 +3402,29 @@ void Worker::childStarted(GoalPtr goal,
|
|||
child.timeStarted = child.lastOutput = time(0);
|
||||
child.inBuildSlot = inBuildSlot;
|
||||
child.respectTimeouts = respectTimeouts;
|
||||
children[pid] = child;
|
||||
children.emplace_back(child);
|
||||
if (inBuildSlot) nrLocalBuilds++;
|
||||
}
|
||||
|
||||
|
||||
void Worker::childTerminated(pid_t pid, bool wakeSleepers)
|
||||
void Worker::childTerminated(GoalPtr goal, bool wakeSleepers)
|
||||
{
|
||||
assert(pid != -1); /* common mistake */
|
||||
|
||||
Children::iterator i = children.find(pid);
|
||||
auto i = std::find_if(children.begin(), children.end(),
|
||||
[&](const Child & child) { return child.goal.lock() == goal; });
|
||||
assert(i != children.end());
|
||||
|
||||
if (i->second.inBuildSlot) {
|
||||
if (i->inBuildSlot) {
|
||||
assert(nrLocalBuilds > 0);
|
||||
nrLocalBuilds--;
|
||||
}
|
||||
|
||||
children.erase(pid);
|
||||
children.erase(i);
|
||||
|
||||
if (wakeSleepers) {
|
||||
|
||||
/* Wake up goals waiting for a build slot. */
|
||||
for (auto & i : wantingToBuild) {
|
||||
GoalPtr goal = i.lock();
|
||||
for (auto & j : wantingToBuild) {
|
||||
GoalPtr goal = j.lock();
|
||||
if (goal) wakeUp(goal);
|
||||
}
|
||||
|
||||
|
@ -3641,11 +3526,11 @@ void Worker::waitForInput()
|
|||
assert(sizeof(time_t) >= sizeof(long));
|
||||
time_t nearest = LONG_MAX; // nearest deadline
|
||||
for (auto & i : children) {
|
||||
if (!i.second.respectTimeouts) continue;
|
||||
if (!i.respectTimeouts) continue;
|
||||
if (settings.maxSilentTime != 0)
|
||||
nearest = std::min(nearest, i.second.lastOutput + settings.maxSilentTime);
|
||||
nearest = std::min(nearest, i.lastOutput + settings.maxSilentTime);
|
||||
if (settings.buildTimeout != 0)
|
||||
nearest = std::min(nearest, i.second.timeStarted + settings.buildTimeout);
|
||||
nearest = std::min(nearest, i.timeStarted + settings.buildTimeout);
|
||||
}
|
||||
if (nearest != LONG_MAX) {
|
||||
timeout.tv_sec = std::max((time_t) 1, nearest - before);
|
||||
|
@ -3663,7 +3548,6 @@ void Worker::waitForInput()
|
|||
timeout.tv_sec = std::max((time_t) 1, (time_t) (lastWokenUp + settings.pollInterval - before));
|
||||
} else lastWokenUp = 0;
|
||||
|
||||
using namespace std;
|
||||
/* Use select() to wait for the input side of any logger pipe to
|
||||
become `available'. Note that `available' (i.e., non-blocking)
|
||||
includes EOF. */
|
||||
|
@ -3671,7 +3555,7 @@ void Worker::waitForInput()
|
|||
FD_ZERO(&fds);
|
||||
int fdMax = 0;
|
||||
for (auto & i : children) {
|
||||
for (auto & j : i.second.fds) {
|
||||
for (auto & j : i.fds) {
|
||||
FD_SET(j, &fds);
|
||||
if (j >= fdMax) fdMax = j + 1;
|
||||
}
|
||||
|
@ -3685,22 +3569,16 @@ void Worker::waitForInput()
|
|||
time_t after = time(0);
|
||||
|
||||
/* Process all available file descriptors. */
|
||||
decltype(children)::iterator i;
|
||||
for (auto j = children.begin(); j != children.end(); j = i) {
|
||||
i = std::next(j);
|
||||
|
||||
/* Since goals may be canceled from inside the loop below (causing
|
||||
them go be erased from the `children' map), we have to be
|
||||
careful that we don't keep iterators alive across calls to
|
||||
timedOut(). */
|
||||
set<pid_t> pids;
|
||||
for (auto & i : children) pids.insert(i.first);
|
||||
|
||||
for (auto & i : pids) {
|
||||
checkInterrupt();
|
||||
Children::iterator j = children.find(i);
|
||||
if (j == children.end()) continue; // child destroyed
|
||||
GoalPtr goal = j->second.goal.lock();
|
||||
|
||||
GoalPtr goal = j->goal.lock();
|
||||
assert(goal);
|
||||
|
||||
set<int> fds2(j->second.fds);
|
||||
set<int> fds2(j->fds);
|
||||
for (auto & k : fds2) {
|
||||
if (FD_ISSET(k, &fds)) {
|
||||
unsigned char buffer[4096];
|
||||
|
@ -3712,12 +3590,12 @@ void Worker::waitForInput()
|
|||
} else if (rd == 0) {
|
||||
debug(format("%1%: got EOF") % goal->getName());
|
||||
goal->handleEOF(k);
|
||||
j->second.fds.erase(k);
|
||||
j->fds.erase(k);
|
||||
} else {
|
||||
printMsg(lvlVomit, format("%1%: read %2% bytes")
|
||||
% goal->getName() % rd);
|
||||
string data((char *) buffer, rd);
|
||||
j->second.lastOutput = after;
|
||||
j->lastOutput = after;
|
||||
goal->handleChildOutput(k, data);
|
||||
}
|
||||
}
|
||||
|
@ -3725,8 +3603,8 @@ void Worker::waitForInput()
|
|||
|
||||
if (goal->getExitCode() == Goal::ecBusy &&
|
||||
settings.maxSilentTime != 0 &&
|
||||
j->second.respectTimeouts &&
|
||||
after - j->second.lastOutput >= (time_t) settings.maxSilentTime)
|
||||
j->respectTimeouts &&
|
||||
after - j->lastOutput >= (time_t) settings.maxSilentTime)
|
||||
{
|
||||
printMsg(lvlError,
|
||||
format("%1% timed out after %2% seconds of silence")
|
||||
|
@ -3736,8 +3614,8 @@ void Worker::waitForInput()
|
|||
|
||||
else if (goal->getExitCode() == Goal::ecBusy &&
|
||||
settings.buildTimeout != 0 &&
|
||||
j->second.respectTimeouts &&
|
||||
after - j->second.timeStarted >= (time_t) settings.buildTimeout)
|
||||
j->respectTimeouts &&
|
||||
after - j->timeStarted >= (time_t) settings.buildTimeout)
|
||||
{
|
||||
printMsg(lvlError,
|
||||
format("%1% timed out after %2% seconds")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue