diff --git a/bolt/connectors/hive/storage_adapters/s3fs/CMakeLists.txt b/bolt/connectors/hive/storage_adapters/s3fs/CMakeLists.txt index 9ca8673a5..1aa24388b 100644 --- a/bolt/connectors/hive/storage_adapters/s3fs/CMakeLists.txt +++ b/bolt/connectors/hive/storage_adapters/s3fs/CMakeLists.txt @@ -39,4 +39,7 @@ if(BOLT_ENABLE_S3) if(${BOLT_BUILD_TESTING}) add_subdirectory(tests) endif() + if(${BOLT_BUILD_BENCHMARKS}) + add_subdirectory(benchmark) + endif() endif() diff --git a/bolt/connectors/hive/storage_adapters/s3fs/S3Config.h b/bolt/connectors/hive/storage_adapters/s3fs/S3Config.h index 888897623..8aff97aeb 100644 --- a/bolt/connectors/hive/storage_adapters/s3fs/S3Config.h +++ b/bolt/connectors/hive/storage_adapters/s3fs/S3Config.h @@ -91,6 +91,10 @@ class S3Config { kRetryMode, kUseProxyFromEnv, kCredentialsProvider, + kPartUploadAsync, + kPartUploadSize, + kMaxConcurrentUploadNum, + kUploadThreads, kEnd }; @@ -129,6 +133,13 @@ class S3Config { std::make_pair("use-proxy-from-env", "false")}, {Keys::kCredentialsProvider, std::make_pair("aws-credentials-provider", std::nullopt)}, + {Keys::kPartUploadAsync, + std::make_pair("part-upload-async", "false")}, + {Keys::kPartUploadSize, + std::make_pair("part-upload-size", "10485760")}, + {Keys::kMaxConcurrentUploadNum, + std::make_pair("max-concurrent-upload-num", "4")}, + {Keys::kUploadThreads, std::make_pair("upload-threads", "16")}, }; return config; } @@ -258,6 +269,33 @@ class S3Config { return config_.find(Keys::kCredentialsProvider)->second; } + /// If true, enables asynchronous upload of parts for S3 multipart uploads, + /// false otherwise. + bool partUploadAsync() const { + auto value = config_.find(Keys::kPartUploadAsync)->second.value(); + return folly::to(value); + } + + /// Return the size (in bytes) of each part for S3 multipart uploads. + int32_t partUploadSize() const { + auto value = config_.find(Keys::kPartUploadSize)->second.value(); + return folly::to(value); + } + + /// Return the maximum number of concurrent uploads for S3 multipart uploads, + /// applicable only when asynchronous uploads are enabled. + int32_t maxConcurrentUploadNum() const { + auto value = config_.find(Keys::kMaxConcurrentUploadNum)->second.value(); + return folly::to(value); + } + + /// Return the number of threads to use for S3 multipart uploads, + /// applicable only when asynchronous uploads are enabled. + int32_t uploadThreads() const { + auto value = config_.find(Keys::kUploadThreads)->second.value(); + return folly::to(value); + } + private: std::unordered_map> config_; std::string payloadSigningPolicy_; diff --git a/bolt/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp b/bolt/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp index fc15391e7..ea999ca17 100644 --- a/bolt/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp +++ b/bolt/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp @@ -316,6 +316,7 @@ class S3FileSystem::Impl { client_ = std::make_shared( credentialsProvider, nullptr /* endpointProvider */, clientConfig); + s3Config_ = std::make_shared(s3Config); ++fileSystemCount; } @@ -453,6 +454,10 @@ class S3FileSystem::Impl { return client_.get(); } + S3Config* s3Config() const { + return s3Config_.get(); + } + std::string getLogLevelName() const { return getAwsInstance()->getLogLevelName(); } @@ -463,6 +468,7 @@ class S3FileSystem::Impl { private: std::shared_ptr client_; + std::shared_ptr s3Config_; }; S3FileSystem::S3FileSystem( @@ -494,8 +500,8 @@ std::unique_ptr S3FileSystem::openFileForWrite( std::string_view s3Path, const FileOptions& options) { const auto path = getPath(s3Path); - auto s3file = - std::make_unique(path, impl_->s3Client(), options.pool); + auto s3file = std::make_unique( + path, impl_->s3Client(), options.pool, impl_->s3Config()); return s3file; } diff --git a/bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp b/bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp index 1e1cef1dc..0fc9d9c33 100644 --- a/bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp +++ b/bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp @@ -29,6 +29,9 @@ */ #include "bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.h" +#include +#include +#include #include "bolt/common/base/StatsReporter.h" #include "bolt/connectors/hive/storage_adapters/s3fs/S3Counters.h" #include "bolt/connectors/hive/storage_adapters/s3fs/S3Util.h" @@ -52,14 +55,29 @@ class S3WriteFile::Impl { explicit Impl( std::string_view path, Aws::S3::S3Client* client, - memory::MemoryPool* pool) + memory::MemoryPool* pool, + S3Config* s3Config) : client_(client), pool_(pool) { BOLT_CHECK_NOT_NULL(client); BOLT_CHECK_NOT_NULL(pool); + BOLT_CHECK_NOT_NULL(s3Config); + partUploadSize_ = s3Config->partUploadSize(); + if (s3Config->partUploadAsync()) { + maxConcurrentUploadNum_ = std::make_unique( + s3Config->maxConcurrentUploadNum()); + if (!uploadThreadPool_) { + uploadThreadPool_ = std::make_shared( + s3Config->uploadThreads(), + std::make_shared("upload-thread")); + } + } else { + uploadThreadPool_ = nullptr; + } + getBucketAndKeyFromPath(path, bucket_, key_); currentPart_ = std::make_unique>(*pool_); - currentPart_->reserve(kPartUploadSize); // Check that the object doesn't exist, if it does throw an error. + currentPart_->reserve(partUploadSize_); { Aws::S3::Model::HeadObjectRequest request; request.SetBucket(awsString(bucket_)); @@ -103,6 +121,7 @@ class S3WriteFile::Impl { /// (https://github.com/apache/arrow/issues/11934). So we instead default /// to application/octet-stream which is less misleading. request.SetContentType(kApplicationOctetStream); + request.SetChecksumAlgorithm(Aws::S3::Model::ChecksumAlgorithm::CRC32); auto outcome = client_->CreateMultipartUpload(request); BOLT_CHECK_AWS_OUTCOME( outcome, "Failed initiating multiple part upload", bucket_, key_); @@ -115,7 +134,7 @@ class S3WriteFile::Impl { // Appends data to the end of the file. void append(std::string_view data) { BOLT_CHECK(!closed(), "File is closed"); - if (data.size() + currentPart_->size() >= kPartUploadSize) { + if (data.size() + currentPart_->size() >= partUploadSize_) { upload(data); } else { // Append to current part. @@ -129,7 +148,7 @@ class S3WriteFile::Impl { BOLT_CHECK(!closed(), "File is closed"); /// currentPartSize must be less than kPartUploadSize since /// append() would have already flushed after reaching kUploadPartSize. - BOLT_CHECK_LT(currentPart_->size(), kPartUploadSize); + BOLT_CHECK_LT(currentPart_->size(), partUploadSize_); } // Complete the multipart upload and close the file. @@ -139,6 +158,20 @@ class S3WriteFile::Impl { } RECORD_METRIC_VALUE(kMetricS3StartedUploads); uploadPart({currentPart_->data(), currentPart_->size()}, true); + if (uploadThreadPool_) { + if (!futures_.empty()) { + folly::collectAll(std::move(futures_)).get(); + } + // The list of parts should be in ascending order. + std::sort( + uploadState_.completedParts.begin(), + uploadState_.completedParts.end(), + [](const Aws::S3::Model::CompletedPart& a, + const Aws::S3::Model::CompletedPart& b) { + return a.GetPartNumber() < b.GetPartNumber(); + }); + } + BOLT_CHECK_EQ(uploadState_.partNumber, uploadState_.completedParts.size()); // Complete the multipart upload. { @@ -172,7 +205,6 @@ class S3WriteFile::Impl { } private: - static constexpr int64_t kPartUploadSize = 10 * 1024 * 1024; static constexpr const char* kApplicationOctetStream = "application/octet-stream"; @@ -186,10 +218,9 @@ class S3WriteFile::Impl { int64_t partNumber = 0; Aws::String id; }; - UploadState uploadState_; - // Data can be smaller or larger than the kPartUploadSize. - // Complete the currentPart_ and upload kPartUploadSize chunks of data. + // Data can be smaller or larger than the partUploadSize_. + // Complete the currentPart_ and upload partUploadSize_ chunks of data. // Save the remaining into currentPart_. void upload(const std::string_view data) { auto dataPtr = data.data(); @@ -200,44 +231,90 @@ class S3WriteFile::Impl { uploadPart({currentPart_->data(), currentPart_->size()}); dataPtr += remainingBufferSize; dataSize -= remainingBufferSize; - while (dataSize > kPartUploadSize) { - uploadPart({dataPtr, kPartUploadSize}); - dataPtr += kPartUploadSize; - dataSize -= kPartUploadSize; + while (dataSize > partUploadSize_) { + uploadPart({dataPtr, partUploadSize_}); + dataPtr += partUploadSize_; + dataSize -= partUploadSize_; } // Stash the remaining at the beginning of currentPart. currentPart_->unsafeAppend(0, dataPtr, dataSize); } void uploadPart(const std::string_view part, bool isLast = false) { - // Only the last part can be less than kPartUploadSize. - BOLT_CHECK(isLast || (!isLast && (part.size() == kPartUploadSize))); - // Upload the part. - { - Aws::S3::Model::UploadPartRequest request; - request.SetBucket(bucket_); - request.SetKey(key_); - request.SetUploadId(uploadState_.id); - request.SetPartNumber(++uploadState_.partNumber); - request.SetContentLength(part.size()); - request.SetBody( - std::make_shared(part.data(), part.size())); - auto outcome = client_->UploadPart(request); - BOLT_CHECK_AWS_OUTCOME(outcome, "Failed to upload", bucket_, key_); - // Append ETag and part number for this uploaded part. - // This will be needed for upload completion in Close(). - auto result = outcome.GetResult(); - Aws::S3::Model::CompletedPart part; + // Only the last part can be less than partUploadSize_. + BOLT_CHECK(isLast || part.size() == partUploadSize_); + auto uploadPartSync = [&](const std::string_view partData) { + Aws::S3::Model::CompletedPart completedPart = + uploadPartSeq(uploadState_.id, ++uploadState_.partNumber, partData); + uploadState_.completedParts.push_back(std::move(completedPart)); + }; + // If this is the last part and no parts have been uploaded yet, + // use the synchronous upload method. + bool useSyncUpload = + !uploadThreadPool_ || (isLast && uploadState_.partNumber == 0); + if (useSyncUpload) { + uploadPartSync(part); + } else { + uploadPartAsync(part); + } + } - part.SetPartNumber(uploadState_.partNumber); - part.SetETag(result.GetETag()); - // Don't add the checksum to the part if the checksum is empty. - // Some filesystems such as IBM COS require this to be not set. - if (!result.GetChecksumCRC32().empty()) { - part.SetChecksumCRC32(result.GetChecksumCRC32()); - } - uploadState_.completedParts.push_back(std::move(part)); + // Common logic for uploading a part. + Aws::S3::Model::CompletedPart uploadPartSeq( + const Aws::String& uploadId, + const int64_t partNumber, + const std::string_view part) { + Aws::S3::Model::UploadPartRequest request; + request.SetBucket(bucket_); + request.SetKey(key_); + request.SetUploadId(uploadId); + request.SetPartNumber(partNumber); + request.SetContentLength(part.size()); + request.SetBody( + std::make_shared(part.data(), part.size())); + // The default algorithm used is MD5. However, MD5 is not supported with + // fips and can cause a SIGSEGV. Set CRC32 instead which is a standard for + // checksum computation and is not restricted by fips. + request.SetChecksumAlgorithm(Aws::S3::Model::ChecksumAlgorithm::CRC32); + auto outcome = client_->UploadPart(request); + BOLT_CHECK_AWS_OUTCOME(outcome, "Failed to upload", bucket_, key_); + // Append ETag and part number for this uploaded part. + // This will be needed for upload completion in Close(). + auto result = outcome.GetResult(); + Aws::S3::Model::CompletedPart completedPart; + completedPart.SetPartNumber(partNumber); + completedPart.SetETag(result.GetETag()); + // Don't add the checksum to the part if the checksum is empty. + // Some filesystems such as IBM COS require this to be not set. + if (!result.GetChecksumCRC32().empty()) { + completedPart.SetChecksumCRC32(result.GetChecksumCRC32()); } + return completedPart; + } + + // Upload the part asynchronously. + void uploadPartAsync(const std::string_view part) { + // NOLINT(readability-convert-member-functions-to-static) + maxConcurrentUploadNum_->wait(); + const int64_t partNumber = ++uploadState_.partNumber; + std::shared_ptr partStr = + std::make_shared(part.data(), part.size()); + futures_.emplace_back( + folly::via(uploadThreadPool_.get(), [this, partNumber, partStr]() { + SCOPE_EXIT { + maxConcurrentUploadNum_->post(); + }; + try { + Aws::S3::Model::CompletedPart completedPart = + uploadPartSeq(uploadState_.id, partNumber, *partStr); + std::lock_guard lock(uploadStateMutex_); + uploadState_.completedParts.push_back(std::move(completedPart)); + } catch (const std::exception& e) { + LOG(ERROR) << "Exception during async upload: " << e.what(); + } catch (...) { + LOG(ERROR) << "Unknown exception during async upload."; + } + })); } Aws::S3::S3Client* client_; @@ -246,13 +323,22 @@ class S3WriteFile::Impl { std::string bucket_; std::string key_; size_t fileSize_ = -1; + UploadState uploadState_; + std::mutex uploadStateMutex_; + std::vector> futures_; + size_t partUploadSize_; + // maxConcurrentUploadNum_ controls the concurrency of asynchronous uploads to + // S3 for each S3WriteFile, preventing excessive memory usage. + std::unique_ptr maxConcurrentUploadNum_; + inline static std::shared_ptr uploadThreadPool_; }; S3WriteFile::S3WriteFile( std::string_view path, Aws::S3::S3Client* client, - memory::MemoryPool* pool) { - impl_ = std::make_shared(path, client, pool); + memory::MemoryPool* pool, + S3Config* s3Config) { + impl_ = std::make_shared(path, client, pool, s3Config); } void S3WriteFile::append(std::string_view data) { diff --git a/bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.h b/bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.h index a98f1676e..c9fabfcaf 100644 --- a/bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.h +++ b/bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.h @@ -32,6 +32,7 @@ #include "bolt/common/file/File.h" #include "bolt/common/memory/MemoryPool.h" +#include "bolt/connectors/hive/storage_adapters/s3fs/S3Config.h" namespace Aws::S3 { class S3Client; @@ -64,7 +65,8 @@ class S3WriteFile : public WriteFile { S3WriteFile( std::string_view path, Aws::S3::S3Client* client, - memory::MemoryPool* pool); + memory::MemoryPool* pool, + S3Config* s3Config); /// Appends data to the end of the file. /// Uploads a part on reaching part size limit. diff --git a/bolt/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt b/bolt/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt new file mode 100644 index 000000000..e1f78ae4c --- /dev/null +++ b/bolt/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt @@ -0,0 +1,24 @@ +# Copyright (c) ByteDance Ltd. and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(bolt_s3asyncupload_benchmark S3AsyncUploadBenchmark.cpp) + +add_test(bolt_s3asyncupload_benchmark bolt_s3asyncupload_benchmark) +target_link_libraries( + bolt_s3asyncupload_benchmark + bolt_s3fs + bolt_testutils + GTest::gtest + ${FOLLY_BENCHMARK} +) diff --git a/bolt/connectors/hive/storage_adapters/s3fs/benchmark/S3AsyncUploadBenchmark.cpp b/bolt/connectors/hive/storage_adapters/s3fs/benchmark/S3AsyncUploadBenchmark.cpp new file mode 100644 index 000000000..388bcc449 --- /dev/null +++ b/bolt/connectors/hive/storage_adapters/s3fs/benchmark/S3AsyncUploadBenchmark.cpp @@ -0,0 +1,128 @@ +/* + * Copyright (c) ByteDance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "bolt/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h" +#include "bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.h" +#include "bolt/connectors/hive/storage_adapters/s3fs/tests/S3Test.h" +#include "bolt/functions/lib/benchmarks/FunctionBenchmarkBase.h" + +#include + +namespace { +using namespace bytedance::bolt::filesystems; + +class S3AsyncUploadBenchmark { + public: + S3AsyncUploadBenchmark() { + minioServer_ = std::make_unique(); + minioServer_->start(); + ioExecutor_ = std::make_unique(3); + filesystems::initializeS3("Info", kLogLocation_); + } + + ~S3AsyncUploadBenchmark() { + minioServer_->stop(); + filesystems::finalizeS3(); + } + + std::unique_ptr minioServer_; + std::unique_ptr ioExecutor_; + std::string_view kLogLocation_ = "/tmp/foobar/"; + + std::string localPath(const char* directory) { + return minioServer_->path() + "/" + directory; + } + + void addBucket(const char* bucket) { + minioServer_->addBucket(bucket); + } + + void + run(const std::string& name, bool enableUploadPartAsync, int32_t sizeMiB) { + folly::BenchmarkSuspender suspender; + const auto bucketName = "writedata"; + const auto file = fmt::format("test_{}_{}.txt", name, sizeMiB); + const auto filename = localPath(bucketName) + "/" + file.c_str(); + addBucket(bucketName); + const auto s3File = s3URI(bucketName, file.c_str()); + auto hiveConfig = minioServer_->hiveConfig( + {{"hive.s3.part-upload-async", + enableUploadPartAsync ? "true" : "false"}}); + filesystems::S3FileSystem s3fs(bucketName, hiveConfig); + + suspender.dismiss(); + auto pool = memory::memoryManager()->addLeafPool("S3AsyncUploadBenchmark"); + auto writeFile = + s3fs.openFileForWrite(s3File, {{}, pool.get(), std::nullopt}); + auto s3WriteFile = dynamic_cast(writeFile.get()); + std::string dataContent(1024, 'a'); + + EXPECT_EQ(writeFile->size(), 0); + std::int64_t contentSize = dataContent.length(); + // dataContent length is 1024. + EXPECT_EQ(contentSize, 1024); + + // Append and flush a small batch of data. + writeFile->append(dataContent.substr(0, 10)); + EXPECT_EQ(writeFile->size(), 10); + writeFile->append(dataContent.substr(10, contentSize - 10)); + EXPECT_EQ(writeFile->size(), contentSize); + writeFile->flush(); + // No parts must have been uploaded. + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 0); + + // Append data + for (int i = 0; i < 1024 * sizeMiB - 1; ++i) { + writeFile->append(dataContent); + } + writeFile->close(); + EXPECT_EQ( + writeFile->size(), static_cast(1024) * 1024 * sizeMiB); + } +}; + +auto benchmark = S3AsyncUploadBenchmark(); + +#define DEFINE_BENCHMARKS(size) \ + BENCHMARK(sync_upload_##size##M) { \ + benchmark.run("sync_upload", false, size); \ + } \ + BENCHMARK_RELATIVE(async_upload_##size##M) { \ + benchmark.run("async_upload", true, size); \ + } + +DEFINE_BENCHMARKS(4) +DEFINE_BENCHMARKS(8) +DEFINE_BENCHMARKS(16) +DEFINE_BENCHMARKS(32) +DEFINE_BENCHMARKS(64) +DEFINE_BENCHMARKS(128) +DEFINE_BENCHMARKS(256) +DEFINE_BENCHMARKS(512) +DEFINE_BENCHMARKS(1024) +DEFINE_BENCHMARKS(2048) +} // namespace + +int main(int argc, char** argv) { + folly::Init init{&argc, &argv}; + bytedance::bolt::memory::MemoryManager::initialize( + bytedance::bolt::memory::MemoryManager::Options{}); + folly::runBenchmarks(); + return 0; +} diff --git a/bolt/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp b/bolt/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp index 46d972880..93e903f1c 100644 --- a/bolt/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp +++ b/bolt/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp @@ -255,80 +255,86 @@ TEST_F(S3FileSystemTest, writeFileAndRead) { const auto file = "test.txt"; const auto filename = localPath(bucketName) + "/" + file; const auto s3File = s3URI(bucketName, file); - - auto hiveConfig = minioServer_->hiveConfig(); - filesystems::S3FileSystem s3fs(bucketName, hiveConfig); auto pool = memory::memoryManager()->addLeafPool("S3FileSystemTest"); - auto writeFile = s3fs.openFileForWrite( - s3File, filesystems::FileOptions{.pool = pool.get()}); - auto s3WriteFile = dynamic_cast(writeFile.get()); - std::string dataContent = - "Dance me to your beauty with a burning violin" - "Dance me through the panic till I'm gathered safely in" - "Lift me like an olive branch and be my homeward dove" - "Dance me to the end of love"; - - EXPECT_EQ(writeFile->size(), 0); - std::int64_t contentSize = dataContent.length(); - // dataContent length is 178. - EXPECT_EQ(contentSize, 178); - - // Append and flush a small batch of data. - writeFile->append(dataContent.substr(0, 10)); - EXPECT_EQ(writeFile->size(), 10); - writeFile->append(dataContent.substr(10, contentSize - 10)); - EXPECT_EQ(writeFile->size(), contentSize); - writeFile->flush(); - // No parts must have been uploaded. - EXPECT_EQ(s3WriteFile->numPartsUploaded(), 0); - - // Append data 178 * 100'000 ~ 16MiB. - // Should have 1 part in total with kUploadPartSize = 10MiB. - for (int i = 0; i < 100'000; ++i) { - writeFile->append(dataContent); - } - EXPECT_EQ(s3WriteFile->numPartsUploaded(), 1); - EXPECT_EQ(writeFile->size(), 100'001 * contentSize); - - // Append a large data buffer 178 * 150'000 ~ 25MiB (2 parts). - std::vector largeBuffer(contentSize * 150'000); - for (int i = 0; i < 150'000; ++i) { - memcpy( - largeBuffer.data() + (i * contentSize), - dataContent.data(), - contentSize); - } - - writeFile->append({largeBuffer.data(), largeBuffer.size()}); - EXPECT_EQ(writeFile->size(), 250'001 * contentSize); - // Total data = ~41 MB = 5 parts. - // But parts uploaded will be 4. - EXPECT_EQ(s3WriteFile->numPartsUploaded(), 4); - - // Upload the last part. - writeFile->close(); - EXPECT_EQ(s3WriteFile->numPartsUploaded(), 5); + auto uploadPart = [&](bool uploadPartAsync) { + auto hiveConfig = minioServer_->hiveConfig( + {{"hive.s3.part-upload-async", uploadPartAsync ? "true" : "false"}}); + filesystems::S3FileSystem s3fs(bucketName, hiveConfig); + auto writeFile = s3fs.openFileForWrite( + s3File, filesystems::FileOptions{.pool = pool.get()}); + auto s3WriteFile = dynamic_cast(writeFile.get()); + std::string dataContent = + "Dance me to your beauty with a burning violin" + "Dance me through the panic till I'm gathered safely in" + "Lift me like an olive branch and be my homeward dove" + "Dance me to the end of love"; + + EXPECT_EQ(writeFile->size(), 0); + std::int64_t contentSize = dataContent.length(); + // dataContent length is 178. + EXPECT_EQ(contentSize, 178); + + // Append and flush a small batch of data. + writeFile->append(dataContent.substr(0, 10)); + EXPECT_EQ(writeFile->size(), 10); + writeFile->append(dataContent.substr(10, contentSize - 10)); + EXPECT_EQ(writeFile->size(), contentSize); + writeFile->flush(); + // No parts must have been uploaded. + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 0); + + // Append data 178 * 100'000 ~ 16MiB. + // Should have 1 part in total with kUploadPartSize = 10MiB. + for (int i = 0; i < 100'000; ++i) { + writeFile->append(dataContent); + } + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 1); + EXPECT_EQ(writeFile->size(), 100'001 * contentSize); + + // Append a large data buffer 178 * 150'000 ~ 25MiB (2 parts). + std::vector largeBuffer(contentSize * 150'000); + for (int i = 0; i < 150'000; ++i) { + memcpy( + largeBuffer.data() + (i * contentSize), + dataContent.data(), + contentSize); + } + + writeFile->append({largeBuffer.data(), largeBuffer.size()}); + EXPECT_EQ(writeFile->size(), 250'001 * contentSize); + // Total data = ~41 MB = 5 parts. + // But parts uploaded will be 4. + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 4); + + // Upload the last part. + writeFile->close(); + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 5); - BOLT_ASSERT_THROW( - writeFile->append(dataContent.substr(0, 10)), "File is closed"); + BOLT_ASSERT_THROW( + writeFile->append(dataContent.substr(0, 10)), "File is closed"); - auto readFile = s3fs.openFileForRead(s3File); - ASSERT_EQ(readFile->size(), contentSize * 250'001); - // Sample and verify every 1'000 dataContent chunks. - for (int i = 0; i < 250; ++i) { - ASSERT_EQ( - readFile->pread(i * (1'000 * contentSize), contentSize), dataContent); - } - // Verify the last chunk. - ASSERT_EQ(readFile->pread(contentSize * 250'000, contentSize), dataContent); + auto readFile = s3fs.openFileForRead(s3File); + ASSERT_EQ(readFile->size(), contentSize * 250'001); + // Sample and verify every 1'000 dataContent chunks. + for (int i = 0; i < 250; ++i) { + ASSERT_EQ( + readFile->pread(i * (1'000 * contentSize), contentSize), dataContent); + } + // Verify the last chunk. + ASSERT_EQ(readFile->pread(contentSize * 250'000, contentSize), dataContent); - // Verify the S3 list function. - auto result = s3fs.list(s3File); + // Verify the S3 list function. + auto result = s3fs.list(s3File); - ASSERT_EQ(result.size(), 1); - ASSERT_TRUE(result[0] == file); + ASSERT_EQ(result.size(), 1); + ASSERT_TRUE(result[0] == file); - ASSERT_TRUE(s3fs.exists(s3File)); + ASSERT_TRUE(s3fs.exists(s3File)); + }; + // Upload parts synchronously. + uploadPart(false); + // Upload parts asynchronously. + uploadPart(true); } TEST_F(S3FileSystemTest, invalidConnectionSettings) {