Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@ if(VELOX_ENABLE_S3)
if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif()
if(${VELOX_ENABLE_BENCHMARKS})
add_subdirectory(benchmark)
endif()
endif()
38 changes: 38 additions & 0 deletions velox/connectors/hive/storage_adapters/s3fs/S3Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ class S3Config {
kRetryMode,
kUseProxyFromEnv,
kCredentialsProvider,
KPartUploadAsync,
kPartUploadSize,
KMaxConcurrentUploadNum,
KUploadThreads,
kEnd
};

Expand Down Expand Up @@ -114,6 +118,13 @@ class S3Config {
std::make_pair("use-proxy-from-env", "false")},
{Keys::kCredentialsProvider,
std::make_pair("aws-credentials-provider", std::nullopt)},
{Keys::KPartUploadAsync,
std::make_pair("part-upload-async", "false")},
{Keys::kPartUploadSize,
Comment thread
weixiuli marked this conversation as resolved.
std::make_pair("part-upload-size", "10485760")},
{Keys::KMaxConcurrentUploadNum,
std::make_pair("max-concurrent-upload-num", "4")},
{Keys::KUploadThreads, std::make_pair("upload-threads", "16")},
};
return config;
}
Expand Down Expand Up @@ -243,6 +254,33 @@ class S3Config {
return config_.find(Keys::kCredentialsProvider)->second;
}

/// If true, enables asynchronous upload of parts for S3 multipart uploads,
/// false otherwise.
bool partUploadAsync() const {
Comment thread
weixiuli marked this conversation as resolved.
auto value = config_.find(Keys::KPartUploadAsync)->second.value();
return folly::to<bool>(value);
}

/// Return the size (in bytes) of each part for S3 multipart uploads.
int32_t partUploadSize() const {
auto value = config_.find(Keys::kPartUploadSize)->second.value();
return folly::to<uint32_t>(value);
}

/// Return the maximum number of concurrent uploads for S3 multipart uploads,
/// applicable only when asynchronous uploads are enabled.
int32_t maxConcurrentUploadNum() const {
auto value = config_.find(Keys::KMaxConcurrentUploadNum)->second.value();
return folly::to<uint32_t>(value);
}

/// Return the number of threads to use for S3 multipart uploads,
/// applicable only when asynchronous uploads are enabled.
int32_t uploadThreads() const {
auto value = config_.find(Keys::KUploadThreads)->second.value();
return folly::to<uint32_t>(value);
}

private:
std::unordered_map<Keys, std::optional<std::string>> config_;
std::string payloadSigningPolicy_;
Expand Down
10 changes: 8 additions & 2 deletions velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ class S3FileSystem::Impl {

auto credentialsProvider = getCredentialsProvider(s3Config);

s3Config_ = std::make_shared<S3Config>(s3Config);
client_ = std::make_shared<Aws::S3::S3Client>(
credentialsProvider, nullptr /* endpointProvider */, clientConfig);
++fileSystemCount;
Expand Down Expand Up @@ -433,6 +434,10 @@ class S3FileSystem::Impl {
return client_.get();
}

S3Config* s3Config() const {
return s3Config_.get();
}

std::string getLogLevelName() const {
return getAwsInstance()->getLogLevelName();
}
Expand All @@ -443,6 +448,7 @@ class S3FileSystem::Impl {

private:
std::shared_ptr<Aws::S3::S3Client> client_;
std::shared_ptr<S3Config> s3Config_;
};

S3FileSystem::S3FileSystem(
Expand Down Expand Up @@ -474,8 +480,8 @@ std::unique_ptr<WriteFile> S3FileSystem::openFileForWrite(
std::string_view s3Path,
const FileOptions& options) {
const auto path = getPath(s3Path);
auto s3file =
std::make_unique<S3WriteFile>(path, impl_->s3Client(), options.pool);
auto s3file = std::make_unique<S3WriteFile>(
path, impl_->s3Client(), options.pool, impl_->s3Config());
return s3file;
}

Expand Down
170 changes: 124 additions & 46 deletions velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/

#include "velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h"
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/synchronization/ThrottledLifoSem.h>
#include "velox/common/base/StatsReporter.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Counters.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h"
Expand All @@ -38,13 +40,27 @@ class S3WriteFile::Impl {
explicit Impl(
std::string_view path,
Aws::S3::S3Client* client,
memory::MemoryPool* pool)
memory::MemoryPool* pool,
S3Config* s3Config)
: client_(client), pool_(pool) {
VELOX_CHECK_NOT_NULL(client);
VELOX_CHECK_NOT_NULL(pool);
partUploadSize_ = s3Config->partUploadSize();
if (s3Config->partUploadAsync()) {
maxConcurrentUploadNum_ = std::make_unique<folly::ThrottledLifoSem>(
s3Config->maxConcurrentUploadNum());
if (!uploadThreadPool_) {
uploadThreadPool_ = std::make_shared<folly::CPUThreadPoolExecutor>(
s3Config->uploadThreads(),
std::make_shared<folly::NamedThreadFactory>("upload-thread"));
}
} else {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove the else branch since we've already initialized it to null in the parameter definition?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since uploadThreadPool_ is declared as a static member of S3WriteFile::Impl, we need to add an else branch to handle the case where hive.s3.upload-part-async changes from true to false in benchmarks and unit tests. In this branch, we should reset uploadThreadPool_, while keeping maxConcurrentUploadNum_ unchanged (i.e., no reset needed).

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since tests can run in parallel, this can lead to non-deterministic failures.
Since uploadThreadPool_ is static across different filesystem instances, we have a race condition here.
Can we keep it non-static?

uploadThreadPool_ = nullptr;
}

getBucketAndKeyFromPath(path, bucket_, key_);
currentPart_ = std::make_unique<dwio::common::DataBuffer<char>>(*pool_);
currentPart_->reserve(kPartUploadSize);
currentPart_->reserve(partUploadSize_);
// Check that the object doesn't exist, if it does throw an error.
{
Aws::S3::Model::HeadObjectRequest request;
Expand Down Expand Up @@ -106,7 +122,7 @@ class S3WriteFile::Impl {
// Appends data to the end of the file.
void append(std::string_view data) {
VELOX_CHECK(!closed(), "File is closed");
if (data.size() + currentPart_->size() >= kPartUploadSize) {
if (data.size() + currentPart_->size() >= partUploadSize_) {
upload(data);
} else {
// Append to current part.
Expand All @@ -118,9 +134,9 @@ class S3WriteFile::Impl {
// No-op.
void flush() {
VELOX_CHECK(!closed(), "File is closed");
/// currentPartSize must be less than kPartUploadSize since
/// currentPartSize must be less than partUploadSize_ since
/// append() would have already flushed after reaching kUploadPartSize.
VELOX_CHECK_LT(currentPart_->size(), kPartUploadSize);
VELOX_CHECK_LT(currentPart_->size(), partUploadSize_);
}

// Complete the multipart upload and close the file.
Expand All @@ -130,6 +146,20 @@ class S3WriteFile::Impl {
}
RECORD_METRIC_VALUE(kMetricS3StartedUploads);
uploadPart({currentPart_->data(), currentPart_->size()}, true);
if (uploadThreadPool_) {
if (futures_.size() > 0) {
folly::collectAll(std::move(futures_)).get();
}
// The list of parts should be in ascending order.
std::sort(
uploadState_.completedParts.begin(),
uploadState_.completedParts.end(),
[](const Aws::S3::Model::CompletedPart& a,
const Aws::S3::Model::CompletedPart& b) {
return a.GetPartNumber() < b.GetPartNumber();
});
}

VELOX_CHECK_EQ(uploadState_.partNumber, uploadState_.completedParts.size());
// Complete the multipart upload.
{
Expand Down Expand Up @@ -163,7 +193,6 @@ class S3WriteFile::Impl {
}

private:
static constexpr int64_t kPartUploadSize = 10 * 1024 * 1024;
static constexpr const char* kApplicationOctetStream =
"application/octet-stream";

Expand All @@ -177,10 +206,9 @@ class S3WriteFile::Impl {
int64_t partNumber = 0;
Aws::String id;
};
UploadState uploadState_;

// Data can be smaller or larger than the kPartUploadSize.
// Complete the currentPart_ and upload kPartUploadSize chunks of data.
// Data can be smaller or larger than the partUploadSize_.
// Complete the currentPart_ and upload partUploadSize_ chunks of data.
// Save the remaining into currentPart_.
void upload(const std::string_view data) {
auto dataPtr = data.data();
Expand All @@ -191,63 +219,113 @@ class S3WriteFile::Impl {
uploadPart({currentPart_->data(), currentPart_->size()});
dataPtr += remainingBufferSize;
dataSize -= remainingBufferSize;
while (dataSize > kPartUploadSize) {
uploadPart({dataPtr, kPartUploadSize});
dataPtr += kPartUploadSize;
dataSize -= kPartUploadSize;
while (dataSize > partUploadSize_) {
uploadPart({dataPtr, partUploadSize_});
dataPtr += partUploadSize_;
dataSize -= partUploadSize_;
}
// Stash the remaining at the beginning of currentPart.
currentPart_->unsafeAppend(0, dataPtr, dataSize);
}

void uploadPart(const std::string_view part, bool isLast = false) {
// Only the last part can be less than kPartUploadSize.
VELOX_CHECK(isLast || (!isLast && (part.size() == kPartUploadSize)));
// Upload the part.
{
Aws::S3::Model::UploadPartRequest request;
request.SetBucket(bucket_);
request.SetKey(key_);
request.SetUploadId(uploadState_.id);
request.SetPartNumber(++uploadState_.partNumber);
request.SetContentLength(part.size());
request.SetBody(
std::make_shared<StringViewStream>(part.data(), part.size()));
// The default algorithm used is MD5. However, MD5 is not supported with
// fips and can cause a SIGSEGV. Set CRC32 instead which is a standard for
// checksum computation and is not restricted by fips.
request.SetChecksumAlgorithm(Aws::S3::Model::ChecksumAlgorithm::CRC32);
auto outcome = client_->UploadPart(request);
VELOX_CHECK_AWS_OUTCOME(outcome, "Failed to upload", bucket_, key_);
// Append ETag and part number for this uploaded part.
// This will be needed for upload completion in Close().
auto result = outcome.GetResult();
Aws::S3::Model::CompletedPart part;

part.SetPartNumber(uploadState_.partNumber);
part.SetETag(result.GetETag());
// Don't add the checksum to the part if the checksum is empty.
// Some filesystems such as IBM COS require this to be not set.
if (!result.GetChecksumCRC32().empty()) {
part.SetChecksumCRC32(result.GetChecksumCRC32());
}
uploadState_.completedParts.push_back(std::move(part));
// Only the last part can be less than partUploadSize_.
VELOX_CHECK(isLast || (!isLast && (part.size() == partUploadSize_)));
auto uploadPartSync = [&](const std::string_view partData) {
Aws::S3::Model::CompletedPart completedPart =
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use auto?

uploadPartSeq(uploadState_.id, ++uploadState_.partNumber, partData);
uploadState_.completedParts.push_back(std::move(completedPart));
};
// If this is the last part and no parts have been uploaded yet,
// use the synchronous upload method.
bool useSyncUpload =
!uploadThreadPool_ || (isLast && uploadState_.partNumber == 0);
if (useSyncUpload) {
uploadPartSync(part);
} else {
uploadPartAsync(part);
}
}

// Common logic for uploading a part.
Aws::S3::Model::CompletedPart uploadPartSeq(
const Aws::String& uploadId,
const int64_t partNumber,
const std::string_view part) {
Aws::S3::Model::UploadPartRequest request;
request.SetBucket(bucket_);
request.SetKey(key_);
request.SetUploadId(uploadId);
request.SetPartNumber(partNumber);
request.SetContentLength(part.size());
request.SetBody(
std::make_shared<StringViewStream>(part.data(), part.size()));
// The default algorithm used is MD5. However, MD5 is not supported with
// fips and can cause a SIGSEGV. Set CRC32 instead which is a standard for
// checksum computation and is not restricted by fips.
request.SetChecksumAlgorithm(Aws::S3::Model::ChecksumAlgorithm::CRC32);
auto outcome = client_->UploadPart(request);
VELOX_CHECK_AWS_OUTCOME(outcome, "Failed to upload", bucket_, key_);
// Append ETag and part number for this uploaded part.
// This will be needed for upload completion in Close().
auto result = outcome.GetResult();
Aws::S3::Model::CompletedPart completedPart;
completedPart.SetPartNumber(partNumber);
completedPart.SetETag(result.GetETag());
// Don't add the checksum to the part if the checksum is empty.
// Some filesystems such as IBM COS require this to be not set.
if (!result.GetChecksumCRC32().empty()) {
completedPart.SetChecksumCRC32(result.GetChecksumCRC32());
}
return completedPart;
}

// Upload the part asynchronously.
void uploadPartAsync(const std::string_view part) {
maxConcurrentUploadNum_->wait();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this wait?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxConcurrentUploadNum_ is a semaphore and controls the concurrency of asynchronous uploads to
S3 for each S3WriteFile, preventing excessive memory usage.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the control flow here. The futures are run during close(). Don't we end up adding more tasks than maxConcurrentUploadNum_ at this point?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the control flow here. The futures are run during close(). Don't we end up adding more tasks than maxConcurrentUploadNum_ at this point?

@majetideepak The maxConcurrentUploadNum_ only limits the concurrency of asynchronous uploads within each S3WriteFile to prevent excessive memory usage. If multiple S3WriteFile instances exist, more upload tasks can still run concurrently , but it should be less than or equal to hive.s3.upload-threads.

const int64_t partNumber = ++uploadState_.partNumber;
std::shared_ptr<std::string> partStr =
std::make_shared<std::string>(part.data(), part.size());
futures_.emplace_back(
folly::via(uploadThreadPool_.get(), [this, partNumber, partStr]() {
SCOPE_EXIT {
maxConcurrentUploadNum_->post();
};
try {
Aws::S3::Model::CompletedPart completedPart =
uploadPartSeq(uploadState_.id, partNumber, *partStr);
std::lock_guard<std::mutex> lock(uploadStateMutex_);
uploadState_.completedParts.push_back(std::move(completedPart));
} catch (const std::exception& e) {
LOG(ERROR) << "Exception during async upload: " << e.what();
} catch (...) {
LOG(ERROR) << "Unknown exception during async upload.";
}
}));
}

Aws::S3::S3Client* client_;
memory::MemoryPool* pool_;
std::unique_ptr<dwio::common::DataBuffer<char>> currentPart_;
std::string bucket_;
std::string key_;
size_t fileSize_ = -1;
UploadState uploadState_;
std::mutex uploadStateMutex_;
std::vector<folly::Future<folly::Unit>> futures_;
size_t partUploadSize_;
// maxConcurrentUploadNum_ controls the concurrency of asynchronous uploads to
// S3 for each S3WriteFile, preventing excessive memory usage.
std::unique_ptr<folly::ThrottledLifoSem> maxConcurrentUploadNum_;
inline static std::shared_ptr<folly::CPUThreadPoolExecutor> uploadThreadPool_;
};

S3WriteFile::S3WriteFile(
std::string_view path,
Aws::S3::S3Client* client,
memory::MemoryPool* pool) {
impl_ = std::make_shared<Impl>(path, client, pool);
memory::MemoryPool* pool,
S3Config* s3Config) {
impl_ = std::make_shared<Impl>(path, client, pool, s3Config);
}

void S3WriteFile::append(std::string_view data) {
Expand Down
4 changes: 3 additions & 1 deletion velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "velox/common/file/File.h"
#include "velox/common/memory/MemoryPool.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Config.h"

namespace Aws::S3 {
class S3Client;
Expand Down Expand Up @@ -50,7 +51,8 @@ class S3WriteFile : public WriteFile {
S3WriteFile(
std::string_view path,
Aws::S3::S3Client* client,
memory::MemoryPool* pool);
memory::MemoryPool* pool,
S3Config* s3Config);

/// Appends data to the end of the file.
/// Uploads a part on reaching part size limit.
Expand Down
Loading