diff --git a/src/libstore/daemon.cc b/src/libstore/daemon.cc index fc14eda3c..6533b2f58 100644 --- a/src/libstore/daemon.cc +++ b/src/libstore/daemon.cc @@ -269,26 +269,21 @@ struct ClientSettings }; static void performOp(TunnelLogger * logger, ref store, - TrustedFlag trusted, RecursiveFlag recursive, WorkerProto::Version clientVersion, - Source & from, BufferedSink & to, WorkerProto::Op op) + TrustedFlag trusted, RecursiveFlag recursive, + WorkerProto::BasicServerConnection & conn, + WorkerProto::Op op) { - WorkerProto::ReadConn rconn { - .from = from, - .version = clientVersion, - }; - WorkerProto::WriteConn wconn { - .to = to, - .version = clientVersion, - }; + WorkerProto::ReadConn rconn(conn); + WorkerProto::WriteConn wconn(conn); switch (op) { case WorkerProto::Op::IsValidPath: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); bool result = store->isValidPath(path); logger->stopWork(); - to << result; + conn.to << result; break; } @@ -296,8 +291,8 @@ static void performOp(TunnelLogger * logger, ref store, auto paths = WorkerProto::Serialise::read(*store, rconn); SubstituteFlag substitute = NoSubstitute; - if (GET_PROTOCOL_MINOR(clientVersion) >= 27) { - substitute = readInt(from) ? Substitute : NoSubstitute; + if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 27) { + substitute = readInt(conn.from) ? Substitute : NoSubstitute; } logger->startWork(); @@ -311,13 +306,13 @@ static void performOp(TunnelLogger * logger, ref store, } case WorkerProto::Op::HasSubstitutes: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); StorePathSet paths; // FIXME paths.insert(path); auto res = store->querySubstitutablePaths(paths); logger->stopWork(); - to << (res.count(path) != 0); + conn.to << (res.count(path) != 0); break; } @@ -331,11 +326,11 @@ static void performOp(TunnelLogger * logger, ref store, } case WorkerProto::Op::QueryPathHash: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); auto hash = store->queryPathInfo(path)->narHash; logger->stopWork(); - to << hash.to_string(HashFormat::Base16, false); + conn.to << hash.to_string(HashFormat::Base16, false); break; } @@ -343,7 +338,7 @@ static void performOp(TunnelLogger * logger, ref store, case WorkerProto::Op::QueryReferrers: case WorkerProto::Op::QueryValidDerivers: case WorkerProto::Op::QueryDerivationOutputs: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); StorePathSet paths; if (op == WorkerProto::Op::QueryReferences) @@ -360,16 +355,16 @@ static void performOp(TunnelLogger * logger, ref store, } case WorkerProto::Op::QueryDerivationOutputNames: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); auto names = store->readDerivation(path).outputNames(); logger->stopWork(); - to << names; + conn.to << names; break; } case WorkerProto::Op::QueryDerivationOutputMap: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); auto outputs = store->queryPartialDerivationOutputMap(path); logger->stopWork(); @@ -378,37 +373,37 @@ static void performOp(TunnelLogger * logger, ref store, } case WorkerProto::Op::QueryDeriver: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); auto info = store->queryPathInfo(path); logger->stopWork(); - to << (info->deriver ? store->printStorePath(*info->deriver) : ""); + conn.to << (info->deriver ? store->printStorePath(*info->deriver) : ""); break; } case WorkerProto::Op::QueryPathFromHashPart: { - auto hashPart = readString(from); + auto hashPart = readString(conn.from); logger->startWork(); auto path = store->queryPathFromHashPart(hashPart); logger->stopWork(); - to << (path ? store->printStorePath(*path) : ""); + conn.to << (path ? store->printStorePath(*path) : ""); break; } case WorkerProto::Op::AddToStore: { - if (GET_PROTOCOL_MINOR(clientVersion) >= 25) { - auto name = readString(from); - auto camStr = readString(from); + if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 25) { + auto name = readString(conn.from); + auto camStr = readString(conn.from); auto refs = WorkerProto::Serialise::read(*store, rconn); bool repairBool; - from >> repairBool; + conn.from >> repairBool; auto repair = RepairFlag{repairBool}; logger->startWork(); auto pathInfo = [&]() { // NB: FramedSource must be out of scope before logger->stopWork(); auto [contentAddressMethod, hashAlgo] = ContentAddressMethod::parseWithAlgo(camStr); - FramedSource source(from); + FramedSource source(conn.from); FileSerialisationMethod dumpMethod; switch (contentAddressMethod.getFileIngestionMethod()) { case FileIngestionMethod::Flat: @@ -439,7 +434,7 @@ static void performOp(TunnelLogger * logger, ref store, bool fixed; uint8_t recursive; std::string hashAlgoRaw; - from >> baseName >> fixed /* obsolete */ >> recursive >> hashAlgoRaw; + conn.from >> baseName >> fixed /* obsolete */ >> recursive >> hashAlgoRaw; if (recursive > true) throw Error("unsupported FileIngestionMethod with value of %i; you may need to upgrade nix-daemon", recursive); method = recursive @@ -459,11 +454,11 @@ static void performOp(TunnelLogger * logger, ref store, so why all this extra work? We still parse the NAR so that we aren't sending arbitrary data to `saved` unwittingly`, and we know when the NAR ends so we don't - consume the rest of `from` and can't parse another + consume the rest of `conn.from` and can't parse another command. (We don't trust `addToStoreFromDump` to not eagerly consume the entire stream it's given, past the length of the Nar. */ - TeeSource savedNARSource(from, saved); + TeeSource savedNARSource(conn.from, saved); NullFileSystemObjectSink sink; /* just parse the NAR */ parseDump(sink, savedNARSource); }); @@ -472,20 +467,20 @@ static void performOp(TunnelLogger * logger, ref store, *dumpSource, baseName, FileSerialisationMethod::NixArchive, method, hashAlgo); logger->stopWork(); - to << store->printStorePath(path); + conn.to << store->printStorePath(path); } break; } case WorkerProto::Op::AddMultipleToStore: { bool repair, dontCheckSigs; - from >> repair >> dontCheckSigs; + conn.from >> repair >> dontCheckSigs; if (!trusted && dontCheckSigs) dontCheckSigs = false; logger->startWork(); { - FramedSource source(from); + FramedSource source(conn.from); store->addMultipleToStore(source, RepairFlag{repair}, dontCheckSigs ? NoCheckSigs : CheckSigs); @@ -495,8 +490,8 @@ static void performOp(TunnelLogger * logger, ref store, } case WorkerProto::Op::AddTextToStore: { - std::string suffix = readString(from); - std::string s = readString(from); + std::string suffix = readString(conn.from); + std::string s = readString(conn.from); auto refs = WorkerProto::Serialise::read(*store, rconn); logger->startWork(); auto path = ({ @@ -504,37 +499,37 @@ static void performOp(TunnelLogger * logger, ref store, store->addToStoreFromDump(source, suffix, FileSerialisationMethod::Flat, ContentAddressMethod::Raw::Text, HashAlgorithm::SHA256, refs, NoRepair); }); logger->stopWork(); - to << store->printStorePath(path); + conn.to << store->printStorePath(path); break; } case WorkerProto::Op::ExportPath: { - auto path = store->parseStorePath(readString(from)); - readInt(from); // obsolete + auto path = store->parseStorePath(readString(conn.from)); + readInt(conn.from); // obsolete logger->startWork(); - TunnelSink sink(to); + TunnelSink sink(conn.to); store->exportPath(path, sink); logger->stopWork(); - to << 1; + conn.to << 1; break; } case WorkerProto::Op::ImportPaths: { logger->startWork(); - TunnelSource source(from, to); + TunnelSource source(conn.from, conn.to); auto paths = store->importPaths(source, trusted ? NoCheckSigs : CheckSigs); logger->stopWork(); Strings paths2; for (auto & i : paths) paths2.push_back(store->printStorePath(i)); - to << paths2; + conn.to << paths2; break; } case WorkerProto::Op::BuildPaths: { auto drvs = WorkerProto::Serialise::read(*store, rconn); BuildMode mode = bmNormal; - if (GET_PROTOCOL_MINOR(clientVersion) >= 15) { + if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 15) { mode = WorkerProto::Serialise::read(*store, rconn); /* Repairing is not atomic, so disallowed for "untrusted" @@ -552,7 +547,7 @@ static void performOp(TunnelLogger * logger, ref store, logger->startWork(); store->buildPaths(drvs, mode); logger->stopWork(); - to << 1; + conn.to << 1; break; } @@ -578,7 +573,7 @@ static void performOp(TunnelLogger * logger, ref store, } case WorkerProto::Op::BuildDerivation: { - auto drvPath = store->parseStorePath(readString(from)); + auto drvPath = store->parseStorePath(readString(conn.from)); BasicDerivation drv; /* * Note: unlike wopEnsurePath, this operation reads a @@ -589,7 +584,7 @@ static void performOp(TunnelLogger * logger, ref store, * it cannot be trusted that its outPath was calculated * correctly. */ - readDerivation(from, *store, drv, Derivation::nameFromPath(drvPath)); + readDerivation(conn.from, *store, drv, Derivation::nameFromPath(drvPath)); auto buildMode = WorkerProto::Serialise::read(*store, rconn); logger->startWork(); @@ -655,20 +650,20 @@ static void performOp(TunnelLogger * logger, ref store, } case WorkerProto::Op::EnsurePath: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); store->ensurePath(path); logger->stopWork(); - to << 1; + conn.to << 1; break; } case WorkerProto::Op::AddTempRoot: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); store->addTempRoot(path); logger->stopWork(); - to << 1; + conn.to << 1; break; } @@ -678,24 +673,24 @@ static void performOp(TunnelLogger * logger, ref store, "you are not privileged to create perm roots\n\n" "hint: you can just do this client-side without special privileges, and probably want to do that instead."); auto storePath = WorkerProto::Serialise::read(*store, rconn); - Path gcRoot = absPath(readString(from)); + Path gcRoot = absPath(readString(conn.from)); logger->startWork(); auto & localFSStore = require(*store); localFSStore.addPermRoot(storePath, gcRoot); logger->stopWork(); - to << gcRoot; + conn.to << gcRoot; break; } case WorkerProto::Op::AddIndirectRoot: { - Path path = absPath(readString(from)); + Path path = absPath(readString(conn.from)); logger->startWork(); auto & indirectRootStore = require(*store); indirectRootStore.addIndirectRoot(path); logger->stopWork(); - to << 1; + conn.to << 1; break; } @@ -703,7 +698,7 @@ static void performOp(TunnelLogger * logger, ref store, case WorkerProto::Op::SyncWithGC: { logger->startWork(); logger->stopWork(); - to << 1; + conn.to << 1; break; } @@ -717,24 +712,24 @@ static void performOp(TunnelLogger * logger, ref store, for (auto & i : roots) size += i.second.size(); - to << size; + conn.to << size; for (auto & [target, links] : roots) for (auto & link : links) - to << link << store->printStorePath(target); + conn.to << link << store->printStorePath(target); break; } case WorkerProto::Op::CollectGarbage: { GCOptions options; - options.action = (GCOptions::GCAction) readInt(from); + options.action = (GCOptions::GCAction) readInt(conn.from); options.pathsToDelete = WorkerProto::Serialise::read(*store, rconn); - from >> options.ignoreLiveness >> options.maxFreed; + conn.from >> options.ignoreLiveness >> options.maxFreed; // obsolete fields - readInt(from); - readInt(from); - readInt(from); + readInt(conn.from); + readInt(conn.from); + readInt(conn.from); GCResults results; @@ -745,7 +740,7 @@ static void performOp(TunnelLogger * logger, ref store, gcStore.collectGarbage(options, results); logger->stopWork(); - to << results.paths << results.bytesFreed << 0 /* obsolete */; + conn.to << results.paths << results.bytesFreed << 0 /* obsolete */; break; } @@ -754,24 +749,24 @@ static void performOp(TunnelLogger * logger, ref store, ClientSettings clientSettings; - clientSettings.keepFailed = readInt(from); - clientSettings.keepGoing = readInt(from); - clientSettings.tryFallback = readInt(from); - clientSettings.verbosity = (Verbosity) readInt(from); - clientSettings.maxBuildJobs = readInt(from); - clientSettings.maxSilentTime = readInt(from); - readInt(from); // obsolete useBuildHook - clientSettings.verboseBuild = lvlError == (Verbosity) readInt(from); - readInt(from); // obsolete logType - readInt(from); // obsolete printBuildTrace - clientSettings.buildCores = readInt(from); - clientSettings.useSubstitutes = readInt(from); + clientSettings.keepFailed = readInt(conn.from); + clientSettings.keepGoing = readInt(conn.from); + clientSettings.tryFallback = readInt(conn.from); + clientSettings.verbosity = (Verbosity) readInt(conn.from); + clientSettings.maxBuildJobs = readInt(conn.from); + clientSettings.maxSilentTime = readInt(conn.from); + readInt(conn.from); // obsolete useBuildHook + clientSettings.verboseBuild = lvlError == (Verbosity) readInt(conn.from); + readInt(conn.from); // obsolete logType + readInt(conn.from); // obsolete printBuildTrace + clientSettings.buildCores = readInt(conn.from); + clientSettings.useSubstitutes = readInt(conn.from); - if (GET_PROTOCOL_MINOR(clientVersion) >= 12) { - unsigned int n = readInt(from); + if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 12) { + unsigned int n = readInt(conn.from); for (unsigned int i = 0; i < n; i++) { - auto name = readString(from); - auto value = readString(from); + auto name = readString(conn.from); + auto value = readString(conn.from); clientSettings.overrides.emplace(name, value); } } @@ -788,20 +783,20 @@ static void performOp(TunnelLogger * logger, ref store, } case WorkerProto::Op::QuerySubstitutablePathInfo: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); SubstitutablePathInfos infos; store->querySubstitutablePathInfos({{path, std::nullopt}}, infos); logger->stopWork(); auto i = infos.find(path); if (i == infos.end()) - to << 0; + conn.to << 0; else { - to << 1 + conn.to << 1 << (i->second.deriver ? store->printStorePath(*i->second.deriver) : ""); WorkerProto::write(*store, wconn, i->second.references); - to << i->second.downloadSize - << i->second.narSize; + conn.to << i->second.downloadSize + << i->second.narSize; } break; } @@ -809,7 +804,7 @@ static void performOp(TunnelLogger * logger, ref store, case WorkerProto::Op::QuerySubstitutablePathInfos: { SubstitutablePathInfos infos; StorePathCAMap pathsMap = {}; - if (GET_PROTOCOL_MINOR(clientVersion) < 22) { + if (GET_PROTOCOL_MINOR(conn.protoVersion) < 22) { auto paths = WorkerProto::Serialise::read(*store, rconn); for (auto & path : paths) pathsMap.emplace(path, std::nullopt); @@ -818,12 +813,12 @@ static void performOp(TunnelLogger * logger, ref store, logger->startWork(); store->querySubstitutablePathInfos(pathsMap, infos); logger->stopWork(); - to << infos.size(); + conn.to << infos.size(); for (auto & i : infos) { - to << store->printStorePath(i.first) - << (i.second.deriver ? store->printStorePath(*i.second.deriver) : ""); + conn.to << store->printStorePath(i.first) + << (i.second.deriver ? store->printStorePath(*i.second.deriver) : ""); WorkerProto::write(*store, wconn, i.second.references); - to << i.second.downloadSize << i.second.narSize; + conn.to << i.second.downloadSize << i.second.narSize; } break; } @@ -837,22 +832,22 @@ static void performOp(TunnelLogger * logger, ref store, } case WorkerProto::Op::QueryPathInfo: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); std::shared_ptr info; logger->startWork(); try { info = store->queryPathInfo(path); } catch (InvalidPath &) { - if (GET_PROTOCOL_MINOR(clientVersion) < 17) throw; + if (GET_PROTOCOL_MINOR(conn.protoVersion) < 17) throw; } logger->stopWork(); if (info) { - if (GET_PROTOCOL_MINOR(clientVersion) >= 17) - to << 1; + if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 17) + conn.to << 1; WorkerProto::write(*store, wconn, static_cast(*info)); } else { - assert(GET_PROTOCOL_MINOR(clientVersion) >= 17); - to << 0; + assert(GET_PROTOCOL_MINOR(conn.protoVersion) >= 17); + conn.to << 0; } break; } @@ -861,61 +856,61 @@ static void performOp(TunnelLogger * logger, ref store, logger->startWork(); store->optimiseStore(); logger->stopWork(); - to << 1; + conn.to << 1; break; case WorkerProto::Op::VerifyStore: { bool checkContents, repair; - from >> checkContents >> repair; + conn.from >> checkContents >> repair; logger->startWork(); if (repair && !trusted) throw Error("you are not privileged to repair paths"); bool errors = store->verifyStore(checkContents, (RepairFlag) repair); logger->stopWork(); - to << errors; + conn.to << errors; break; } case WorkerProto::Op::AddSignatures: { - auto path = store->parseStorePath(readString(from)); - StringSet sigs = readStrings(from); + auto path = store->parseStorePath(readString(conn.from)); + StringSet sigs = readStrings(conn.from); logger->startWork(); store->addSignatures(path, sigs); logger->stopWork(); - to << 1; + conn.to << 1; break; } case WorkerProto::Op::NarFromPath: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); logger->stopWork(); - dumpPath(store->toRealPath(path), to); + dumpPath(store->toRealPath(path), conn.to); break; } case WorkerProto::Op::AddToStoreNar: { bool repair, dontCheckSigs; - auto path = store->parseStorePath(readString(from)); - auto deriver = readString(from); - auto narHash = Hash::parseAny(readString(from), HashAlgorithm::SHA256); + auto path = store->parseStorePath(readString(conn.from)); + auto deriver = readString(conn.from); + auto narHash = Hash::parseAny(readString(conn.from), HashAlgorithm::SHA256); ValidPathInfo info { path, narHash }; if (deriver != "") info.deriver = store->parseStorePath(deriver); info.references = WorkerProto::Serialise::read(*store, rconn); - from >> info.registrationTime >> info.narSize >> info.ultimate; - info.sigs = readStrings(from); - info.ca = ContentAddress::parseOpt(readString(from)); - from >> repair >> dontCheckSigs; + conn.from >> info.registrationTime >> info.narSize >> info.ultimate; + info.sigs = readStrings(conn.from); + info.ca = ContentAddress::parseOpt(readString(conn.from)); + conn.from >> repair >> dontCheckSigs; if (!trusted && dontCheckSigs) dontCheckSigs = false; if (!trusted) info.ultimate = false; - if (GET_PROTOCOL_MINOR(clientVersion) >= 23) { + if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 23) { logger->startWork(); { - FramedSource source(from); + FramedSource source(conn.from); store->addToStore(info, source, (RepairFlag) repair, dontCheckSigs ? NoCheckSigs : CheckSigs); } @@ -925,10 +920,10 @@ static void performOp(TunnelLogger * logger, ref store, else { std::unique_ptr source; StringSink saved; - if (GET_PROTOCOL_MINOR(clientVersion) >= 21) - source = std::make_unique(from, to); + if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 21) + source = std::make_unique(conn.from, conn.to); else { - TeeSource tee { from, saved }; + TeeSource tee { conn.from, saved }; NullFileSystemObjectSink ether; parseDump(ether, tee); source = std::make_unique(saved.s); @@ -956,15 +951,15 @@ static void performOp(TunnelLogger * logger, ref store, WorkerProto::write(*store, wconn, willBuild); WorkerProto::write(*store, wconn, willSubstitute); WorkerProto::write(*store, wconn, unknown); - to << downloadSize << narSize; + conn.to << downloadSize << narSize; break; } case WorkerProto::Op::RegisterDrvOutput: { logger->startWork(); - if (GET_PROTOCOL_MINOR(clientVersion) < 31) { - auto outputId = DrvOutput::parse(readString(from)); - auto outputPath = StorePath(readString(from)); + if (GET_PROTOCOL_MINOR(conn.protoVersion) < 31) { + auto outputId = DrvOutput::parse(readString(conn.from)); + auto outputPath = StorePath(readString(conn.from)); store->registerDrvOutput(Realisation{ .id = outputId, .outPath = outputPath}); } else { @@ -977,10 +972,10 @@ static void performOp(TunnelLogger * logger, ref store, case WorkerProto::Op::QueryRealisation: { logger->startWork(); - auto outputId = DrvOutput::parse(readString(from)); + auto outputId = DrvOutput::parse(readString(conn.from)); auto info = store->queryRealisation(outputId); logger->stopWork(); - if (GET_PROTOCOL_MINOR(clientVersion) < 31) { + if (GET_PROTOCOL_MINOR(conn.protoVersion) < 31) { std::set outPaths; if (info) outPaths.insert(info->outPath); WorkerProto::write(*store, wconn, outPaths); @@ -993,19 +988,19 @@ static void performOp(TunnelLogger * logger, ref store, } case WorkerProto::Op::AddBuildLog: { - StorePath path{readString(from)}; + StorePath path{readString(conn.from)}; logger->startWork(); if (!trusted) throw Error("you are not privileged to add logs"); auto & logStore = require(*store); { - FramedSource source(from); + FramedSource source(conn.from); StringSink sink; source.drainInto(sink); logStore.addBuildLog(path, sink.s); } logger->stopWork(); - to << 1; + conn.to << 1; break; } @@ -1090,7 +1085,7 @@ void processConnection( debug("performing daemon worker op: %d", op); try { - performOp(tunnelLogger, store, trusted, recursive, clientVersion, conn.from, conn.to, op); + performOp(tunnelLogger, store, trusted, recursive, conn, op); } catch (Error & e) { /* If we're not in a state where we can send replies, then something went wrong processing the input of the