diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index 5eaab02e648..f87547d6bcf 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -67,6 +67,11 @@ uint32_t HiveConfig::maxPartitionsPerWriters( config_->get(kMaxPartitionsPerWriters, 128)); } +uint32_t HiveConfig::maxBucketCount(const config::ConfigBase* session) const { + return session->get( + kMaxBucketCountSession, config_->get(kMaxBucketCount, 100'000)); +} + bool HiveConfig::immutablePartitions() const { return config_->get(kImmutablePartitions, false); } diff --git a/velox/connectors/hive/HiveConfig.h b/velox/connectors/hive/HiveConfig.h index 04dbd754b81..36d6283ab1c 100644 --- a/velox/connectors/hive/HiveConfig.h +++ b/velox/connectors/hive/HiveConfig.h @@ -49,6 +49,10 @@ class HiveConfig { static constexpr const char* kMaxPartitionsPerWritersSession = "max_partitions_per_writers"; + /// Maximum number of bucketed count. + static constexpr const char* kMaxBucketCount = "max-bucket-count"; + static constexpr const char* kMaxBucketCountSession = "max_bucket_count"; + /// Whether new data can be inserted into an unpartition table. /// Velox currently does not support appending data to existing partitions. static constexpr const char* kImmutablePartitions = @@ -188,6 +192,8 @@ class HiveConfig { uint32_t maxPartitionsPerWriters(const config::ConfigBase* session) const; + uint32_t maxBucketCount(const config::ConfigBase* session) const; + bool immutablePartitions() const; std::string gcsEndpoint() const; diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 36ec297220d..2b9ab173c1c 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -166,9 +166,10 @@ std::unique_ptr createBucketFunction( std::string computeBucketedFileName( const std::string& queryId, - uint32_t bucket) { + uint32_t bucket, + uint32_t maxBucketCount) { static const uint32_t kMaxBucketCountPadding = - std::to_string(HiveDataSink::maxBucketCount() - 1).size(); + std::to_string(maxBucketCount - 1).size(); const std::string bucketValueStr = std::to_string(bucket); return fmt::format( "0{:0>{}}_0_{}", bucketValueStr, kMaxBucketCountPadding, queryId); @@ -400,6 +401,8 @@ HiveDataSink::HiveDataSink( updateMode_(getUpdateMode()), maxOpenWriters_(hiveConfig_->maxPartitionsPerWriters( connectorQueryCtx->sessionProperties())), + maxBucketCount_( + hiveConfig_->maxBucketCount(connectorQueryCtx->sessionProperties())), partitionChannels_(getPartitionChannels(insertTableHandle_)), partitionIdGenerator_( !partitionChannels_.empty() @@ -424,7 +427,7 @@ HiveDataSink::HiveDataSink( fileNameGenerator_(insertTableHandle_->fileNameGenerator()) { if (isBucketed()) { VELOX_USER_CHECK_LT( - bucketCount_, maxBucketCount(), "bucketCount exceeds the limit"); + bucketCount_, maxBucketCount_, "bucketCount exceeds the limit"); } VELOX_USER_CHECK( (commitStrategy_ == CommitStrategy::kNoCommit) || @@ -933,11 +936,16 @@ HiveWriterParameters HiveDataSink::getWriterParameters( std::pair HiveDataSink::getWriterFileNames( std::optional bucketId) const { return fileNameGenerator_->gen( - bucketId, insertTableHandle_, *connectorQueryCtx_, isCommitRequired()); + bucketId, + maxBucketCount_, + insertTableHandle_, + *connectorQueryCtx_, + isCommitRequired()); } std::pair HiveInsertFileNameGenerator::gen( std::optional bucketId, + std::optional maxBucketCount, const std::shared_ptr insertTableHandle, const ConnectorQueryCtx& connectorQueryCtx, bool commitRequired) const { @@ -946,8 +954,8 @@ std::pair HiveInsertFileNameGenerator::gen( if (bucketId.has_value()) { VELOX_CHECK(generateFileName); // TODO: add hive.file_renaming_enabled support. - targetFileName = - computeBucketedFileName(connectorQueryCtx.queryId(), bucketId.value()); + targetFileName = computeBucketedFileName( + connectorQueryCtx.queryId(), bucketId.value(), maxBucketCount.value()); } else if (generateFileName) { // targetFileName includes planNodeId and Uuid. As a result, different // table writers run by the same task driver or the same table writer diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index c1354b1ca6b..31615f3102c 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -200,6 +200,7 @@ class FileNameGenerator : public ISerializable { virtual std::pair gen( std::optional bucketId, + std::optional maxBucketCount, const std::shared_ptr insertTableHandle, const ConnectorQueryCtx& connectorQueryCtx, bool commitRequired) const = 0; @@ -213,6 +214,7 @@ class HiveInsertFileNameGenerator : public FileNameGenerator { std::pair gen( std::optional bucketId, + std::optional maxBucketCount, const std::shared_ptr insertTableHandle, const ConnectorQueryCtx& connectorQueryCtx, bool commitRequired) const override; @@ -525,11 +527,6 @@ class HiveDataSink : public DataSink { uint32_t bucketCount, std::unique_ptr bucketFunction); - static uint32_t maxBucketCount() { - static const uint32_t kMaxBucketCount = 100'000; - return kMaxBucketCount; - } - void appendData(RowVectorPtr input) override; bool finish() override; @@ -665,6 +662,7 @@ class HiveDataSink : public DataSink { const std::shared_ptr hiveConfig_; const HiveWriterParameters::UpdateMode updateMode_; const uint32_t maxOpenWriters_; + const uint32_t maxBucketCount_; const std::vector partitionChannels_; const std::unique_ptr partitionIdGenerator_; // Indices of dataChannel are stored in ascending order diff --git a/velox/connectors/hive/tests/HiveConfigTest.cpp b/velox/connectors/hive/tests/HiveConfigTest.cpp index ae76870f492..db791868685 100644 --- a/velox/connectors/hive/tests/HiveConfigTest.cpp +++ b/velox/connectors/hive/tests/HiveConfigTest.cpp @@ -32,6 +32,7 @@ TEST(HiveConfigTest, defaultConfig) { facebook::velox::connector::hive::HiveConfig:: InsertExistingPartitionsBehavior::kError); ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(emptySession.get()), 128); + ASSERT_EQ(hiveConfig.maxBucketCount(emptySession.get()), 100'000); ASSERT_EQ(hiveConfig.immutablePartitions(), false); ASSERT_EQ(hiveConfig.gcsEndpoint(), ""); ASSERT_EQ(hiveConfig.gcsCredentialsPath(), ""); @@ -59,6 +60,7 @@ TEST(HiveConfigTest, overrideConfig) { std::unordered_map configFromFile = { {HiveConfig::kInsertExistingPartitionsBehavior, "OVERWRITE"}, {HiveConfig::kMaxPartitionsPerWriters, "120"}, + {HiveConfig::kMaxBucketCount, "200"}, {HiveConfig::kImmutablePartitions, "true"}, {HiveConfig::kGcsEndpoint, "hey"}, {HiveConfig::kGcsCredentialsPath, "hey"}, @@ -84,6 +86,7 @@ TEST(HiveConfigTest, overrideConfig) { facebook::velox::connector::hive::HiveConfig:: InsertExistingPartitionsBehavior::kOverwrite); ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(emptySession.get()), 120); + ASSERT_EQ(hiveConfig.maxBucketCount(emptySession.get()), 200); ASSERT_TRUE(hiveConfig.immutablePartitions()); ASSERT_EQ(hiveConfig.gcsEndpoint(), "hey"); ASSERT_EQ(hiveConfig.gcsCredentialsPath(), "hey"); @@ -121,6 +124,7 @@ TEST(HiveConfigTest, overrideSession) { {HiveConfig::kAllowNullPartitionKeysSession, "false"}, {HiveConfig::kIgnoreMissingFilesSession, "true"}, {HiveConfig::kReadStatsBasedFilterReorderDisabledSession, "true"}, + {HiveConfig::kMaxBucketCountSession, "200"}, {HiveConfig::kLoadQuantumSession, std::to_string(4 << 20)}}; const auto session = std::make_unique(std::move(sessionOverride)); @@ -129,6 +133,7 @@ TEST(HiveConfigTest, overrideSession) { facebook::velox::connector::hive::HiveConfig:: InsertExistingPartitionsBehavior::kOverwrite); ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(session.get()), 128); + ASSERT_EQ(hiveConfig.maxBucketCount(session.get()), 200); ASSERT_FALSE(hiveConfig.immutablePartitions()); ASSERT_EQ(hiveConfig.gcsEndpoint(), ""); ASSERT_EQ(hiveConfig.gcsCredentialsPath(), ""); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 9c19b600e06..d3a77ee14cc 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -521,6 +521,11 @@ Each query can override the config by setting corresponding query session proper - integer - 100 - Maximum number of (bucketed) partitions per a single table writer instance. + * - hive.max-bucket-count + - + - integer + - 100'000 + - Maximum number of bucket count for table writer. * - insert-existing-partitions-behavior - insert_existing_partitions_behavior - string diff --git a/velox/exec/tests/TableWriterTest.cpp b/velox/exec/tests/TableWriterTest.cpp index 461d3a124dc..f5ae37b4e95 100644 --- a/velox/exec/tests/TableWriterTest.cpp +++ b/velox/exec/tests/TableWriterTest.cpp @@ -1557,6 +1557,14 @@ TEST_P(BucketedTableOnlyWriteTest, bucketCountLimit) { SCOPED_TRACE(testParam_.toString()); auto input = makeVectors(1, 100); createDuckDbTable(input); + + // Get the default maxBucketCount from config. + HiveConfig hiveConfig(std::make_shared( + std::unordered_map())); + const auto emptySession = std::make_unique( + std::unordered_map()); + uint32_t maxBucketCount = hiveConfig.maxBucketCount(emptySession.get()); + struct { uint32_t bucketCount; bool expectedError; @@ -1568,10 +1576,10 @@ TEST_P(BucketedTableOnlyWriteTest, bucketCountLimit) { } testSettings[] = { {1, false}, {3, false}, - {HiveDataSink::maxBucketCount() - 1, false}, - {HiveDataSink::maxBucketCount(), true}, - {HiveDataSink::maxBucketCount() + 1, true}, - {HiveDataSink::maxBucketCount() * 2, true}}; + {maxBucketCount - 1, false}, + {maxBucketCount, true}, + {maxBucketCount + 1, true}, + {maxBucketCount * 2, true}}; for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); auto outputDirectory = TempDirectoryPath::create();