1
0
Fork 0
mirror of https://github.com/NixOS/nix synced 2025-06-28 13:41:15 +02:00

Create worker_proto::{Read,Write}Conn

Pass this around instead of `Source &` and `Sink &` directly. This will
give us something to put the protocol version on once the time comes.

To do this ergonomically, we need to expose `RemoteStore::Connection`,
so do that too. Give it some more API docs while we are at it.
This commit is contained in:
John Ericson 2023-04-17 13:40:46 -04:00
parent 4e8b495ad7
commit 9f69b7dee9
16 changed files with 348 additions and 184 deletions

View file

@ -260,13 +260,13 @@ struct ClientSettings
}
};
static std::vector<DerivedPath> readDerivedPaths(Store & store, unsigned int clientVersion, Source & from)
static std::vector<DerivedPath> readDerivedPaths(Store & store, unsigned int clientVersion, WorkerProto::ReadConn conn)
{
std::vector<DerivedPath> reqs;
if (GET_PROTOCOL_MINOR(clientVersion) >= 30) {
reqs = WorkerProto::Serialise<std::vector<DerivedPath>>::read(store, from);
reqs = WorkerProto::Serialise<std::vector<DerivedPath>>::read(store, conn);
} else {
for (auto & s : readStrings<Strings>(from))
for (auto & s : readStrings<Strings>(conn.from))
reqs.push_back(parsePathWithOutputs(store, s).toDerivedPath());
}
return reqs;
@ -276,6 +276,9 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
TrustedFlag trusted, RecursiveFlag recursive, unsigned int clientVersion,
Source & from, BufferedSink & to, WorkerProto::Op op)
{
WorkerProto::ReadConn rconn { .from = from };
WorkerProto::WriteConn wconn { .to = to };
switch (op) {
case WorkerProto::Op::IsValidPath: {
@ -288,7 +291,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
}
case WorkerProto::Op::QueryValidPaths: {
auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, from);
auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
SubstituteFlag substitute = NoSubstitute;
if (GET_PROTOCOL_MINOR(clientVersion) >= 27) {
@ -301,7 +304,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
}
auto res = store->queryValidPaths(paths, substitute);
logger->stopWork();
WorkerProto::write(*store, to, res);
WorkerProto::write(*store, wconn, res);
break;
}
@ -317,11 +320,11 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
}
case WorkerProto::Op::QuerySubstitutablePaths: {
auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, from);
auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
logger->startWork();
auto res = store->querySubstitutablePaths(paths);
logger->stopWork();
WorkerProto::write(*store, to, res);
WorkerProto::write(*store, wconn, res);
break;
}
@ -350,7 +353,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
paths = store->queryValidDerivers(path);
else paths = store->queryDerivationOutputs(path);
logger->stopWork();
WorkerProto::write(*store, to, paths);
WorkerProto::write(*store, wconn, paths);
break;
}
@ -368,7 +371,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
logger->startWork();
auto outputs = store->queryPartialDerivationOutputMap(path);
logger->stopWork();
WorkerProto::write(*store, to, outputs);
WorkerProto::write(*store, wconn, outputs);
break;
}
@ -394,7 +397,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
if (GET_PROTOCOL_MINOR(clientVersion) >= 25) {
auto name = readString(from);
auto camStr = readString(from);
auto refs = WorkerProto::Serialise<StorePathSet>::read(*store, from);
auto refs = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
bool repairBool;
from >> repairBool;
auto repair = RepairFlag{repairBool};
@ -496,7 +499,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
case WorkerProto::Op::AddTextToStore: {
std::string suffix = readString(from);
std::string s = readString(from);
auto refs = WorkerProto::Serialise<StorePathSet>::read(*store, from);
auto refs = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
logger->startWork();
auto path = store->addTextToStore(suffix, s, refs, NoRepair);
logger->stopWork();
@ -528,7 +531,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
}
case WorkerProto::Op::BuildPaths: {
auto drvs = readDerivedPaths(*store, clientVersion, from);
auto drvs = readDerivedPaths(*store, clientVersion, rconn);
BuildMode mode = bmNormal;
if (GET_PROTOCOL_MINOR(clientVersion) >= 15) {
mode = (BuildMode) readInt(from);
@ -553,7 +556,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
}
case WorkerProto::Op::BuildPathsWithResults: {
auto drvs = readDerivedPaths(*store, clientVersion, from);
auto drvs = readDerivedPaths(*store, clientVersion, rconn);
BuildMode mode = bmNormal;
mode = (BuildMode) readInt(from);
@ -568,7 +571,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
auto results = store->buildPathsWithResults(drvs, mode);
logger->stopWork();
WorkerProto::write(*store, to, results);
WorkerProto::write(*store, wconn, results);
break;
}
@ -645,7 +648,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
DrvOutputs builtOutputs;
for (auto & [output, realisation] : res.builtOutputs)
builtOutputs.insert_or_assign(realisation.id, realisation);
WorkerProto::write(*store, to, builtOutputs);
WorkerProto::write(*store, wconn, builtOutputs);
}
break;
}
@ -710,7 +713,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
case WorkerProto::Op::CollectGarbage: {
GCOptions options;
options.action = (GCOptions::GCAction) readInt(from);
options.pathsToDelete = WorkerProto::Serialise<StorePathSet>::read(*store, from);
options.pathsToDelete = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
from >> options.ignoreLiveness >> options.maxFreed;
// obsolete fields
readInt(from);
@ -780,7 +783,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
else {
to << 1
<< (i->second.deriver ? store->printStorePath(*i->second.deriver) : "");
WorkerProto::write(*store, to, i->second.references);
WorkerProto::write(*store, wconn, i->second.references);
to << i->second.downloadSize
<< i->second.narSize;
}
@ -791,11 +794,11 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
SubstitutablePathInfos infos;
StorePathCAMap pathsMap = {};
if (GET_PROTOCOL_MINOR(clientVersion) < 22) {
auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, from);
auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
for (auto & path : paths)
pathsMap.emplace(path, std::nullopt);
} else
pathsMap = WorkerProto::Serialise<StorePathCAMap>::read(*store, from);
pathsMap = WorkerProto::Serialise<StorePathCAMap>::read(*store, rconn);
logger->startWork();
store->querySubstitutablePathInfos(pathsMap, infos);
logger->stopWork();
@ -803,7 +806,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
for (auto & i : infos) {
to << store->printStorePath(i.first)
<< (i.second.deriver ? store->printStorePath(*i.second.deriver) : "");
WorkerProto::write(*store, to, i.second.references);
WorkerProto::write(*store, wconn, i.second.references);
to << i.second.downloadSize << i.second.narSize;
}
break;
@ -813,7 +816,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
logger->startWork();
auto paths = store->queryAllValidPaths();
logger->stopWork();
WorkerProto::write(*store, to, paths);
WorkerProto::write(*store, wconn, paths);
break;
}
@ -885,7 +888,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
ValidPathInfo info { path, narHash };
if (deriver != "")
info.deriver = store->parseStorePath(deriver);
info.references = WorkerProto::Serialise<StorePathSet>::read(*store, from);
info.references = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
from >> info.registrationTime >> info.narSize >> info.ultimate;
info.sigs = readStrings<StringSet>(from);
info.ca = ContentAddress::parseOpt(readString(from));
@ -930,15 +933,15 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
}
case WorkerProto::Op::QueryMissing: {
auto targets = readDerivedPaths(*store, clientVersion, from);
auto targets = readDerivedPaths(*store, clientVersion, rconn);
logger->startWork();
StorePathSet willBuild, willSubstitute, unknown;
uint64_t downloadSize, narSize;
store->queryMissing(targets, willBuild, willSubstitute, unknown, downloadSize, narSize);
logger->stopWork();
WorkerProto::write(*store, to, willBuild);
WorkerProto::write(*store, to, willSubstitute);
WorkerProto::write(*store, to, unknown);
WorkerProto::write(*store, wconn, willBuild);
WorkerProto::write(*store, wconn, willSubstitute);
WorkerProto::write(*store, wconn, unknown);
to << downloadSize << narSize;
break;
}
@ -951,7 +954,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
store->registerDrvOutput(Realisation{
.id = outputId, .outPath = outputPath});
} else {
auto realisation = WorkerProto::Serialise<Realisation>::read(*store, from);
auto realisation = WorkerProto::Serialise<Realisation>::read(*store, rconn);
store->registerDrvOutput(realisation);
}
logger->stopWork();
@ -966,11 +969,11 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
if (GET_PROTOCOL_MINOR(clientVersion) < 31) {
std::set<StorePath> outPaths;
if (info) outPaths.insert(info->outPath);
WorkerProto::write(*store, to, outPaths);
WorkerProto::write(*store, wconn, outPaths);
} else {
std::set<Realisation> realisations;
if (info) realisations.insert(*info);
WorkerProto::write(*store, to, realisations);
WorkerProto::write(*store, wconn, realisations);
}
break;
}
@ -1050,7 +1053,8 @@ void processConnection(
auto temp = trusted
? store->isTrustedClient()
: std::optional { NotTrusted };
WorkerProto::write(*store, to, temp);
WorkerProto::WriteConn wconn { .to = to };
WorkerProto::write(*store, wconn, temp);
}
/* Send startup error messages to the client. */