mirror of
https://github.com/NixOS/nix
synced 2025-06-27 04:21:16 +02:00
Add "nix verify-paths" command
Unlike "nix-store --verify-path", this command verifies signatures in addition to store path contents, is multi-threaded (especially useful when verifying binary caches), and has a progress indicator. Example use: $ nix verify-paths --store https://cache.nixos.org -r $(type -p thunderbird) ... [17/132 checked] checking ‘/nix/store/rawakphadqrqxr6zri2rmnxh03gqkrl3-autogen-5.18.6’
This commit is contained in:
parent
0ebe69dc67
commit
784ee35c80
11 changed files with 432 additions and 2 deletions
81
src/libutil/thread-pool.cc
Normal file
81
src/libutil/thread-pool.cc
Normal file
|
@ -0,0 +1,81 @@
|
|||
#include "thread-pool.hh"
|
||||
|
||||
namespace nix {
|
||||
|
||||
ThreadPool::ThreadPool(size_t _nrThreads)
|
||||
: nrThreads(_nrThreads)
|
||||
{
|
||||
if (!nrThreads) {
|
||||
nrThreads = std::thread::hardware_concurrency();
|
||||
if (!nrThreads) nrThreads = 1;
|
||||
}
|
||||
}
|
||||
|
||||
void ThreadPool::enqueue(const work_t & t)
|
||||
{
|
||||
auto state_(state.lock());
|
||||
state_->left.push(t);
|
||||
wakeup.notify_one();
|
||||
}
|
||||
|
||||
void ThreadPool::process()
|
||||
{
|
||||
printMsg(lvlDebug, format("starting pool of %d threads") % nrThreads);
|
||||
|
||||
std::vector<std::thread> workers;
|
||||
|
||||
for (size_t n = 0; n < nrThreads; n++)
|
||||
workers.push_back(std::thread([&]() {
|
||||
bool first = true;
|
||||
|
||||
while (true) {
|
||||
work_t work;
|
||||
{
|
||||
auto state_(state.lock());
|
||||
if (state_->exception) return;
|
||||
if (!first) {
|
||||
assert(state_->pending);
|
||||
state_->pending--;
|
||||
}
|
||||
first = false;
|
||||
while (state_->left.empty()) {
|
||||
if (!state_->pending) {
|
||||
wakeup.notify_all();
|
||||
return;
|
||||
}
|
||||
if (state_->exception) return;
|
||||
state_.wait(wakeup);
|
||||
}
|
||||
work = state_->left.front();
|
||||
state_->left.pop();
|
||||
state_->pending++;
|
||||
}
|
||||
|
||||
try {
|
||||
work();
|
||||
} catch (std::exception & e) {
|
||||
auto state_(state.lock());
|
||||
if (state_->exception)
|
||||
printMsg(lvlError, format("error: %s") % e.what());
|
||||
else {
|
||||
state_->exception = std::current_exception();
|
||||
wakeup.notify_all();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}));
|
||||
|
||||
for (auto & thr : workers)
|
||||
thr.join();
|
||||
|
||||
{
|
||||
auto state_(state.lock());
|
||||
if (state_->exception)
|
||||
std::rethrow_exception(state_->exception);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
52
src/libutil/thread-pool.hh
Normal file
52
src/libutil/thread-pool.hh
Normal file
|
@ -0,0 +1,52 @@
|
|||
#pragma once
|
||||
|
||||
#include "sync.hh"
|
||||
#include "util.hh"
|
||||
|
||||
#include <queue>
|
||||
#include <functional>
|
||||
#include <thread>
|
||||
|
||||
namespace nix {
|
||||
|
||||
/* A simple thread pool that executes a queue of work items
|
||||
(lambdas). */
|
||||
class ThreadPool
|
||||
{
|
||||
public:
|
||||
|
||||
ThreadPool(size_t nrThreads = 0);
|
||||
|
||||
// FIXME: use std::packaged_task?
|
||||
typedef std::function<void()> work_t;
|
||||
|
||||
/* Enqueue a function to be executed by the thread pool. */
|
||||
void enqueue(const work_t & t);
|
||||
|
||||
/* Execute work items until the queue is empty. Note that work
|
||||
items are allowed to add new items to the queue; this is
|
||||
handled correctly. Queue processing stops prematurely if any
|
||||
work item throws an exception. This exception is propagated to
|
||||
the calling thread. If multiple work items throw an exception
|
||||
concurrently, only one item is propagated; the others are
|
||||
printed on stderr and otherwise ignored. */
|
||||
void process();
|
||||
|
||||
private:
|
||||
|
||||
size_t nrThreads;
|
||||
|
||||
struct State
|
||||
{
|
||||
std::queue<work_t> left;
|
||||
size_t pending = 0;
|
||||
std::exception_ptr exception;
|
||||
};
|
||||
|
||||
Sync<State> state;
|
||||
|
||||
std::condition_variable wakeup;
|
||||
|
||||
};
|
||||
|
||||
}
|
|
@ -548,7 +548,7 @@ void writeToStderr(const string & s)
|
|||
}
|
||||
|
||||
|
||||
void (*_writeToStderr) (const unsigned char * buf, size_t count) = 0;
|
||||
std::function<void(const unsigned char * buf, size_t count)> _writeToStderr;
|
||||
|
||||
|
||||
void readFull(int fd, unsigned char * buf, size_t count)
|
||||
|
|
|
@ -167,7 +167,7 @@ void warnOnce(bool & haveWarned, const FormatOrString & fs);
|
|||
|
||||
void writeToStderr(const string & s);
|
||||
|
||||
extern void (*_writeToStderr) (const unsigned char * buf, size_t count);
|
||||
extern std::function<void(const unsigned char * buf, size_t count)> _writeToStderr;
|
||||
|
||||
|
||||
/* Wrappers arount read()/write() that read/write exactly the
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue