diff --git a/velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt b/velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt index 741f01a61b3..1fb33117903 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt @@ -27,4 +27,7 @@ if(VELOX_ENABLE_S3) if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) endif() + if(${VELOX_ENABLE_BENCHMARKS}) + add_subdirectory(benchmark) + endif() endif() diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3Config.h b/velox/connectors/hive/storage_adapters/s3fs/S3Config.h index 4fad4379925..b8b823669fa 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3Config.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3Config.h @@ -76,6 +76,10 @@ class S3Config { kRetryMode, kUseProxyFromEnv, kCredentialsProvider, + KPartUploadAsync, + kPartUploadSize, + KMaxConcurrentUploadNum, + KUploadThreads, kEnd }; @@ -114,6 +118,13 @@ class S3Config { std::make_pair("use-proxy-from-env", "false")}, {Keys::kCredentialsProvider, std::make_pair("aws-credentials-provider", std::nullopt)}, + {Keys::KPartUploadAsync, + std::make_pair("part-upload-async", "false")}, + {Keys::kPartUploadSize, + std::make_pair("part-upload-size", "10485760")}, + {Keys::KMaxConcurrentUploadNum, + std::make_pair("max-concurrent-upload-num", "4")}, + {Keys::KUploadThreads, std::make_pair("upload-threads", "16")}, }; return config; } @@ -243,6 +254,33 @@ class S3Config { return config_.find(Keys::kCredentialsProvider)->second; } + /// If true, enables asynchronous upload of parts for S3 multipart uploads, + /// false otherwise. + bool partUploadAsync() const { + auto value = config_.find(Keys::KPartUploadAsync)->second.value(); + return folly::to(value); + } + + /// Return the size (in bytes) of each part for S3 multipart uploads. + int32_t partUploadSize() const { + auto value = config_.find(Keys::kPartUploadSize)->second.value(); + return folly::to(value); + } + + /// Return the maximum number of concurrent uploads for S3 multipart uploads, + /// applicable only when asynchronous uploads are enabled. + int32_t maxConcurrentUploadNum() const { + auto value = config_.find(Keys::KMaxConcurrentUploadNum)->second.value(); + return folly::to(value); + } + + /// Return the number of threads to use for S3 multipart uploads, + /// applicable only when asynchronous uploads are enabled. + int32_t uploadThreads() const { + auto value = config_.find(Keys::KUploadThreads)->second.value(); + return folly::to(value); + } + private: std::unordered_map> config_; std::string payloadSigningPolicy_; diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp index cdc456ee94b..02b8a63fb5c 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp @@ -294,6 +294,7 @@ class S3FileSystem::Impl { auto credentialsProvider = getCredentialsProvider(s3Config); + s3Config_ = std::make_shared(s3Config); client_ = std::make_shared( credentialsProvider, nullptr /* endpointProvider */, clientConfig); ++fileSystemCount; @@ -433,6 +434,10 @@ class S3FileSystem::Impl { return client_.get(); } + S3Config* s3Config() const { + return s3Config_.get(); + } + std::string getLogLevelName() const { return getAwsInstance()->getLogLevelName(); } @@ -443,6 +448,7 @@ class S3FileSystem::Impl { private: std::shared_ptr client_; + std::shared_ptr s3Config_; }; S3FileSystem::S3FileSystem( @@ -474,8 +480,8 @@ std::unique_ptr S3FileSystem::openFileForWrite( std::string_view s3Path, const FileOptions& options) { const auto path = getPath(s3Path); - auto s3file = - std::make_unique(path, impl_->s3Client(), options.pool); + auto s3file = std::make_unique( + path, impl_->s3Client(), options.pool, impl_->s3Config()); return s3file; } diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp index fcccfe240ab..d7ea2b40ca2 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp @@ -15,6 +15,8 @@ */ #include "velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h" +#include +#include #include "velox/common/base/StatsReporter.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3Counters.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h" @@ -38,13 +40,27 @@ class S3WriteFile::Impl { explicit Impl( std::string_view path, Aws::S3::S3Client* client, - memory::MemoryPool* pool) + memory::MemoryPool* pool, + S3Config* s3Config) : client_(client), pool_(pool) { VELOX_CHECK_NOT_NULL(client); VELOX_CHECK_NOT_NULL(pool); + partUploadSize_ = s3Config->partUploadSize(); + if (s3Config->partUploadAsync()) { + maxConcurrentUploadNum_ = std::make_unique( + s3Config->maxConcurrentUploadNum()); + if (!uploadThreadPool_) { + uploadThreadPool_ = std::make_shared( + s3Config->uploadThreads(), + std::make_shared("upload-thread")); + } + } else { + uploadThreadPool_ = nullptr; + } + getBucketAndKeyFromPath(path, bucket_, key_); currentPart_ = std::make_unique>(*pool_); - currentPart_->reserve(kPartUploadSize); + currentPart_->reserve(partUploadSize_); // Check that the object doesn't exist, if it does throw an error. { Aws::S3::Model::HeadObjectRequest request; @@ -106,7 +122,7 @@ class S3WriteFile::Impl { // Appends data to the end of the file. void append(std::string_view data) { VELOX_CHECK(!closed(), "File is closed"); - if (data.size() + currentPart_->size() >= kPartUploadSize) { + if (data.size() + currentPart_->size() >= partUploadSize_) { upload(data); } else { // Append to current part. @@ -118,9 +134,9 @@ class S3WriteFile::Impl { // No-op. void flush() { VELOX_CHECK(!closed(), "File is closed"); - /// currentPartSize must be less than kPartUploadSize since + /// currentPartSize must be less than partUploadSize_ since /// append() would have already flushed after reaching kUploadPartSize. - VELOX_CHECK_LT(currentPart_->size(), kPartUploadSize); + VELOX_CHECK_LT(currentPart_->size(), partUploadSize_); } // Complete the multipart upload and close the file. @@ -130,6 +146,20 @@ class S3WriteFile::Impl { } RECORD_METRIC_VALUE(kMetricS3StartedUploads); uploadPart({currentPart_->data(), currentPart_->size()}, true); + if (uploadThreadPool_) { + if (futures_.size() > 0) { + folly::collectAll(std::move(futures_)).get(); + } + // The list of parts should be in ascending order. + std::sort( + uploadState_.completedParts.begin(), + uploadState_.completedParts.end(), + [](const Aws::S3::Model::CompletedPart& a, + const Aws::S3::Model::CompletedPart& b) { + return a.GetPartNumber() < b.GetPartNumber(); + }); + } + VELOX_CHECK_EQ(uploadState_.partNumber, uploadState_.completedParts.size()); // Complete the multipart upload. { @@ -163,7 +193,6 @@ class S3WriteFile::Impl { } private: - static constexpr int64_t kPartUploadSize = 10 * 1024 * 1024; static constexpr const char* kApplicationOctetStream = "application/octet-stream"; @@ -177,10 +206,9 @@ class S3WriteFile::Impl { int64_t partNumber = 0; Aws::String id; }; - UploadState uploadState_; - // Data can be smaller or larger than the kPartUploadSize. - // Complete the currentPart_ and upload kPartUploadSize chunks of data. + // Data can be smaller or larger than the partUploadSize_. + // Complete the currentPart_ and upload partUploadSize_ chunks of data. // Save the remaining into currentPart_. void upload(const std::string_view data) { auto dataPtr = data.data(); @@ -191,63 +219,113 @@ class S3WriteFile::Impl { uploadPart({currentPart_->data(), currentPart_->size()}); dataPtr += remainingBufferSize; dataSize -= remainingBufferSize; - while (dataSize > kPartUploadSize) { - uploadPart({dataPtr, kPartUploadSize}); - dataPtr += kPartUploadSize; - dataSize -= kPartUploadSize; + while (dataSize > partUploadSize_) { + uploadPart({dataPtr, partUploadSize_}); + dataPtr += partUploadSize_; + dataSize -= partUploadSize_; } // Stash the remaining at the beginning of currentPart. currentPart_->unsafeAppend(0, dataPtr, dataSize); } void uploadPart(const std::string_view part, bool isLast = false) { - // Only the last part can be less than kPartUploadSize. - VELOX_CHECK(isLast || (!isLast && (part.size() == kPartUploadSize))); - // Upload the part. - { - Aws::S3::Model::UploadPartRequest request; - request.SetBucket(bucket_); - request.SetKey(key_); - request.SetUploadId(uploadState_.id); - request.SetPartNumber(++uploadState_.partNumber); - request.SetContentLength(part.size()); - request.SetBody( - std::make_shared(part.data(), part.size())); - // The default algorithm used is MD5. However, MD5 is not supported with - // fips and can cause a SIGSEGV. Set CRC32 instead which is a standard for - // checksum computation and is not restricted by fips. - request.SetChecksumAlgorithm(Aws::S3::Model::ChecksumAlgorithm::CRC32); - auto outcome = client_->UploadPart(request); - VELOX_CHECK_AWS_OUTCOME(outcome, "Failed to upload", bucket_, key_); - // Append ETag and part number for this uploaded part. - // This will be needed for upload completion in Close(). - auto result = outcome.GetResult(); - Aws::S3::Model::CompletedPart part; - - part.SetPartNumber(uploadState_.partNumber); - part.SetETag(result.GetETag()); - // Don't add the checksum to the part if the checksum is empty. - // Some filesystems such as IBM COS require this to be not set. - if (!result.GetChecksumCRC32().empty()) { - part.SetChecksumCRC32(result.GetChecksumCRC32()); - } - uploadState_.completedParts.push_back(std::move(part)); + // Only the last part can be less than partUploadSize_. + VELOX_CHECK(isLast || (!isLast && (part.size() == partUploadSize_))); + auto uploadPartSync = [&](const std::string_view partData) { + Aws::S3::Model::CompletedPart completedPart = + uploadPartSeq(uploadState_.id, ++uploadState_.partNumber, partData); + uploadState_.completedParts.push_back(std::move(completedPart)); + }; + // If this is the last part and no parts have been uploaded yet, + // use the synchronous upload method. + bool useSyncUpload = + !uploadThreadPool_ || (isLast && uploadState_.partNumber == 0); + if (useSyncUpload) { + uploadPartSync(part); + } else { + uploadPartAsync(part); } } + // Common logic for uploading a part. + Aws::S3::Model::CompletedPart uploadPartSeq( + const Aws::String& uploadId, + const int64_t partNumber, + const std::string_view part) { + Aws::S3::Model::UploadPartRequest request; + request.SetBucket(bucket_); + request.SetKey(key_); + request.SetUploadId(uploadId); + request.SetPartNumber(partNumber); + request.SetContentLength(part.size()); + request.SetBody( + std::make_shared(part.data(), part.size())); + // The default algorithm used is MD5. However, MD5 is not supported with + // fips and can cause a SIGSEGV. Set CRC32 instead which is a standard for + // checksum computation and is not restricted by fips. + request.SetChecksumAlgorithm(Aws::S3::Model::ChecksumAlgorithm::CRC32); + auto outcome = client_->UploadPart(request); + VELOX_CHECK_AWS_OUTCOME(outcome, "Failed to upload", bucket_, key_); + // Append ETag and part number for this uploaded part. + // This will be needed for upload completion in Close(). + auto result = outcome.GetResult(); + Aws::S3::Model::CompletedPart completedPart; + completedPart.SetPartNumber(partNumber); + completedPart.SetETag(result.GetETag()); + // Don't add the checksum to the part if the checksum is empty. + // Some filesystems such as IBM COS require this to be not set. + if (!result.GetChecksumCRC32().empty()) { + completedPart.SetChecksumCRC32(result.GetChecksumCRC32()); + } + return completedPart; + } + + // Upload the part asynchronously. + void uploadPartAsync(const std::string_view part) { + maxConcurrentUploadNum_->wait(); + const int64_t partNumber = ++uploadState_.partNumber; + std::shared_ptr partStr = + std::make_shared(part.data(), part.size()); + futures_.emplace_back( + folly::via(uploadThreadPool_.get(), [this, partNumber, partStr]() { + SCOPE_EXIT { + maxConcurrentUploadNum_->post(); + }; + try { + Aws::S3::Model::CompletedPart completedPart = + uploadPartSeq(uploadState_.id, partNumber, *partStr); + std::lock_guard lock(uploadStateMutex_); + uploadState_.completedParts.push_back(std::move(completedPart)); + } catch (const std::exception& e) { + LOG(ERROR) << "Exception during async upload: " << e.what(); + } catch (...) { + LOG(ERROR) << "Unknown exception during async upload."; + } + })); + } + Aws::S3::S3Client* client_; memory::MemoryPool* pool_; std::unique_ptr> currentPart_; std::string bucket_; std::string key_; size_t fileSize_ = -1; + UploadState uploadState_; + std::mutex uploadStateMutex_; + std::vector> futures_; + size_t partUploadSize_; + // maxConcurrentUploadNum_ controls the concurrency of asynchronous uploads to + // S3 for each S3WriteFile, preventing excessive memory usage. + std::unique_ptr maxConcurrentUploadNum_; + inline static std::shared_ptr uploadThreadPool_; }; S3WriteFile::S3WriteFile( std::string_view path, Aws::S3::S3Client* client, - memory::MemoryPool* pool) { - impl_ = std::make_shared(path, client, pool); + memory::MemoryPool* pool, + S3Config* s3Config) { + impl_ = std::make_shared(path, client, pool, s3Config); } void S3WriteFile::append(std::string_view data) { diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h index 929eed20c37..38edb8b4750 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h @@ -18,6 +18,7 @@ #include "velox/common/file/File.h" #include "velox/common/memory/MemoryPool.h" +#include "velox/connectors/hive/storage_adapters/s3fs/S3Config.h" namespace Aws::S3 { class S3Client; @@ -50,7 +51,8 @@ class S3WriteFile : public WriteFile { S3WriteFile( std::string_view path, Aws::S3::S3Client* client, - memory::MemoryPool* pool); + memory::MemoryPool* pool, + S3Config* s3Config); /// Appends data to the end of the file. /// Uploads a part on reaching part size limit. diff --git a/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt b/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt new file mode 100644 index 00000000000..1bd3a7a83d7 --- /dev/null +++ b/velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt @@ -0,0 +1,29 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +add_executable(velox_s3asyncupload_benchmark S3AsyncUploadBenchmark.cpp) + +add_test(velox_s3asyncupload_benchmark velox_s3asyncupload_benchmark) +target_link_libraries( + velox_s3asyncupload_benchmark + velox_file + velox_s3fs + velox_core + velox_exec_test_lib + velox_dwio_common_exception + velox_exec + GTest::gtest + Folly::folly + Folly::follybenchmark + GTest::gtest_main +) diff --git a/velox/connectors/hive/storage_adapters/s3fs/benchmark/S3AsyncUploadBenchmark.cpp b/velox/connectors/hive/storage_adapters/s3fs/benchmark/S3AsyncUploadBenchmark.cpp new file mode 100644 index 00000000000..b848965057f --- /dev/null +++ b/velox/connectors/hive/storage_adapters/s3fs/benchmark/S3AsyncUploadBenchmark.cpp @@ -0,0 +1,128 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h" +#include "velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h" +#include "velox/connectors/hive/storage_adapters/s3fs/tests/S3Test.h" +#include "velox/functions/lib/benchmarks/FunctionBenchmarkBase.h" + +#include + +namespace { +using namespace facebook::velox::filesystems; + +class S3AsyncUploadBenchmark { + public: + S3AsyncUploadBenchmark() { + minioServer_ = std::make_unique(); + minioServer_->start(); + ioExecutor_ = std::make_unique(3); + filesystems::initializeS3("Info", kLogLocation_); + } + + ~S3AsyncUploadBenchmark() { + minioServer_->stop(); + filesystems::finalizeS3(); + } + + std::unique_ptr minioServer_; + std::unique_ptr ioExecutor_; + std::string_view kLogLocation_ = "/tmp/foobar/"; + + std::string localPath(const char* directory) { + return minioServer_->path() + "/" + directory; + } + + void addBucket(const char* bucket) { + minioServer_->addBucket(bucket); + } + + void + run(const std::string& name, bool enableUploadPartAsync, int32_t size_MiB) { + folly::BenchmarkSuspender suspender; + const auto bucketName = "writedata"; + const auto file = fmt::format("test_{}_{}.txt", name, size_MiB); + const auto filename = localPath(bucketName) + "/" + file.c_str(); + addBucket(bucketName); + const auto s3File = s3URI(bucketName, file.c_str()); + auto hiveConfig = minioServer_->hiveConfig( + {{"hive.s3.part-upload-async", + enableUploadPartAsync ? "true" : "false"}}); + filesystems::S3FileSystem s3fs(bucketName, hiveConfig); + + suspender.dismiss(); + auto pool = memory::memoryManager()->addLeafPool("S3AsyncUploadBenchmark"); + auto writeFile = + s3fs.openFileForWrite(s3File, {{}, pool.get(), std::nullopt}); + auto s3WriteFile = dynamic_cast(writeFile.get()); + std::string dataContent(1024, 'a'); + + EXPECT_EQ(writeFile->size(), 0); + std::int64_t contentSize = dataContent.length(); + // dataContent length is 1024. + EXPECT_EQ(contentSize, 1024); + + // Append and flush a small batch of data. + writeFile->append(dataContent.substr(0, 10)); + EXPECT_EQ(writeFile->size(), 10); + writeFile->append(dataContent.substr(10, contentSize - 10)); + EXPECT_EQ(writeFile->size(), contentSize); + writeFile->flush(); + // No parts must have been uploaded. + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 0); + + // Append data + for (int i = 0; i < 1024 * size_MiB - 1; ++i) { + writeFile->append(dataContent); + } + writeFile->close(); + EXPECT_EQ( + writeFile->size(), static_cast(1024) * 1024 * size_MiB); + } +}; + +auto benchmark = S3AsyncUploadBenchmark(); + +#define DEFINE_BENCHMARKS(size) \ + BENCHMARK(sync_upload_##size##M) { \ + benchmark.run("sync_upload", false, size); \ + } \ + BENCHMARK_RELATIVE(async_upload_##size##M) { \ + benchmark.run("async_upload", true, size); \ + } + +DEFINE_BENCHMARKS(4) +DEFINE_BENCHMARKS(8) +DEFINE_BENCHMARKS(16) +DEFINE_BENCHMARKS(32) +DEFINE_BENCHMARKS(64) +DEFINE_BENCHMARKS(128) +DEFINE_BENCHMARKS(256) +DEFINE_BENCHMARKS(512) +DEFINE_BENCHMARKS(1024) +DEFINE_BENCHMARKS(2048) +} // namespace + +int main(int argc, char** argv) { + folly::Init init{&argc, &argv}; + facebook::velox::memory::MemoryManager::initialize( + facebook::velox::memory::MemoryManager::Options{}); + folly::runBenchmarks(); + return 0; +} diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp index 3fea62365bb..eadf010cbf9 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp @@ -245,80 +245,88 @@ TEST_F(S3FileSystemTest, writeFileAndRead) { const auto file = "test.txt"; const auto filename = localPath(bucketName) + "/" + file; const auto s3File = s3URI(bucketName, file); - - auto hiveConfig = minioServer_->hiveConfig(); - filesystems::S3FileSystem s3fs(bucketName, hiveConfig); auto pool = memory::memoryManager()->addLeafPool("S3FileSystemTest"); - auto writeFile = - s3fs.openFileForWrite(s3File, {{}, pool.get(), std::nullopt}); - auto s3WriteFile = dynamic_cast(writeFile.get()); - std::string dataContent = - "Dance me to your beauty with a burning violin" - "Dance me through the panic till I'm gathered safely in" - "Lift me like an olive branch and be my homeward dove" - "Dance me to the end of love"; - - EXPECT_EQ(writeFile->size(), 0); - std::int64_t contentSize = dataContent.length(); - // dataContent length is 178. - EXPECT_EQ(contentSize, 178); - - // Append and flush a small batch of data. - writeFile->append(dataContent.substr(0, 10)); - EXPECT_EQ(writeFile->size(), 10); - writeFile->append(dataContent.substr(10, contentSize - 10)); - EXPECT_EQ(writeFile->size(), contentSize); - writeFile->flush(); - // No parts must have been uploaded. - EXPECT_EQ(s3WriteFile->numPartsUploaded(), 0); - - // Append data 178 * 100'000 ~ 16MiB. - // Should have 1 part in total with kUploadPartSize = 10MiB. - for (int i = 0; i < 100'000; ++i) { - writeFile->append(dataContent); - } - EXPECT_EQ(s3WriteFile->numPartsUploaded(), 1); - EXPECT_EQ(writeFile->size(), 100'001 * contentSize); - - // Append a large data buffer 178 * 150'000 ~ 25MiB (2 parts). - std::vector largeBuffer(contentSize * 150'000); - for (int i = 0; i < 150'000; ++i) { - memcpy( - largeBuffer.data() + (i * contentSize), - dataContent.data(), - contentSize); - } - - writeFile->append({largeBuffer.data(), largeBuffer.size()}); - EXPECT_EQ(writeFile->size(), 250'001 * contentSize); - // Total data = ~41 MB = 5 parts. - // But parts uploaded will be 4. - EXPECT_EQ(s3WriteFile->numPartsUploaded(), 4); - - // Upload the last part. - writeFile->close(); - EXPECT_EQ(s3WriteFile->numPartsUploaded(), 5); + auto uploadPart = [&](bool uploadPartAsync) { + auto hiveConfig = minioServer_->hiveConfig(); + auto hiveConfig = minioServer_->hiveConfig( + {{"hive.s3.part-upload-async", uploadPartAsync ? "true" : "false"}}); + filesystems::S3FileSystem s3fs(bucketName, hiveConfig); + filesystems::S3FileSystem s3fs(bucketName, hiveConfig); + auto writeFile = + s3fs.openFileForWrite(s3File, {{}, pool.get(), std::nullopt}); + auto s3WriteFile = dynamic_cast(writeFile.get()); + std::string dataContent = + "Dance me to your beauty with a burning violin" + "Dance me through the panic till I'm gathered safely in" + "Lift me like an olive branch and be my homeward dove" + "Dance me to the end of love"; + + EXPECT_EQ(writeFile->size(), 0); + std::int64_t contentSize = dataContent.length(); + // dataContent length is 178. + EXPECT_EQ(contentSize, 178); + + // Append and flush a small batch of data. + writeFile->append(dataContent.substr(0, 10)); + EXPECT_EQ(writeFile->size(), 10); + writeFile->append(dataContent.substr(10, contentSize - 10)); + EXPECT_EQ(writeFile->size(), contentSize); + writeFile->flush(); + // No parts must have been uploaded. + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 0); + + // Append data 178 * 100'000 ~ 16MiB. + // Should have 1 part in total with kUploadPartSize = 10MiB. + for (int i = 0; i < 100'000; ++i) { + writeFile->append(dataContent); + } + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 1); + EXPECT_EQ(writeFile->size(), 100'001 * contentSize); + + // Append a large data buffer 178 * 150'000 ~ 25MiB (2 parts). + std::vector largeBuffer(contentSize * 150'000); + for (int i = 0; i < 150'000; ++i) { + memcpy( + largeBuffer.data() + (i * contentSize), + dataContent.data(), + contentSize); + } + + writeFile->append({largeBuffer.data(), largeBuffer.size()}); + EXPECT_EQ(writeFile->size(), 250'001 * contentSize); + // Total data = ~41 MB = 5 parts. + // But parts uploaded will be 4. + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 4); + + // Upload the last part. + writeFile->close(); + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 5); - VELOX_ASSERT_THROW( - writeFile->append(dataContent.substr(0, 10)), "File is closed"); + VELOX_ASSERT_THROW( + writeFile->append(dataContent.substr(0, 10)), "File is closed"); - auto readFile = s3fs.openFileForRead(s3File); - ASSERT_EQ(readFile->size(), contentSize * 250'001); - // Sample and verify every 1'000 dataContent chunks. - for (int i = 0; i < 250; ++i) { - ASSERT_EQ( - readFile->pread(i * (1'000 * contentSize), contentSize), dataContent); - } - // Verify the last chunk. - ASSERT_EQ(readFile->pread(contentSize * 250'000, contentSize), dataContent); + auto readFile = s3fs.openFileForRead(s3File); + ASSERT_EQ(readFile->size(), contentSize * 250'001); + // Sample and verify every 1'000 dataContent chunks. + for (int i = 0; i < 250; ++i) { + ASSERT_EQ( + readFile->pread(i * (1'000 * contentSize), contentSize), dataContent); + } + // Verify the last chunk. + ASSERT_EQ(readFile->pread(contentSize * 250'000, contentSize), dataContent); - // Verify the S3 list function. - auto result = s3fs.list(s3File); + // Verify the S3 list function. + auto result = s3fs.list(s3File); - ASSERT_EQ(result.size(), 1); - ASSERT_TRUE(result[0] == file); + ASSERT_EQ(result.size(), 1); + ASSERT_TRUE(result[0] == file); - ASSERT_TRUE(s3fs.exists(s3File)); + ASSERT_TRUE(s3fs.exists(s3File)); + }; + // Upload parts synchronously. + uploadPart(false); + // Upload parts asynchronously. + uploadPart(true); } TEST_F(S3FileSystemTest, invalidConnectionSettings) { diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 21eebecf8df..e10e0acc7b1 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -879,6 +879,22 @@ Each query can override the config by setting corresponding query session proper - - A custom credential provider, if specified, will be used to create the client in favor of other authentication mechanisms. The provider must be registered using "registerAWSCredentialsProvider" before it can be used. + * - hive.s3.part-upload-async + - bool + - false + - If true, enables asynchronous upload of parts for S3 multipart uploads. + * - hive.s3.part-upload-size + - integer + - 10485760 + - Specifies the size (in bytes) of each part for S3 multipart uploads. + * - hive.s3.max-concurrent-upload-num + - integer + - 4 + - Specifies the maximum number of concurrent uploads for S3 multipart uploads. + * - hive.s3.upload-threads + - integer + - 16 + - Specifies the number of threads to use for S3 multipart uploads. Bucket Level Configuration """"""""""""""""""""""""""