Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ uint32_t HiveConfig::maxPartitionsPerWriters(
config_->get<uint32_t>(kMaxPartitionsPerWriters, 128));
}

uint32_t HiveConfig::maxBucketCount(const config::ConfigBase* session) const {
return session->get<uint32_t>(
kMaxBucketCountSession, config_->get<uint32_t>(kMaxBucketCount, 100'000));
}

bool HiveConfig::immutablePartitions() const {
return config_->get<bool>(kImmutablePartitions, false);
}
Expand Down
6 changes: 6 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class HiveConfig {
static constexpr const char* kMaxPartitionsPerWritersSession =
"max_partitions_per_writers";

/// Maximum number of bucketed count.
static constexpr const char* kMaxBucketCount = "max-bucket-count";
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be hive.max-bucket-count. Same with max-partitions-per-writers. Can you fix both?

Copy link
Copy Markdown
Collaborator

@majetideepak majetideepak May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also kInsertExistingPartitionsBehavior needs the hive. prefix.

static constexpr const char* kMaxBucketCountSession = "max_bucket_count";

/// Whether new data can be inserted into an unpartition table.
/// Velox currently does not support appending data to existing partitions.
static constexpr const char* kImmutablePartitions =
Expand Down Expand Up @@ -188,6 +192,8 @@ class HiveConfig {

uint32_t maxPartitionsPerWriters(const config::ConfigBase* session) const;

uint32_t maxBucketCount(const config::ConfigBase* session) const;

bool immutablePartitions() const;

std::string gcsEndpoint() const;
Expand Down
20 changes: 14 additions & 6 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,10 @@ std::unique_ptr<core::PartitionFunction> createBucketFunction(

std::string computeBucketedFileName(
const std::string& queryId,
uint32_t bucket) {
uint32_t bucket,
uint32_t maxBucketCount) {
static const uint32_t kMaxBucketCountPadding =
std::to_string(HiveDataSink::maxBucketCount() - 1).size();
std::to_string(maxBucketCount - 1).size();
const std::string bucketValueStr = std::to_string(bucket);
return fmt::format(
"0{:0>{}}_0_{}", bucketValueStr, kMaxBucketCountPadding, queryId);
Expand Down Expand Up @@ -400,6 +401,8 @@ HiveDataSink::HiveDataSink(
updateMode_(getUpdateMode()),
maxOpenWriters_(hiveConfig_->maxPartitionsPerWriters(
connectorQueryCtx->sessionProperties())),
maxBucketCount_(
hiveConfig_->maxBucketCount(connectorQueryCtx->sessionProperties())),
partitionChannels_(getPartitionChannels(insertTableHandle_)),
partitionIdGenerator_(
!partitionChannels_.empty()
Expand All @@ -424,7 +427,7 @@ HiveDataSink::HiveDataSink(
fileNameGenerator_(insertTableHandle_->fileNameGenerator()) {
if (isBucketed()) {
VELOX_USER_CHECK_LT(
bucketCount_, maxBucketCount(), "bucketCount exceeds the limit");
bucketCount_, maxBucketCount_, "bucketCount exceeds the limit");
}
VELOX_USER_CHECK(
(commitStrategy_ == CommitStrategy::kNoCommit) ||
Expand Down Expand Up @@ -933,11 +936,16 @@ HiveWriterParameters HiveDataSink::getWriterParameters(
std::pair<std::string, std::string> HiveDataSink::getWriterFileNames(
std::optional<uint32_t> bucketId) const {
return fileNameGenerator_->gen(
bucketId, insertTableHandle_, *connectorQueryCtx_, isCommitRequired());
bucketId,
maxBucketCount_,
insertTableHandle_,
*connectorQueryCtx_,
isCommitRequired());
}

std::pair<std::string, std::string> HiveInsertFileNameGenerator::gen(
std::optional<uint32_t> bucketId,
std::optional<uint32_t> maxBucketCount,
const std::shared_ptr<const HiveInsertTableHandle> insertTableHandle,
const ConnectorQueryCtx& connectorQueryCtx,
bool commitRequired) const {
Expand All @@ -946,8 +954,8 @@ std::pair<std::string, std::string> HiveInsertFileNameGenerator::gen(
if (bucketId.has_value()) {
VELOX_CHECK(generateFileName);
// TODO: add hive.file_renaming_enabled support.
targetFileName =
computeBucketedFileName(connectorQueryCtx.queryId(), bucketId.value());
targetFileName = computeBucketedFileName(
connectorQueryCtx.queryId(), bucketId.value(), maxBucketCount.value());
} else if (generateFileName) {
// targetFileName includes planNodeId and Uuid. As a result, different
// table writers run by the same task driver or the same table writer
Expand Down
8 changes: 3 additions & 5 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ class FileNameGenerator : public ISerializable {

virtual std::pair<std::string, std::string> gen(
std::optional<uint32_t> bucketId,
std::optional<uint32_t> maxBucketCount,
Comment thread
JkSelf marked this conversation as resolved.
Outdated
const std::shared_ptr<const HiveInsertTableHandle> insertTableHandle,
const ConnectorQueryCtx& connectorQueryCtx,
bool commitRequired) const = 0;
Expand All @@ -213,6 +214,7 @@ class HiveInsertFileNameGenerator : public FileNameGenerator {

std::pair<std::string, std::string> gen(
std::optional<uint32_t> bucketId,
std::optional<uint32_t> maxBucketCount,
const std::shared_ptr<const HiveInsertTableHandle> insertTableHandle,
const ConnectorQueryCtx& connectorQueryCtx,
bool commitRequired) const override;
Expand Down Expand Up @@ -525,11 +527,6 @@ class HiveDataSink : public DataSink {
uint32_t bucketCount,
std::unique_ptr<core::PartitionFunction> bucketFunction);

static uint32_t maxBucketCount() {
static const uint32_t kMaxBucketCount = 100'000;
return kMaxBucketCount;
}

void appendData(RowVectorPtr input) override;

bool finish() override;
Expand Down Expand Up @@ -665,6 +662,7 @@ class HiveDataSink : public DataSink {
const std::shared_ptr<const HiveConfig> hiveConfig_;
const HiveWriterParameters::UpdateMode updateMode_;
const uint32_t maxOpenWriters_;
const uint32_t maxBucketCount_;
const std::vector<column_index_t> partitionChannels_;
const std::unique_ptr<PartitionIdGenerator> partitionIdGenerator_;
// Indices of dataChannel are stored in ascending order
Expand Down
5 changes: 5 additions & 0 deletions velox/connectors/hive/tests/HiveConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ TEST(HiveConfigTest, defaultConfig) {
facebook::velox::connector::hive::HiveConfig::
InsertExistingPartitionsBehavior::kError);
ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(emptySession.get()), 128);
ASSERT_EQ(hiveConfig.maxBucketCount(emptySession.get()), 100'000);
ASSERT_EQ(hiveConfig.immutablePartitions(), false);
ASSERT_EQ(hiveConfig.gcsEndpoint(), "");
ASSERT_EQ(hiveConfig.gcsCredentialsPath(), "");
Expand Down Expand Up @@ -59,6 +60,7 @@ TEST(HiveConfigTest, overrideConfig) {
std::unordered_map<std::string, std::string> configFromFile = {
{HiveConfig::kInsertExistingPartitionsBehavior, "OVERWRITE"},
{HiveConfig::kMaxPartitionsPerWriters, "120"},
{HiveConfig::kMaxBucketCount, "200"},
{HiveConfig::kImmutablePartitions, "true"},
{HiveConfig::kGcsEndpoint, "hey"},
{HiveConfig::kGcsCredentialsPath, "hey"},
Expand All @@ -84,6 +86,7 @@ TEST(HiveConfigTest, overrideConfig) {
facebook::velox::connector::hive::HiveConfig::
InsertExistingPartitionsBehavior::kOverwrite);
ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(emptySession.get()), 120);
ASSERT_EQ(hiveConfig.maxBucketCount(emptySession.get()), 200);
ASSERT_TRUE(hiveConfig.immutablePartitions());
ASSERT_EQ(hiveConfig.gcsEndpoint(), "hey");
ASSERT_EQ(hiveConfig.gcsCredentialsPath(), "hey");
Expand Down Expand Up @@ -121,6 +124,7 @@ TEST(HiveConfigTest, overrideSession) {
{HiveConfig::kAllowNullPartitionKeysSession, "false"},
{HiveConfig::kIgnoreMissingFilesSession, "true"},
{HiveConfig::kReadStatsBasedFilterReorderDisabledSession, "true"},
{HiveConfig::kMaxBucketCountSession, "200"},
{HiveConfig::kLoadQuantumSession, std::to_string(4 << 20)}};
const auto session =
std::make_unique<config::ConfigBase>(std::move(sessionOverride));
Expand All @@ -129,6 +133,7 @@ TEST(HiveConfigTest, overrideSession) {
facebook::velox::connector::hive::HiveConfig::
InsertExistingPartitionsBehavior::kOverwrite);
ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(session.get()), 128);
ASSERT_EQ(hiveConfig.maxBucketCount(session.get()), 200);
ASSERT_FALSE(hiveConfig.immutablePartitions());
ASSERT_EQ(hiveConfig.gcsEndpoint(), "");
ASSERT_EQ(hiveConfig.gcsCredentialsPath(), "");
Expand Down
5 changes: 5 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,11 @@ Each query can override the config by setting corresponding query session proper
- integer
- 100
- Maximum number of (bucketed) partitions per a single table writer instance.
* - hive.max-bucket-count
-
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the session property name here and above for hive.max-partitions-per-writers

- integer
- 100'000
- Maximum number of bucket count for table writer.
* - insert-existing-partitions-behavior
- insert_existing_partitions_behavior
- string
Expand Down
16 changes: 12 additions & 4 deletions velox/exec/tests/TableWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1557,6 +1557,14 @@ TEST_P(BucketedTableOnlyWriteTest, bucketCountLimit) {
SCOPED_TRACE(testParam_.toString());
auto input = makeVectors(1, 100);
createDuckDbTable(input);

// Get the default maxBucketCount from config.
HiveConfig hiveConfig(std::make_shared<config::ConfigBase>(
std::unordered_map<std::string, std::string>()));
const auto emptySession = std::make_unique<config::ConfigBase>(
std::unordered_map<std::string, std::string>());
uint32_t maxBucketCount = hiveConfig.maxBucketCount(emptySession.get());

struct {
uint32_t bucketCount;
bool expectedError;
Expand All @@ -1568,10 +1576,10 @@ TEST_P(BucketedTableOnlyWriteTest, bucketCountLimit) {
} testSettings[] = {
{1, false},
{3, false},
{HiveDataSink::maxBucketCount() - 1, false},
{HiveDataSink::maxBucketCount(), true},
{HiveDataSink::maxBucketCount() + 1, true},
{HiveDataSink::maxBucketCount() * 2, true}};
{maxBucketCount - 1, false},
{maxBucketCount, true},
{maxBucketCount + 1, true},
{maxBucketCount * 2, true}};
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
auto outputDirectory = TempDirectoryPath::create();
Expand Down
Loading