1
0
Fork 0
mirror of https://github.com/NixOS/nix synced 2025-07-07 14:21:48 +02:00

Merge remote-tracking branch 'upstream/master' into trustless-remote-builder-simple

This commit is contained in:
John Ericson 2022-02-28 18:04:39 +00:00
commit c863e5f338
387 changed files with 21068 additions and 35287 deletions

View file

@ -4,7 +4,6 @@
#include "store-api.hh"
#include "path-with-outputs.hh"
#include "finally.hh"
#include "affinity.hh"
#include "archive.hh"
#include "derivations.hh"
#include "args.hh"
@ -70,7 +69,7 @@ struct TunnelLogger : public Logger
StringSink buf;
buf << STDERR_NEXT << (fs.s + "\n");
enqueueMsg(*buf.s);
enqueueMsg(buf.s);
}
void logEI(const ErrorInfo & ei) override
@ -82,7 +81,7 @@ struct TunnelLogger : public Logger
StringSink buf;
buf << STDERR_NEXT << oss.str();
enqueueMsg(*buf.s);
enqueueMsg(buf.s);
}
/* startWork() means that we're starting an operation for which we
@ -130,7 +129,7 @@ struct TunnelLogger : public Logger
StringSink buf;
buf << STDERR_START_ACTIVITY << act << lvl << type << s << fields << parent;
enqueueMsg(*buf.s);
enqueueMsg(buf.s);
}
void stopActivity(ActivityId act) override
@ -138,7 +137,7 @@ struct TunnelLogger : public Logger
if (GET_PROTOCOL_MINOR(clientVersion) < 20) return;
StringSink buf;
buf << STDERR_STOP_ACTIVITY << act;
enqueueMsg(*buf.s);
enqueueMsg(buf.s);
}
void result(ActivityId act, ResultType type, const Fields & fields) override
@ -146,7 +145,7 @@ struct TunnelLogger : public Logger
if (GET_PROTOCOL_MINOR(clientVersion) < 20) return;
StringSink buf;
buf << STDERR_RESULT << act << type << fields;
enqueueMsg(*buf.s);
enqueueMsg(buf.s);
}
};
@ -230,11 +229,12 @@ struct ClientSettings
else if (name == settings.experimentalFeatures.name) {
// We dont want to forward the experimental features to
// the daemon, as that could cause some pretty weird stuff
if (tokenizeString<Strings>(value) != settings.experimentalFeatures.get())
if (parseFeatures(tokenizeString<StringSet>(value)) != settings.experimentalFeatures.get())
debug("Ignoring the client-specified experimental features");
}
else if (trusted
|| name == settings.buildTimeout.name
|| name == settings.buildRepeat.name
|| name == "connect-timeout"
|| (name == "builders" && value == ""))
settings.set(name, value);
@ -402,9 +402,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
return store->queryPathInfo(path);
},
[&](FixedOutputHashMethod & fohm) {
if (!refs.empty())
throw UnimplementedError("cannot yet have refs with flat or nar-hashed data");
auto path = store->addToStoreFromDump(source, name, fohm.fileIngestionMethod, fohm.hashType, repair);
auto path = store->addToStoreFromDump(source, name, fohm.fileIngestionMethod, fohm.hashType, repair, refs);
return store->queryPathInfo(path);
},
}, contentAddressMethod);
@ -432,25 +430,30 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
hashAlgo = parseHashType(hashAlgoRaw);
}
StringSink saved;
TeeSource savedNARSource(from, saved);
RetrieveRegularNARSink savedRegular { saved };
if (method == FileIngestionMethod::Recursive) {
/* Get the entire NAR dump from the client and save it to
a string so that we can pass it to
addToStoreFromDump(). */
ParseSink sink; /* null sink; just parse the NAR */
parseDump(sink, savedNARSource);
} else
parseDump(savedRegular, from);
auto dumpSource = sinkToSource([&](Sink & saved) {
if (method == FileIngestionMethod::Recursive) {
/* We parse the NAR dump through into `saved` unmodified,
so why all this extra work? We still parse the NAR so
that we aren't sending arbitrary data to `saved`
unwittingly`, and we know when the NAR ends so we don't
consume the rest of `from` and can't parse another
command. (We don't trust `addToStoreFromDump` to not
eagerly consume the entire stream it's given, past the
length of the Nar. */
TeeSource savedNARSource(from, saved);
ParseSink sink; /* null sink; just parse the NAR */
parseDump(sink, savedNARSource);
} else {
/* Incrementally parse the NAR file, stripping the
metadata, and streaming the sole file we expect into
`saved`. */
RetrieveRegularNARSink savedRegular { saved };
parseDump(savedRegular, from);
if (!savedRegular.regular) throw Error("regular file expected");
}
});
logger->startWork();
if (!savedRegular.regular) throw Error("regular file expected");
// FIXME: try to stream directly from `from`.
StringSource dumpSource { *saved.s };
auto path = store->addToStoreFromDump(dumpSource, baseName, method, hashAlgo);
auto path = store->addToStoreFromDump(*dumpSource, baseName, method, hashAlgo);
logger->stopWork();
to << store->printStorePath(path);
@ -465,17 +468,19 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
dontCheckSigs = false;
logger->startWork();
FramedSource source(from);
store->addMultipleToStore(source,
RepairFlag{repair},
dontCheckSigs ? NoCheckSigs : CheckSigs);
{
FramedSource source(from);
store->addMultipleToStore(source,
RepairFlag{repair},
dontCheckSigs ? NoCheckSigs : CheckSigs);
}
logger->stopWork();
break;
}
case wopAddTextToStore: {
string suffix = readString(from);
string s = readString(from);
std::string suffix = readString(from);
std::string s = readString(from);
auto refs = worker_proto::read(*store, from, Phantom<StorePathSet> {});
logger->startWork();
auto path = store->addTextToStore(suffix, s, refs, NoRepair);
@ -624,9 +629,9 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break;
}
// Obsolete.
case wopSyncWithGC: {
logger->startWork();
store->syncWithGC();
logger->stopWork();
to << 1;
break;
@ -693,8 +698,8 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
if (GET_PROTOCOL_MINOR(clientVersion) >= 12) {
unsigned int n = readInt(from);
for (unsigned int i = 0; i < n; i++) {
string name = readString(from);
string value = readString(from);
auto name = readString(from);
auto value = readString(from);
clientSettings.overrides.emplace(name, value);
}
}
@ -849,14 +854,14 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
else {
std::unique_ptr<Source> source;
StringSink saved;
if (GET_PROTOCOL_MINOR(clientVersion) >= 21)
source = std::make_unique<TunnelSource>(from, to);
else {
StringSink saved;
TeeSource tee { from, saved };
ParseSink ether;
parseDump(ether, tee);
source = std::make_unique<StringSource>(std::move(*saved.s));
source = std::make_unique<StringSource>(saved.s);
}
logger->startWork();
@ -917,6 +922,22 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
break;
}
case wopAddBuildLog: {
StorePath path{readString(from)};
logger->startWork();
if (!trusted)
throw Error("you are not privileged to add logs");
{
FramedSource source(from);
StringSink sink;
source.drainInto(sink);
store->addBuildLog(path, sink.s);
}
logger->stopWork();
to << 1;
break;
}
default:
throw Error("invalid operation %1%", op);
}
@ -952,15 +973,19 @@ void processConnection(
Finally finally([&]() {
_isInterrupted = false;
prevLogger->log(lvlDebug, fmt("%d operations", opCount));
printMsgUsing(prevLogger, lvlDebug, "%d operations", opCount);
});
if (GET_PROTOCOL_MINOR(clientVersion) >= 14 && readInt(from)) {
auto affinity = readInt(from);
setAffinityTo(affinity);
// Obsolete CPU affinity.
readInt(from);
}
readInt(from); // obsolete reserveSpace
if (GET_PROTOCOL_MINOR(clientVersion) >= 11)
readInt(from); // obsolete reserveSpace
if (GET_PROTOCOL_MINOR(clientVersion) >= 33)
to << nixVersion;
/* Send startup error messages to the client. */
tunnelLogger->startWork();
@ -985,6 +1010,8 @@ void processConnection(
break;
}
printMsgUsing(prevLogger, lvlDebug, "received daemon op %d", op);
opCount++;
debug("performing daemon worker op: %d", op);