feat:Support multi-threaded asynchronous data upload to object storage.#14472
feat:Support multi-threaded asynchronous data upload to object storage.#14472weixiuli wants to merge 9 commits intofacebookincubator:mainfrom
Conversation
✅ Deploy Preview for meta-velox canceled.
|
0120710 to
8cba9e9
Compare
|
What's the config when you got this performance?
|
We set the |
|
@weixiuli Thanks for your optimization. |
|
@JkSelf @jinchengchenghh Thanks for your review, PTAL. |
2d6a94e to
9456784
Compare
JkSelf
left a comment
There was a problem hiding this comment.
Thanks for your work. Leaving some comments.
|
@weixiuli Please help to resolve the conflict. Thanks. |
9456784 to
61dcd92
Compare
2a07c32 to
a1217ff
Compare
|
@JkSelf @jinchengchenghh @majetideepak Could you help to review this PR? Thanks. |
a1217ff to
c697ff5
Compare
|
@pedroerp could you help to review the PR? thanks. |
majetideepak
left a comment
There was a problem hiding this comment.
@weixiuli Did you evaluate the S3 Transfer Manager?
Yes, velox may bypass local disk and directly use TransferManager for parallel uploads to S3, but it requires implementing a custom sink connector that feeds in-memory serialized Velox data into TransferManager’s multipart upload API. This design is a more complex one, while this PR solves the problem of asynchronous upload through a small memory buffle, with less modification and significant performance improvement. |
5576747 to
c4daa54
Compare
|
|
||
| // Upload the part asynchronously. | ||
| void uploadPartAsync(const std::string_view part) { | ||
| maxConcurrentUploadNum_->wait(); |
There was a problem hiding this comment.
What is the purpose of this wait?
There was a problem hiding this comment.
maxConcurrentUploadNum_ is a semaphore and controls the concurrency of asynchronous uploads to
S3 for each S3WriteFile, preventing excessive memory usage.
There was a problem hiding this comment.
I don't understand the control flow here. The futures are run during close(). Don't we end up adding more tasks than maxConcurrentUploadNum_ at this point?
There was a problem hiding this comment.
I don't understand the control flow here. The futures are run during
close(). Don't we end up adding more tasks thanmaxConcurrentUploadNum_at this point?
@majetideepak The maxConcurrentUploadNum_ only limits the concurrency of asynchronous uploads within each S3WriteFile to prevent excessive memory usage. If multiple S3WriteFile instances exist, more upload tasks can still run concurrently , but it should be less than or equal to hive.s3.upload-threads.
| DEFINE_BENCHMARKS(2048) | ||
| } // namespace | ||
|
|
||
| int main(int argc, char** argv) { |
There was a problem hiding this comment.
What is the output of this benchmark?
There was a problem hiding this comment.
The benchmark output is same as the description of the PR.
JkSelf
left a comment
There was a problem hiding this comment.
Thanks for your updating. Leaving some nits.
| s3Config->uploadThreads(), | ||
| std::make_shared<folly::NamedThreadFactory>("upload-thread")); | ||
| } | ||
| } else { |
There was a problem hiding this comment.
Could we remove the else branch since we've already initialized it to null in the parameter definition?
There was a problem hiding this comment.
Since uploadThreadPool_ is declared as a static member of S3WriteFile::Impl, we need to add an else branch to handle the case where hive.s3.upload-part-async changes from true to false in benchmarks and unit tests. In this branch, we should reset uploadThreadPool_, while keeping maxConcurrentUploadNum_ unchanged (i.e., no reset needed).
There was a problem hiding this comment.
Since tests can run in parallel, this can lead to non-deterministic failures.
Since uploadThreadPool_ is static across different filesystem instances, we have a race condition here.
Can we keep it non-static?
| kRetryMode, | ||
| kUseProxyFromEnv, | ||
| kCredentialsProvider, | ||
| KUploadPartAsync, |
There was a problem hiding this comment.
KUploadPartAsync -> KPartUploadAsync, matching the kPartUploadSize
| return config_.find(Keys::kCredentialsProvider)->second; | ||
| } | ||
|
|
||
| bool uploadPartAsync() const { |
velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp
Outdated
Show resolved
Hide resolved
velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp
Show resolved
Hide resolved
velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp
Show resolved
Hide resolved
velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp
Show resolved
Hide resolved
velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp
Outdated
Show resolved
Hide resolved
velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp
Outdated
Show resolved
Hide resolved
velox/connectors/hive/storage_adapters/s3fs/benchmark/S3FileSystemBenchmark.cpp
Outdated
Show resolved
Hide resolved
velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp
Outdated
Show resolved
Hide resolved
6589956 to
2faf0f4
Compare
JkSelf
left a comment
There was a problem hiding this comment.
LGTM. Thanks for your fix.
velox/connectors/hive/storage_adapters/s3fs/benchmark/CMakeLists.txt
Outdated
Show resolved
Hide resolved
|
Thanks @JkSelf cc @jinchengchenghh @majetideepak |
|
@majetideepak @jinchengchenghh could you help to review the PR again? thanks. |
| s3Config->uploadThreads(), | ||
| std::make_shared<folly::NamedThreadFactory>("upload-thread")); | ||
| } | ||
| } else { |
There was a problem hiding this comment.
Since tests can run in parallel, this can lead to non-deterministic failures.
Since uploadThreadPool_ is static across different filesystem instances, we have a race condition here.
Can we keep it non-static?
| // Only the last part can be less than partUploadSize_. | ||
| VELOX_CHECK(isLast || (!isLast && (part.size() == partUploadSize_))); | ||
| auto uploadPartSync = [&](const std::string_view partData) { | ||
| Aws::S3::Model::CompletedPart completedPart = |
|
|
||
| // Upload the part asynchronously. | ||
| void uploadPartAsync(const std::string_view part) { | ||
| maxConcurrentUploadNum_->wait(); |
There was a problem hiding this comment.
I don't understand the control flow here. The futures are run during close(). Don't we end up adding more tasks than maxConcurrentUploadNum_ at this point?
| /// and creates the object. | ||
| /// https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html | ||
| /// https://github.com/apache/arrow/blob/main/cpp/src/arrow/filesystem/s3fs.cc | ||
| /// S3WriteFile is not thread-safe. |
There was a problem hiding this comment.
How did you resolve this issue? Thanks.
There was a problem hiding this comment.
How did you resolve this issue? Thanks.
The current modification of the pr is an optimization of the internal processing logic of S3WriteFile. We control uploadState_.completedParts through uploadStateMutex_ and ensure that The list of parts should be in ascending order when it is closed. Currently, the internal processing of S3WriteFile is thread-safe.
|
This pull request has been automatically marked as stale because it has not had recent activity. If you'd still like this PR merged, please comment on the PR, make sure you've addressed reviewer comments, and rebase on the latest main. Thank you for your contributions! |
|
@FelixYBW Some of the review comments have not been addressed. |
|
@weixiuli Would you rebase the PR? I am picking it back for internal testing. |


#14471

Support multi-threaded asynchronous data upload to object storage.
This PR also adds a benchmark named
S3AsyncUploadBenchmark, the output of benchmark show below :We used this PR on spark +gluten+ velox incubator-gluten and set the
hive.s3.uploadPartAsyncto be true in our environment, and the average write performance improved by 85%.Before this PR: Total Time Across All Tasks: 64.3 h

After this PR: Total Time Across All Tasks: 32.4 h
