diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp index 900beb05b4741..5396cad2b1d71 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp @@ -1028,6 +1028,41 @@ toHiveBucketProperty( sortedBy); } +std::unique_ptr +toVeloxHiveColumnHandle( + const protocol::ColumnHandle* column, + const TypeParser& typeParser) { + auto* hiveColumn = dynamic_cast(column); + VELOX_CHECK_NOT_NULL( + hiveColumn, "Unexpected column handle type {}", column->_type); + velox::type::fbhive::HiveTypeParser hiveTypeParser; + // TODO(spershin): Should we pass something different than 'typeSignature' + // to 'hiveType' argument of the 'HiveColumnHandle' constructor? + return std::make_unique( + hiveColumn->name, + toHiveColumnType(hiveColumn->columnType), + stringToType(hiveColumn->typeSignature, typeParser), + hiveTypeParser.parse(hiveColumn->hiveType), + toRequiredSubfields(hiveColumn->requiredSubfields)); +} + +velox::connector::hive::HiveBucketConversion toVeloxBucketConversion( + const protocol::BucketConversion& bucketConversion) { + velox::connector::hive::HiveBucketConversion veloxBucketConversion; + // Current table bucket count (new). + veloxBucketConversion.tableBucketCount = bucketConversion.tableBucketCount; + // Partition bucket count (old). + veloxBucketConversion.partitionBucketCount = + bucketConversion.partitionBucketCount; + TypeParser typeParser; + for (const auto& column : bucketConversion.bucketColumnHandles) { + // Columns used as bucket input. + veloxBucketConversion.bucketColumnHandles.push_back( + toVeloxHiveColumnHandle(&column, typeParser)); + } + return veloxBucketConversion; +} + velox::connector::hive::iceberg::FileContent toVeloxFileContent( const presto::protocol::FileContent content) { if (content == protocol::FileContent::DATA) { @@ -1075,39 +1110,35 @@ HivePrestoToVeloxConnector::toVeloxSplit( infoColumns.insert( {"$file_modified_time", std::to_string(hiveSplit->fileSplit.fileModifiedTime)}); - return std::make_unique( - catalogId, - hiveSplit->fileSplit.path, - toVeloxFileFormat(hiveSplit->storage.storageFormat), - hiveSplit->fileSplit.start, - hiveSplit->fileSplit.length, - partitionKeys, - hiveSplit->tableBucketNumber - ? std::optional(*hiveSplit->tableBucketNumber) - : std::nullopt, - customSplitInfo, - extraFileInfo, - serdeParameters, - hiveSplit->splitWeight, - infoColumns); + auto veloxSplit = + std::make_unique( + catalogId, + hiveSplit->fileSplit.path, + toVeloxFileFormat(hiveSplit->storage.storageFormat), + hiveSplit->fileSplit.start, + hiveSplit->fileSplit.length, + partitionKeys, + hiveSplit->tableBucketNumber + ? std::optional(*hiveSplit->tableBucketNumber) + : std::nullopt, + customSplitInfo, + extraFileInfo, + serdeParameters, + hiveSplit->splitWeight, + infoColumns); + if (hiveSplit->bucketConversion) { + VELOX_CHECK_NOT_NULL(hiveSplit->tableBucketNumber); + veloxSplit->bucketConversion = + toVeloxBucketConversion(*hiveSplit->bucketConversion); + } + return veloxSplit; } std::unique_ptr HivePrestoToVeloxConnector::toVeloxColumnHandle( const protocol::ColumnHandle* column, const TypeParser& typeParser) const { - auto hiveColumn = dynamic_cast(column); - VELOX_CHECK_NOT_NULL( - hiveColumn, "Unexpected column handle type {}", column->_type); - velox::type::fbhive::HiveTypeParser hiveTypeParser; - // TODO(spershin): Should we pass something different than 'typeSignature' - // to 'hiveType' argument of the 'HiveColumnHandle' constructor? - return std::make_unique( - hiveColumn->name, - toHiveColumnType(hiveColumn->columnType), - stringToType(hiveColumn->typeSignature, typeParser), - hiveTypeParser.parse(hiveColumn->hiveType), - toRequiredSubfields(hiveColumn->requiredSubfields)); + return toVeloxHiveColumnHandle(column, typeParser); } std::unique_ptr diff --git a/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxSplitTest.cpp b/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxSplitTest.cpp index 951b6900a6531..5e3a8f98c295a 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxSplitTest.cpp +++ b/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxSplitTest.cpp @@ -135,3 +135,33 @@ TEST_F(PrestoToVeloxSplitTest, serdeParameters) { dwio::common::SerDeOptions::kMapKeyDelim), "|"); } + +TEST_F(PrestoToVeloxSplitTest, bucketConversion) { + auto scheduledSplit = makeHiveScheduledSplit(); + auto& hiveSplit = + static_cast(*scheduledSplit.split.connectorSplit); + hiveSplit.tableBucketNumber = std::make_shared(42); + hiveSplit.bucketConversion = std::make_shared(); + hiveSplit.bucketConversion->tableBucketCount = 4096; + hiveSplit.bucketConversion->partitionBucketCount = 512; + auto& column = hiveSplit.bucketConversion->bucketColumnHandles.emplace_back(); + column.name = "c0"; + column.hiveType = "bigint"; + column.typeSignature = "bigint"; + column.columnType = protocol::ColumnType::REGULAR; + auto veloxSplit = toVeloxSplit(scheduledSplit); + const auto& veloxHiveSplit = + static_cast( + *veloxSplit.connectorSplit); + ASSERT_TRUE(veloxHiveSplit.bucketConversion.has_value()); + ASSERT_EQ(veloxHiveSplit.bucketConversion->tableBucketCount, 4096); + ASSERT_EQ(veloxHiveSplit.bucketConversion->partitionBucketCount, 512); + ASSERT_EQ(veloxHiveSplit.bucketConversion->bucketColumnHandles.size(), 1); + auto& veloxColumn = veloxHiveSplit.bucketConversion->bucketColumnHandles[0]; + ASSERT_EQ(veloxColumn->name(), "c0"); + ASSERT_EQ(*veloxColumn->dataType(), *BIGINT()); + ASSERT_EQ(*veloxColumn->hiveType(), *BIGINT()); + ASSERT_EQ( + veloxColumn->columnType(), + connector::hive::HiveColumnHandle::ColumnType::kRegular); +} diff --git a/presto-native-execution/velox b/presto-native-execution/velox index 39743cd9a96f3..db287c99773d9 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit 39743cd9a96f3d2d83ab4a77554dc5584e08bfb6 +Subproject commit db287c99773d9c3ed50cc1465be4beccbbb1b616