mirror of
https://github.com/NixOS/nix
synced 2025-06-29 06:21:14 +02:00
withFramedSink(): Don't use a thread to monitor the other side
Since withFramedSink() is now used a lot more than in the past (for every addToStore() variant), we were creating a lot of threads, e.g. nix flake show --no-eval-cache --all-systems github:NixOS/nix/afdd12be5e19c0001ff3297dea544301108d298 would create 46418 threads. While threads on Linux are cheap, this is still substantial overhead. So instead, just poll from FramedSink before every write whether there are pending messages from the daemon. This could slightly increase the latency on log messages from the daemon, but not on exceptions (which were only synchronously checked from FramedSink anyway). This speeds up the command above from 19.2s to 17.5s on my machine (a 9% speedup).
This commit is contained in:
parent
b0a7edb5ab
commit
39daa4a0d3
6 changed files with 61 additions and 52 deletions
|
@ -49,7 +49,7 @@ struct RemoteStore::ConnectionHandle
|
|||
RemoteStore::Connection & operator * () { return *handle; }
|
||||
RemoteStore::Connection * operator -> () { return &*handle; }
|
||||
|
||||
void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true);
|
||||
void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true);
|
||||
|
||||
void withFramedSink(std::function<void(Sink & sink)> fun);
|
||||
};
|
||||
|
|
|
@ -153,9 +153,9 @@ RemoteStore::ConnectionHandle::~ConnectionHandle()
|
|||
}
|
||||
}
|
||||
|
||||
void RemoteStore::ConnectionHandle::processStderr(Sink * sink, Source * source, bool flush)
|
||||
void RemoteStore::ConnectionHandle::processStderr(Sink * sink, Source * source, bool flush, bool block)
|
||||
{
|
||||
handle->processStderr(&daemonException, sink, source, flush);
|
||||
handle->processStderr(&daemonException, sink, source, flush, block);
|
||||
}
|
||||
|
||||
|
||||
|
@ -926,43 +926,17 @@ void RemoteStore::ConnectionHandle::withFramedSink(std::function<void(Sink & sin
|
|||
{
|
||||
(*this)->to.flush();
|
||||
|
||||
std::exception_ptr ex;
|
||||
|
||||
/* Handle log messages / exceptions from the remote on a separate
|
||||
thread. */
|
||||
std::thread stderrThread([&]()
|
||||
{
|
||||
try {
|
||||
ReceiveInterrupts receiveInterrupts;
|
||||
processStderr(nullptr, nullptr, false);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
});
|
||||
|
||||
Finally joinStderrThread([&]()
|
||||
{
|
||||
if (stderrThread.joinable()) {
|
||||
stderrThread.join();
|
||||
if (ex) {
|
||||
try {
|
||||
std::rethrow_exception(ex);
|
||||
} catch (...) {
|
||||
ignoreException();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
{
|
||||
FramedSink sink((*this)->to, ex);
|
||||
FramedSink sink((*this)->to, [&]() {
|
||||
/* Periodically process stderr messages and exceptions
|
||||
from the daemon. */
|
||||
processStderr(nullptr, nullptr, false, false);
|
||||
});
|
||||
fun(sink);
|
||||
sink.flush();
|
||||
}
|
||||
|
||||
stderrThread.join();
|
||||
if (ex)
|
||||
std::rethrow_exception(ex);
|
||||
processStderr(nullptr, nullptr, false);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -32,7 +32,8 @@ static Logger::Fields readFields(Source & from)
|
|||
return fields;
|
||||
}
|
||||
|
||||
std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink * sink, Source * source, bool flush)
|
||||
std::exception_ptr
|
||||
WorkerProto::BasicClientConnection::processStderrReturn(Sink * sink, Source * source, bool flush, bool block)
|
||||
{
|
||||
if (flush)
|
||||
to.flush();
|
||||
|
@ -41,6 +42,9 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink
|
|||
|
||||
while (true) {
|
||||
|
||||
if (!block && !from.hasData())
|
||||
break;
|
||||
|
||||
auto msg = readNum<uint64_t>(from);
|
||||
|
||||
if (msg == STDERR_WRITE) {
|
||||
|
@ -95,8 +99,10 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink
|
|||
logger->result(act, type, fields);
|
||||
}
|
||||
|
||||
else if (msg == STDERR_LAST)
|
||||
else if (msg == STDERR_LAST) {
|
||||
assert(block);
|
||||
break;
|
||||
}
|
||||
|
||||
else
|
||||
throw Error("got unknown message type %x from Nix daemon", msg);
|
||||
|
@ -130,9 +136,10 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink
|
|||
}
|
||||
}
|
||||
|
||||
void WorkerProto::BasicClientConnection::processStderr(bool * daemonException, Sink * sink, Source * source, bool flush)
|
||||
void WorkerProto::BasicClientConnection::processStderr(
|
||||
bool * daemonException, Sink * sink, Source * source, bool flush, bool block)
|
||||
{
|
||||
auto ex = processStderrReturn(sink, source, flush);
|
||||
auto ex = processStderrReturn(sink, source, flush, block);
|
||||
if (ex) {
|
||||
*daemonException = true;
|
||||
std::rethrow_exception(ex);
|
||||
|
|
|
@ -70,9 +70,10 @@ struct WorkerProto::BasicClientConnection : WorkerProto::BasicConnection
|
|||
|
||||
virtual void closeWrite() = 0;
|
||||
|
||||
std::exception_ptr processStderrReturn(Sink * sink = 0, Source * source = 0, bool flush = true);
|
||||
std::exception_ptr processStderrReturn(Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true);
|
||||
|
||||
void processStderr(bool * daemonException, Sink * sink = 0, Source * source = 0, bool flush = true);
|
||||
void
|
||||
processStderr(bool * daemonException, Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true);
|
||||
|
||||
/**
|
||||
* Establishes connection, negotiating version.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue