1
0
Fork 0
mirror of https://github.com/NixOS/nix synced 2025-06-25 14:51:16 +02:00

Merge pull request #13006 from NixOS/mergify/bp/2.28-maintenance/pr-12538

libstore S3: fix progress bar and make file transfers interruptible (backport #12538)
This commit is contained in:
mergify[bot] 2025-04-11 22:54:15 +00:00 committed by GitHub
commit 011f6e06a2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 119 additions and 42 deletions

View file

@ -95,7 +95,7 @@ struct curlFileTransfer : public FileTransfer
: fileTransfer(fileTransfer) : fileTransfer(fileTransfer)
, request(request) , request(request)
, act(*logger, lvlTalkative, actFileTransfer, , act(*logger, lvlTalkative, actFileTransfer,
request.post ? "" : fmt(request.data ? "uploading '%s'" : "downloading '%s'", request.uri), fmt("%sing '%s'", request.verb(), request.uri),
{request.uri}, request.parentAct) {request.uri}, request.parentAct)
, callback(std::move(callback)) , callback(std::move(callback))
, finalSink([this](std::string_view data) { , finalSink([this](std::string_view data) {
@ -272,19 +272,11 @@ struct curlFileTransfer : public FileTransfer
return getInterrupted(); return getInterrupted();
} }
int silentProgressCallback(curl_off_t dltotal, curl_off_t dlnow)
{
return getInterrupted();
}
static int progressCallbackWrapper(void * userp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) static int progressCallbackWrapper(void * userp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
{ {
return ((TransferItem *) userp)->progressCallback(dltotal, dlnow); auto & item = *static_cast<TransferItem *>(userp);
} auto isUpload = bool(item.request.data);
return item.progressCallback(isUpload ? ultotal : dltotal, isUpload ? ulnow : dlnow);
static int silentProgressCallbackWrapper(void * userp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
{
return ((TransferItem *) userp)->silentProgressCallback(dltotal, dlnow);
} }
static int debugCallback(CURL * handle, curl_infotype type, char * data, size_t size, void * userptr) static int debugCallback(CURL * handle, curl_infotype type, char * data, size_t size, void * userptr)
@ -351,9 +343,6 @@ struct curlFileTransfer : public FileTransfer
curl_easy_setopt(req, CURLOPT_HEADERFUNCTION, TransferItem::headerCallbackWrapper); curl_easy_setopt(req, CURLOPT_HEADERFUNCTION, TransferItem::headerCallbackWrapper);
curl_easy_setopt(req, CURLOPT_HEADERDATA, this); curl_easy_setopt(req, CURLOPT_HEADERDATA, this);
if (request.post)
curl_easy_setopt(req, CURLOPT_XFERINFOFUNCTION, silentProgressCallbackWrapper);
else
curl_easy_setopt(req, CURLOPT_XFERINFOFUNCTION, progressCallbackWrapper); curl_easy_setopt(req, CURLOPT_XFERINFOFUNCTION, progressCallbackWrapper);
curl_easy_setopt(req, CURLOPT_XFERINFODATA, this); curl_easy_setopt(req, CURLOPT_XFERINFODATA, this);
curl_easy_setopt(req, CURLOPT_NOPROGRESS, 0); curl_easy_setopt(req, CURLOPT_NOPROGRESS, 0);
@ -447,7 +436,6 @@ struct curlFileTransfer : public FileTransfer
if (httpStatus == 304 && result.etag == "") if (httpStatus == 304 && result.etag == "")
result.etag = request.expectedETag; result.etag = request.expectedETag;
if (!request.post)
act.progress(result.bodySize, result.bodySize); act.progress(result.bodySize, result.bodySize);
done = true; done = true;
callback(std::move(result)); callback(std::move(result));
@ -789,10 +777,6 @@ struct curlFileTransfer : public FileTransfer
S3Helper s3Helper(profile, region, scheme, endpoint); S3Helper s3Helper(profile, region, scheme, endpoint);
Activity act(*logger, lvlTalkative, actFileTransfer,
fmt("downloading '%s'", request.uri),
{request.uri}, request.parentAct);
// FIXME: implement ETag // FIXME: implement ETag
auto s3Res = s3Helper.getObject(bucketName, key); auto s3Res = s3Helper.getObject(bucketName, key);
FileTransferResult res; FileTransferResult res;

View file

@ -77,7 +77,7 @@ struct FileTransferRequest
FileTransferRequest(std::string_view uri) FileTransferRequest(std::string_view uri)
: uri(uri), parentAct(getCurActivity()) { } : uri(uri), parentAct(getCurActivity()) { }
std::string verb() std::string verb() const
{ {
return data ? "upload" : "download"; return data ? "upload" : "download";
} }

View file

@ -160,7 +160,10 @@ ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(
S3Helper::FileTransferResult S3Helper::getObject( S3Helper::FileTransferResult S3Helper::getObject(
const std::string & bucketName, const std::string & key) const std::string & bucketName, const std::string & key)
{ {
debug("fetching 's3://%s/%s'...", bucketName, key); std::string uri = "s3://" + bucketName + "/" + key;
Activity act(*logger, lvlTalkative, actFileTransfer,
fmt("downloading '%s'", uri),
Logger::Fields{uri}, getCurActivity());
auto request = auto request =
Aws::S3::Model::GetObjectRequest() Aws::S3::Model::GetObjectRequest()
@ -171,6 +174,22 @@ S3Helper::FileTransferResult S3Helper::getObject(
return Aws::New<std::stringstream>("STRINGSTREAM"); return Aws::New<std::stringstream>("STRINGSTREAM");
}); });
size_t bytesDone = 0;
size_t bytesExpected = 0;
request.SetDataReceivedEventHandler([&](const Aws::Http::HttpRequest * req, Aws::Http::HttpResponse * resp, long long l) {
if (!bytesExpected && resp->HasHeader("Content-Length")) {
if (auto length = string2Int<size_t>(resp->GetHeader("Content-Length"))) {
bytesExpected = *length;
}
}
bytesDone += l;
act.progress(bytesDone, bytesExpected);
});
request.SetContinueRequestHandler([](const Aws::Http::HttpRequest*) {
return !isInterrupted();
});
FileTransferResult res; FileTransferResult res;
auto now1 = std::chrono::steady_clock::now(); auto now1 = std::chrono::steady_clock::now();
@ -180,6 +199,8 @@ S3Helper::FileTransferResult S3Helper::getObject(
auto result = checkAws(fmt("AWS error fetching '%s'", key), auto result = checkAws(fmt("AWS error fetching '%s'", key),
client->GetObject(request)); client->GetObject(request));
act.progress(result.GetContentLength(), result.GetContentLength());
res.data = decompress(result.GetContentEncoding(), res.data = decompress(result.GetContentEncoding(),
dynamic_cast<std::stringstream &>(result.GetBody()).str()); dynamic_cast<std::stringstream &>(result.GetBody()).str());
@ -307,11 +328,35 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
std::shared_ptr<TransferManager> transferManager; std::shared_ptr<TransferManager> transferManager;
std::once_flag transferManagerCreated; std::once_flag transferManagerCreated;
struct AsyncContext : public Aws::Client::AsyncCallerContext
{
mutable std::mutex mutex;
mutable std::condition_variable cv;
const Activity & act;
void notify() const
{
cv.notify_one();
}
void wait() const
{
std::unique_lock<std::mutex> lk(mutex);
cv.wait(lk);
}
AsyncContext(const Activity & act) : act(act) {}
};
void uploadFile(const std::string & path, void uploadFile(const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream, std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType, const std::string & mimeType,
const std::string & contentEncoding) const std::string & contentEncoding)
{ {
std::string uri = "s3://" + bucketName + "/" + path;
Activity act(*logger, lvlTalkative, actFileTransfer,
fmt("uploading '%s'", uri),
Logger::Fields{uri}, getCurActivity());
istream->seekg(0, istream->end); istream->seekg(0, istream->end);
auto size = istream->tellg(); auto size = istream->tellg();
istream->seekg(0, istream->beg); istream->seekg(0, istream->beg);
@ -330,16 +375,25 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
transferConfig.bufferSize = bufferSize; transferConfig.bufferSize = bufferSize;
transferConfig.uploadProgressCallback = transferConfig.uploadProgressCallback =
[](const TransferManager *transferManager, [](const TransferManager * transferManager,
const std::shared_ptr<const TransferHandle> const std::shared_ptr<const TransferHandle> & transferHandle)
&transferHandle)
{ {
//FIXME: find a way to properly abort the multipart upload. auto context = std::dynamic_pointer_cast<const AsyncContext>(transferHandle->GetContext());
//checkInterrupt(); size_t bytesDone = transferHandle->GetBytesTransferred();
debug("upload progress ('%s'): '%d' of '%d' bytes", size_t bytesTotal = transferHandle->GetBytesTotalSize();
transferHandle->GetKey(), try {
transferHandle->GetBytesTransferred(), checkInterrupt();
transferHandle->GetBytesTotalSize()); context->act.progress(bytesDone, bytesTotal);
} catch (...) {
context->notify();
}
};
transferConfig.transferStatusUpdatedCallback =
[](const TransferManager * transferManager,
const std::shared_ptr<const TransferHandle> & transferHandle)
{
auto context = std::dynamic_pointer_cast<const AsyncContext>(transferHandle->GetContext());
context->notify();
}; };
transferManager = TransferManager::Create(transferConfig); transferManager = TransferManager::Create(transferConfig);
@ -353,29 +407,51 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
if (contentEncoding != "") if (contentEncoding != "")
throw Error("setting a content encoding is not supported with S3 multi-part uploads"); throw Error("setting a content encoding is not supported with S3 multi-part uploads");
auto context = std::make_shared<AsyncContext>(act);
std::shared_ptr<TransferHandle> transferHandle = std::shared_ptr<TransferHandle> transferHandle =
transferManager->UploadFile( transferManager->UploadFile(
istream, bucketName, path, mimeType, istream, bucketName, path, mimeType,
Aws::Map<Aws::String, Aws::String>(), Aws::Map<Aws::String, Aws::String>(),
nullptr /*, contentEncoding */); context /*, contentEncoding */);
TransferStatus status = transferHandle->GetStatus();
while (status == TransferStatus::IN_PROGRESS || status == TransferStatus::NOT_STARTED) {
if (!isInterrupted()) {
context->wait();
} else {
transferHandle->Cancel();
transferHandle->WaitUntilFinished(); transferHandle->WaitUntilFinished();
}
status = transferHandle->GetStatus();
}
act.progress(transferHandle->GetBytesTransferred(), transferHandle->GetBytesTotalSize());
if (transferHandle->GetStatus() == TransferStatus::FAILED) if (status == TransferStatus::FAILED)
throw Error("AWS error: failed to upload 's3://%s/%s': %s", throw Error("AWS error: failed to upload 's3://%s/%s': %s",
bucketName, path, transferHandle->GetLastError().GetMessage()); bucketName, path, transferHandle->GetLastError().GetMessage());
if (transferHandle->GetStatus() != TransferStatus::COMPLETED) if (status != TransferStatus::COMPLETED)
throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state", throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state",
bucketName, path); bucketName, path);
} else { } else {
act.progress(0, size);
auto request = auto request =
Aws::S3::Model::PutObjectRequest() Aws::S3::Model::PutObjectRequest()
.WithBucket(bucketName) .WithBucket(bucketName)
.WithKey(path); .WithKey(path);
size_t bytesSent = 0;
request.SetDataSentEventHandler([&](const Aws::Http::HttpRequest * req, long long l) {
bytesSent += l;
act.progress(bytesSent, size);
});
request.SetContinueRequestHandler([](const Aws::Http::HttpRequest*) {
return !isInterrupted();
});
request.SetContentType(mimeType); request.SetContentType(mimeType);
if (contentEncoding != "") if (contentEncoding != "")
@ -385,6 +461,8 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
auto result = checkAws(fmt("AWS error uploading '%s'", path), auto result = checkAws(fmt("AWS error uploading '%s'", path),
s3Helper.client->PutObject(request)); s3Helper.client->PutObject(request));
act.progress(size, size);
} }
auto now2 = std::chrono::steady_clock::now(); auto now2 = std::chrono::steady_clock::now();

View file

@ -26,6 +26,11 @@ static inline bool getInterrupted();
*/ */
void setInterruptThrown(); void setInterruptThrown();
/**
* @note Does nothing on Windows
*/
static inline bool isInterrupted();
/** /**
* @note Does nothing on Windows * @note Does nothing on Windows
*/ */

View file

@ -85,17 +85,22 @@ static inline bool getInterrupted()
return unix::_isInterrupted; return unix::_isInterrupted;
} }
static inline bool isInterrupted()
{
using namespace unix;
return _isInterrupted || (interruptCheck && interruptCheck());
}
/** /**
* Throw `Interrupted` exception if the process has been interrupted. * Throw `Interrupted` exception if the process has been interrupted.
* *
* Call this in long-running loops and between slow operations to terminate * Call this in long-running loops and between slow operations to terminate
* them as needed. * them as needed.
*/ */
void inline checkInterrupt() inline void checkInterrupt()
{ {
using namespace unix; if (isInterrupted())
if (_isInterrupted || (interruptCheck && interruptCheck())) unix::_interrupted();
_interrupted();
} }
/** /**

View file

@ -22,7 +22,12 @@ inline void setInterruptThrown()
/* Do nothing for now */ /* Do nothing for now */
} }
void inline checkInterrupt() static inline bool isInterrupted()
{
/* Do nothing for now */
}
inline void checkInterrupt()
{ {
/* Do nothing for now */ /* Do nothing for now */
} }