From 54d2d1a03e82513460f9f2eb0b49d7af5aa0a291 Mon Sep 17 00:00:00 2001 From: Xiuli Wei Date: Mon, 11 Aug 2025 07:29:52 +0000 Subject: [PATCH 1/9] feat:Support multi-threaded asynchronous data upload to object storage. --- .../hive/storage_adapters/s3fs/S3Config.h | 41 +++++++ .../storage_adapters/s3fs/S3FileSystem.cpp | 20 +++ .../hive/storage_adapters/s3fs/S3FileSystem.h | 48 ++++++++ .../storage_adapters/s3fs/S3WriteFile.cpp | 115 ++++++++++++++++-- .../s3fs/tests/S3FileSystemTest.cpp | 86 ++++++++++++- 5 files changed, 300 insertions(+), 10 deletions(-) diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3Config.h b/velox/connectors/hive/storage_adapters/s3fs/S3Config.h index 4fad4379925..d3dc79beae2 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3Config.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3Config.h @@ -76,6 +76,10 @@ class S3Config { kRetryMode, kUseProxyFromEnv, kCredentialsProvider, + KS3UploadPartAsync, + kS3PartUploadSize, + KS3WriteFileSemaphoreNum, + KS3UploadThreads, kEnd }; @@ -114,6 +118,14 @@ class S3Config { std::make_pair("use-proxy-from-env", "false")}, {Keys::kCredentialsProvider, std::make_pair("aws-credentials-provider", std::nullopt)}, + {Keys::KS3UploadPartAsync, + std::make_pair("uploadPartAsync","false")}, + {Keys::kS3PartUploadSize, + std::make_pair("part-upload-size", std::nullopt)}, + {Keys::KS3WriteFileSemaphoreNum, + std::make_pair("writeFileSemaphoreNum", std::nullopt)}, + {Keys::KS3UploadThreads, + std::make_pair("uploadThreads", std::nullopt)}, }; return config; } @@ -243,6 +255,35 @@ class S3Config { return config_.find(Keys::kCredentialsProvider)->second; } + bool uploadPartAsync() const { + auto value = config_.find(Keys::KS3UploadPartAsync)->second.value(); + return folly::to(value); + } + + std::optional partUploadSize() const { + auto val = config_.find(Keys::kS3PartUploadSize)->second; + if (val.has_value()) { + return folly::to(val.value()); + } + return std::optional(); + } + + std::optional writeFileSemaphoreNum() const { + auto val = config_.find(Keys::KS3WriteFileSemaphoreNum)->second; + if (val.has_value()) { + return folly::to(val.value()); + } + return std::optional(); + } + + std::optional uploadThreads() const { + auto val = config_.find(Keys::KS3UploadThreads)->second; + if (val.has_value()) { + return folly::to(val.value()); + } + return std::optional(); + } + private: std::unordered_map> config_; std::string payloadSigningPolicy_; diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp index cdc456ee94b..9c84fe8d3f6 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp @@ -16,6 +16,8 @@ #include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h" #include "velox/common/base/StatsReporter.h" +#include +#include #include "velox/common/config/Config.h" #include "velox/common/file/File.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3Config.h" @@ -227,6 +229,12 @@ void registerCredentialsProvider( }); } +bool S3FileSystem::uploadPartAsyncEnabled = false; +size_t S3FileSystem::kPartUploadSize = 10485760; +size_t S3FileSystem::writeFileSemaphore = 4; +std::shared_ptr S3FileSystem::uploadThreadPool_ = + nullptr; + class S3FileSystem::Impl { public: Impl(const S3Config& s3Config) { @@ -294,6 +302,18 @@ class S3FileSystem::Impl { auto credentialsProvider = getCredentialsProvider(s3Config); + S3FileSystem::setUploadPartAsyncEnabled(s3Config.uploadPartAsync()); + S3FileSystem::setPartUploadSize( + s3Config.partUploadSize().value_or(10485760)); + + S3FileSystem::setWriteFileSemaphoreNum( + s3Config.writeFileSemaphoreNum().value_or(4)); + + auto threadPoolSize = s3Config.uploadThreads().value_or(16); + S3FileSystem::setUploadThreadPool( + std::make_shared(threadPoolSize)); + LOG(INFO) << "partUploadSize : " << S3FileSystem::getPartUploadSize(); + client_ = std::make_shared( credentialsProvider, nullptr /* endpointProvider */, clientConfig); ++fileSystemCount; diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h index f121223f0f3..9a68057038a 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h @@ -16,6 +16,7 @@ #pragma once +#include #include "velox/common/file/FileSystems.h" namespace Aws::Auth { @@ -87,9 +88,56 @@ class S3FileSystem : public FileSystem { std::string getLogPrefix() const; + static bool isUploadPartAsyncEnabled() { + return uploadPartAsyncEnabled; + } + + static void setUploadPartAsyncEnabled(bool enabled) { + uploadPartAsyncEnabled = enabled; + } + static size_t getPartUploadSize() { + return kPartUploadSize; + } + + static void setPartUploadSize(size_t partUploadSize) { + VELOX_USER_CHECK_GT( + partUploadSize, + 0, + "Invalid configuration: 'hive.s3.part-upload-size' must be greater than 0."); + kPartUploadSize = partUploadSize; + } + static size_t getWriteFileSemaphoreNum() { + return writeFileSemaphore; + } + + static void setWriteFileSemaphoreNum(size_t value) { + VELOX_USER_CHECK_GT( + value, + 0, + "Invalid configuration: 'hive.s3.writeFileSemaphoreNum' must be greater than 0."); + writeFileSemaphore = value; + } + static std::shared_ptr getUploadThreadPool() { + return uploadThreadPool_; + } + static void setUploadThreadPool( + std::shared_ptr threadPool) { + VELOX_USER_CHECK( + threadPool != nullptr && threadPool.get() != nullptr && + threadPool->numThreads() > 0, + "Invalid configuration: 'hive.s3.uploadThreads' must be greater than 0."); + uploadThreadPool_ = threadPool; + } + protected: class Impl; std::shared_ptr impl_; + + private: + static bool uploadPartAsyncEnabled; + static size_t kPartUploadSize; + static size_t writeFileSemaphore; + static std::shared_ptr uploadThreadPool_; }; } // namespace facebook::velox::filesystems diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp index fcccfe240ab..921ddb0500a 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp @@ -15,6 +15,8 @@ */ #include "velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h" +#include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h" +#include #include "velox/common/base/StatsReporter.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3Counters.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h" @@ -44,7 +46,7 @@ class S3WriteFile::Impl { VELOX_CHECK_NOT_NULL(pool); getBucketAndKeyFromPath(path, bucket_, key_); currentPart_ = std::make_unique>(*pool_); - currentPart_->reserve(kPartUploadSize); + currentPart_->reserve(S3FileSystem::getPartUploadSize()); // Check that the object doesn't exist, if it does throw an error. { Aws::S3::Model::HeadObjectRequest request; @@ -106,7 +108,7 @@ class S3WriteFile::Impl { // Appends data to the end of the file. void append(std::string_view data) { VELOX_CHECK(!closed(), "File is closed"); - if (data.size() + currentPart_->size() >= kPartUploadSize) { + if (data.size() + currentPart_->size() >= S3FileSystem::getPartUploadSize()) { upload(data); } else { // Append to current part. @@ -120,7 +122,7 @@ class S3WriteFile::Impl { VELOX_CHECK(!closed(), "File is closed"); /// currentPartSize must be less than kPartUploadSize since /// append() would have already flushed after reaching kUploadPartSize. - VELOX_CHECK_LT(currentPart_->size(), kPartUploadSize); + VELOX_CHECK_LT(currentPart_->size(), S3FileSystem::getPartUploadSize()); } // Complete the multipart upload and close the file. @@ -130,6 +132,20 @@ class S3WriteFile::Impl { } RECORD_METRIC_VALUE(kMetricS3StartedUploads); uploadPart({currentPart_->data(), currentPart_->size()}, true); + if(S3FileSystem::isUploadPartAsyncEnabled()){ + if (futures.size() > 0) { + folly::collectAll(std::move(futures)).get(); + } + // The list of parts should be in ascending order. + std::sort( + uploadState_.completedParts.begin(), + uploadState_.completedParts.end(), + [](const Aws::S3::Model::CompletedPart& a, + const Aws::S3::Model::CompletedPart& b) { + return a.GetPartNumber() < b.GetPartNumber(); + }); + } + VELOX_CHECK_EQ(uploadState_.partNumber, uploadState_.completedParts.size()); // Complete the multipart upload. { @@ -163,7 +179,6 @@ class S3WriteFile::Impl { } private: - static constexpr int64_t kPartUploadSize = 10 * 1024 * 1024; static constexpr const char* kApplicationOctetStream = "application/octet-stream"; @@ -178,6 +193,10 @@ class S3WriteFile::Impl { Aws::String id; }; UploadState uploadState_; + std::mutex uploadStateMutex_; + std::vector> futures; + folly::ThrottledLifoSem semaphore{ + static_cast(S3FileSystem::getWriteFileSemaphoreNum())}; // Data can be smaller or larger than the kPartUploadSize. // Complete the currentPart_ and upload kPartUploadSize chunks of data. @@ -191,18 +210,26 @@ class S3WriteFile::Impl { uploadPart({currentPart_->data(), currentPart_->size()}); dataPtr += remainingBufferSize; dataSize -= remainingBufferSize; - while (dataSize > kPartUploadSize) { - uploadPart({dataPtr, kPartUploadSize}); - dataPtr += kPartUploadSize; - dataSize -= kPartUploadSize; + while (dataSize > S3FileSystem::getPartUploadSize()) { + uploadPart({dataPtr, S3FileSystem::getPartUploadSize()}); + dataPtr += S3FileSystem::getPartUploadSize(); + dataSize -= S3FileSystem::getPartUploadSize(); } // Stash the remaining at the beginning of currentPart. currentPart_->unsafeAppend(0, dataPtr, dataSize); } void uploadPart(const std::string_view part, bool isLast = false) { + if (S3FileSystem::isUploadPartAsyncEnabled()) { + uploadPartAsync(part, isLast); + } else { + uploadPartV1(part, isLast); + } + } + + void uploadPartV1(const std::string_view part, bool isLast = false) { // Only the last part can be less than kPartUploadSize. - VELOX_CHECK(isLast || (!isLast && (part.size() == kPartUploadSize))); + VELOX_CHECK(isLast || (!isLast && (part.size() == S3FileSystem::getPartUploadSize()))); // Upload the part. { Aws::S3::Model::UploadPartRequest request; @@ -234,6 +261,76 @@ class S3WriteFile::Impl { uploadState_.completedParts.push_back(std::move(part)); } } + void uploadPartAsync(const std::string_view part, const bool isLast = false) { + VELOX_CHECK( + isLast || + (!isLast && (part.size() == S3FileSystem::getPartUploadSize()))); + LOG(INFO) << "uploadPartAsync semaphore is " << semaphore.valueGuess(); + auto const startTime0 = std::chrono::high_resolution_clock::now(); + semaphore.wait(); + const int64_t partNumber = ++uploadState_.partNumber; + auto duration0 = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - startTime0) + .count(); + LOG(INFO) << "LOG_Part " << partNumber << " semaphore in " << duration0 + << " ms."; + auto const startTime1 = std::chrono::high_resolution_clock::now(); + auto const partLength = part.size(); + + std::shared_ptr partStr = + std::make_shared(part.data(), part.size()); + auto duration = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - startTime1) + .count(); + LOG(INFO) << "LOG_Part " << partNumber << " partCopy in " << duration + << " ms."; + futures.emplace_back( + folly::via( + S3FileSystem::getUploadThreadPool().get(), + [this, partNumber, partStr, partLength, startTime1]() { + SCOPE_EXIT { + semaphore.post(); + }; + try { + Aws::S3::Model::UploadPartRequest request; + request.SetBucket(bucket_); + request.SetKey(key_); + request.SetUploadId(uploadState_.id); + request.SetPartNumber(partNumber); + request.SetContentLength(partLength); + + request.SetBody(std::make_shared( + partStr->c_str(), partLength)); + + auto outcome = client_->UploadPart(request); + VELOX_CHECK_AWS_OUTCOME( + outcome, "Failed to upload", bucket_, key_); + + auto result = outcome.GetResult(); + Aws::S3::Model::CompletedPart completedPart; + completedPart.SetPartNumber(partNumber); + completedPart.SetETag(result.GetETag()); + + // Use a mutex to ensure thread safety for completedParts update + { + std::lock_guard lock(uploadStateMutex_); + uploadState_.completedParts.push_back( + std::move(completedPart)); + } + auto duration = + std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - startTime1) + .count(); + + LOG(INFO) << "LOG_Part " << partNumber << " uploaded in " + << duration << " ms."; + } catch (const std::exception& e) { + LOG(ERROR) << "Exception during async upload: " << e.what(); + } catch (...) { + LOG(ERROR) << "Unknown exception during async upload."; + } + })); + } Aws::S3::S3Client* client_; memory::MemoryPool* pool_; diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp index 3fea62365bb..b543f9c0c40 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp @@ -240,13 +240,97 @@ TEST_F(S3FileSystemTest, mkdirAndRename) { ASSERT_FALSE(s3fs.exists(s3File)); } +TEST_F(S3FileSystemTest, writeFileAsync) { + const auto bucketName = "writedata"; + const auto file = "test.txt"; + const auto filename = localPath(bucketName) + "/" + file; + const auto s3File = s3URI(bucketName, file); + + auto hiveConfig = + minioServer_->hiveConfig({{"hive.s3.uploadPartAsync", "true"}}); + filesystems::S3FileSystem s3fs(bucketName, hiveConfig); + + auto pool = memory::memoryManager()->addLeafPool("S3FileSystemTest"); + auto writeFile = + s3fs.openFileForWrite(s3File, {{}, pool.get(), std::nullopt}); + auto s3WriteFile = dynamic_cast(writeFile.get()); + std::string dataContent = + "Dance me to your beauty with a burning violin" + "Dance me through the panic till I'm gathered safely in" + "Lift me like an olive branch and be my homeward dove" + "Dance me to the end of love"; + + EXPECT_EQ(writeFile->size(), 0); + std::int64_t contentSize = dataContent.length(); + // dataContent length is 178. + EXPECT_EQ(contentSize, 178); + + // Append and flush a small batch of data. + writeFile->append(dataContent.substr(0, 10)); + EXPECT_EQ(writeFile->size(), 10); + writeFile->append(dataContent.substr(10, contentSize - 10)); + EXPECT_EQ(writeFile->size(), contentSize); + writeFile->flush(); + // No parts must have been uploaded. + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 0); + + // Append data 178 * 100'000 ~ 16MiB. + // Should have 1 part in total with kUploadPartSize = 10MiB. + for (int i = 0; i < 100'000; ++i) { + writeFile->append(dataContent); + } + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 1); + EXPECT_EQ(writeFile->size(), 100'001 * contentSize); + + // Append a large data buffer 178 * 150'000 ~ 25MiB (2 parts). + std::vector largeBuffer(contentSize * 150'000); + for (int i = 0; i < 150'000; ++i) { + memcpy( + largeBuffer.data() + (i * contentSize), + dataContent.data(), + contentSize); + } + + writeFile->append({largeBuffer.data(), largeBuffer.size()}); + EXPECT_EQ(writeFile->size(), 250'001 * contentSize); + // Total data = ~41 MB = 5 parts. + // But parts uploaded will be 4. + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 4); + + // Upload the last part. + writeFile->close(); + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 5); + + VELOX_ASSERT_THROW( + writeFile->append(dataContent.substr(0, 10)), "File is closed"); + + auto readFile = s3fs.openFileForRead(s3File); + ASSERT_EQ(readFile->size(), contentSize * 250'001); + // Sample and verify every 1'000 dataContent chunks. + for (int i = 0; i < 250; ++i) { + ASSERT_EQ( + readFile->pread(i * (1'000 * contentSize), contentSize), dataContent); + } + // Verify the last chunk. + ASSERT_EQ(readFile->pread(contentSize * 250'000, contentSize), dataContent); + + // Verify the S3 list function. + auto result = s3fs.list(s3File); + + ASSERT_EQ(result.size(), 1); + ASSERT_TRUE(result[0] == file); + + ASSERT_TRUE(s3fs.exists(s3File)); +} + TEST_F(S3FileSystemTest, writeFileAndRead) { const auto bucketName = "writedata"; const auto file = "test.txt"; const auto filename = localPath(bucketName) + "/" + file; const auto s3File = s3URI(bucketName, file); - auto hiveConfig = minioServer_->hiveConfig(); + auto hiveConfig = + minioServer_->hiveConfig({{"hive.s3.uploadPartAsync", "false"}}); filesystems::S3FileSystem s3fs(bucketName, hiveConfig); auto pool = memory::memoryManager()->addLeafPool("S3FileSystemTest"); auto writeFile = From 44204c604ecf6b748d8aca9eb0a0e914137ae521 Mon Sep 17 00:00:00 2001 From: Xiuli Wei Date: Mon, 25 Aug 2025 08:06:27 +0000 Subject: [PATCH 2/9] Add S3FileSystemBenchmark --- .../hive/storage_adapters/s3fs/CMakeLists.txt | 3 + .../hive/storage_adapters/s3fs/S3Config.h | 6 +- .../storage_adapters/s3fs/S3FileSystem.cpp | 7 +- .../hive/storage_adapters/s3fs/S3FileSystem.h | 6 +- .../storage_adapters/s3fs/S3WriteFile.cpp | 119 ++++++++--------- .../s3fs/benchmark/CMakeLists.txt | 29 +++++ .../s3fs/benchmark/S3FileSystemBenchmark.cpp | 123 ++++++++++++++++++ 7 files changed, 216 insertions(+), 77 deletions(-) create mode 100644 velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt create mode 100644 velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp diff --git a/velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt b/velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt index 741f01a61b3..1fb33117903 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt @@ -27,4 +27,7 @@ if(VELOX_ENABLE_S3) if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) endif() + if(${VELOX_ENABLE_BENCHMARKS}) + add_subdirectory(benchmark) + endif() endif() diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3Config.h b/velox/connectors/hive/storage_adapters/s3fs/S3Config.h index d3dc79beae2..d944f6b91b3 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3Config.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3Config.h @@ -119,13 +119,13 @@ class S3Config { {Keys::kCredentialsProvider, std::make_pair("aws-credentials-provider", std::nullopt)}, {Keys::KS3UploadPartAsync, - std::make_pair("uploadPartAsync","false")}, + std::make_pair("uploadPartAsync", "false")}, {Keys::kS3PartUploadSize, std::make_pair("part-upload-size", std::nullopt)}, {Keys::KS3WriteFileSemaphoreNum, - std::make_pair("writeFileSemaphoreNum", std::nullopt)}, + std::make_pair("writeFileSemaphoreNum", std::nullopt)}, {Keys::KS3UploadThreads, - std::make_pair("uploadThreads", std::nullopt)}, + std::make_pair("uploadThreads", std::nullopt)}, }; return config; } diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp index 9c84fe8d3f6..3806e115151 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp @@ -15,9 +15,9 @@ */ #include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h" -#include "velox/common/base/StatsReporter.h" #include #include +#include "velox/common/base/StatsReporter.h" #include "velox/common/config/Config.h" #include "velox/common/file/File.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3Config.h" @@ -304,15 +304,14 @@ class S3FileSystem::Impl { S3FileSystem::setUploadPartAsyncEnabled(s3Config.uploadPartAsync()); S3FileSystem::setPartUploadSize( - s3Config.partUploadSize().value_or(10485760)); + s3Config.partUploadSize().value_or(10485760)); S3FileSystem::setWriteFileSemaphoreNum( s3Config.writeFileSemaphoreNum().value_or(4)); - auto threadPoolSize = s3Config.uploadThreads().value_or(16); + auto threadPoolSize = s3Config.uploadThreads().value_or(16); S3FileSystem::setUploadThreadPool( std::make_shared(threadPoolSize)); - LOG(INFO) << "partUploadSize : " << S3FileSystem::getPartUploadSize(); client_ = std::make_shared( credentialsProvider, nullptr /* endpointProvider */, clientConfig); diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h index 9a68057038a..a80a01f5159 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h @@ -123,9 +123,9 @@ class S3FileSystem : public FileSystem { static void setUploadThreadPool( std::shared_ptr threadPool) { VELOX_USER_CHECK( - threadPool != nullptr && threadPool.get() != nullptr && - threadPool->numThreads() > 0, - "Invalid configuration: 'hive.s3.uploadThreads' must be greater than 0."); + threadPool != nullptr && threadPool.get() != nullptr && + threadPool->numThreads() > 0, + "Invalid configuration: 'hive.s3.uploadThreads' must be greater than 0."); uploadThreadPool_ = threadPool; } diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp index 921ddb0500a..e4ed12cb973 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp @@ -15,10 +15,10 @@ */ #include "velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h" -#include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h" #include #include "velox/common/base/StatsReporter.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3Counters.h" +#include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h" #include "velox/dwio/common/DataBuffer.h" @@ -108,7 +108,8 @@ class S3WriteFile::Impl { // Appends data to the end of the file. void append(std::string_view data) { VELOX_CHECK(!closed(), "File is closed"); - if (data.size() + currentPart_->size() >= S3FileSystem::getPartUploadSize()) { + if (data.size() + currentPart_->size() >= + S3FileSystem::getPartUploadSize()) { upload(data); } else { // Append to current part. @@ -132,7 +133,7 @@ class S3WriteFile::Impl { } RECORD_METRIC_VALUE(kMetricS3StartedUploads); uploadPart({currentPart_->data(), currentPart_->size()}, true); - if(S3FileSystem::isUploadPartAsyncEnabled()){ + if (S3FileSystem::isUploadPartAsyncEnabled()) { if (futures.size() > 0) { folly::collectAll(std::move(futures)).get(); } @@ -196,7 +197,7 @@ class S3WriteFile::Impl { std::mutex uploadStateMutex_; std::vector> futures; folly::ThrottledLifoSem semaphore{ - static_cast(S3FileSystem::getWriteFileSemaphoreNum())}; + static_cast(S3FileSystem::getWriteFileSemaphoreNum())}; // Data can be smaller or larger than the kPartUploadSize. // Complete the currentPart_ and upload kPartUploadSize chunks of data. @@ -221,7 +222,13 @@ class S3WriteFile::Impl { void uploadPart(const std::string_view part, bool isLast = false) { if (S3FileSystem::isUploadPartAsyncEnabled()) { - uploadPartAsync(part, isLast); + // If this is the last part and no parts have been uploaded yet, + // use the synchronous upload method. + if (isLast && uploadState_.partNumber == 0) { + uploadPartV1(part, isLast); + } else { + uploadPartAsync(part, isLast); + } } else { uploadPartV1(part, isLast); } @@ -229,7 +236,9 @@ class S3WriteFile::Impl { void uploadPartV1(const std::string_view part, bool isLast = false) { // Only the last part can be less than kPartUploadSize. - VELOX_CHECK(isLast || (!isLast && (part.size() == S3FileSystem::getPartUploadSize()))); + VELOX_CHECK( + isLast || + (!isLast && (part.size() == S3FileSystem::getPartUploadSize()))); // Upload the part. { Aws::S3::Model::UploadPartRequest request; @@ -265,71 +274,47 @@ class S3WriteFile::Impl { VELOX_CHECK( isLast || (!isLast && (part.size() == S3FileSystem::getPartUploadSize()))); - LOG(INFO) << "uploadPartAsync semaphore is " << semaphore.valueGuess(); - auto const startTime0 = std::chrono::high_resolution_clock::now(); semaphore.wait(); const int64_t partNumber = ++uploadState_.partNumber; - auto duration0 = std::chrono::duration_cast( - std::chrono::high_resolution_clock::now() - startTime0) - .count(); - LOG(INFO) << "LOG_Part " << partNumber << " semaphore in " << duration0 - << " ms."; - auto const startTime1 = std::chrono::high_resolution_clock::now(); auto const partLength = part.size(); - std::shared_ptr partStr = - std::make_shared(part.data(), part.size()); - auto duration = std::chrono::duration_cast( - std::chrono::high_resolution_clock::now() - startTime1) - .count(); - LOG(INFO) << "LOG_Part " << partNumber << " partCopy in " << duration - << " ms."; - futures.emplace_back( - folly::via( - S3FileSystem::getUploadThreadPool().get(), - [this, partNumber, partStr, partLength, startTime1]() { - SCOPE_EXIT { - semaphore.post(); - }; - try { - Aws::S3::Model::UploadPartRequest request; - request.SetBucket(bucket_); - request.SetKey(key_); - request.SetUploadId(uploadState_.id); - request.SetPartNumber(partNumber); - request.SetContentLength(partLength); - - request.SetBody(std::make_shared( - partStr->c_str(), partLength)); - - auto outcome = client_->UploadPart(request); - VELOX_CHECK_AWS_OUTCOME( - outcome, "Failed to upload", bucket_, key_); - - auto result = outcome.GetResult(); - Aws::S3::Model::CompletedPart completedPart; - completedPart.SetPartNumber(partNumber); - completedPart.SetETag(result.GetETag()); - - // Use a mutex to ensure thread safety for completedParts update - { - std::lock_guard lock(uploadStateMutex_); - uploadState_.completedParts.push_back( - std::move(completedPart)); - } - auto duration = - std::chrono::duration_cast( - std::chrono::high_resolution_clock::now() - startTime1) - .count(); - - LOG(INFO) << "LOG_Part " << partNumber << " uploaded in " - << duration << " ms."; - } catch (const std::exception& e) { - LOG(ERROR) << "Exception during async upload: " << e.what(); - } catch (...) { - LOG(ERROR) << "Unknown exception during async upload."; - } - })); + std::make_shared(part.data(), part.size()); + futures.emplace_back(folly::via( + S3FileSystem::getUploadThreadPool().get(), + [this, partNumber, partStr, partLength]() { + SCOPE_EXIT { + semaphore.post(); + }; + try { + Aws::S3::Model::UploadPartRequest request; + request.SetBucket(bucket_); + request.SetKey(key_); + request.SetUploadId(uploadState_.id); + request.SetPartNumber(partNumber); + request.SetContentLength(partLength); + + request.SetBody(std::make_shared( + partStr->c_str(), partLength)); + + auto outcome = client_->UploadPart(request); + VELOX_CHECK_AWS_OUTCOME(outcome, "Failed to upload", bucket_, key_); + + auto result = outcome.GetResult(); + Aws::S3::Model::CompletedPart completedPart; + completedPart.SetPartNumber(partNumber); + completedPart.SetETag(result.GetETag()); + + // Use a mutex to ensure thread safety for completedParts update + { + std::lock_guard lock(uploadStateMutex_); + uploadState_.completedParts.push_back(std::move(completedPart)); + } + } catch (const std::exception& e) { + LOG(ERROR) << "Exception during async upload: " << e.what(); + } catch (...) { + LOG(ERROR) << "Unknown exception during async upload."; + } + })); } Aws::S3::S3Client* client_; diff --git a/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt b/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt new file mode 100644 index 00000000000..e3611ba621e --- /dev/null +++ b/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt @@ -0,0 +1,29 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +add_executable(velox_s3FileSystem_benchmark + S3FileSystemBenchmark.cpp) + +add_test(velox_s3FileSystem_benchmark velox_s3FileSystem_benchmark) +target_link_libraries( + velox_s3FileSystem_benchmark + velox_file + velox_s3fs + velox_core + velox_exec_test_lib + velox_dwio_common_exception + velox_exec + GTest::gtest + Folly::folly + Folly::follybenchmark + GTest::gtest_main) diff --git a/velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp b/velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp new file mode 100644 index 00000000000..91593078fc7 --- /dev/null +++ b/velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp @@ -0,0 +1,123 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h" +#include "velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h" +#include "velox/connectors/hive/storage_adapters/s3fs/tests/S3Test.h" +#include "velox/functions/lib/benchmarks/FunctionBenchmarkBase.h" + +#include + +namespace { +using namespace facebook::velox::filesystems; +class S3FileSystemBenchmark { + public: + S3FileSystemBenchmark() { + minioServer_ = std::make_unique(); + minioServer_->start(); + ioExecutor_ = std::make_unique(3); + filesystems::initializeS3("Info", kLogLocation_); + } + ~S3FileSystemBenchmark() { + minioServer_->stop(); + filesystems::finalizeS3(); + } + std::unique_ptr minioServer_; + std::unique_ptr ioExecutor_; + std::string_view kLogLocation_ = "/tmp/foobar/"; + + std::string localPath(const char* directory) { + return minioServer_->path() + "/" + directory; + } + void addBucket(const char* bucket) { + minioServer_->addBucket(bucket); + } + void + run(const std::string& name, bool enableUploadPartAsync, int32_t size_MiB) { + folly::BenchmarkSuspender suspender; + const auto bucketName = "writedata"; + const auto file = fmt::format("test_{}_{}.txt", name, size_MiB); + const auto filename = localPath(bucketName) + "/" + file.c_str(); + addBucket(bucketName); + const auto s3File = s3URI(bucketName, file.c_str()); + auto hiveConfig = minioServer_->hiveConfig( + {{"hive.s3.uploadPartAsync", + enableUploadPartAsync ? "true" : "false"}}); + filesystems::S3FileSystem s3fs(bucketName, hiveConfig); + suspender.dismiss(); + auto pool = memory::memoryManager()->addLeafPool("S3FileSystemBenchmark"); + auto writeFile = + s3fs.openFileForWrite(s3File, {{}, pool.get(), std::nullopt}); + auto s3WriteFile = dynamic_cast(writeFile.get()); + // 1024 + std::string dataContent(1024, 'a'); + + EXPECT_EQ(writeFile->size(), 0); + std::int64_t contentSize = dataContent.length(); + // dataContent length is 1024. + EXPECT_EQ(contentSize, 1024); + + // Append and flush a small batch of data. + writeFile->append(dataContent.substr(0, 10)); + EXPECT_EQ(writeFile->size(), 10); + writeFile->append(dataContent.substr(10, contentSize - 10)); + EXPECT_EQ(writeFile->size(), contentSize); + writeFile->flush(); + // No parts must have been uploaded. + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 0); + + // Append data + for (int i = 0; i < 1024 * size_MiB - 1; ++i) { + writeFile->append(dataContent); + } + writeFile->close(); + EXPECT_EQ( + writeFile->size(), static_cast(1024) * 1024 * size_MiB); + } +}; + +auto benchmark = S3FileSystemBenchmark(); + +#define DEFINE_BENCHMARKS(size) \ + BENCHMARK(non_async_upload_##size##M) { \ + benchmark.run("non_async_upload", false, size); \ + } \ + BENCHMARK_RELATIVE(async_upload_##size##M) { \ + benchmark.run("async_upload", true, size); \ + } + +DEFINE_BENCHMARKS(4) +DEFINE_BENCHMARKS(8) +DEFINE_BENCHMARKS(16) +DEFINE_BENCHMARKS(32) +DEFINE_BENCHMARKS(64) +DEFINE_BENCHMARKS(128) +DEFINE_BENCHMARKS(256) +DEFINE_BENCHMARKS(512) +DEFINE_BENCHMARKS(1024) +DEFINE_BENCHMARKS(2048) +} // namespace + +int main(int argc, char** argv) { + folly::Init init{&argc, &argv}; + facebook::velox::memory::MemoryManager::initialize( + facebook::velox::memory::MemoryManager::Options{}); + folly::runBenchmarks(); + return 0; +} From 904b0eaefca5ccd29d13c07b32f7b4b33886aacf Mon Sep 17 00:00:00 2001 From: Xiuli Wei Date: Wed, 27 Aug 2025 09:32:48 +0000 Subject: [PATCH 3/9] Address comments --- .../hive/storage_adapters/s3fs/S3Config.h | 6 +- .../storage_adapters/s3fs/S3FileSystem.cpp | 91 +++++++++++++++---- .../hive/storage_adapters/s3fs/S3FileSystem.h | 74 ++++++--------- .../storage_adapters/s3fs/S3WriteFile.cpp | 61 +++++++------ .../hive/storage_adapters/s3fs/S3WriteFile.h | 4 +- .../s3fs/benchmark/CMakeLists.txt | 25 +++-- .../s3fs/benchmark/S3FileSystemBenchmark.cpp | 2 +- .../s3fs/tests/S3FileSystemTest.cpp | 5 +- 8 files changed, 155 insertions(+), 113 deletions(-) diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3Config.h b/velox/connectors/hive/storage_adapters/s3fs/S3Config.h index d944f6b91b3..57563eaed10 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3Config.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3Config.h @@ -119,13 +119,13 @@ class S3Config { {Keys::kCredentialsProvider, std::make_pair("aws-credentials-provider", std::nullopt)}, {Keys::KS3UploadPartAsync, - std::make_pair("uploadPartAsync", "false")}, + std::make_pair("upload-part-async", "false")}, {Keys::kS3PartUploadSize, std::make_pair("part-upload-size", std::nullopt)}, {Keys::KS3WriteFileSemaphoreNum, - std::make_pair("writeFileSemaphoreNum", std::nullopt)}, + std::make_pair("write-file-semaphore-num", std::nullopt)}, {Keys::KS3UploadThreads, - std::make_pair("uploadThreads", std::nullopt)}, + std::make_pair("upload-threads", std::nullopt)}, }; return config; } diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp index 3806e115151..ce650ace07f 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp @@ -229,11 +229,71 @@ void registerCredentialsProvider( }); } -bool S3FileSystem::uploadPartAsyncEnabled = false; -size_t S3FileSystem::kPartUploadSize = 10485760; -size_t S3FileSystem::writeFileSemaphore = 4; -std::shared_ptr S3FileSystem::uploadThreadPool_ = - nullptr; +std::shared_ptr S3UploadManager::instance_ = nullptr; +S3UploadManager::S3UploadManager(const S3Config& s3Config) { + uploadPartAsyncEnabled = s3Config.uploadPartAsync(); + setPartUploadSize(s3Config.partUploadSize().value_or(kDefaultPartUploadSize)); + setWriteFileSemaphoreNum( + s3Config.writeFileSemaphoreNum().value_or(kDefaultWriteFileSemaphore)); + setUploadThreadPool(s3Config.uploadThreads().value_or(kDefaultUploadThreads)); +} + +std::shared_ptr S3UploadManager::getInstance( + const S3Config& s3Config) { +#ifndef NDEBUG + // In debug mode, always create a new S3UploadManager instance for each + // s3Config. + instance_ = std::make_shared(s3Config); +#else + // In no debug mode, create a new instance only if it doesn't already exist. + if (!instance_) { + instance_ = std::make_shared(s3Config); + } +#endif + return instance_; +} + +bool S3UploadManager::isUploadPartAsyncEnabled() const { + return uploadPartAsyncEnabled; +} + +size_t S3UploadManager::getPartUploadSize() const { + return kPartUploadSize; +} + +size_t S3UploadManager::getWriteFileSemaphoreNum() const { + return writeFileSemaphore; +} + +std::shared_ptr +S3UploadManager::getUploadThreadPool() const { + return uploadThreadPool_; +} + +void S3UploadManager::setPartUploadSize(size_t partUploadSize) { + kPartUploadSize = + validatePositiveValue(partUploadSize, "hive.s3.part-upload-size"); +} + +void S3UploadManager::setWriteFileSemaphoreNum(size_t value) { + writeFileSemaphore = + validatePositiveValue(value, "hive.s3.write-file-semaphore-num"); +} + +void S3UploadManager::setUploadThreadPool(size_t value) { + auto threadPool = std::make_shared( + validatePositiveValue(value, "hive.s3.upload-threads")); + uploadThreadPool_ = threadPool; +} + +size_t S3UploadManager::validatePositiveValue( + size_t value, + const std::string& name) { + VELOX_USER_CHECK( + value > 0, + fmt::format("Invalid configuration: '{}' must be greater than 0.", name)); + return value; +} class S3FileSystem::Impl { public: @@ -302,17 +362,7 @@ class S3FileSystem::Impl { auto credentialsProvider = getCredentialsProvider(s3Config); - S3FileSystem::setUploadPartAsyncEnabled(s3Config.uploadPartAsync()); - S3FileSystem::setPartUploadSize( - s3Config.partUploadSize().value_or(10485760)); - - S3FileSystem::setWriteFileSemaphoreNum( - s3Config.writeFileSemaphoreNum().value_or(4)); - - auto threadPoolSize = s3Config.uploadThreads().value_or(16); - S3FileSystem::setUploadThreadPool( - std::make_shared(threadPoolSize)); - + uploadManager_ = S3UploadManager::getInstance(s3Config); client_ = std::make_shared( credentialsProvider, nullptr /* endpointProvider */, clientConfig); ++fileSystemCount; @@ -452,6 +502,10 @@ class S3FileSystem::Impl { return client_.get(); } + std::shared_ptr s3UploadManager() const { + return uploadManager_; + } + std::string getLogLevelName() const { return getAwsInstance()->getLogLevelName(); } @@ -462,6 +516,7 @@ class S3FileSystem::Impl { private: std::shared_ptr client_; + std::shared_ptr uploadManager_; }; S3FileSystem::S3FileSystem( @@ -493,8 +548,8 @@ std::unique_ptr S3FileSystem::openFileForWrite( std::string_view s3Path, const FileOptions& options) { const auto path = getPath(s3Path); - auto s3file = - std::make_unique(path, impl_->s3Client(), options.pool); + auto s3file = std::make_unique( + path, impl_->s3Client(), options.pool, impl_->s3UploadManager()); return s3file; } diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h index a80a01f5159..34f3cf0b23d 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h @@ -34,6 +34,33 @@ void finalizeS3(); class S3Config; +struct S3UploadManager { + explicit S3UploadManager(const S3Config& s3Config); + static std::shared_ptr getInstance(const S3Config& s3Config); + + bool isUploadPartAsyncEnabled() const; + size_t getPartUploadSize() const; + size_t getWriteFileSemaphoreNum() const; + std::shared_ptr getUploadThreadPool() const; + + private: + static constexpr bool kDefaultUploadPartAsyncEnabled = false; + static constexpr size_t kDefaultPartUploadSize = 10485760; + static constexpr size_t kDefaultWriteFileSemaphore = 4; + static constexpr size_t kDefaultUploadThreads = 16; + + bool uploadPartAsyncEnabled; + size_t kPartUploadSize; + size_t writeFileSemaphore; + std::shared_ptr uploadThreadPool_; + static std::shared_ptr instance_; + + void setPartUploadSize(size_t partUploadSize); + void setWriteFileSemaphoreNum(size_t value); + void setUploadThreadPool(size_t value); + static size_t validatePositiveValue(size_t value, const std::string& name); +}; + using AWSCredentialsProviderFactory = std::function( const S3Config& config)>; @@ -88,56 +115,9 @@ class S3FileSystem : public FileSystem { std::string getLogPrefix() const; - static bool isUploadPartAsyncEnabled() { - return uploadPartAsyncEnabled; - } - - static void setUploadPartAsyncEnabled(bool enabled) { - uploadPartAsyncEnabled = enabled; - } - static size_t getPartUploadSize() { - return kPartUploadSize; - } - - static void setPartUploadSize(size_t partUploadSize) { - VELOX_USER_CHECK_GT( - partUploadSize, - 0, - "Invalid configuration: 'hive.s3.part-upload-size' must be greater than 0."); - kPartUploadSize = partUploadSize; - } - static size_t getWriteFileSemaphoreNum() { - return writeFileSemaphore; - } - - static void setWriteFileSemaphoreNum(size_t value) { - VELOX_USER_CHECK_GT( - value, - 0, - "Invalid configuration: 'hive.s3.writeFileSemaphoreNum' must be greater than 0."); - writeFileSemaphore = value; - } - static std::shared_ptr getUploadThreadPool() { - return uploadThreadPool_; - } - static void setUploadThreadPool( - std::shared_ptr threadPool) { - VELOX_USER_CHECK( - threadPool != nullptr && threadPool.get() != nullptr && - threadPool->numThreads() > 0, - "Invalid configuration: 'hive.s3.uploadThreads' must be greater than 0."); - uploadThreadPool_ = threadPool; - } - protected: class Impl; std::shared_ptr impl_; - - private: - static bool uploadPartAsyncEnabled; - static size_t kPartUploadSize; - static size_t writeFileSemaphore; - static std::shared_ptr uploadThreadPool_; }; } // namespace facebook::velox::filesystems diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp index e4ed12cb973..ad937d88068 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp @@ -18,7 +18,6 @@ #include #include "velox/common/base/StatsReporter.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3Counters.h" -#include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h" #include "velox/dwio/common/DataBuffer.h" @@ -40,13 +39,17 @@ class S3WriteFile::Impl { explicit Impl( std::string_view path, Aws::S3::S3Client* client, - memory::MemoryPool* pool) - : client_(client), pool_(pool) { + memory::MemoryPool* pool, + std::shared_ptr uploadManager) + : client_(client), pool_(pool), uploadManager_(uploadManager) { VELOX_CHECK_NOT_NULL(client); VELOX_CHECK_NOT_NULL(pool); + VELOX_CHECK_NOT_NULL(uploadManager); + semaphore_ = std::make_unique( + uploadManager_->getWriteFileSemaphoreNum()); getBucketAndKeyFromPath(path, bucket_, key_); currentPart_ = std::make_unique>(*pool_); - currentPart_->reserve(S3FileSystem::getPartUploadSize()); + currentPart_->reserve(uploadManager_->getPartUploadSize()); // Check that the object doesn't exist, if it does throw an error. { Aws::S3::Model::HeadObjectRequest request; @@ -109,7 +112,7 @@ class S3WriteFile::Impl { void append(std::string_view data) { VELOX_CHECK(!closed(), "File is closed"); if (data.size() + currentPart_->size() >= - S3FileSystem::getPartUploadSize()) { + uploadManager_->getPartUploadSize()) { upload(data); } else { // Append to current part. @@ -123,7 +126,7 @@ class S3WriteFile::Impl { VELOX_CHECK(!closed(), "File is closed"); /// currentPartSize must be less than kPartUploadSize since /// append() would have already flushed after reaching kUploadPartSize. - VELOX_CHECK_LT(currentPart_->size(), S3FileSystem::getPartUploadSize()); + VELOX_CHECK_LT(currentPart_->size(), uploadManager_->getPartUploadSize()); } // Complete the multipart upload and close the file. @@ -133,9 +136,9 @@ class S3WriteFile::Impl { } RECORD_METRIC_VALUE(kMetricS3StartedUploads); uploadPart({currentPart_->data(), currentPart_->size()}, true); - if (S3FileSystem::isUploadPartAsyncEnabled()) { - if (futures.size() > 0) { - folly::collectAll(std::move(futures)).get(); + if (uploadManager_->isUploadPartAsyncEnabled()) { + if (futures_.size() > 0) { + folly::collectAll(std::move(futures_)).get(); } // The list of parts should be in ascending order. std::sort( @@ -193,11 +196,6 @@ class S3WriteFile::Impl { int64_t partNumber = 0; Aws::String id; }; - UploadState uploadState_; - std::mutex uploadStateMutex_; - std::vector> futures; - folly::ThrottledLifoSem semaphore{ - static_cast(S3FileSystem::getWriteFileSemaphoreNum())}; // Data can be smaller or larger than the kPartUploadSize. // Complete the currentPart_ and upload kPartUploadSize chunks of data. @@ -211,17 +209,17 @@ class S3WriteFile::Impl { uploadPart({currentPart_->data(), currentPart_->size()}); dataPtr += remainingBufferSize; dataSize -= remainingBufferSize; - while (dataSize > S3FileSystem::getPartUploadSize()) { - uploadPart({dataPtr, S3FileSystem::getPartUploadSize()}); - dataPtr += S3FileSystem::getPartUploadSize(); - dataSize -= S3FileSystem::getPartUploadSize(); + while (dataSize > uploadManager_->getPartUploadSize()) { + uploadPart({dataPtr, uploadManager_->getPartUploadSize()}); + dataPtr += uploadManager_->getPartUploadSize(); + dataSize -= uploadManager_->getPartUploadSize(); } // Stash the remaining at the beginning of currentPart. currentPart_->unsafeAppend(0, dataPtr, dataSize); } void uploadPart(const std::string_view part, bool isLast = false) { - if (S3FileSystem::isUploadPartAsyncEnabled()) { + if (uploadManager_->isUploadPartAsyncEnabled()) { // If this is the last part and no parts have been uploaded yet, // use the synchronous upload method. if (isLast && uploadState_.partNumber == 0) { @@ -234,11 +232,12 @@ class S3WriteFile::Impl { } } + // Upload the part synchronously. void uploadPartV1(const std::string_view part, bool isLast = false) { // Only the last part can be less than kPartUploadSize. VELOX_CHECK( isLast || - (!isLast && (part.size() == S3FileSystem::getPartUploadSize()))); + (!isLast && (part.size() == uploadManager_->getPartUploadSize()))); // Upload the part. { Aws::S3::Model::UploadPartRequest request; @@ -270,20 +269,22 @@ class S3WriteFile::Impl { uploadState_.completedParts.push_back(std::move(part)); } } + + // Upload the part asynchronously. void uploadPartAsync(const std::string_view part, const bool isLast = false) { VELOX_CHECK( isLast || - (!isLast && (part.size() == S3FileSystem::getPartUploadSize()))); - semaphore.wait(); + (!isLast && (part.size() == uploadManager_->getPartUploadSize()))); + semaphore_->wait(); const int64_t partNumber = ++uploadState_.partNumber; auto const partLength = part.size(); std::shared_ptr partStr = std::make_shared(part.data(), part.size()); - futures.emplace_back(folly::via( - S3FileSystem::getUploadThreadPool().get(), + futures_.emplace_back(folly::via( + uploadManager_->getUploadThreadPool().get(), [this, partNumber, partStr, partLength]() { SCOPE_EXIT { - semaphore.post(); + semaphore_->post(); }; try { Aws::S3::Model::UploadPartRequest request; @@ -323,13 +324,19 @@ class S3WriteFile::Impl { std::string bucket_; std::string key_; size_t fileSize_ = -1; + UploadState uploadState_; + std::mutex uploadStateMutex_; + std::vector> futures_; + std::unique_ptr semaphore_; + std::shared_ptr uploadManager_; }; S3WriteFile::S3WriteFile( std::string_view path, Aws::S3::S3Client* client, - memory::MemoryPool* pool) { - impl_ = std::make_shared(path, client, pool); + memory::MemoryPool* pool, + std::shared_ptr uploadManager) { + impl_ = std::make_shared(path, client, pool, uploadManager); } void S3WriteFile::append(std::string_view data) { diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h index 929eed20c37..3ccbf6cb3f9 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h @@ -18,6 +18,7 @@ #include "velox/common/file/File.h" #include "velox/common/memory/MemoryPool.h" +#include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h" namespace Aws::S3 { class S3Client; @@ -50,7 +51,8 @@ class S3WriteFile : public WriteFile { S3WriteFile( std::string_view path, Aws::S3::S3Client* client, - memory::MemoryPool* pool); + memory::MemoryPool* pool, + std::shared_ptr uploadManager); /// Appends data to the end of the file. /// Uploads a part on reaching part size limit. diff --git a/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt b/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt index e3611ba621e..6d84665bb40 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt @@ -11,19 +11,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -add_executable(velox_s3FileSystem_benchmark - S3FileSystemBenchmark.cpp) +add_executable(velox_s3FileSystem_benchmark S3FileSystemBenchmark.cpp) add_test(velox_s3FileSystem_benchmark velox_s3FileSystem_benchmark) target_link_libraries( - velox_s3FileSystem_benchmark - velox_file - velox_s3fs - velox_core - velox_exec_test_lib - velox_dwio_common_exception - velox_exec - GTest::gtest - Folly::folly - Folly::follybenchmark - GTest::gtest_main) + velox_s3FileSystem_benchmark + velox_file + velox_s3fs + velox_core + velox_exec_test_lib + velox_dwio_common_exception + velox_exec + GTest::gtest + Folly::folly + Folly::follybenchmark + GTest::gtest_main) diff --git a/velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp b/velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp index 91593078fc7..bb361ab54a3 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp @@ -57,7 +57,7 @@ class S3FileSystemBenchmark { addBucket(bucketName); const auto s3File = s3URI(bucketName, file.c_str()); auto hiveConfig = minioServer_->hiveConfig( - {{"hive.s3.uploadPartAsync", + {{"hive.s3.upload-part-async", enableUploadPartAsync ? "true" : "false"}}); filesystems::S3FileSystem s3fs(bucketName, hiveConfig); suspender.dismiss(); diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp index b543f9c0c40..45dc86d9f5f 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp @@ -247,7 +247,7 @@ TEST_F(S3FileSystemTest, writeFileAsync) { const auto s3File = s3URI(bucketName, file); auto hiveConfig = - minioServer_->hiveConfig({{"hive.s3.uploadPartAsync", "true"}}); + minioServer_->hiveConfig({{"hive.s3.upload-part-async", "true"}}); filesystems::S3FileSystem s3fs(bucketName, hiveConfig); auto pool = memory::memoryManager()->addLeafPool("S3FileSystemTest"); @@ -329,8 +329,7 @@ TEST_F(S3FileSystemTest, writeFileAndRead) { const auto filename = localPath(bucketName) + "/" + file; const auto s3File = s3URI(bucketName, file); - auto hiveConfig = - minioServer_->hiveConfig({{"hive.s3.uploadPartAsync", "false"}}); + auto hiveConfig = minioServer_->hiveConfig(); filesystems::S3FileSystem s3fs(bucketName, hiveConfig); auto pool = memory::memoryManager()->addLeafPool("S3FileSystemTest"); auto writeFile = From c4daa540b001dfeb1bf7de755d5471f0f63e335b Mon Sep 17 00:00:00 2001 From: Xiuli Wei Date: Thu, 11 Sep 2025 06:56:44 +0000 Subject: [PATCH 4/9] Address comments --- .../hive/storage_adapters/s3fs/S3Config.h | 28 +-- .../storage_adapters/s3fs/S3FileSystem.cpp | 97 +++------- .../hive/storage_adapters/s3fs/S3FileSystem.h | 30 +-- .../storage_adapters/s3fs/S3WriteFile.cpp | 174 +++++++++--------- .../hive/storage_adapters/s3fs/S3WriteFile.h | 4 +- velox/docs/configs.rst | 16 ++ 6 files changed, 144 insertions(+), 205 deletions(-) diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3Config.h b/velox/connectors/hive/storage_adapters/s3fs/S3Config.h index 57563eaed10..3202c19c958 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3Config.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3Config.h @@ -76,10 +76,10 @@ class S3Config { kRetryMode, kUseProxyFromEnv, kCredentialsProvider, - KS3UploadPartAsync, - kS3PartUploadSize, - KS3WriteFileSemaphoreNum, - KS3UploadThreads, + KUploadPartAsync, + kPartUploadSize, + KMaxConcurrentUploadNum, + KUploadThreads, kEnd }; @@ -118,13 +118,13 @@ class S3Config { std::make_pair("use-proxy-from-env", "false")}, {Keys::kCredentialsProvider, std::make_pair("aws-credentials-provider", std::nullopt)}, - {Keys::KS3UploadPartAsync, + {Keys::KUploadPartAsync, std::make_pair("upload-part-async", "false")}, - {Keys::kS3PartUploadSize, + {Keys::kPartUploadSize, std::make_pair("part-upload-size", std::nullopt)}, - {Keys::KS3WriteFileSemaphoreNum, - std::make_pair("write-file-semaphore-num", std::nullopt)}, - {Keys::KS3UploadThreads, + {Keys::KMaxConcurrentUploadNum, + std::make_pair("max-concurrent-upload-num", std::nullopt)}, + {Keys::KUploadThreads, std::make_pair("upload-threads", std::nullopt)}, }; return config; @@ -256,20 +256,20 @@ class S3Config { } bool uploadPartAsync() const { - auto value = config_.find(Keys::KS3UploadPartAsync)->second.value(); + auto value = config_.find(Keys::KUploadPartAsync)->second.value(); return folly::to(value); } std::optional partUploadSize() const { - auto val = config_.find(Keys::kS3PartUploadSize)->second; + auto val = config_.find(Keys::kPartUploadSize)->second; if (val.has_value()) { return folly::to(val.value()); } return std::optional(); } - std::optional writeFileSemaphoreNum() const { - auto val = config_.find(Keys::KS3WriteFileSemaphoreNum)->second; + std::optional maxConcurrentUploadNum() const { + auto val = config_.find(Keys::KMaxConcurrentUploadNum)->second; if (val.has_value()) { return folly::to(val.value()); } @@ -277,7 +277,7 @@ class S3Config { } std::optional uploadThreads() const { - auto val = config_.find(Keys::KS3UploadThreads)->second; + auto val = config_.find(Keys::KUploadThreads)->second; if (val.has_value()) { return folly::to(val.value()); } diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp index ce650ace07f..1a19e219d1b 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp @@ -15,8 +15,6 @@ */ #include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h" -#include -#include #include "velox/common/base/StatsReporter.h" #include "velox/common/config/Config.h" #include "velox/common/file/File.h" @@ -229,72 +227,6 @@ void registerCredentialsProvider( }); } -std::shared_ptr S3UploadManager::instance_ = nullptr; -S3UploadManager::S3UploadManager(const S3Config& s3Config) { - uploadPartAsyncEnabled = s3Config.uploadPartAsync(); - setPartUploadSize(s3Config.partUploadSize().value_or(kDefaultPartUploadSize)); - setWriteFileSemaphoreNum( - s3Config.writeFileSemaphoreNum().value_or(kDefaultWriteFileSemaphore)); - setUploadThreadPool(s3Config.uploadThreads().value_or(kDefaultUploadThreads)); -} - -std::shared_ptr S3UploadManager::getInstance( - const S3Config& s3Config) { -#ifndef NDEBUG - // In debug mode, always create a new S3UploadManager instance for each - // s3Config. - instance_ = std::make_shared(s3Config); -#else - // In no debug mode, create a new instance only if it doesn't already exist. - if (!instance_) { - instance_ = std::make_shared(s3Config); - } -#endif - return instance_; -} - -bool S3UploadManager::isUploadPartAsyncEnabled() const { - return uploadPartAsyncEnabled; -} - -size_t S3UploadManager::getPartUploadSize() const { - return kPartUploadSize; -} - -size_t S3UploadManager::getWriteFileSemaphoreNum() const { - return writeFileSemaphore; -} - -std::shared_ptr -S3UploadManager::getUploadThreadPool() const { - return uploadThreadPool_; -} - -void S3UploadManager::setPartUploadSize(size_t partUploadSize) { - kPartUploadSize = - validatePositiveValue(partUploadSize, "hive.s3.part-upload-size"); -} - -void S3UploadManager::setWriteFileSemaphoreNum(size_t value) { - writeFileSemaphore = - validatePositiveValue(value, "hive.s3.write-file-semaphore-num"); -} - -void S3UploadManager::setUploadThreadPool(size_t value) { - auto threadPool = std::make_shared( - validatePositiveValue(value, "hive.s3.upload-threads")); - uploadThreadPool_ = threadPool; -} - -size_t S3UploadManager::validatePositiveValue( - size_t value, - const std::string& name) { - VELOX_USER_CHECK( - value > 0, - fmt::format("Invalid configuration: '{}' must be greater than 0.", name)); - return value; -} - class S3FileSystem::Impl { public: Impl(const S3Config& s3Config) { @@ -362,7 +294,26 @@ class S3FileSystem::Impl { auto credentialsProvider = getCredentialsProvider(s3Config); - uploadManager_ = S3UploadManager::getInstance(s3Config); + if (s3Config.uploadPartAsync() && !asyncUploadInfo_) { + asyncUploadInfo_ = std::make_shared(); + + auto partUploadSize = s3Config.partUploadSize(); + if (partUploadSize.has_value()) { + asyncUploadInfo_->partUploadSize = partUploadSize.value(); + } + + auto maxConcurrentUploadNum = s3Config.maxConcurrentUploadNum(); + if (maxConcurrentUploadNum.has_value()) { + asyncUploadInfo_->maxConcurrentUploadNum = + maxConcurrentUploadNum.value(); + } + + auto uploadThreads = s3Config.uploadThreads(); + if (uploadThreads.has_value()) { + asyncUploadInfo_->uploadThreads = uploadThreads.value(); + } + } + client_ = std::make_shared( credentialsProvider, nullptr /* endpointProvider */, clientConfig); ++fileSystemCount; @@ -502,8 +453,8 @@ class S3FileSystem::Impl { return client_.get(); } - std::shared_ptr s3UploadManager() const { - return uploadManager_; + std::shared_ptr getAsyncUploadInfo() const { + return asyncUploadInfo_; } std::string getLogLevelName() const { @@ -516,7 +467,7 @@ class S3FileSystem::Impl { private: std::shared_ptr client_; - std::shared_ptr uploadManager_; + std::shared_ptr asyncUploadInfo_; }; S3FileSystem::S3FileSystem( @@ -549,7 +500,7 @@ std::unique_ptr S3FileSystem::openFileForWrite( const FileOptions& options) { const auto path = getPath(s3Path); auto s3file = std::make_unique( - path, impl_->s3Client(), options.pool, impl_->s3UploadManager()); + path, impl_->s3Client(), options.pool, impl_->getAsyncUploadInfo()); return s3file; } diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h index 34f3cf0b23d..ba09518cbeb 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h @@ -16,7 +16,6 @@ #pragma once -#include #include "velox/common/file/FileSystems.h" namespace Aws::Auth { @@ -34,31 +33,10 @@ void finalizeS3(); class S3Config; -struct S3UploadManager { - explicit S3UploadManager(const S3Config& s3Config); - static std::shared_ptr getInstance(const S3Config& s3Config); - - bool isUploadPartAsyncEnabled() const; - size_t getPartUploadSize() const; - size_t getWriteFileSemaphoreNum() const; - std::shared_ptr getUploadThreadPool() const; - - private: - static constexpr bool kDefaultUploadPartAsyncEnabled = false; - static constexpr size_t kDefaultPartUploadSize = 10485760; - static constexpr size_t kDefaultWriteFileSemaphore = 4; - static constexpr size_t kDefaultUploadThreads = 16; - - bool uploadPartAsyncEnabled; - size_t kPartUploadSize; - size_t writeFileSemaphore; - std::shared_ptr uploadThreadPool_; - static std::shared_ptr instance_; - - void setPartUploadSize(size_t partUploadSize); - void setWriteFileSemaphoreNum(size_t value); - void setUploadThreadPool(size_t value); - static size_t validatePositiveValue(size_t value, const std::string& name); +struct AsyncUploadInfo { + std::optional partUploadSize{10485760}; + std::optional maxConcurrentUploadNum{4}; + std::optional uploadThreads{16}; }; using AWSCredentialsProviderFactory = diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp index ad937d88068..b170cd60540 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp @@ -15,6 +15,7 @@ */ #include "velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h" +#include #include #include "velox/common/base/StatsReporter.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3Counters.h" @@ -40,16 +41,27 @@ class S3WriteFile::Impl { std::string_view path, Aws::S3::S3Client* client, memory::MemoryPool* pool, - std::shared_ptr uploadManager) - : client_(client), pool_(pool), uploadManager_(uploadManager) { + std::shared_ptr asyncUploadInfo) + : client_(client), pool_(pool) { VELOX_CHECK_NOT_NULL(client); VELOX_CHECK_NOT_NULL(pool); - VELOX_CHECK_NOT_NULL(uploadManager); - semaphore_ = std::make_unique( - uploadManager_->getWriteFileSemaphoreNum()); + if (asyncUploadInfo) { + kPartUploadSize = asyncUploadInfo->partUploadSize.value(); + maxConcurrentUploadNum_ = std::make_unique( + asyncUploadInfo->maxConcurrentUploadNum.value()); + if (!uploadThreadPool_) { + uploadThreadPool_ = std::make_shared( + asyncUploadInfo->uploadThreads.value(), + std::make_shared("upload-thread")); + } + } else { + uploadThreadPool_ = nullptr; + maxConcurrentUploadNum_ = nullptr; + } + getBucketAndKeyFromPath(path, bucket_, key_); currentPart_ = std::make_unique>(*pool_); - currentPart_->reserve(uploadManager_->getPartUploadSize()); + currentPart_->reserve(kPartUploadSize); // Check that the object doesn't exist, if it does throw an error. { Aws::S3::Model::HeadObjectRequest request; @@ -111,8 +123,7 @@ class S3WriteFile::Impl { // Appends data to the end of the file. void append(std::string_view data) { VELOX_CHECK(!closed(), "File is closed"); - if (data.size() + currentPart_->size() >= - uploadManager_->getPartUploadSize()) { + if (data.size() + currentPart_->size() >= kPartUploadSize) { upload(data); } else { // Append to current part. @@ -126,7 +137,7 @@ class S3WriteFile::Impl { VELOX_CHECK(!closed(), "File is closed"); /// currentPartSize must be less than kPartUploadSize since /// append() would have already flushed after reaching kUploadPartSize. - VELOX_CHECK_LT(currentPart_->size(), uploadManager_->getPartUploadSize()); + VELOX_CHECK_LT(currentPart_->size(), kPartUploadSize); } // Complete the multipart upload and close the file. @@ -136,7 +147,7 @@ class S3WriteFile::Impl { } RECORD_METRIC_VALUE(kMetricS3StartedUploads); uploadPart({currentPart_->data(), currentPart_->size()}, true); - if (uploadManager_->isUploadPartAsyncEnabled()) { + if (uploadThreadPool_) { if (futures_.size() > 0) { folly::collectAll(std::move(futures_)).get(); } @@ -209,107 +220,83 @@ class S3WriteFile::Impl { uploadPart({currentPart_->data(), currentPart_->size()}); dataPtr += remainingBufferSize; dataSize -= remainingBufferSize; - while (dataSize > uploadManager_->getPartUploadSize()) { - uploadPart({dataPtr, uploadManager_->getPartUploadSize()}); - dataPtr += uploadManager_->getPartUploadSize(); - dataSize -= uploadManager_->getPartUploadSize(); + while (dataSize > kPartUploadSize) { + uploadPart({dataPtr, kPartUploadSize}); + dataPtr += kPartUploadSize; + dataSize -= kPartUploadSize; } // Stash the remaining at the beginning of currentPart. currentPart_->unsafeAppend(0, dataPtr, dataSize); } void uploadPart(const std::string_view part, bool isLast = false) { - if (uploadManager_->isUploadPartAsyncEnabled()) { + // Only the last part can be less than kPartUploadSize. + VELOX_CHECK(isLast || (!isLast && (part.size() == kPartUploadSize))); + if (uploadThreadPool_) { // If this is the last part and no parts have been uploaded yet, // use the synchronous upload method. if (isLast && uploadState_.partNumber == 0) { - uploadPartV1(part, isLast); + uploadPartSeq(uploadState_.id, ++uploadState_.partNumber, part); } else { - uploadPartAsync(part, isLast); + uploadPartAsync(part); } } else { - uploadPartV1(part, isLast); + uploadPartSeq(uploadState_.id, ++uploadState_.partNumber, part); } } - // Upload the part synchronously. - void uploadPartV1(const std::string_view part, bool isLast = false) { - // Only the last part can be less than kPartUploadSize. - VELOX_CHECK( - isLast || - (!isLast && (part.size() == uploadManager_->getPartUploadSize()))); - // Upload the part. - { - Aws::S3::Model::UploadPartRequest request; - request.SetBucket(bucket_); - request.SetKey(key_); - request.SetUploadId(uploadState_.id); - request.SetPartNumber(++uploadState_.partNumber); - request.SetContentLength(part.size()); - request.SetBody( - std::make_shared(part.data(), part.size())); - // The default algorithm used is MD5. However, MD5 is not supported with - // fips and can cause a SIGSEGV. Set CRC32 instead which is a standard for - // checksum computation and is not restricted by fips. - request.SetChecksumAlgorithm(Aws::S3::Model::ChecksumAlgorithm::CRC32); - auto outcome = client_->UploadPart(request); - VELOX_CHECK_AWS_OUTCOME(outcome, "Failed to upload", bucket_, key_); - // Append ETag and part number for this uploaded part. - // This will be needed for upload completion in Close(). - auto result = outcome.GetResult(); - Aws::S3::Model::CompletedPart part; - - part.SetPartNumber(uploadState_.partNumber); - part.SetETag(result.GetETag()); - // Don't add the checksum to the part if the checksum is empty. - // Some filesystems such as IBM COS require this to be not set. - if (!result.GetChecksumCRC32().empty()) { - part.SetChecksumCRC32(result.GetChecksumCRC32()); - } - uploadState_.completedParts.push_back(std::move(part)); + // Common logic for uploading a part. + void uploadPartSeq( + const Aws::String& uploadId, + const int64_t partNumber, + const std::string_view part, + bool async = false) { + Aws::S3::Model::UploadPartRequest request; + request.SetBucket(bucket_); + request.SetKey(key_); + request.SetUploadId(uploadId); + request.SetPartNumber(partNumber); + request.SetContentLength(part.size()); + request.SetBody( + std::make_shared(part.data(), part.size())); + // The default algorithm used is MD5. However, MD5 is not supported with + // fips and can cause a SIGSEGV. Set CRC32 instead which is a standard for + // checksum computation and is not restricted by fips. + request.SetChecksumAlgorithm(Aws::S3::Model::ChecksumAlgorithm::CRC32); + auto outcome = client_->UploadPart(request); + VELOX_CHECK_AWS_OUTCOME(outcome, "Failed to upload", bucket_, key_); + // Append ETag and part number for this uploaded part. + // This will be needed for upload completion in Close(). + auto result = outcome.GetResult(); + Aws::S3::Model::CompletedPart completedPart; + completedPart.SetPartNumber(partNumber); + completedPart.SetETag(result.GetETag()); + // Don't add the checksum to the part if the checksum is empty. + // Some filesystems such as IBM COS require this to be not set. + if (!result.GetChecksumCRC32().empty()) { + completedPart.SetChecksumCRC32(result.GetChecksumCRC32()); + } + if (async) { + std::lock_guard lock(uploadStateMutex_); + uploadState_.completedParts.push_back(std::move(completedPart)); + } else { + uploadState_.completedParts.push_back(std::move(completedPart)); } } // Upload the part asynchronously. - void uploadPartAsync(const std::string_view part, const bool isLast = false) { - VELOX_CHECK( - isLast || - (!isLast && (part.size() == uploadManager_->getPartUploadSize()))); - semaphore_->wait(); + void uploadPartAsync(const std::string_view part) { + maxConcurrentUploadNum_->wait(); const int64_t partNumber = ++uploadState_.partNumber; - auto const partLength = part.size(); std::shared_ptr partStr = std::make_shared(part.data(), part.size()); - futures_.emplace_back(folly::via( - uploadManager_->getUploadThreadPool().get(), - [this, partNumber, partStr, partLength]() { + futures_.emplace_back( + folly::via(uploadThreadPool_.get(), [this, partNumber, partStr]() { SCOPE_EXIT { - semaphore_->post(); + maxConcurrentUploadNum_->post(); }; try { - Aws::S3::Model::UploadPartRequest request; - request.SetBucket(bucket_); - request.SetKey(key_); - request.SetUploadId(uploadState_.id); - request.SetPartNumber(partNumber); - request.SetContentLength(partLength); - - request.SetBody(std::make_shared( - partStr->c_str(), partLength)); - - auto outcome = client_->UploadPart(request); - VELOX_CHECK_AWS_OUTCOME(outcome, "Failed to upload", bucket_, key_); - - auto result = outcome.GetResult(); - Aws::S3::Model::CompletedPart completedPart; - completedPart.SetPartNumber(partNumber); - completedPart.SetETag(result.GetETag()); - - // Use a mutex to ensure thread safety for completedParts update - { - std::lock_guard lock(uploadStateMutex_); - uploadState_.completedParts.push_back(std::move(completedPart)); - } + uploadPartSeq(uploadState_.id, partNumber, *partStr, true); } catch (const std::exception& e) { LOG(ERROR) << "Exception during async upload: " << e.what(); } catch (...) { @@ -327,16 +314,23 @@ class S3WriteFile::Impl { UploadState uploadState_; std::mutex uploadStateMutex_; std::vector> futures_; - std::unique_ptr semaphore_; - std::shared_ptr uploadManager_; + // maxConcurrentUploadNum_ controls the concurrency of asynchronous uploads to + // S3 for each S3WriteFile, preventing excessive memory usage. + std::unique_ptr maxConcurrentUploadNum_; + static std::shared_ptr uploadThreadPool_; + static size_t kPartUploadSize; }; +std::shared_ptr + S3WriteFile::Impl::uploadThreadPool_ = nullptr; +size_t S3WriteFile::Impl::kPartUploadSize = 10 * 1024 * 1024; + S3WriteFile::S3WriteFile( std::string_view path, Aws::S3::S3Client* client, memory::MemoryPool* pool, - std::shared_ptr uploadManager) { - impl_ = std::make_shared(path, client, pool, uploadManager); + std::shared_ptr asyncUploadInfo) { + impl_ = std::make_shared(path, client, pool, asyncUploadInfo); } void S3WriteFile::append(std::string_view data) { diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h index 3ccbf6cb3f9..3f00eb9a1da 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h @@ -52,7 +52,7 @@ class S3WriteFile : public WriteFile { std::string_view path, Aws::S3::S3Client* client, memory::MemoryPool* pool, - std::shared_ptr uploadManager); + std::shared_ptr asyncUploadInfo); /// Appends data to the end of the file. /// Uploads a part on reaching part size limit. @@ -69,9 +69,9 @@ class S3WriteFile : public WriteFile { /// Return the number of parts uploaded so far. int numPartsUploaded() const; + class Impl; protected: - class Impl; std::shared_ptr impl_; }; diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 21eebecf8df..859762a4da3 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -879,6 +879,22 @@ Each query can override the config by setting corresponding query session proper - - A custom credential provider, if specified, will be used to create the client in favor of other authentication mechanisms. The provider must be registered using "registerAWSCredentialsProvider" before it can be used. + * - hive.s3.upload-part-async + - bool + - false + - If true, enables asynchronous upload of parts for S3 multipart uploads. + * - hive.s3.part-upload-size + - integer + - + - Specifies the size (in bytes) of each part for S3 multipart uploads. + * - hive.s3.max-concurrent-upload-num + - integer + - + - Specifies the maximum number of concurrent uploads for S3 multipart uploads. + * - hive.s3.upload-threads + - integer + - + - Specifies the number of threads to use for S3 multipart uploads. Bucket Level Configuration """""""""""""""""""""""""" From bf5b7661197f33abda8827ac033ef9ad27f5e691 Mon Sep 17 00:00:00 2001 From: Xiuli Wei Date: Fri, 12 Sep 2025 04:47:54 +0000 Subject: [PATCH 5/9] Address comments --- .../hive/storage_adapters/s3fs/S3Config.h | 34 ++++++---------- .../storage_adapters/s3fs/S3FileSystem.cpp | 29 +++----------- .../hive/storage_adapters/s3fs/S3FileSystem.h | 6 --- .../storage_adapters/s3fs/S3WriteFile.cpp | 40 ++++++++++--------- .../hive/storage_adapters/s3fs/S3WriteFile.h | 4 +- velox/docs/configs.rst | 6 +-- 6 files changed, 43 insertions(+), 76 deletions(-) diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3Config.h b/velox/connectors/hive/storage_adapters/s3fs/S3Config.h index 3202c19c958..0245b255162 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3Config.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3Config.h @@ -121,11 +121,10 @@ class S3Config { {Keys::KUploadPartAsync, std::make_pair("upload-part-async", "false")}, {Keys::kPartUploadSize, - std::make_pair("part-upload-size", std::nullopt)}, + std::make_pair("part-upload-size", "10485760")}, {Keys::KMaxConcurrentUploadNum, - std::make_pair("max-concurrent-upload-num", std::nullopt)}, - {Keys::KUploadThreads, - std::make_pair("upload-threads", std::nullopt)}, + std::make_pair("max-concurrent-upload-num", "4")}, + {Keys::KUploadThreads, std::make_pair("upload-threads", "16")}, }; return config; } @@ -260,28 +259,19 @@ class S3Config { return folly::to(value); } - std::optional partUploadSize() const { - auto val = config_.find(Keys::kPartUploadSize)->second; - if (val.has_value()) { - return folly::to(val.value()); - } - return std::optional(); + int32_t partUploadSize() const { + auto value = config_.find(Keys::kPartUploadSize)->second.value(); + return folly::to(value); } - std::optional maxConcurrentUploadNum() const { - auto val = config_.find(Keys::KMaxConcurrentUploadNum)->second; - if (val.has_value()) { - return folly::to(val.value()); - } - return std::optional(); + int32_t maxConcurrentUploadNum() const { + auto value = config_.find(Keys::KMaxConcurrentUploadNum)->second.value(); + return folly::to(value); } - std::optional uploadThreads() const { - auto val = config_.find(Keys::KUploadThreads)->second; - if (val.has_value()) { - return folly::to(val.value()); - } - return std::optional(); + int32_t uploadThreads() const { + auto value = config_.find(Keys::KUploadThreads)->second.value(); + return folly::to(value); } private: diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp index 1a19e219d1b..02b8a63fb5c 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp @@ -294,26 +294,7 @@ class S3FileSystem::Impl { auto credentialsProvider = getCredentialsProvider(s3Config); - if (s3Config.uploadPartAsync() && !asyncUploadInfo_) { - asyncUploadInfo_ = std::make_shared(); - - auto partUploadSize = s3Config.partUploadSize(); - if (partUploadSize.has_value()) { - asyncUploadInfo_->partUploadSize = partUploadSize.value(); - } - - auto maxConcurrentUploadNum = s3Config.maxConcurrentUploadNum(); - if (maxConcurrentUploadNum.has_value()) { - asyncUploadInfo_->maxConcurrentUploadNum = - maxConcurrentUploadNum.value(); - } - - auto uploadThreads = s3Config.uploadThreads(); - if (uploadThreads.has_value()) { - asyncUploadInfo_->uploadThreads = uploadThreads.value(); - } - } - + s3Config_ = std::make_shared(s3Config); client_ = std::make_shared( credentialsProvider, nullptr /* endpointProvider */, clientConfig); ++fileSystemCount; @@ -453,8 +434,8 @@ class S3FileSystem::Impl { return client_.get(); } - std::shared_ptr getAsyncUploadInfo() const { - return asyncUploadInfo_; + S3Config* s3Config() const { + return s3Config_.get(); } std::string getLogLevelName() const { @@ -467,7 +448,7 @@ class S3FileSystem::Impl { private: std::shared_ptr client_; - std::shared_ptr asyncUploadInfo_; + std::shared_ptr s3Config_; }; S3FileSystem::S3FileSystem( @@ -500,7 +481,7 @@ std::unique_ptr S3FileSystem::openFileForWrite( const FileOptions& options) { const auto path = getPath(s3Path); auto s3file = std::make_unique( - path, impl_->s3Client(), options.pool, impl_->getAsyncUploadInfo()); + path, impl_->s3Client(), options.pool, impl_->s3Config()); return s3file; } diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h index ba09518cbeb..f121223f0f3 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h @@ -33,12 +33,6 @@ void finalizeS3(); class S3Config; -struct AsyncUploadInfo { - std::optional partUploadSize{10485760}; - std::optional maxConcurrentUploadNum{4}; - std::optional uploadThreads{16}; -}; - using AWSCredentialsProviderFactory = std::function( const S3Config& config)>; diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp index b170cd60540..fb83e1af111 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp @@ -41,17 +41,17 @@ class S3WriteFile::Impl { std::string_view path, Aws::S3::S3Client* client, memory::MemoryPool* pool, - std::shared_ptr asyncUploadInfo) + S3Config* s3Config) : client_(client), pool_(pool) { VELOX_CHECK_NOT_NULL(client); VELOX_CHECK_NOT_NULL(pool); - if (asyncUploadInfo) { - kPartUploadSize = asyncUploadInfo->partUploadSize.value(); + kPartUploadSize = s3Config->partUploadSize(); + if (s3Config->uploadPartAsync()) { maxConcurrentUploadNum_ = std::make_unique( - asyncUploadInfo->maxConcurrentUploadNum.value()); + s3Config->maxConcurrentUploadNum()); if (!uploadThreadPool_) { uploadThreadPool_ = std::make_shared( - asyncUploadInfo->uploadThreads.value(), + s3Config->uploadThreads(), std::make_shared("upload-thread")); } } else { @@ -232,25 +232,29 @@ class S3WriteFile::Impl { void uploadPart(const std::string_view part, bool isLast = false) { // Only the last part can be less than kPartUploadSize. VELOX_CHECK(isLast || (!isLast && (part.size() == kPartUploadSize))); + auto uploadCompletedPart = [&](const std::string_view partData) { + Aws::S3::Model::CompletedPart completedPart = + uploadPartSeq(uploadState_.id, ++uploadState_.partNumber, partData); + uploadState_.completedParts.push_back(std::move(completedPart)); + }; if (uploadThreadPool_) { // If this is the last part and no parts have been uploaded yet, // use the synchronous upload method. if (isLast && uploadState_.partNumber == 0) { - uploadPartSeq(uploadState_.id, ++uploadState_.partNumber, part); + uploadCompletedPart(part); } else { uploadPartAsync(part); } } else { - uploadPartSeq(uploadState_.id, ++uploadState_.partNumber, part); + uploadCompletedPart(part); } } // Common logic for uploading a part. - void uploadPartSeq( + Aws::S3::Model::CompletedPart uploadPartSeq( const Aws::String& uploadId, const int64_t partNumber, - const std::string_view part, - bool async = false) { + const std::string_view part) { Aws::S3::Model::UploadPartRequest request; request.SetBucket(bucket_); request.SetKey(key_); @@ -276,12 +280,7 @@ class S3WriteFile::Impl { if (!result.GetChecksumCRC32().empty()) { completedPart.SetChecksumCRC32(result.GetChecksumCRC32()); } - if (async) { - std::lock_guard lock(uploadStateMutex_); - uploadState_.completedParts.push_back(std::move(completedPart)); - } else { - uploadState_.completedParts.push_back(std::move(completedPart)); - } + return completedPart; } // Upload the part asynchronously. @@ -296,7 +295,10 @@ class S3WriteFile::Impl { maxConcurrentUploadNum_->post(); }; try { - uploadPartSeq(uploadState_.id, partNumber, *partStr, true); + Aws::S3::Model::CompletedPart completedPart = + uploadPartSeq(uploadState_.id, partNumber, *partStr); + std::lock_guard lock(uploadStateMutex_); + uploadState_.completedParts.push_back(std::move(completedPart)); } catch (const std::exception& e) { LOG(ERROR) << "Exception during async upload: " << e.what(); } catch (...) { @@ -329,8 +331,8 @@ S3WriteFile::S3WriteFile( std::string_view path, Aws::S3::S3Client* client, memory::MemoryPool* pool, - std::shared_ptr asyncUploadInfo) { - impl_ = std::make_shared(path, client, pool, asyncUploadInfo); + S3Config* s3Config) { + impl_ = std::make_shared(path, client, pool, s3Config); } void S3WriteFile::append(std::string_view data) { diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h index 3f00eb9a1da..0d3febc57a8 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h @@ -18,7 +18,7 @@ #include "velox/common/file/File.h" #include "velox/common/memory/MemoryPool.h" -#include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h" +#include "velox/connectors/hive/storage_adapters/s3fs/S3Config.h" namespace Aws::S3 { class S3Client; @@ -52,7 +52,7 @@ class S3WriteFile : public WriteFile { std::string_view path, Aws::S3::S3Client* client, memory::MemoryPool* pool, - std::shared_ptr asyncUploadInfo); + S3Config* s3Config); /// Appends data to the end of the file. /// Uploads a part on reaching part size limit. diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 859762a4da3..0e7f227c24d 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -885,15 +885,15 @@ Each query can override the config by setting corresponding query session proper - If true, enables asynchronous upload of parts for S3 multipart uploads. * - hive.s3.part-upload-size - integer - - + - 10485760 - Specifies the size (in bytes) of each part for S3 multipart uploads. * - hive.s3.max-concurrent-upload-num - integer - - + - 4 - Specifies the maximum number of concurrent uploads for S3 multipart uploads. * - hive.s3.upload-threads - integer - - + - 16 - Specifies the number of threads to use for S3 multipart uploads. Bucket Level Configuration From 2a9c10b5cbba5010fe1b36710ea4441944b522c8 Mon Sep 17 00:00:00 2001 From: Xiuli Wei Date: Mon, 15 Sep 2025 09:01:32 +0000 Subject: [PATCH 6/9] Address comments --- .../storage_adapters/s3fs/S3WriteFile.cpp | 41 ++++++++----------- .../s3fs/benchmark/CMakeLists.txt | 3 +- 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp index fb83e1af111..329c3ed47f1 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp @@ -45,7 +45,7 @@ class S3WriteFile::Impl { : client_(client), pool_(pool) { VELOX_CHECK_NOT_NULL(client); VELOX_CHECK_NOT_NULL(pool); - kPartUploadSize = s3Config->partUploadSize(); + partUploadSize_ = s3Config->partUploadSize(); if (s3Config->uploadPartAsync()) { maxConcurrentUploadNum_ = std::make_unique( s3Config->maxConcurrentUploadNum()); @@ -56,12 +56,11 @@ class S3WriteFile::Impl { } } else { uploadThreadPool_ = nullptr; - maxConcurrentUploadNum_ = nullptr; } getBucketAndKeyFromPath(path, bucket_, key_); currentPart_ = std::make_unique>(*pool_); - currentPart_->reserve(kPartUploadSize); + currentPart_->reserve(partUploadSize_); // Check that the object doesn't exist, if it does throw an error. { Aws::S3::Model::HeadObjectRequest request; @@ -123,7 +122,7 @@ class S3WriteFile::Impl { // Appends data to the end of the file. void append(std::string_view data) { VELOX_CHECK(!closed(), "File is closed"); - if (data.size() + currentPart_->size() >= kPartUploadSize) { + if (data.size() + currentPart_->size() >= partUploadSize_) { upload(data); } else { // Append to current part. @@ -135,9 +134,9 @@ class S3WriteFile::Impl { // No-op. void flush() { VELOX_CHECK(!closed(), "File is closed"); - /// currentPartSize must be less than kPartUploadSize since + /// currentPartSize must be less than partUploadSize_ since /// append() would have already flushed after reaching kUploadPartSize. - VELOX_CHECK_LT(currentPart_->size(), kPartUploadSize); + VELOX_CHECK_LT(currentPart_->size(), partUploadSize_); } // Complete the multipart upload and close the file. @@ -208,8 +207,8 @@ class S3WriteFile::Impl { Aws::String id; }; - // Data can be smaller or larger than the kPartUploadSize. - // Complete the currentPart_ and upload kPartUploadSize chunks of data. + // Data can be smaller or larger than the partUploadSize_. + // Complete the currentPart_ and upload partUploadSize_ chunks of data. // Save the remaining into currentPart_. void upload(const std::string_view data) { auto dataPtr = data.data(); @@ -220,19 +219,19 @@ class S3WriteFile::Impl { uploadPart({currentPart_->data(), currentPart_->size()}); dataPtr += remainingBufferSize; dataSize -= remainingBufferSize; - while (dataSize > kPartUploadSize) { - uploadPart({dataPtr, kPartUploadSize}); - dataPtr += kPartUploadSize; - dataSize -= kPartUploadSize; + while (dataSize > partUploadSize_) { + uploadPart({dataPtr, partUploadSize_}); + dataPtr += partUploadSize_; + dataSize -= partUploadSize_; } // Stash the remaining at the beginning of currentPart. currentPart_->unsafeAppend(0, dataPtr, dataSize); } void uploadPart(const std::string_view part, bool isLast = false) { - // Only the last part can be less than kPartUploadSize. - VELOX_CHECK(isLast || (!isLast && (part.size() == kPartUploadSize))); - auto uploadCompletedPart = [&](const std::string_view partData) { + // Only the last part can be less than partUploadSize_. + VELOX_CHECK(isLast || (!isLast && (part.size() == partUploadSize_))); + auto uploadPartSync = [&](const std::string_view partData) { Aws::S3::Model::CompletedPart completedPart = uploadPartSeq(uploadState_.id, ++uploadState_.partNumber, partData); uploadState_.completedParts.push_back(std::move(completedPart)); @@ -241,12 +240,12 @@ class S3WriteFile::Impl { // If this is the last part and no parts have been uploaded yet, // use the synchronous upload method. if (isLast && uploadState_.partNumber == 0) { - uploadCompletedPart(part); + uploadPartSync(part); } else { uploadPartAsync(part); } } else { - uploadCompletedPart(part); + uploadPartSync(part); } } @@ -316,17 +315,13 @@ class S3WriteFile::Impl { UploadState uploadState_; std::mutex uploadStateMutex_; std::vector> futures_; + size_t partUploadSize_; // maxConcurrentUploadNum_ controls the concurrency of asynchronous uploads to // S3 for each S3WriteFile, preventing excessive memory usage. std::unique_ptr maxConcurrentUploadNum_; - static std::shared_ptr uploadThreadPool_; - static size_t kPartUploadSize; + inline static std::shared_ptr uploadThreadPool_; }; -std::shared_ptr - S3WriteFile::Impl::uploadThreadPool_ = nullptr; -size_t S3WriteFile::Impl::kPartUploadSize = 10 * 1024 * 1024; - S3WriteFile::S3WriteFile( std::string_view path, Aws::S3::S3Client* client, diff --git a/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt b/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt index 6d84665bb40..b5b2d7d65bc 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt @@ -25,4 +25,5 @@ target_link_libraries( GTest::gtest Folly::folly Folly::follybenchmark - GTest::gtest_main) + GTest::gtest_main +) From 9573e78819e67eded053e036f4f5ad44236fa9e6 Mon Sep 17 00:00:00 2001 From: Xiuli Wei Date: Mon, 15 Sep 2025 11:08:45 +0000 Subject: [PATCH 7/9] Address commnets --- velox/connectors/hive/storage_adapters/s3fs/S3Config.h | 10 +++++----- .../hive/storage_adapters/s3fs/S3WriteFile.cpp | 2 +- .../hive/storage_adapters/s3fs/S3WriteFile.h | 2 ++ .../s3fs/benchmark/S3FileSystemBenchmark.cpp | 9 +++++++-- .../storage_adapters/s3fs/tests/S3FileSystemTest.cpp | 2 +- velox/docs/configs.rst | 2 +- 6 files changed, 17 insertions(+), 10 deletions(-) diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3Config.h b/velox/connectors/hive/storage_adapters/s3fs/S3Config.h index 0245b255162..4bd4f592510 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3Config.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3Config.h @@ -76,7 +76,7 @@ class S3Config { kRetryMode, kUseProxyFromEnv, kCredentialsProvider, - KUploadPartAsync, + KPartUploadAsync, kPartUploadSize, KMaxConcurrentUploadNum, KUploadThreads, @@ -118,8 +118,8 @@ class S3Config { std::make_pair("use-proxy-from-env", "false")}, {Keys::kCredentialsProvider, std::make_pair("aws-credentials-provider", std::nullopt)}, - {Keys::KUploadPartAsync, - std::make_pair("upload-part-async", "false")}, + {Keys::KPartUploadAsync, + std::make_pair("part-upload-async", "false")}, {Keys::kPartUploadSize, std::make_pair("part-upload-size", "10485760")}, {Keys::KMaxConcurrentUploadNum, @@ -254,8 +254,8 @@ class S3Config { return config_.find(Keys::kCredentialsProvider)->second; } - bool uploadPartAsync() const { - auto value = config_.find(Keys::KUploadPartAsync)->second.value(); + bool partUploadAsync() const { + auto value = config_.find(Keys::KPartUploadAsync)->second.value(); return folly::to(value); } diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp index 329c3ed47f1..77cfed10554 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp @@ -46,7 +46,7 @@ class S3WriteFile::Impl { VELOX_CHECK_NOT_NULL(client); VELOX_CHECK_NOT_NULL(pool); partUploadSize_ = s3Config->partUploadSize(); - if (s3Config->uploadPartAsync()) { + if (s3Config->partUploadAsync()) { maxConcurrentUploadNum_ = std::make_unique( s3Config->maxConcurrentUploadNum()); if (!uploadThreadPool_) { diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h index 0d3febc57a8..cfd0d219b34 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h @@ -69,6 +69,8 @@ class S3WriteFile : public WriteFile { /// Return the number of parts uploaded so far. int numPartsUploaded() const; + + protected: class Impl; protected: diff --git a/velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp b/velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp index bb361ab54a3..d423c3b2ec2 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp @@ -26,6 +26,7 @@ namespace { using namespace facebook::velox::filesystems; + class S3FileSystemBenchmark { public: S3FileSystemBenchmark() { @@ -34,10 +35,12 @@ class S3FileSystemBenchmark { ioExecutor_ = std::make_unique(3); filesystems::initializeS3("Info", kLogLocation_); } + ~S3FileSystemBenchmark() { minioServer_->stop(); filesystems::finalizeS3(); } + std::unique_ptr minioServer_; std::unique_ptr ioExecutor_; std::string_view kLogLocation_ = "/tmp/foobar/"; @@ -45,9 +48,11 @@ class S3FileSystemBenchmark { std::string localPath(const char* directory) { return minioServer_->path() + "/" + directory; } + void addBucket(const char* bucket) { minioServer_->addBucket(bucket); } + void run(const std::string& name, bool enableUploadPartAsync, int32_t size_MiB) { folly::BenchmarkSuspender suspender; @@ -57,15 +62,15 @@ class S3FileSystemBenchmark { addBucket(bucketName); const auto s3File = s3URI(bucketName, file.c_str()); auto hiveConfig = minioServer_->hiveConfig( - {{"hive.s3.upload-part-async", + {{"hive.s3.part-upload-async", enableUploadPartAsync ? "true" : "false"}}); filesystems::S3FileSystem s3fs(bucketName, hiveConfig); + suspender.dismiss(); auto pool = memory::memoryManager()->addLeafPool("S3FileSystemBenchmark"); auto writeFile = s3fs.openFileForWrite(s3File, {{}, pool.get(), std::nullopt}); auto s3WriteFile = dynamic_cast(writeFile.get()); - // 1024 std::string dataContent(1024, 'a'); EXPECT_EQ(writeFile->size(), 0); diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp index 45dc86d9f5f..9dd6c686979 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp @@ -247,7 +247,7 @@ TEST_F(S3FileSystemTest, writeFileAsync) { const auto s3File = s3URI(bucketName, file); auto hiveConfig = - minioServer_->hiveConfig({{"hive.s3.upload-part-async", "true"}}); + minioServer_->hiveConfig({{"hive.s3.part-upload-async", "true"}}); filesystems::S3FileSystem s3fs(bucketName, hiveConfig); auto pool = memory::memoryManager()->addLeafPool("S3FileSystemTest"); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 0e7f227c24d..e10e0acc7b1 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -879,7 +879,7 @@ Each query can override the config by setting corresponding query session proper - - A custom credential provider, if specified, will be used to create the client in favor of other authentication mechanisms. The provider must be registered using "registerAWSCredentialsProvider" before it can be used. - * - hive.s3.upload-part-async + * - hive.s3.part-upload-async - bool - false - If true, enables asynchronous upload of parts for S3 multipart uploads. From 2faf0f44c37d930091f9828a81a604501568eff6 Mon Sep 17 00:00:00 2001 From: Xiuli Wei Date: Tue, 16 Sep 2025 08:40:10 +0000 Subject: [PATCH 8/9] Address comments --- .../hive/storage_adapters/s3fs/S3Config.h | 7 + .../storage_adapters/s3fs/S3WriteFile.cpp | 16 +- .../hive/storage_adapters/s3fs/S3WriteFile.h | 2 - .../s3fs/benchmark/CMakeLists.txt | 6 +- ...nchmark.cpp => S3AsyncUploadBenchmark.cpp} | 22 +- .../s3fs/tests/S3FileSystemTest.cpp | 227 ++++++------------ 6 files changed, 104 insertions(+), 176 deletions(-) rename velox/connectors/hive/storage_adapters/s3fs/benchmark/{S3FileSystemBenchmark.cpp => S3AsyncUploadBenchmark.cpp} (87%) diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3Config.h b/velox/connectors/hive/storage_adapters/s3fs/S3Config.h index 4bd4f592510..b8b823669fa 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3Config.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3Config.h @@ -254,21 +254,28 @@ class S3Config { return config_.find(Keys::kCredentialsProvider)->second; } + /// If true, enables asynchronous upload of parts for S3 multipart uploads, + /// false otherwise. bool partUploadAsync() const { auto value = config_.find(Keys::KPartUploadAsync)->second.value(); return folly::to(value); } + /// Return the size (in bytes) of each part for S3 multipart uploads. int32_t partUploadSize() const { auto value = config_.find(Keys::kPartUploadSize)->second.value(); return folly::to(value); } + /// Return the maximum number of concurrent uploads for S3 multipart uploads, + /// applicable only when asynchronous uploads are enabled. int32_t maxConcurrentUploadNum() const { auto value = config_.find(Keys::KMaxConcurrentUploadNum)->second.value(); return folly::to(value); } + /// Return the number of threads to use for S3 multipart uploads, + /// applicable only when asynchronous uploads are enabled. int32_t uploadThreads() const { auto value = config_.find(Keys::KUploadThreads)->second.value(); return folly::to(value); diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp index 77cfed10554..d7ea2b40ca2 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp @@ -236,16 +236,14 @@ class S3WriteFile::Impl { uploadPartSeq(uploadState_.id, ++uploadState_.partNumber, partData); uploadState_.completedParts.push_back(std::move(completedPart)); }; - if (uploadThreadPool_) { - // If this is the last part and no parts have been uploaded yet, - // use the synchronous upload method. - if (isLast && uploadState_.partNumber == 0) { - uploadPartSync(part); - } else { - uploadPartAsync(part); - } - } else { + // If this is the last part and no parts have been uploaded yet, + // use the synchronous upload method. + bool useSyncUpload = + !uploadThreadPool_ || (isLast && uploadState_.partNumber == 0); + if (useSyncUpload) { uploadPartSync(part); + } else { + uploadPartAsync(part); } } diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h index cfd0d219b34..38edb8b4750 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h @@ -72,8 +72,6 @@ class S3WriteFile : public WriteFile { protected: class Impl; - - protected: std::shared_ptr impl_; }; diff --git a/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt b/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt index b5b2d7d65bc..bf51d7fa419 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt @@ -11,11 +11,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -add_executable(velox_s3FileSystem_benchmark S3FileSystemBenchmark.cpp) +add_executable(velox_s3AsyncUpload_benchmark S3AsyncUploadBenchmark.cpp) -add_test(velox_s3FileSystem_benchmark velox_s3FileSystem_benchmark) +add_test(velox_s3AsyncUpload_benchmark velox_s3AsyncUpload_benchmark) target_link_libraries( - velox_s3FileSystem_benchmark + velox_s3AsyncUpload_benchmark velox_file velox_s3fs velox_core diff --git a/velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp b/velox/connectors/hive/storage_adapters/s3fs/benchmark/S3AsyncUploadBenchmark.cpp similarity index 87% rename from velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp rename to velox/connectors/hive/storage_adapters/s3fs/benchmark/S3AsyncUploadBenchmark.cpp index d423c3b2ec2..b848965057f 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/benchmark/S3AsyncUploadBenchmark.cpp @@ -27,16 +27,16 @@ namespace { using namespace facebook::velox::filesystems; -class S3FileSystemBenchmark { +class S3AsyncUploadBenchmark { public: - S3FileSystemBenchmark() { + S3AsyncUploadBenchmark() { minioServer_ = std::make_unique(); minioServer_->start(); ioExecutor_ = std::make_unique(3); filesystems::initializeS3("Info", kLogLocation_); } - ~S3FileSystemBenchmark() { + ~S3AsyncUploadBenchmark() { minioServer_->stop(); filesystems::finalizeS3(); } @@ -67,7 +67,7 @@ class S3FileSystemBenchmark { filesystems::S3FileSystem s3fs(bucketName, hiveConfig); suspender.dismiss(); - auto pool = memory::memoryManager()->addLeafPool("S3FileSystemBenchmark"); + auto pool = memory::memoryManager()->addLeafPool("S3AsyncUploadBenchmark"); auto writeFile = s3fs.openFileForWrite(s3File, {{}, pool.get(), std::nullopt}); auto s3WriteFile = dynamic_cast(writeFile.get()); @@ -97,14 +97,14 @@ class S3FileSystemBenchmark { } }; -auto benchmark = S3FileSystemBenchmark(); +auto benchmark = S3AsyncUploadBenchmark(); -#define DEFINE_BENCHMARKS(size) \ - BENCHMARK(non_async_upload_##size##M) { \ - benchmark.run("non_async_upload", false, size); \ - } \ - BENCHMARK_RELATIVE(async_upload_##size##M) { \ - benchmark.run("async_upload", true, size); \ +#define DEFINE_BENCHMARKS(size) \ + BENCHMARK(sync_upload_##size##M) { \ + benchmark.run("sync_upload", false, size); \ + } \ + BENCHMARK_RELATIVE(async_upload_##size##M) { \ + benchmark.run("async_upload", true, size); \ } DEFINE_BENCHMARKS(4) diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp index 9dd6c686979..eadf010cbf9 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp @@ -240,168 +240,93 @@ TEST_F(S3FileSystemTest, mkdirAndRename) { ASSERT_FALSE(s3fs.exists(s3File)); } -TEST_F(S3FileSystemTest, writeFileAsync) { - const auto bucketName = "writedata"; - const auto file = "test.txt"; - const auto filename = localPath(bucketName) + "/" + file; - const auto s3File = s3URI(bucketName, file); - - auto hiveConfig = - minioServer_->hiveConfig({{"hive.s3.part-upload-async", "true"}}); - filesystems::S3FileSystem s3fs(bucketName, hiveConfig); - - auto pool = memory::memoryManager()->addLeafPool("S3FileSystemTest"); - auto writeFile = - s3fs.openFileForWrite(s3File, {{}, pool.get(), std::nullopt}); - auto s3WriteFile = dynamic_cast(writeFile.get()); - std::string dataContent = - "Dance me to your beauty with a burning violin" - "Dance me through the panic till I'm gathered safely in" - "Lift me like an olive branch and be my homeward dove" - "Dance me to the end of love"; - - EXPECT_EQ(writeFile->size(), 0); - std::int64_t contentSize = dataContent.length(); - // dataContent length is 178. - EXPECT_EQ(contentSize, 178); - - // Append and flush a small batch of data. - writeFile->append(dataContent.substr(0, 10)); - EXPECT_EQ(writeFile->size(), 10); - writeFile->append(dataContent.substr(10, contentSize - 10)); - EXPECT_EQ(writeFile->size(), contentSize); - writeFile->flush(); - // No parts must have been uploaded. - EXPECT_EQ(s3WriteFile->numPartsUploaded(), 0); - - // Append data 178 * 100'000 ~ 16MiB. - // Should have 1 part in total with kUploadPartSize = 10MiB. - for (int i = 0; i < 100'000; ++i) { - writeFile->append(dataContent); - } - EXPECT_EQ(s3WriteFile->numPartsUploaded(), 1); - EXPECT_EQ(writeFile->size(), 100'001 * contentSize); - - // Append a large data buffer 178 * 150'000 ~ 25MiB (2 parts). - std::vector largeBuffer(contentSize * 150'000); - for (int i = 0; i < 150'000; ++i) { - memcpy( - largeBuffer.data() + (i * contentSize), - dataContent.data(), - contentSize); - } - - writeFile->append({largeBuffer.data(), largeBuffer.size()}); - EXPECT_EQ(writeFile->size(), 250'001 * contentSize); - // Total data = ~41 MB = 5 parts. - // But parts uploaded will be 4. - EXPECT_EQ(s3WriteFile->numPartsUploaded(), 4); - - // Upload the last part. - writeFile->close(); - EXPECT_EQ(s3WriteFile->numPartsUploaded(), 5); - - VELOX_ASSERT_THROW( - writeFile->append(dataContent.substr(0, 10)), "File is closed"); - - auto readFile = s3fs.openFileForRead(s3File); - ASSERT_EQ(readFile->size(), contentSize * 250'001); - // Sample and verify every 1'000 dataContent chunks. - for (int i = 0; i < 250; ++i) { - ASSERT_EQ( - readFile->pread(i * (1'000 * contentSize), contentSize), dataContent); - } - // Verify the last chunk. - ASSERT_EQ(readFile->pread(contentSize * 250'000, contentSize), dataContent); - - // Verify the S3 list function. - auto result = s3fs.list(s3File); - - ASSERT_EQ(result.size(), 1); - ASSERT_TRUE(result[0] == file); - - ASSERT_TRUE(s3fs.exists(s3File)); -} - TEST_F(S3FileSystemTest, writeFileAndRead) { const auto bucketName = "writedata"; const auto file = "test.txt"; const auto filename = localPath(bucketName) + "/" + file; const auto s3File = s3URI(bucketName, file); - - auto hiveConfig = minioServer_->hiveConfig(); - filesystems::S3FileSystem s3fs(bucketName, hiveConfig); auto pool = memory::memoryManager()->addLeafPool("S3FileSystemTest"); - auto writeFile = - s3fs.openFileForWrite(s3File, {{}, pool.get(), std::nullopt}); - auto s3WriteFile = dynamic_cast(writeFile.get()); - std::string dataContent = - "Dance me to your beauty with a burning violin" - "Dance me through the panic till I'm gathered safely in" - "Lift me like an olive branch and be my homeward dove" - "Dance me to the end of love"; - - EXPECT_EQ(writeFile->size(), 0); - std::int64_t contentSize = dataContent.length(); - // dataContent length is 178. - EXPECT_EQ(contentSize, 178); - - // Append and flush a small batch of data. - writeFile->append(dataContent.substr(0, 10)); - EXPECT_EQ(writeFile->size(), 10); - writeFile->append(dataContent.substr(10, contentSize - 10)); - EXPECT_EQ(writeFile->size(), contentSize); - writeFile->flush(); - // No parts must have been uploaded. - EXPECT_EQ(s3WriteFile->numPartsUploaded(), 0); - - // Append data 178 * 100'000 ~ 16MiB. - // Should have 1 part in total with kUploadPartSize = 10MiB. - for (int i = 0; i < 100'000; ++i) { - writeFile->append(dataContent); - } - EXPECT_EQ(s3WriteFile->numPartsUploaded(), 1); - EXPECT_EQ(writeFile->size(), 100'001 * contentSize); - - // Append a large data buffer 178 * 150'000 ~ 25MiB (2 parts). - std::vector largeBuffer(contentSize * 150'000); - for (int i = 0; i < 150'000; ++i) { - memcpy( - largeBuffer.data() + (i * contentSize), - dataContent.data(), - contentSize); - } + auto uploadPart = [&](bool uploadPartAsync) { + auto hiveConfig = minioServer_->hiveConfig(); + auto hiveConfig = minioServer_->hiveConfig( + {{"hive.s3.part-upload-async", uploadPartAsync ? "true" : "false"}}); + filesystems::S3FileSystem s3fs(bucketName, hiveConfig); + filesystems::S3FileSystem s3fs(bucketName, hiveConfig); + auto writeFile = + s3fs.openFileForWrite(s3File, {{}, pool.get(), std::nullopt}); + auto s3WriteFile = dynamic_cast(writeFile.get()); + std::string dataContent = + "Dance me to your beauty with a burning violin" + "Dance me through the panic till I'm gathered safely in" + "Lift me like an olive branch and be my homeward dove" + "Dance me to the end of love"; + + EXPECT_EQ(writeFile->size(), 0); + std::int64_t contentSize = dataContent.length(); + // dataContent length is 178. + EXPECT_EQ(contentSize, 178); + + // Append and flush a small batch of data. + writeFile->append(dataContent.substr(0, 10)); + EXPECT_EQ(writeFile->size(), 10); + writeFile->append(dataContent.substr(10, contentSize - 10)); + EXPECT_EQ(writeFile->size(), contentSize); + writeFile->flush(); + // No parts must have been uploaded. + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 0); + + // Append data 178 * 100'000 ~ 16MiB. + // Should have 1 part in total with kUploadPartSize = 10MiB. + for (int i = 0; i < 100'000; ++i) { + writeFile->append(dataContent); + } + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 1); + EXPECT_EQ(writeFile->size(), 100'001 * contentSize); + + // Append a large data buffer 178 * 150'000 ~ 25MiB (2 parts). + std::vector largeBuffer(contentSize * 150'000); + for (int i = 0; i < 150'000; ++i) { + memcpy( + largeBuffer.data() + (i * contentSize), + dataContent.data(), + contentSize); + } + + writeFile->append({largeBuffer.data(), largeBuffer.size()}); + EXPECT_EQ(writeFile->size(), 250'001 * contentSize); + // Total data = ~41 MB = 5 parts. + // But parts uploaded will be 4. + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 4); + + // Upload the last part. + writeFile->close(); + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 5); - writeFile->append({largeBuffer.data(), largeBuffer.size()}); - EXPECT_EQ(writeFile->size(), 250'001 * contentSize); - // Total data = ~41 MB = 5 parts. - // But parts uploaded will be 4. - EXPECT_EQ(s3WriteFile->numPartsUploaded(), 4); + VELOX_ASSERT_THROW( + writeFile->append(dataContent.substr(0, 10)), "File is closed"); - // Upload the last part. - writeFile->close(); - EXPECT_EQ(s3WriteFile->numPartsUploaded(), 5); + auto readFile = s3fs.openFileForRead(s3File); + ASSERT_EQ(readFile->size(), contentSize * 250'001); + // Sample and verify every 1'000 dataContent chunks. + for (int i = 0; i < 250; ++i) { + ASSERT_EQ( + readFile->pread(i * (1'000 * contentSize), contentSize), dataContent); + } + // Verify the last chunk. + ASSERT_EQ(readFile->pread(contentSize * 250'000, contentSize), dataContent); - VELOX_ASSERT_THROW( - writeFile->append(dataContent.substr(0, 10)), "File is closed"); + // Verify the S3 list function. + auto result = s3fs.list(s3File); - auto readFile = s3fs.openFileForRead(s3File); - ASSERT_EQ(readFile->size(), contentSize * 250'001); - // Sample and verify every 1'000 dataContent chunks. - for (int i = 0; i < 250; ++i) { - ASSERT_EQ( - readFile->pread(i * (1'000 * contentSize), contentSize), dataContent); - } - // Verify the last chunk. - ASSERT_EQ(readFile->pread(contentSize * 250'000, contentSize), dataContent); + ASSERT_EQ(result.size(), 1); + ASSERT_TRUE(result[0] == file); - // Verify the S3 list function. - auto result = s3fs.list(s3File); - - ASSERT_EQ(result.size(), 1); - ASSERT_TRUE(result[0] == file); - - ASSERT_TRUE(s3fs.exists(s3File)); + ASSERT_TRUE(s3fs.exists(s3File)); + }; + // Upload parts synchronously. + uploadPart(false); + // Upload parts asynchronously. + uploadPart(true); } TEST_F(S3FileSystemTest, invalidConnectionSettings) { From 413eb736127461a0af68313091e843fb2198a6c6 Mon Sep 17 00:00:00 2001 From: Xiuli Wei Date: Wed, 17 Sep 2025 07:18:54 +0000 Subject: [PATCH 9/9] Address comments --- .../hive/storage_adapters/s3fs/benchmark/CMakeLists.txt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt b/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt index bf51d7fa419..1bd3a7a83d7 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt @@ -11,11 +11,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -add_executable(velox_s3AsyncUpload_benchmark S3AsyncUploadBenchmark.cpp) +add_executable(velox_s3asyncupload_benchmark S3AsyncUploadBenchmark.cpp) -add_test(velox_s3AsyncUpload_benchmark velox_s3AsyncUpload_benchmark) +add_test(velox_s3asyncupload_benchmark velox_s3asyncupload_benchmark) target_link_libraries( - velox_s3AsyncUpload_benchmark + velox_s3asyncupload_benchmark velox_file velox_s3fs velox_core