From 36f3fb72e9126b0e9b9d4faa5ef2e25375305adc Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Mon, 19 Aug 2024 17:49:26 +0200 Subject: [PATCH 1/4] withFramedSink(): Don't use a thread to monitor the other side Since withFramedSink() is now used a lot more than in the past (for every addToStore() variant), we were creating a lot of threads, e.g. nix flake show --no-eval-cache --all-systems github:NixOS/nix/afdd12be5e19c0001ff3297dea544301108d298 would create 46418 threads. While threads on Linux are cheap, this is still substantial overhead. So instead, just poll from FramedSink before every write whether there are pending messages from the daemon. This could slightly increase the latency on log messages from the daemon, but not on exceptions (which were only synchronously checked from FramedSink anyway). This speeds up the command above from 19.2s to 17.5s on my machine (a 9% speedup). (cherry picked from commit 39daa4a0d3e7451680e070d0bff7998ec0aea787) --- src/libstore/remote-store-connection.hh | 2 +- src/libstore/remote-store.cc | 42 +++++----------------- src/libstore/worker-protocol-connection.cc | 15 +++++--- src/libstore/worker-protocol-connection.hh | 5 +-- src/libutil/serialise.cc | 20 +++++++++++ src/libutil/serialise.hh | 29 +++++++++------ 6 files changed, 61 insertions(+), 52 deletions(-) diff --git a/src/libstore/remote-store-connection.hh b/src/libstore/remote-store-connection.hh index 405120ee9..513bd6838 100644 --- a/src/libstore/remote-store-connection.hh +++ b/src/libstore/remote-store-connection.hh @@ -49,7 +49,7 @@ struct RemoteStore::ConnectionHandle RemoteStore::Connection & operator * () { return *handle; } RemoteStore::Connection * operator -> () { return &*handle; } - void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true); + void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true); void withFramedSink(std::function fun); }; diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 555936c18..69bbc64fc 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -153,9 +153,9 @@ RemoteStore::ConnectionHandle::~ConnectionHandle() } } -void RemoteStore::ConnectionHandle::processStderr(Sink * sink, Source * source, bool flush) +void RemoteStore::ConnectionHandle::processStderr(Sink * sink, Source * source, bool flush, bool block) { - handle->processStderr(&daemonException, sink, source, flush); + handle->processStderr(&daemonException, sink, source, flush, block); } @@ -926,43 +926,17 @@ void RemoteStore::ConnectionHandle::withFramedSink(std::functionto.flush(); - std::exception_ptr ex; - - /* Handle log messages / exceptions from the remote on a separate - thread. */ - std::thread stderrThread([&]() { - try { - ReceiveInterrupts receiveInterrupts; - processStderr(nullptr, nullptr, false); - } catch (...) { - ex = std::current_exception(); - } - }); - - Finally joinStderrThread([&]() - { - if (stderrThread.joinable()) { - stderrThread.join(); - if (ex) { - try { - std::rethrow_exception(ex); - } catch (...) { - ignoreException(); - } - } - } - }); - - { - FramedSink sink((*this)->to, ex); + FramedSink sink((*this)->to, [&]() { + /* Periodically process stderr messages and exceptions + from the daemon. */ + processStderr(nullptr, nullptr, false, false); + }); fun(sink); sink.flush(); } - stderrThread.join(); - if (ex) - std::rethrow_exception(ex); + processStderr(nullptr, nullptr, false); } } diff --git a/src/libstore/worker-protocol-connection.cc b/src/libstore/worker-protocol-connection.cc index a47dbb689..ae434c7f0 100644 --- a/src/libstore/worker-protocol-connection.cc +++ b/src/libstore/worker-protocol-connection.cc @@ -32,7 +32,8 @@ static Logger::Fields readFields(Source & from) return fields; } -std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink * sink, Source * source, bool flush) +std::exception_ptr +WorkerProto::BasicClientConnection::processStderrReturn(Sink * sink, Source * source, bool flush, bool block) { if (flush) to.flush(); @@ -41,6 +42,9 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink while (true) { + if (!block && !from.hasData()) + break; + auto msg = readNum(from); if (msg == STDERR_WRITE) { @@ -95,8 +99,10 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink logger->result(act, type, fields); } - else if (msg == STDERR_LAST) + else if (msg == STDERR_LAST) { + assert(block); break; + } else throw Error("got unknown message type %x from Nix daemon", msg); @@ -130,9 +136,10 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink } } -void WorkerProto::BasicClientConnection::processStderr(bool * daemonException, Sink * sink, Source * source, bool flush) +void WorkerProto::BasicClientConnection::processStderr( + bool * daemonException, Sink * sink, Source * source, bool flush, bool block) { - auto ex = processStderrReturn(sink, source, flush); + auto ex = processStderrReturn(sink, source, flush, block); if (ex) { *daemonException = true; std::rethrow_exception(ex); diff --git a/src/libstore/worker-protocol-connection.hh b/src/libstore/worker-protocol-connection.hh index 9c96195b5..9665067dd 100644 --- a/src/libstore/worker-protocol-connection.hh +++ b/src/libstore/worker-protocol-connection.hh @@ -70,9 +70,10 @@ struct WorkerProto::BasicClientConnection : WorkerProto::BasicConnection virtual void closeWrite() = 0; - std::exception_ptr processStderrReturn(Sink * sink = 0, Source * source = 0, bool flush = true); + std::exception_ptr processStderrReturn(Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true); - void processStderr(bool * daemonException, Sink * sink = 0, Source * source = 0, bool flush = true); + void + processStderr(bool * daemonException, Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true); /** * Establishes connection, negotiating version. diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc index 5352a436b..a7312c01c 100644 --- a/src/libutil/serialise.cc +++ b/src/libutil/serialise.cc @@ -10,6 +10,8 @@ #ifdef _WIN32 # include # include "windows-error.hh" +#else +# include #endif @@ -158,6 +160,24 @@ bool FdSource::good() } +bool FdSource::hasData() +{ + if (BufferedSource::hasData()) return true; + + while (true) { + struct pollfd fds[1]; + fds[0].fd = fd; + fds[0].events = POLLIN; + auto n = poll(fds, 1, 0); + if (n < 0) { + if (errno == EINTR) continue; + throw SysError("polling file descriptor"); + } + return n == 1 && (fds[0].events & POLLIN); + } +} + + size_t StringSource::read(char * data, size_t len) { if (pos == s.size()) throw EndOfFile("end of string reached"); diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh index e9f3e3a4a..026306665 100644 --- a/src/libutil/serialise.hh +++ b/src/libutil/serialise.hh @@ -104,6 +104,9 @@ struct BufferedSource : Source size_t read(char * data, size_t len) override; + /** + * Return true if the buffer is not empty. + */ bool hasData(); protected: @@ -162,6 +165,13 @@ struct FdSource : BufferedSource FdSource & operator=(FdSource && s) = default; bool good() override; + + /** + * Return true if the buffer is not empty after a non-blocking + * read. + */ + bool hasData(); + protected: size_t readUnbuffered(char * data, size_t len) override; private: @@ -522,15 +532,16 @@ struct FramedSource : Source /** * Write as chunks in the format expected by FramedSource. * - * The exception_ptr reference can be used to terminate the stream when you - * detect that an error has occurred on the remote end. + * The `checkError` function can be used to terminate the stream when you + * detect that an error has occurred. */ struct FramedSink : nix::BufferedSink { BufferedSink & to; - std::exception_ptr & ex; + std::function checkError; - FramedSink(BufferedSink & to, std::exception_ptr & ex) : to(to), ex(ex) + FramedSink(BufferedSink & to, std::function && checkError) + : to(to), checkError(checkError) { } ~FramedSink() @@ -545,13 +556,9 @@ struct FramedSink : nix::BufferedSink void writeUnbuffered(std::string_view data) override { - /* Don't send more data if the remote has - encountered an error. */ - if (ex) { - auto ex2 = ex; - ex = nullptr; - std::rethrow_exception(ex2); - } + /* Don't send more data if an error has occured. */ + checkError(); + to << data.size(); to(data); }; From 77a71c518fb50e05cf3652c630c904bd7564aaf6 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Wed, 21 Aug 2024 19:50:24 +0200 Subject: [PATCH 2/4] Update src/libutil/serialise.hh Co-authored-by: John Ericson (cherry picked from commit ce7cf4a2d32d3221eed50f087fe53f17f5c5ca12) --- src/libutil/serialise.hh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh index 026306665..4bb1a3e4b 100644 --- a/src/libutil/serialise.hh +++ b/src/libutil/serialise.hh @@ -533,7 +533,7 @@ struct FramedSource : Source * Write as chunks in the format expected by FramedSource. * * The `checkError` function can be used to terminate the stream when you - * detect that an error has occurred. + * detect that an error has occurred. It does so by throwing an exception. */ struct FramedSink : nix::BufferedSink { From a9020404cfc0b5e6f531d4ea8d1a10f25646783d Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Wed, 21 Aug 2024 20:54:02 +0200 Subject: [PATCH 3/4] select() -> poll() for Windows compat (cherry picked from commit 270c8469d712a232915e736eeecacca696f77a01) --- src/libutil/serialise.cc | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc index a7312c01c..8a57858f5 100644 --- a/src/libutil/serialise.cc +++ b/src/libutil/serialise.cc @@ -165,15 +165,20 @@ bool FdSource::hasData() if (BufferedSource::hasData()) return true; while (true) { - struct pollfd fds[1]; - fds[0].fd = fd; - fds[0].events = POLLIN; - auto n = poll(fds, 1, 0); + fd_set fds; + FD_ZERO(&fds); + FD_SET(fd, &fds); + + struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = 0; + + auto n = select(fd + 1, &fds, nullptr, nullptr, &timeout); if (n < 0) { if (errno == EINTR) continue; throw SysError("polling file descriptor"); } - return n == 1 && (fds[0].events & POLLIN); + return FD_ISSET(fd, &fds); } } From 931eb85f509ef9ffb62a768edd3ef17fee6cbc68 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Wed, 21 Aug 2024 21:08:26 +0200 Subject: [PATCH 4/4] Add FIXME (cherry picked from commit fac756fed4c0c5c4c78924ed8ecc900809f8c596) --- src/libstore/daemon.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/libstore/daemon.cc b/src/libstore/daemon.cc index ef3326cd6..6079eae7b 100644 --- a/src/libstore/daemon.cc +++ b/src/libstore/daemon.cc @@ -402,6 +402,9 @@ static void performOp(TunnelLogger * logger, ref store, logger->startWork(); auto pathInfo = [&]() { // NB: FramedSource must be out of scope before logger->stopWork(); + // FIXME: this means that if there is an error + // half-way through, the client will keep sending + // data, since we haven't sent it the error yet. auto [contentAddressMethod, hashAlgo] = ContentAddressMethod::parseWithAlgo(camStr); FramedSource source(conn.from); FileSerialisationMethod dumpMethod;