From 081672c376a1a50426801bb6d0fbda8855b5d3b2 Mon Sep 17 00:00:00 2001 From: Zhang Xiaofeng Date: Fri, 27 Feb 2026 18:23:26 +0800 Subject: [PATCH 1/4] fix(shuffle): Reset outputPos after flushing output buffer in CompressInternal (#271) --- bolt/shuffle/sparksql/CompressionStream.h | 1 + .../tests/AdaptiveParallelZstdCodecTest.cpp | 48 +++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/bolt/shuffle/sparksql/CompressionStream.h b/bolt/shuffle/sparksql/CompressionStream.h index 08de4d8f..a36f6f47 100644 --- a/bolt/shuffle/sparksql/CompressionStream.h +++ b/bolt/shuffle/sparksql/CompressionStream.h @@ -317,6 +317,7 @@ class AdaptiveParallelZstdCodec { // stream bytedance::bolt::NanosecondTimer timer1(&writeTime); RETURN_NOT_OK(outputStream->Write(output, outputPos)); + outputPos = 0; } } while (inputLen > 0); } diff --git a/bolt/shuffle/sparksql/tests/AdaptiveParallelZstdCodecTest.cpp b/bolt/shuffle/sparksql/tests/AdaptiveParallelZstdCodecTest.cpp index c055ec14..db93b77d 100644 --- a/bolt/shuffle/sparksql/tests/AdaptiveParallelZstdCodecTest.cpp +++ b/bolt/shuffle/sparksql/tests/AdaptiveParallelZstdCodecTest.cpp @@ -115,6 +115,31 @@ void runRoundTrip( EXPECT_EQ(0, std::memcmp(output.data(), expected.data(), expected.size())); } +std::vector buildIncompressibleData(size_t size) { + std::vector data(size); + uint32_t state = 0x12345678u; + for (size_t i = 0; i < size; ++i) { + state = state * 1664525u + 1013904223u; + data[i] = static_cast(state >> 24); + } + return data; +} + +std::vector buildIncompressibleRow(size_t payloadSize, uint32_t seed) { + std::vector row(sizeof(int32_t) + payloadSize); + auto payloadSize32 = static_cast(payloadSize); + std::memcpy(row.data(), &payloadSize32, sizeof(int32_t)); + + auto payload = buildIncompressibleData(payloadSize); + uint32_t state = seed; + for (size_t i = 0; i < payloadSize; ++i) { + state = state * 1103515245u + 12345u; + payload[i] ^= static_cast(state >> 24); + } + std::memcpy(row.data() + sizeof(int32_t), payload.data(), payload.size()); + return row; +} + } // namespace TEST(AdaptiveParallelZstdCodecTest, RoundTripSmallPayloads) { @@ -144,4 +169,27 @@ TEST(AdaptiveParallelZstdCodecTest, RoundTripLargePayload) { runRoundTrip(rows, rawSize, RowVectorLayout::kComposite); } +TEST( + AdaptiveParallelZstdCodecTest, + CompressAndFlushStressRoundTripWithoutCorruption) { + constexpr int32_t kRounds = 6; + constexpr int32_t kRowsPerRound = 256; + const auto payloadSize = + static_cast(ZSTD_CStreamInSize() - sizeof(int32_t) - 1); + + for (int32_t round = 0; round < kRounds; ++round) { + std::vector> rows; + rows.reserve(kRowsPerRound); + + int64_t rawSize = 0; + for (int32_t i = 0; i < kRowsPerRound; ++i) { + rows.emplace_back(buildIncompressibleRow( + payloadSize, static_cast(round * kRowsPerRound + i + 1))); + rawSize += static_cast(rows.back().size()); + } + + runRoundTrip(rows, rawSize, RowVectorLayout::kComposite); + } +} + } // namespace bytedance::bolt::shuffle::sparksql::test From 9a80185c9cf28bbe9d1f641e637a2a4e917c64a4 Mon Sep 17 00:00:00 2001 From: Zhang Xiaofeng Date: Mon, 2 Mar 2026 16:14:17 +0800 Subject: [PATCH 2/4] add devcontainer postCreateCommand to chown mount volumn (#277) --- .devcontainer/devcontainer.json | 3 ++- .devcontainer/post_create_command.sh | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) create mode 100755 .devcontainer/post_create_command.sh diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 415b4224..aadb12a2 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -32,5 +32,6 @@ "label=disable" ], // Configure tool-specific properties. - "features": {} + "features": {}, + "postCreateCommand": "./.devcontainer/post_create_command.sh" } diff --git a/.devcontainer/post_create_command.sh b/.devcontainer/post_create_command.sh new file mode 100755 index 00000000..e130a9ab --- /dev/null +++ b/.devcontainer/post_create_command.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +# change owner for mounted volumes +sudo chown code:code /home/code/.conan2/p +sudo chown code:code /home/code/.ccache From 906a3a9394482470d7c30f0fef2641adf3bd0912 Mon Sep 17 00:00:00 2001 From: Xiuli Wei <542472913@qq.com> Date: Tue, 3 Mar 2026 02:53:26 +0800 Subject: [PATCH 3/4] feat: Support multi-threaded asynchronous data upload to object storage (#264) --- .../hive/storage_adapters/s3fs/CMakeLists.txt | 3 + .../hive/storage_adapters/s3fs/S3Config.h | 38 ++++ .../storage_adapters/s3fs/S3FileSystem.cpp | 10 +- .../storage_adapters/s3fs/S3WriteFile.cpp | 166 +++++++++++++----- .../hive/storage_adapters/s3fs/S3WriteFile.h | 4 +- .../s3fs/benchmark/CMakeLists.txt | 24 +++ .../s3fs/benchmark/S3AsyncUploadBenchmark.cpp | 128 ++++++++++++++ .../s3fs/tests/S3FileSystemTest.cpp | 142 ++++++++------- 8 files changed, 404 insertions(+), 111 deletions(-) create mode 100644 bolt/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt create mode 100644 bolt/connectors/hive/storage_adapters/s3fs/benchmark/S3AsyncUploadBenchmark.cpp diff --git a/bolt/connectors/hive/storage_adapters/s3fs/CMakeLists.txt b/bolt/connectors/hive/storage_adapters/s3fs/CMakeLists.txt index 9ca8673a..1aa24388 100644 --- a/bolt/connectors/hive/storage_adapters/s3fs/CMakeLists.txt +++ b/bolt/connectors/hive/storage_adapters/s3fs/CMakeLists.txt @@ -39,4 +39,7 @@ if(BOLT_ENABLE_S3) if(${BOLT_BUILD_TESTING}) add_subdirectory(tests) endif() + if(${BOLT_BUILD_BENCHMARKS}) + add_subdirectory(benchmark) + endif() endif() diff --git a/bolt/connectors/hive/storage_adapters/s3fs/S3Config.h b/bolt/connectors/hive/storage_adapters/s3fs/S3Config.h index 88889762..8aff97ae 100644 --- a/bolt/connectors/hive/storage_adapters/s3fs/S3Config.h +++ b/bolt/connectors/hive/storage_adapters/s3fs/S3Config.h @@ -91,6 +91,10 @@ class S3Config { kRetryMode, kUseProxyFromEnv, kCredentialsProvider, + kPartUploadAsync, + kPartUploadSize, + kMaxConcurrentUploadNum, + kUploadThreads, kEnd }; @@ -129,6 +133,13 @@ class S3Config { std::make_pair("use-proxy-from-env", "false")}, {Keys::kCredentialsProvider, std::make_pair("aws-credentials-provider", std::nullopt)}, + {Keys::kPartUploadAsync, + std::make_pair("part-upload-async", "false")}, + {Keys::kPartUploadSize, + std::make_pair("part-upload-size", "10485760")}, + {Keys::kMaxConcurrentUploadNum, + std::make_pair("max-concurrent-upload-num", "4")}, + {Keys::kUploadThreads, std::make_pair("upload-threads", "16")}, }; return config; } @@ -258,6 +269,33 @@ class S3Config { return config_.find(Keys::kCredentialsProvider)->second; } + /// If true, enables asynchronous upload of parts for S3 multipart uploads, + /// false otherwise. + bool partUploadAsync() const { + auto value = config_.find(Keys::kPartUploadAsync)->second.value(); + return folly::to(value); + } + + /// Return the size (in bytes) of each part for S3 multipart uploads. + int32_t partUploadSize() const { + auto value = config_.find(Keys::kPartUploadSize)->second.value(); + return folly::to(value); + } + + /// Return the maximum number of concurrent uploads for S3 multipart uploads, + /// applicable only when asynchronous uploads are enabled. + int32_t maxConcurrentUploadNum() const { + auto value = config_.find(Keys::kMaxConcurrentUploadNum)->second.value(); + return folly::to(value); + } + + /// Return the number of threads to use for S3 multipart uploads, + /// applicable only when asynchronous uploads are enabled. + int32_t uploadThreads() const { + auto value = config_.find(Keys::kUploadThreads)->second.value(); + return folly::to(value); + } + private: std::unordered_map> config_; std::string payloadSigningPolicy_; diff --git a/bolt/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp b/bolt/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp index fc15391e..ea999ca1 100644 --- a/bolt/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp +++ b/bolt/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp @@ -316,6 +316,7 @@ class S3FileSystem::Impl { client_ = std::make_shared( credentialsProvider, nullptr /* endpointProvider */, clientConfig); + s3Config_ = std::make_shared(s3Config); ++fileSystemCount; } @@ -453,6 +454,10 @@ class S3FileSystem::Impl { return client_.get(); } + S3Config* s3Config() const { + return s3Config_.get(); + } + std::string getLogLevelName() const { return getAwsInstance()->getLogLevelName(); } @@ -463,6 +468,7 @@ class S3FileSystem::Impl { private: std::shared_ptr client_; + std::shared_ptr s3Config_; }; S3FileSystem::S3FileSystem( @@ -494,8 +500,8 @@ std::unique_ptr S3FileSystem::openFileForWrite( std::string_view s3Path, const FileOptions& options) { const auto path = getPath(s3Path); - auto s3file = - std::make_unique(path, impl_->s3Client(), options.pool); + auto s3file = std::make_unique( + path, impl_->s3Client(), options.pool, impl_->s3Config()); return s3file; } diff --git a/bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp b/bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp index 1e1cef1d..0fc9d9c3 100644 --- a/bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp +++ b/bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp @@ -29,6 +29,9 @@ */ #include "bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.h" +#include +#include +#include #include "bolt/common/base/StatsReporter.h" #include "bolt/connectors/hive/storage_adapters/s3fs/S3Counters.h" #include "bolt/connectors/hive/storage_adapters/s3fs/S3Util.h" @@ -52,14 +55,29 @@ class S3WriteFile::Impl { explicit Impl( std::string_view path, Aws::S3::S3Client* client, - memory::MemoryPool* pool) + memory::MemoryPool* pool, + S3Config* s3Config) : client_(client), pool_(pool) { BOLT_CHECK_NOT_NULL(client); BOLT_CHECK_NOT_NULL(pool); + BOLT_CHECK_NOT_NULL(s3Config); + partUploadSize_ = s3Config->partUploadSize(); + if (s3Config->partUploadAsync()) { + maxConcurrentUploadNum_ = std::make_unique( + s3Config->maxConcurrentUploadNum()); + if (!uploadThreadPool_) { + uploadThreadPool_ = std::make_shared( + s3Config->uploadThreads(), + std::make_shared("upload-thread")); + } + } else { + uploadThreadPool_ = nullptr; + } + getBucketAndKeyFromPath(path, bucket_, key_); currentPart_ = std::make_unique>(*pool_); - currentPart_->reserve(kPartUploadSize); // Check that the object doesn't exist, if it does throw an error. + currentPart_->reserve(partUploadSize_); { Aws::S3::Model::HeadObjectRequest request; request.SetBucket(awsString(bucket_)); @@ -103,6 +121,7 @@ class S3WriteFile::Impl { /// (https://github.com/apache/arrow/issues/11934). So we instead default /// to application/octet-stream which is less misleading. request.SetContentType(kApplicationOctetStream); + request.SetChecksumAlgorithm(Aws::S3::Model::ChecksumAlgorithm::CRC32); auto outcome = client_->CreateMultipartUpload(request); BOLT_CHECK_AWS_OUTCOME( outcome, "Failed initiating multiple part upload", bucket_, key_); @@ -115,7 +134,7 @@ class S3WriteFile::Impl { // Appends data to the end of the file. void append(std::string_view data) { BOLT_CHECK(!closed(), "File is closed"); - if (data.size() + currentPart_->size() >= kPartUploadSize) { + if (data.size() + currentPart_->size() >= partUploadSize_) { upload(data); } else { // Append to current part. @@ -129,7 +148,7 @@ class S3WriteFile::Impl { BOLT_CHECK(!closed(), "File is closed"); /// currentPartSize must be less than kPartUploadSize since /// append() would have already flushed after reaching kUploadPartSize. - BOLT_CHECK_LT(currentPart_->size(), kPartUploadSize); + BOLT_CHECK_LT(currentPart_->size(), partUploadSize_); } // Complete the multipart upload and close the file. @@ -139,6 +158,20 @@ class S3WriteFile::Impl { } RECORD_METRIC_VALUE(kMetricS3StartedUploads); uploadPart({currentPart_->data(), currentPart_->size()}, true); + if (uploadThreadPool_) { + if (!futures_.empty()) { + folly::collectAll(std::move(futures_)).get(); + } + // The list of parts should be in ascending order. + std::sort( + uploadState_.completedParts.begin(), + uploadState_.completedParts.end(), + [](const Aws::S3::Model::CompletedPart& a, + const Aws::S3::Model::CompletedPart& b) { + return a.GetPartNumber() < b.GetPartNumber(); + }); + } + BOLT_CHECK_EQ(uploadState_.partNumber, uploadState_.completedParts.size()); // Complete the multipart upload. { @@ -172,7 +205,6 @@ class S3WriteFile::Impl { } private: - static constexpr int64_t kPartUploadSize = 10 * 1024 * 1024; static constexpr const char* kApplicationOctetStream = "application/octet-stream"; @@ -186,10 +218,9 @@ class S3WriteFile::Impl { int64_t partNumber = 0; Aws::String id; }; - UploadState uploadState_; - // Data can be smaller or larger than the kPartUploadSize. - // Complete the currentPart_ and upload kPartUploadSize chunks of data. + // Data can be smaller or larger than the partUploadSize_. + // Complete the currentPart_ and upload partUploadSize_ chunks of data. // Save the remaining into currentPart_. void upload(const std::string_view data) { auto dataPtr = data.data(); @@ -200,44 +231,90 @@ class S3WriteFile::Impl { uploadPart({currentPart_->data(), currentPart_->size()}); dataPtr += remainingBufferSize; dataSize -= remainingBufferSize; - while (dataSize > kPartUploadSize) { - uploadPart({dataPtr, kPartUploadSize}); - dataPtr += kPartUploadSize; - dataSize -= kPartUploadSize; + while (dataSize > partUploadSize_) { + uploadPart({dataPtr, partUploadSize_}); + dataPtr += partUploadSize_; + dataSize -= partUploadSize_; } // Stash the remaining at the beginning of currentPart. currentPart_->unsafeAppend(0, dataPtr, dataSize); } void uploadPart(const std::string_view part, bool isLast = false) { - // Only the last part can be less than kPartUploadSize. - BOLT_CHECK(isLast || (!isLast && (part.size() == kPartUploadSize))); - // Upload the part. - { - Aws::S3::Model::UploadPartRequest request; - request.SetBucket(bucket_); - request.SetKey(key_); - request.SetUploadId(uploadState_.id); - request.SetPartNumber(++uploadState_.partNumber); - request.SetContentLength(part.size()); - request.SetBody( - std::make_shared(part.data(), part.size())); - auto outcome = client_->UploadPart(request); - BOLT_CHECK_AWS_OUTCOME(outcome, "Failed to upload", bucket_, key_); - // Append ETag and part number for this uploaded part. - // This will be needed for upload completion in Close(). - auto result = outcome.GetResult(); - Aws::S3::Model::CompletedPart part; + // Only the last part can be less than partUploadSize_. + BOLT_CHECK(isLast || part.size() == partUploadSize_); + auto uploadPartSync = [&](const std::string_view partData) { + Aws::S3::Model::CompletedPart completedPart = + uploadPartSeq(uploadState_.id, ++uploadState_.partNumber, partData); + uploadState_.completedParts.push_back(std::move(completedPart)); + }; + // If this is the last part and no parts have been uploaded yet, + // use the synchronous upload method. + bool useSyncUpload = + !uploadThreadPool_ || (isLast && uploadState_.partNumber == 0); + if (useSyncUpload) { + uploadPartSync(part); + } else { + uploadPartAsync(part); + } + } - part.SetPartNumber(uploadState_.partNumber); - part.SetETag(result.GetETag()); - // Don't add the checksum to the part if the checksum is empty. - // Some filesystems such as IBM COS require this to be not set. - if (!result.GetChecksumCRC32().empty()) { - part.SetChecksumCRC32(result.GetChecksumCRC32()); - } - uploadState_.completedParts.push_back(std::move(part)); + // Common logic for uploading a part. + Aws::S3::Model::CompletedPart uploadPartSeq( + const Aws::String& uploadId, + const int64_t partNumber, + const std::string_view part) { + Aws::S3::Model::UploadPartRequest request; + request.SetBucket(bucket_); + request.SetKey(key_); + request.SetUploadId(uploadId); + request.SetPartNumber(partNumber); + request.SetContentLength(part.size()); + request.SetBody( + std::make_shared(part.data(), part.size())); + // The default algorithm used is MD5. However, MD5 is not supported with + // fips and can cause a SIGSEGV. Set CRC32 instead which is a standard for + // checksum computation and is not restricted by fips. + request.SetChecksumAlgorithm(Aws::S3::Model::ChecksumAlgorithm::CRC32); + auto outcome = client_->UploadPart(request); + BOLT_CHECK_AWS_OUTCOME(outcome, "Failed to upload", bucket_, key_); + // Append ETag and part number for this uploaded part. + // This will be needed for upload completion in Close(). + auto result = outcome.GetResult(); + Aws::S3::Model::CompletedPart completedPart; + completedPart.SetPartNumber(partNumber); + completedPart.SetETag(result.GetETag()); + // Don't add the checksum to the part if the checksum is empty. + // Some filesystems such as IBM COS require this to be not set. + if (!result.GetChecksumCRC32().empty()) { + completedPart.SetChecksumCRC32(result.GetChecksumCRC32()); } + return completedPart; + } + + // Upload the part asynchronously. + void uploadPartAsync(const std::string_view part) { + // NOLINT(readability-convert-member-functions-to-static) + maxConcurrentUploadNum_->wait(); + const int64_t partNumber = ++uploadState_.partNumber; + std::shared_ptr partStr = + std::make_shared(part.data(), part.size()); + futures_.emplace_back( + folly::via(uploadThreadPool_.get(), [this, partNumber, partStr]() { + SCOPE_EXIT { + maxConcurrentUploadNum_->post(); + }; + try { + Aws::S3::Model::CompletedPart completedPart = + uploadPartSeq(uploadState_.id, partNumber, *partStr); + std::lock_guard lock(uploadStateMutex_); + uploadState_.completedParts.push_back(std::move(completedPart)); + } catch (const std::exception& e) { + LOG(ERROR) << "Exception during async upload: " << e.what(); + } catch (...) { + LOG(ERROR) << "Unknown exception during async upload."; + } + })); } Aws::S3::S3Client* client_; @@ -246,13 +323,22 @@ class S3WriteFile::Impl { std::string bucket_; std::string key_; size_t fileSize_ = -1; + UploadState uploadState_; + std::mutex uploadStateMutex_; + std::vector> futures_; + size_t partUploadSize_; + // maxConcurrentUploadNum_ controls the concurrency of asynchronous uploads to + // S3 for each S3WriteFile, preventing excessive memory usage. + std::unique_ptr maxConcurrentUploadNum_; + inline static std::shared_ptr uploadThreadPool_; }; S3WriteFile::S3WriteFile( std::string_view path, Aws::S3::S3Client* client, - memory::MemoryPool* pool) { - impl_ = std::make_shared(path, client, pool); + memory::MemoryPool* pool, + S3Config* s3Config) { + impl_ = std::make_shared(path, client, pool, s3Config); } void S3WriteFile::append(std::string_view data) { diff --git a/bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.h b/bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.h index a98f1676..c9fabfca 100644 --- a/bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.h +++ b/bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.h @@ -32,6 +32,7 @@ #include "bolt/common/file/File.h" #include "bolt/common/memory/MemoryPool.h" +#include "bolt/connectors/hive/storage_adapters/s3fs/S3Config.h" namespace Aws::S3 { class S3Client; @@ -64,7 +65,8 @@ class S3WriteFile : public WriteFile { S3WriteFile( std::string_view path, Aws::S3::S3Client* client, - memory::MemoryPool* pool); + memory::MemoryPool* pool, + S3Config* s3Config); /// Appends data to the end of the file. /// Uploads a part on reaching part size limit. diff --git a/bolt/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt b/bolt/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt new file mode 100644 index 00000000..e1f78ae4 --- /dev/null +++ b/bolt/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt @@ -0,0 +1,24 @@ +# Copyright (c) ByteDance Ltd. and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(bolt_s3asyncupload_benchmark S3AsyncUploadBenchmark.cpp) + +add_test(bolt_s3asyncupload_benchmark bolt_s3asyncupload_benchmark) +target_link_libraries( + bolt_s3asyncupload_benchmark + bolt_s3fs + bolt_testutils + GTest::gtest + ${FOLLY_BENCHMARK} +) diff --git a/bolt/connectors/hive/storage_adapters/s3fs/benchmark/S3AsyncUploadBenchmark.cpp b/bolt/connectors/hive/storage_adapters/s3fs/benchmark/S3AsyncUploadBenchmark.cpp new file mode 100644 index 00000000..388bcc44 --- /dev/null +++ b/bolt/connectors/hive/storage_adapters/s3fs/benchmark/S3AsyncUploadBenchmark.cpp @@ -0,0 +1,128 @@ +/* + * Copyright (c) ByteDance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "bolt/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h" +#include "bolt/connectors/hive/storage_adapters/s3fs/S3WriteFile.h" +#include "bolt/connectors/hive/storage_adapters/s3fs/tests/S3Test.h" +#include "bolt/functions/lib/benchmarks/FunctionBenchmarkBase.h" + +#include + +namespace { +using namespace bytedance::bolt::filesystems; + +class S3AsyncUploadBenchmark { + public: + S3AsyncUploadBenchmark() { + minioServer_ = std::make_unique(); + minioServer_->start(); + ioExecutor_ = std::make_unique(3); + filesystems::initializeS3("Info", kLogLocation_); + } + + ~S3AsyncUploadBenchmark() { + minioServer_->stop(); + filesystems::finalizeS3(); + } + + std::unique_ptr minioServer_; + std::unique_ptr ioExecutor_; + std::string_view kLogLocation_ = "/tmp/foobar/"; + + std::string localPath(const char* directory) { + return minioServer_->path() + "/" + directory; + } + + void addBucket(const char* bucket) { + minioServer_->addBucket(bucket); + } + + void + run(const std::string& name, bool enableUploadPartAsync, int32_t sizeMiB) { + folly::BenchmarkSuspender suspender; + const auto bucketName = "writedata"; + const auto file = fmt::format("test_{}_{}.txt", name, sizeMiB); + const auto filename = localPath(bucketName) + "/" + file.c_str(); + addBucket(bucketName); + const auto s3File = s3URI(bucketName, file.c_str()); + auto hiveConfig = minioServer_->hiveConfig( + {{"hive.s3.part-upload-async", + enableUploadPartAsync ? "true" : "false"}}); + filesystems::S3FileSystem s3fs(bucketName, hiveConfig); + + suspender.dismiss(); + auto pool = memory::memoryManager()->addLeafPool("S3AsyncUploadBenchmark"); + auto writeFile = + s3fs.openFileForWrite(s3File, {{}, pool.get(), std::nullopt}); + auto s3WriteFile = dynamic_cast(writeFile.get()); + std::string dataContent(1024, 'a'); + + EXPECT_EQ(writeFile->size(), 0); + std::int64_t contentSize = dataContent.length(); + // dataContent length is 1024. + EXPECT_EQ(contentSize, 1024); + + // Append and flush a small batch of data. + writeFile->append(dataContent.substr(0, 10)); + EXPECT_EQ(writeFile->size(), 10); + writeFile->append(dataContent.substr(10, contentSize - 10)); + EXPECT_EQ(writeFile->size(), contentSize); + writeFile->flush(); + // No parts must have been uploaded. + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 0); + + // Append data + for (int i = 0; i < 1024 * sizeMiB - 1; ++i) { + writeFile->append(dataContent); + } + writeFile->close(); + EXPECT_EQ( + writeFile->size(), static_cast(1024) * 1024 * sizeMiB); + } +}; + +auto benchmark = S3AsyncUploadBenchmark(); + +#define DEFINE_BENCHMARKS(size) \ + BENCHMARK(sync_upload_##size##M) { \ + benchmark.run("sync_upload", false, size); \ + } \ + BENCHMARK_RELATIVE(async_upload_##size##M) { \ + benchmark.run("async_upload", true, size); \ + } + +DEFINE_BENCHMARKS(4) +DEFINE_BENCHMARKS(8) +DEFINE_BENCHMARKS(16) +DEFINE_BENCHMARKS(32) +DEFINE_BENCHMARKS(64) +DEFINE_BENCHMARKS(128) +DEFINE_BENCHMARKS(256) +DEFINE_BENCHMARKS(512) +DEFINE_BENCHMARKS(1024) +DEFINE_BENCHMARKS(2048) +} // namespace + +int main(int argc, char** argv) { + folly::Init init{&argc, &argv}; + bytedance::bolt::memory::MemoryManager::initialize( + bytedance::bolt::memory::MemoryManager::Options{}); + folly::runBenchmarks(); + return 0; +} diff --git a/bolt/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp b/bolt/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp index 46d97288..93e903f1 100644 --- a/bolt/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp +++ b/bolt/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp @@ -255,80 +255,86 @@ TEST_F(S3FileSystemTest, writeFileAndRead) { const auto file = "test.txt"; const auto filename = localPath(bucketName) + "/" + file; const auto s3File = s3URI(bucketName, file); - - auto hiveConfig = minioServer_->hiveConfig(); - filesystems::S3FileSystem s3fs(bucketName, hiveConfig); auto pool = memory::memoryManager()->addLeafPool("S3FileSystemTest"); - auto writeFile = s3fs.openFileForWrite( - s3File, filesystems::FileOptions{.pool = pool.get()}); - auto s3WriteFile = dynamic_cast(writeFile.get()); - std::string dataContent = - "Dance me to your beauty with a burning violin" - "Dance me through the panic till I'm gathered safely in" - "Lift me like an olive branch and be my homeward dove" - "Dance me to the end of love"; - - EXPECT_EQ(writeFile->size(), 0); - std::int64_t contentSize = dataContent.length(); - // dataContent length is 178. - EXPECT_EQ(contentSize, 178); - - // Append and flush a small batch of data. - writeFile->append(dataContent.substr(0, 10)); - EXPECT_EQ(writeFile->size(), 10); - writeFile->append(dataContent.substr(10, contentSize - 10)); - EXPECT_EQ(writeFile->size(), contentSize); - writeFile->flush(); - // No parts must have been uploaded. - EXPECT_EQ(s3WriteFile->numPartsUploaded(), 0); - - // Append data 178 * 100'000 ~ 16MiB. - // Should have 1 part in total with kUploadPartSize = 10MiB. - for (int i = 0; i < 100'000; ++i) { - writeFile->append(dataContent); - } - EXPECT_EQ(s3WriteFile->numPartsUploaded(), 1); - EXPECT_EQ(writeFile->size(), 100'001 * contentSize); - - // Append a large data buffer 178 * 150'000 ~ 25MiB (2 parts). - std::vector largeBuffer(contentSize * 150'000); - for (int i = 0; i < 150'000; ++i) { - memcpy( - largeBuffer.data() + (i * contentSize), - dataContent.data(), - contentSize); - } - - writeFile->append({largeBuffer.data(), largeBuffer.size()}); - EXPECT_EQ(writeFile->size(), 250'001 * contentSize); - // Total data = ~41 MB = 5 parts. - // But parts uploaded will be 4. - EXPECT_EQ(s3WriteFile->numPartsUploaded(), 4); - - // Upload the last part. - writeFile->close(); - EXPECT_EQ(s3WriteFile->numPartsUploaded(), 5); + auto uploadPart = [&](bool uploadPartAsync) { + auto hiveConfig = minioServer_->hiveConfig( + {{"hive.s3.part-upload-async", uploadPartAsync ? "true" : "false"}}); + filesystems::S3FileSystem s3fs(bucketName, hiveConfig); + auto writeFile = s3fs.openFileForWrite( + s3File, filesystems::FileOptions{.pool = pool.get()}); + auto s3WriteFile = dynamic_cast(writeFile.get()); + std::string dataContent = + "Dance me to your beauty with a burning violin" + "Dance me through the panic till I'm gathered safely in" + "Lift me like an olive branch and be my homeward dove" + "Dance me to the end of love"; + + EXPECT_EQ(writeFile->size(), 0); + std::int64_t contentSize = dataContent.length(); + // dataContent length is 178. + EXPECT_EQ(contentSize, 178); + + // Append and flush a small batch of data. + writeFile->append(dataContent.substr(0, 10)); + EXPECT_EQ(writeFile->size(), 10); + writeFile->append(dataContent.substr(10, contentSize - 10)); + EXPECT_EQ(writeFile->size(), contentSize); + writeFile->flush(); + // No parts must have been uploaded. + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 0); + + // Append data 178 * 100'000 ~ 16MiB. + // Should have 1 part in total with kUploadPartSize = 10MiB. + for (int i = 0; i < 100'000; ++i) { + writeFile->append(dataContent); + } + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 1); + EXPECT_EQ(writeFile->size(), 100'001 * contentSize); + + // Append a large data buffer 178 * 150'000 ~ 25MiB (2 parts). + std::vector largeBuffer(contentSize * 150'000); + for (int i = 0; i < 150'000; ++i) { + memcpy( + largeBuffer.data() + (i * contentSize), + dataContent.data(), + contentSize); + } + + writeFile->append({largeBuffer.data(), largeBuffer.size()}); + EXPECT_EQ(writeFile->size(), 250'001 * contentSize); + // Total data = ~41 MB = 5 parts. + // But parts uploaded will be 4. + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 4); + + // Upload the last part. + writeFile->close(); + EXPECT_EQ(s3WriteFile->numPartsUploaded(), 5); - BOLT_ASSERT_THROW( - writeFile->append(dataContent.substr(0, 10)), "File is closed"); + BOLT_ASSERT_THROW( + writeFile->append(dataContent.substr(0, 10)), "File is closed"); - auto readFile = s3fs.openFileForRead(s3File); - ASSERT_EQ(readFile->size(), contentSize * 250'001); - // Sample and verify every 1'000 dataContent chunks. - for (int i = 0; i < 250; ++i) { - ASSERT_EQ( - readFile->pread(i * (1'000 * contentSize), contentSize), dataContent); - } - // Verify the last chunk. - ASSERT_EQ(readFile->pread(contentSize * 250'000, contentSize), dataContent); + auto readFile = s3fs.openFileForRead(s3File); + ASSERT_EQ(readFile->size(), contentSize * 250'001); + // Sample and verify every 1'000 dataContent chunks. + for (int i = 0; i < 250; ++i) { + ASSERT_EQ( + readFile->pread(i * (1'000 * contentSize), contentSize), dataContent); + } + // Verify the last chunk. + ASSERT_EQ(readFile->pread(contentSize * 250'000, contentSize), dataContent); - // Verify the S3 list function. - auto result = s3fs.list(s3File); + // Verify the S3 list function. + auto result = s3fs.list(s3File); - ASSERT_EQ(result.size(), 1); - ASSERT_TRUE(result[0] == file); + ASSERT_EQ(result.size(), 1); + ASSERT_TRUE(result[0] == file); - ASSERT_TRUE(s3fs.exists(s3File)); + ASSERT_TRUE(s3fs.exists(s3File)); + }; + // Upload parts synchronously. + uploadPart(false); + // Upload parts asynchronously. + uploadPart(true); } TEST_F(S3FileSystemTest, invalidConnectionSettings) { From 5da4a88b8fd8ea5c34a67832310cdea032dd25de Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Tue, 3 Mar 2026 07:04:52 -0800 Subject: [PATCH 4/4] Make WriterOptions and SpillConfig serializable --- bolt/common/base/SpillConfig.cpp | 88 +++++++++++++ bolt/common/base/SpillConfig.h | 14 +- bolt/common/base/tests/SpillConfigTest.cpp | 144 +++++++++++++++++++++ bolt/dwio/common/Options.cpp | 100 ++++++++++++++ bolt/dwio/common/Options.h | 10 +- bolt/dwio/common/tests/CMakeLists.txt | 1 + bolt/dwio/common/tests/OptionsTests.cpp | 141 ++++++++++++++++++++ 7 files changed, 495 insertions(+), 3 deletions(-) diff --git a/bolt/common/base/SpillConfig.cpp b/bolt/common/base/SpillConfig.cpp index ffa493bf..ae8f90cb 100644 --- a/bolt/common/base/SpillConfig.cpp +++ b/bolt/common/base/SpillConfig.cpp @@ -31,9 +31,11 @@ #include "bolt/common/base/SpillConfig.h" #include +#include #include "bolt/common/base/Exceptions.h" #include "bolt/common/base/SuccinctPrinter.h" +#include "bolt/common/compression/Compression.h" namespace bytedance::bolt::common { @@ -225,6 +227,92 @@ std::string SpillConfig::toString() const { jitEnabled); } +folly::dynamic SpillConfig::serialize() const { + folly::dynamic obj = folly::dynamic::object; + obj["name"] = "SpillConfig"; + obj["fileNamePrefix"] = fileNamePrefix; + obj["maxFileSize"] = static_cast(maxFileSize); + obj["spillUringEnabled"] = spillUringEnabled; + obj["writeBufferSize"] = static_cast(writeBufferSize); + obj["minSpillableReservationPct"] = minSpillableReservationPct; + obj["spillableReservationGrowthPct"] = spillableReservationGrowthPct; + obj["startPartitionBit"] = static_cast(startPartitionBit); + obj["joinPartitionBits"] = static_cast(joinPartitionBits); + obj["joinRepartitionBits"] = static_cast(joinRepartitionBits); + obj["maxSpillLevel"] = maxSpillLevel; + obj["maxSpillRunRows"] = static_cast(maxSpillRunRows); + obj["writerFlushThresholdSize"] = + static_cast(writerFlushThresholdSize); + obj["testSpillPct"] = testSpillPct; + obj["compressionKind"] = static_cast(compressionKind); + obj["fileCreateConfig"] = fileCreateConfig; + obj["rowBasedSpillMode"] = static_cast(rowBasedSpillMode); + obj["singlePartitionSerdeKind"] = singlePartitionSerdeKind; + obj["spillPartitionsAdaptiveThreshold"] = + static_cast(spillPartitionsAdaptiveThreshold); + obj["jitEnabled"] = jitEnabled; + obj["needSetNextEqual"] = needSetNextEqual; + obj["aggBypassHTEqualNum"] = static_cast(aggBypassHTEqualNum); + // getSpillDirPathCb, updateAndCheckSpillLimitCb, and executor are omitted; + // they must be re-injected by the host after deserialization. + return obj; +} + +std::shared_ptr SpillConfig::deserialize( + const folly::dynamic& obj) { + auto spillConfig = std::make_shared(); + spillConfig->fileNamePrefix = obj["fileNamePrefix"].asString(); + spillConfig->maxFileSize = static_cast(obj["maxFileSize"].asInt()); + spillConfig->spillUringEnabled = obj["spillUringEnabled"].asBool(); + spillConfig->writeBufferSize = + static_cast(obj["writeBufferSize"].asInt()); + spillConfig->minSpillableReservationPct = + static_cast(obj["minSpillableReservationPct"].asInt()); + spillConfig->spillableReservationGrowthPct = + static_cast(obj["spillableReservationGrowthPct"].asInt()); + spillConfig->startPartitionBit = + static_cast(obj["startPartitionBit"].asInt()); + spillConfig->joinPartitionBits = + static_cast(obj["joinPartitionBits"].asInt()); + spillConfig->joinRepartitionBits = + static_cast(obj["joinRepartitionBits"].asInt()); + spillConfig->maxSpillLevel = + static_cast(obj["maxSpillLevel"].asInt()); + spillConfig->maxSpillRunRows = + static_cast(obj["maxSpillRunRows"].asInt()); + spillConfig->writerFlushThresholdSize = + static_cast(obj["writerFlushThresholdSize"].asInt()); + spillConfig->testSpillPct = static_cast(obj["testSpillPct"].asInt()); + spillConfig->compressionKind = + static_cast(obj["compressionKind"].asInt()); + spillConfig->fileCreateConfig = obj["fileCreateConfig"].asString(); + spillConfig->rowBasedSpillMode = + static_cast(obj["rowBasedSpillMode"].asInt()); + spillConfig->singlePartitionSerdeKind = + obj["singlePartitionSerdeKind"].asString(); + spillConfig->spillPartitionsAdaptiveThreshold = + static_cast(obj["spillPartitionsAdaptiveThreshold"].asInt()); + spillConfig->jitEnabled = obj["jitEnabled"].asBool(); + spillConfig->needSetNextEqual = obj["needSetNextEqual"].asBool(); + spillConfig->aggBypassHTEqualNum = + static_cast(obj["aggBypassHTEqualNum"].asInt()); + // getSpillDirPathCb, updateAndCheckSpillLimitCb, and executor remain + // default-initialized (null); callers must re-inject them as needed. + return spillConfig; +} + +void SpillConfig::registerSerDe() { + auto& registry = bolt::DeserializationRegistryForSharedPtr(); + registry.Register("SpillConfig", SpillConfig::deserialize); +} + +namespace { +const bool kSpillConfigSerdeRegistered = []() { + SpillConfig::registerSerDe(); + return true; +}(); +} // namespace + SpillConfig& SpillConfig::setJITenableForSpill(bool enabled) noexcept { jitEnabled = enabled; return *this; diff --git a/bolt/common/base/SpillConfig.h b/bolt/common/base/SpillConfig.h index ad59c1e8..44e0f58b 100644 --- a/bolt/common/base/SpillConfig.h +++ b/bolt/common/base/SpillConfig.h @@ -33,9 +33,11 @@ #include #include +#include #include #include #include "bolt/common/compression/Compression.h" +#include "bolt/common/serialization/Serializable.h" #include "bolt/vector/VectorStream.h" namespace bytedance::bolt::common { @@ -73,7 +75,7 @@ enum class RowBasedSpillMode { RowBasedSpillMode strToRowBasedSpillMode(const std::string& str); /// Specifies the config for spilling. -struct SpillConfig { +struct SpillConfig : public bytedance::bolt::ISerializable { SpillConfig() = default; SpillConfig( GetSpillDirectoryPathCB _getSpillDirPathCb, @@ -112,6 +114,16 @@ struct SpillConfig { std::string toString() const; + /// Serializes all value fields. Callbacks and executor are omitted and must + /// be re-injected by the host after deserialization. + folly::dynamic serialize() const override; + + /// Reconstructs a SpillConfig from a serialized object. Callbacks and + /// executor are left at their default (null/empty) values. + static std::shared_ptr deserialize(const folly::dynamic& obj); + + static void registerSerDe(); + /// A callback function that returns the spill directory path. Implementations /// can use it to ensure the path exists before returning. GetSpillDirectoryPathCB getSpillDirPathCb; diff --git a/bolt/common/base/tests/SpillConfigTest.cpp b/bolt/common/base/tests/SpillConfigTest.cpp index 4e82b835..d4da2097 100644 --- a/bolt/common/base/tests/SpillConfigTest.cpp +++ b/bolt/common/base/tests/SpillConfigTest.cpp @@ -31,7 +31,9 @@ #include "bolt/common/base/SpillConfig.h" #include #include "bolt/common/base/tests/GTestUtils.h" +#include "bolt/common/serialization/Serializable.h" #include "bolt/exec/HashBitRange.h" +using namespace bytedance::bolt; using namespace bytedance::bolt::common; using namespace bytedance::bolt::exec; @@ -159,6 +161,148 @@ TEST(SpillConfig, spillLevelLimit) { } } +// ---- Serialization helpers -------------------------------------------------- + +namespace { + +// Returns a SpillConfig with every serializable field set to a non-default +// value, making it easy to detect fields that were silently dropped. +SpillConfig makeFullConfig() { + SpillConfig cfg; + cfg.fileNamePrefix = "test_spill"; + cfg.maxFileSize = 512UL * 1024 * 1024; + cfg.spillUringEnabled = true; + cfg.writeBufferSize = 8UL * 1024 * 1024; + cfg.minSpillableReservationPct = 10; + cfg.spillableReservationGrowthPct = 25; + cfg.startPartitionBit = 29; + cfg.joinPartitionBits = 4; + cfg.joinRepartitionBits = 2; + cfg.maxSpillLevel = 3; + cfg.maxSpillRunRows = 100'000; + cfg.writerFlushThresholdSize = 64UL * 1024 * 1024; + cfg.testSpillPct = 5; + cfg.compressionKind = CompressionKind_ZSTD; + cfg.fileCreateConfig = R"({"option":"value"})"; + cfg.rowBasedSpillMode = RowBasedSpillMode::RAW; + cfg.singlePartitionSerdeKind = "Arrow"; + cfg.spillPartitionsAdaptiveThreshold = 64; + cfg.jitEnabled = false; + cfg.needSetNextEqual = true; + cfg.aggBypassHTEqualNum = 42; + return cfg; +} + +void assertFieldsEqual(const SpillConfig& e, const SpillConfig& a) { + EXPECT_EQ(e.fileNamePrefix, a.fileNamePrefix); + EXPECT_EQ(e.maxFileSize, a.maxFileSize); + EXPECT_EQ(e.spillUringEnabled, a.spillUringEnabled); + EXPECT_EQ(e.writeBufferSize, a.writeBufferSize); + EXPECT_EQ(e.minSpillableReservationPct, a.minSpillableReservationPct); + EXPECT_EQ(e.spillableReservationGrowthPct, a.spillableReservationGrowthPct); + EXPECT_EQ(e.startPartitionBit, a.startPartitionBit); + EXPECT_EQ(e.joinPartitionBits, a.joinPartitionBits); + EXPECT_EQ(e.joinRepartitionBits, a.joinRepartitionBits); + EXPECT_EQ(e.maxSpillLevel, a.maxSpillLevel); + EXPECT_EQ(e.maxSpillRunRows, a.maxSpillRunRows); + EXPECT_EQ(e.writerFlushThresholdSize, a.writerFlushThresholdSize); + EXPECT_EQ(e.testSpillPct, a.testSpillPct); + EXPECT_EQ(e.compressionKind, a.compressionKind); + EXPECT_EQ(e.fileCreateConfig, a.fileCreateConfig); + EXPECT_EQ(e.rowBasedSpillMode, a.rowBasedSpillMode); + EXPECT_EQ(e.singlePartitionSerdeKind, a.singlePartitionSerdeKind); + EXPECT_EQ( + e.spillPartitionsAdaptiveThreshold, a.spillPartitionsAdaptiveThreshold); + EXPECT_EQ(e.jitEnabled, a.jitEnabled); + EXPECT_EQ(e.needSetNextEqual, a.needSetNextEqual); + EXPECT_EQ(e.aggBypassHTEqualNum, a.aggBypassHTEqualNum); +} + +} // namespace + +// ---- Serialization tests ---------------------------------------------------- + +TEST(SpillConfig, serializeContainsNameField) { + folly::dynamic dyn = makeFullConfig().serialize(); + ASSERT_TRUE(dyn.isObject()); + ASSERT_EQ("SpillConfig", dyn["name"].asString()); +} + +TEST(SpillConfig, roundTripAllFields) { + SpillConfig cfg = makeFullConfig(); + auto rt = SpillConfig::deserialize(cfg.serialize()); + ASSERT_NE(nullptr, rt); + assertFieldsEqual(cfg, *rt); +} + +TEST(SpillConfig, callbacksAndExecutorNullAfterRoundTrip) { + SpillConfig cfg = makeFullConfig(); + cfg.getSpillDirPathCb = []() -> const std::string& { + static const std::string path = "/tmp/spill"; + return path; + }; + cfg.updateAndCheckSpillLimitCb = [](uint64_t) {}; + + auto rt = SpillConfig::deserialize(cfg.serialize()); + ASSERT_NE(nullptr, rt); + EXPECT_FALSE(static_cast(rt->getSpillDirPathCb)); + EXPECT_FALSE(static_cast(rt->updateAndCheckSpillLimitCb)); + EXPECT_EQ(nullptr, rt->executor); +} + +TEST(SpillConfig, roundTripViaISerializableRegistry) { + SpillConfig cfg = makeFullConfig(); + auto rt = ISerializable::deserialize(cfg.serialize()); + ASSERT_NE(nullptr, rt); + assertFieldsEqual(cfg, *rt); +} + +TEST(SpillConfig, roundTripDefaultConstructed) { + SpillConfig cfg; + auto rt = SpillConfig::deserialize(cfg.serialize()); + ASSERT_NE(nullptr, rt); + assertFieldsEqual(cfg, *rt); +} + +TEST(SpillConfig, roundTripRowBasedSpillModes) { + for (auto mode : + {RowBasedSpillMode::DISABLE, + RowBasedSpillMode::RAW, + RowBasedSpillMode::COMPRESSION}) { + SpillConfig cfg = makeFullConfig(); + cfg.rowBasedSpillMode = mode; + auto rt = SpillConfig::deserialize(cfg.serialize()); + ASSERT_NE(nullptr, rt); + EXPECT_EQ(mode, rt->rowBasedSpillMode); + } +} + +TEST(SpillConfig, roundTripCompressionKinds) { + for (auto kind : + {CompressionKind_NONE, + CompressionKind_ZLIB, + CompressionKind_SNAPPY, + CompressionKind_ZSTD, + CompressionKind_LZ4, + CompressionKind_GZIP}) { + SpillConfig cfg = makeFullConfig(); + cfg.compressionKind = kind; + auto rt = SpillConfig::deserialize(cfg.serialize()); + ASSERT_NE(nullptr, rt); + EXPECT_EQ(kind, rt->compressionKind); + } +} + +TEST(SpillConfig, maxSpillLevelUnlimited) { + SpillConfig cfg = makeFullConfig(); + cfg.maxSpillLevel = -1; // -1 means no limit + auto rt = SpillConfig::deserialize(cfg.serialize()); + ASSERT_NE(nullptr, rt); + EXPECT_EQ(-1, rt->maxSpillLevel); +} + +// ---- spillableReservationPercentages ---------------------------------------- + TEST(SpillConfig, spillableReservationPercentages) { struct { uint32_t growthPct; diff --git a/bolt/dwio/common/Options.cpp b/bolt/dwio/common/Options.cpp index 779ddc93..95526886 100644 --- a/bolt/dwio/common/Options.cpp +++ b/bolt/dwio/common/Options.cpp @@ -30,6 +30,7 @@ #include "bolt/dwio/common/Options.h" #include +#include "bolt/type/Type.h" namespace bytedance::bolt::dwio::common { FileFormat toFileFormat(std::string s) { @@ -98,4 +99,103 @@ ColumnReaderOptions makeColumnReaderOptions(const ReaderOptions& options) { return columnReaderOptions; } +// We do *not* serialize pool, nonReclaimableSection, or the callbacks / +// executor inside spillConfig—they must be re‐injected by the host. +folly::dynamic WriterOptions::serialize() const { + folly::dynamic obj = folly::dynamic::object; + + if (schema) { + obj["schema"] = schema->serialize(); + } + + if (compressionKind) { + obj["compressionKind"] = static_cast(*compressionKind); + } + + if (!serdeParameters.empty()) { + folly::dynamic mapObj = folly::dynamic::object; + for (const auto& [k, v] : serdeParameters) { + mapObj[k] = v; + } + obj["serdeParameters"] = std::move(mapObj); + } + + if (maxStripeSize) { + obj["maxStripeSize"] = static_cast(*maxStripeSize); + } + + if (arrowBridgeTimestampUnit) { + obj["arrowBridgeTimestampUnit"] = + static_cast(*arrowBridgeTimestampUnit); + } + + if (zlibCompressionLevel) { + obj["zlibCompressionLevel"] = static_cast(*zlibCompressionLevel); + } + + // spillConfig (value fields only; callbacks/executor re-injected by host) + if (spillConfig) { + obj["spillConfig"] = spillConfig->serialize(); + } + + return obj; +} + +// pool, nonReclaimableSection, and spillConfig callbacks/executor remain at +// default and must be re-injected by the host. +std::shared_ptr WriterOptions::deserialize( + const folly::dynamic& obj) { + auto opts = std::make_shared(); + + // 1) schema + if (auto p = obj.get_ptr("schema")) { + opts->schema = ISerializable::deserialize(*p); + } + + // 2) compressionKind + if (auto p = obj.get_ptr("compressionKind")) { + opts->compressionKind = + static_cast(p->asInt()); + } + + // 3) serdeParameters + if (auto p = obj.get_ptr("serdeParameters")) { + opts->serdeParameters.clear(); + for (auto& kv : p->items()) { + opts->serdeParameters.emplace(kv.first.asString(), kv.second.asString()); + } + } + + // 4) maxStripeSize + if (auto p = obj.get_ptr("maxStripeSize")) { + opts->maxStripeSize = static_cast(p->asInt()); + } + + // 5) arrowBridgeTimestampUnit + if (auto p = obj.get_ptr("arrowBridgeTimestampUnit")) { + opts->arrowBridgeTimestampUnit = static_cast(p->asInt()); + } + + // 6) zlibCompressionLevel + if (auto p = obj.get_ptr("zlibCompressionLevel")) { + opts->zlibCompressionLevel = static_cast(p->asInt()); + } + + // 7) spillConfig + if (auto p = obj.get_ptr("spillConfig")) { + opts->ownedSpillConfig = + ISerializable::deserialize(*p); + opts->spillConfig = opts->ownedSpillConfig.get(); + } + + return opts; +} + +void WriterOptions::registerSerDe() { + bolt::Type::registerSerDe(); + bolt::common::SpillConfig::registerSerDe(); + auto& registry = DeserializationRegistryForSharedPtr(); + registry.Register("WriterOptions", WriterOptions::deserialize); +} + } // namespace bytedance::bolt::dwio::common diff --git a/bolt/dwio/common/Options.h b/bolt/dwio/common/Options.h index a11217f1..13af360c 100644 --- a/bolt/dwio/common/Options.h +++ b/bolt/dwio/common/Options.h @@ -725,10 +725,12 @@ class ReaderOptions : public io::ReaderOptions { } }; -struct WriterOptions { +struct WriterOptions : public ISerializable { TypePtr schema; - bolt::memory::MemoryPool* memoryPool; + bolt::memory::MemoryPool* memoryPool{nullptr}; const bolt::common::SpillConfig* spillConfig{nullptr}; + // Owns the SpillConfig when it was produced by deserialize(). + std::shared_ptr ownedSpillConfig; tsan_atomic* nonReclaimableSection{nullptr}; std::optional compressionKind; std::optional maxStripeSize{std::nullopt}; @@ -743,6 +745,10 @@ struct WriterOptions { virtual void processHiveConnectorConfigs(const config::ConfigBase&) {} virtual ~WriterOptions() = default; + + folly::dynamic serialize() const override; + static std::shared_ptr deserialize(const folly::dynamic& obj); + static void registerSerDe(); }; struct ColumnReaderOptions { diff --git a/bolt/dwio/common/tests/CMakeLists.txt b/bolt/dwio/common/tests/CMakeLists.txt index 69da6f8d..e09917f0 100644 --- a/bolt/dwio/common/tests/CMakeLists.txt +++ b/bolt/dwio/common/tests/CMakeLists.txt @@ -38,6 +38,7 @@ add_executable( ExecutorBarrierTest.cpp LocalFileSinkTest.cpp LoggedExceptionTest.cpp + OptionsTests.cpp ParallelForTest.cpp RangeTests.cpp ReaderTest.cpp diff --git a/bolt/dwio/common/tests/OptionsTests.cpp b/bolt/dwio/common/tests/OptionsTests.cpp index 7a672805..9d2c5403 100644 --- a/bolt/dwio/common/tests/OptionsTests.cpp +++ b/bolt/dwio/common/tests/OptionsTests.cpp @@ -29,11 +29,23 @@ */ #include + +#include "bolt/common/base/SpillConfig.h" #include "bolt/dwio/common/Options.h" +#include "bolt/type/Type.h" using namespace ::testing; +using namespace bytedance::bolt; +using namespace bytedance::bolt::common; using namespace bytedance::bolt::dwio::common; +class WriterOptionsSerDeTest : public ::testing::Test { + protected: + static void SetUpTestSuite() { + WriterOptions::registerSerDe(); + } +}; + TEST(OptionsTests, defaultAppendRowNumberColumnTest) { // appendRowNumberColumn flag should be false by default RowReaderOptions rowReaderOptions; @@ -55,3 +67,132 @@ TEST(OptionsTests, testAppendRowNumberColumnInCopy) { RowReaderOptions rowReaderOptionsSecondCopy{rowReaderOptions}; ASSERT_EQ(true, rowReaderOptionsSecondCopy.getAppendRowNumberColumn()); } + +TEST_F(WriterOptionsSerDeTest, writerOptionsSerializeDeserializeRoundTrip) { + WriterOptions opts; + + // 1) schema (use a simple type so serialization is stable) + opts.schema = ROW({"c1", "c2"}, {INTEGER(), VARCHAR()}); + + // 2) compression kind + opts.compressionKind = common::CompressionKind_ZSTD; + + // 3) serde parameters + opts.serdeParameters["k1"] = "v1"; + opts.serdeParameters["k2"] = "v2"; + + // 4) maxStripeSize + opts.maxStripeSize = 128UL * 1024 * 1024; + + // 5) arrowBridgeTimestampUnit + opts.arrowBridgeTimestampUnit = 3; + + // 6) zlibCompressionLevel + opts.zlibCompressionLevel = 6; + + // ---- serialize ---- + folly::dynamic dyn = opts.serialize(); + + // ---- deserialize ---- + auto roundTrip = WriterOptions::deserialize(dyn); + + // ---- verify ---- + + ASSERT_TRUE(roundTrip->schema != nullptr); + ASSERT_EQ(opts.schema->toString(), roundTrip->schema->toString()); + + ASSERT_TRUE(roundTrip->compressionKind.has_value()); + ASSERT_EQ(opts.compressionKind.value(), roundTrip->compressionKind.value()); + + ASSERT_EQ(opts.serdeParameters.size(), roundTrip->serdeParameters.size()); + ASSERT_EQ(opts.serdeParameters.at("k1"), roundTrip->serdeParameters.at("k1")); + ASSERT_EQ(opts.serdeParameters.at("k2"), roundTrip->serdeParameters.at("k2")); + + ASSERT_TRUE(roundTrip->maxStripeSize.has_value()); + ASSERT_EQ(opts.maxStripeSize.value(), roundTrip->maxStripeSize.value()); + + ASSERT_TRUE(roundTrip->arrowBridgeTimestampUnit.has_value()); + ASSERT_EQ( + opts.arrowBridgeTimestampUnit.value(), + roundTrip->arrowBridgeTimestampUnit.value()); + + ASSERT_TRUE(roundTrip->zlibCompressionLevel.has_value()); + ASSERT_EQ( + opts.zlibCompressionLevel.value(), + roundTrip->zlibCompressionLevel.value()); +} + +TEST_F(WriterOptionsSerDeTest, writerOptionsDefaultsRoundTrip) { + WriterOptions opts; + + folly::dynamic dyn = opts.serialize(); + auto roundTrip = WriterOptions::deserialize(dyn); + + ASSERT_EQ(nullptr, roundTrip->schema); + ASSERT_FALSE(roundTrip->compressionKind.has_value()); + ASSERT_TRUE(roundTrip->serdeParameters.empty()); + ASSERT_FALSE(roundTrip->maxStripeSize.has_value()); + ASSERT_FALSE(roundTrip->arrowBridgeTimestampUnit.has_value()); + ASSERT_FALSE(roundTrip->zlibCompressionLevel.has_value()); + ASSERT_EQ(nullptr, roundTrip->spillConfig); + ASSERT_EQ(nullptr, roundTrip->ownedSpillConfig.get()); +} + +TEST_F(WriterOptionsSerDeTest, writerOptionsWithSpillConfigRoundTrip) { + SpillConfig cfg; + cfg.fileNamePrefix = "writer_spill"; + cfg.maxFileSize = 256UL * 1024 * 1024; + cfg.spillUringEnabled = true; + cfg.writeBufferSize = 8UL * 1024 * 1024; + cfg.minSpillableReservationPct = 5; + cfg.spillableReservationGrowthPct = 15; + cfg.startPartitionBit = 29; + cfg.joinPartitionBits = 4; + cfg.joinRepartitionBits = 4; + cfg.maxSpillLevel = -1; + cfg.maxSpillRunRows = 0; + cfg.writerFlushThresholdSize = 32UL * 1024 * 1024; + cfg.testSpillPct = 0; + cfg.compressionKind = CompressionKind_NONE; + cfg.rowBasedSpillMode = RowBasedSpillMode::RAW; + cfg.jitEnabled = false; + + WriterOptions opts; + opts.spillConfig = &cfg; + + auto rt = WriterOptions::deserialize(opts.serialize()); + + ASSERT_NE(nullptr, rt->spillConfig); + ASSERT_NE(nullptr, rt->ownedSpillConfig.get()); + ASSERT_EQ(rt->spillConfig, rt->ownedSpillConfig.get()); + ASSERT_EQ(cfg.fileNamePrefix, rt->spillConfig->fileNamePrefix); + ASSERT_EQ(cfg.maxFileSize, rt->spillConfig->maxFileSize); + ASSERT_EQ(cfg.spillUringEnabled, rt->spillConfig->spillUringEnabled); + ASSERT_EQ(cfg.rowBasedSpillMode, rt->spillConfig->rowBasedSpillMode); + ASSERT_EQ(cfg.jitEnabled, rt->spillConfig->jitEnabled); +} + +TEST_F(WriterOptionsSerDeTest, writerOptionsNoSpillConfigRoundTrip) { + WriterOptions opts; + ASSERT_EQ(nullptr, opts.spillConfig); + + auto rt = WriterOptions::deserialize(opts.serialize()); + + ASSERT_EQ(nullptr, rt->spillConfig); + ASSERT_EQ(nullptr, rt->ownedSpillConfig.get()); +} + +TEST_F(WriterOptionsSerDeTest, writerOptionsNoSchemaRoundTrip) { + WriterOptions opts; + opts.compressionKind = common::CompressionKind_ZLIB; + opts.serdeParameters["key"] = "value"; + + folly::dynamic dyn = opts.serialize(); + auto roundTrip = WriterOptions::deserialize(dyn); + + ASSERT_EQ(nullptr, roundTrip->schema); + ASSERT_TRUE(roundTrip->compressionKind.has_value()); + ASSERT_EQ(common::CompressionKind_ZLIB, roundTrip->compressionKind.value()); + ASSERT_EQ(1u, roundTrip->serdeParameters.size()); + ASSERT_EQ("value", roundTrip->serdeParameters.at("key")); +}