diff --git a/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.cpp index cbb4f5c7d40b7..21885ae277ce3 100644 --- a/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.cpp +++ b/presto-native-execution/presto_cpp/main/connectors/HivePrestoToVeloxConnector.cpp @@ -14,6 +14,9 @@ #include "presto_cpp/main/connectors/HivePrestoToVeloxConnector.h" +#include +#include + #include "presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.h" #include "presto_cpp/main/types/PrestoToVeloxExpr.h" #include "presto_cpp/main/types/TypeParser.h" @@ -403,6 +406,9 @@ HivePrestoToVeloxConnector::toVeloxInsertTableHandle( bool isPartitioned{false}; const auto inputColumns = toHiveColumns( hiveOutputTableHandle->inputColumns, typeParser, isPartitioned); + auto serdeParameters = + extractSerdeParameters(hiveOutputTableHandle->additionalTableParameters); + return std::make_unique( inputColumns, toLocationHandle(hiveOutputTableHandle->locationHandle), @@ -410,7 +416,8 @@ HivePrestoToVeloxConnector::toVeloxInsertTableHandle( toHiveBucketProperty( inputColumns, hiveOutputTableHandle->bucketProperty, typeParser), std::optional( - toFileCompressionKind(hiveOutputTableHandle->compressionCodec))); + toFileCompressionKind(hiveOutputTableHandle->compressionCodec)), + std::move(serdeParameters)); } std::unique_ptr diff --git a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.cpp b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.cpp index 0cf8624456f35..624cb364ff35e 100644 --- a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.cpp +++ b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.cpp @@ -807,4 +807,35 @@ std::unique_ptr toHiveTableHandle( columnHandles); } +void extractNimbleSerdeParameters( + const std::map& tableParameters, + std::unordered_map& serdeParameters) { + static constexpr std::string_view kNimblePrefix{"nimble."}; + for (const auto& [key, value] : tableParameters) { + if (key.compare(0, kNimblePrefix.size(), kNimblePrefix) == 0) { + serdeParameters[key] = value; + } + } +} + +std::unordered_map extractSerdeParameters( + const std::map& tableParameters) { + static const std::unordered_set kSerdeKeys = { + "field.delim", + "escape.delim", + "collection.delim", + "mapkey.delim", + "serialization.format", + }; + + std::unordered_map serdeParameters; + for (const auto& [key, value] : tableParameters) { + if (kSerdeKeys.count(key) > 0) { + serdeParameters[key] = value; + } + } + extractNimbleSerdeParameters(tableParameters, serdeParameters); + return serdeParameters; +} + } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.h b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.h index 15e31c0a0f7eb..18f3035e014f8 100644 --- a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.h +++ b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.h @@ -13,6 +13,8 @@ */ #pragma once +#include + #include "presto_cpp/main/types/PrestoToVeloxExpr.h" #include "presto_cpp/presto_protocol/connector/hive/presto_protocol_hive.h" #include "presto_cpp/presto_protocol/core/presto_protocol_core.h" @@ -62,4 +64,15 @@ std::unique_ptr toHiveTableHandle( const VeloxExprConverter& exprConverter, const TypeParser& typeParser); +/// Extracts nimble serde parameters (nimble.*) from table parameters. +void extractNimbleSerdeParameters( + const std::map& tableParameters, + std::unordered_map& serdeParameters); + +/// Extracts serde parameters (textfile delimiters and nimble.* config) from +/// additionalTableParameters during CTAS. +/// Mirrors Java's HiveMetadata.extractSerdeParameters(). +std::unordered_map extractSerdeParameters( + const std::map& tableParameters); + } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxConnectorTest.cpp b/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxConnectorTest.cpp index 69458fa279eb5..d8010ba343c0b 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxConnectorTest.cpp +++ b/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxConnectorTest.cpp @@ -20,6 +20,7 @@ #include "presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/connectors/hive/HiveConnector.h" +#include "velox/connectors/hive/HiveDataSink.h" #include "velox/connectors/hive/TableHandle.h" #include "velox/connectors/hive/iceberg/IcebergColumnHandle.h" @@ -293,3 +294,137 @@ TEST_F(PrestoToVeloxConnectorTest, icebergColumnHandleDeeplyNested) { ASSERT_EQ(icebergHandle->field().children[0].children.size(), 1); EXPECT_EQ(icebergHandle->field().children[0].children[0].fieldId, 3); } + +TEST_F(PrestoToVeloxConnectorTest, ctasPassesTextfileSerdeParameters) { + auto hiveOutputTableHandle = + std::make_shared(); + hiveOutputTableHandle->schemaName = "test_schema"; + hiveOutputTableHandle->tableName = "test_table"; + hiveOutputTableHandle->tableOwner = "owner"; + hiveOutputTableHandle->actualStorageFormat = + protocol::hive::HiveStorageFormat::TEXTFILE; + hiveOutputTableHandle->tableStorageFormat = + protocol::hive::HiveStorageFormat::TEXTFILE; + hiveOutputTableHandle->partitionStorageFormat = + protocol::hive::HiveStorageFormat::TEXTFILE; + hiveOutputTableHandle->compressionCodec = + protocol::hive::HiveCompressionCodec::NONE; + hiveOutputTableHandle->locationHandle.targetPath = "/path/to/target"; + hiveOutputTableHandle->locationHandle.writePath = "/path/to/write"; + hiveOutputTableHandle->locationHandle.tableType = + protocol::hive::TableType::NEW; + hiveOutputTableHandle->additionalTableParameters = { + {"field.delim", "|"}, + {"escape.delim", "\\"}, + {"collection.delim", "$"}, + {"mapkey.delim", "#"}, + {"presto.version", "0.297"}}; + + protocol::OutputTableHandle outputHandle; + outputHandle.connectorId = "hive"; + outputHandle.connectorHandle = hiveOutputTableHandle; + + protocol::CreateHandle createHandle; + createHandle.handle = outputHandle; + + HivePrestoToVeloxConnector hiveConnector("hive"); + auto result = + hiveConnector.toVeloxInsertTableHandle(&createHandle, *typeParser_); + ASSERT_NE(result, nullptr); + + auto* hiveInsert = + dynamic_cast(result.get()); + ASSERT_NE(hiveInsert, nullptr); + + const auto& serdeParams = hiveInsert->serdeParameters(); + // Only serde keys should be extracted, not table-level keys like + // presto.version. + EXPECT_EQ(serdeParams.size(), 4); + EXPECT_EQ(serdeParams.at("field.delim"), "|"); + EXPECT_EQ(serdeParams.at("escape.delim"), "\\"); + EXPECT_EQ(serdeParams.at("collection.delim"), "$"); + EXPECT_EQ(serdeParams.at("mapkey.delim"), "#"); +} + +TEST_F(PrestoToVeloxConnectorTest, ctasPassesNimbleSerdeParameters) { + auto hiveOutputTableHandle = + std::make_shared(); + hiveOutputTableHandle->schemaName = "test_schema"; + hiveOutputTableHandle->tableName = "test_table"; + hiveOutputTableHandle->tableOwner = "owner"; + hiveOutputTableHandle->actualStorageFormat = + protocol::hive::HiveStorageFormat::ALPHA; + hiveOutputTableHandle->tableStorageFormat = + protocol::hive::HiveStorageFormat::ALPHA; + hiveOutputTableHandle->partitionStorageFormat = + protocol::hive::HiveStorageFormat::ALPHA; + hiveOutputTableHandle->compressionCodec = + protocol::hive::HiveCompressionCodec::NONE; + hiveOutputTableHandle->locationHandle.targetPath = "/path/to/target"; + hiveOutputTableHandle->locationHandle.writePath = "/path/to/write"; + hiveOutputTableHandle->locationHandle.tableType = + protocol::hive::TableType::NEW; + hiveOutputTableHandle->additionalTableParameters = { + {"nimble.enable.vectorized.stats", "true"}, + {"nimble.index.columns", "id"}, + {"presto.version", "0.297"}}; + + protocol::OutputTableHandle outputHandle; + outputHandle.connectorId = "hive"; + outputHandle.connectorHandle = hiveOutputTableHandle; + + protocol::CreateHandle createHandle; + createHandle.handle = outputHandle; + + HivePrestoToVeloxConnector hiveConnector("hive"); + auto result = + hiveConnector.toVeloxInsertTableHandle(&createHandle, *typeParser_); + ASSERT_NE(result, nullptr); + + auto* hiveInsert = + dynamic_cast(result.get()); + ASSERT_NE(hiveInsert, nullptr); + + const auto& serdeParams = hiveInsert->serdeParameters(); + EXPECT_EQ(serdeParams.size(), 2); + EXPECT_EQ(serdeParams.at("nimble.enable.vectorized.stats"), "true"); + EXPECT_EQ(serdeParams.at("nimble.index.columns"), "id"); +} + +TEST_F(PrestoToVeloxConnectorTest, ctasEmptySerdeParameters) { + auto hiveOutputTableHandle = + std::make_shared(); + hiveOutputTableHandle->schemaName = "test_schema"; + hiveOutputTableHandle->tableName = "test_table"; + hiveOutputTableHandle->tableOwner = "owner"; + hiveOutputTableHandle->actualStorageFormat = + protocol::hive::HiveStorageFormat::DWRF; + hiveOutputTableHandle->tableStorageFormat = + protocol::hive::HiveStorageFormat::DWRF; + hiveOutputTableHandle->partitionStorageFormat = + protocol::hive::HiveStorageFormat::DWRF; + hiveOutputTableHandle->compressionCodec = + protocol::hive::HiveCompressionCodec::NONE; + hiveOutputTableHandle->locationHandle.targetPath = "/path/to/target"; + hiveOutputTableHandle->locationHandle.writePath = "/path/to/write"; + hiveOutputTableHandle->locationHandle.tableType = + protocol::hive::TableType::NEW; + + protocol::OutputTableHandle outputHandle; + outputHandle.connectorId = "hive"; + outputHandle.connectorHandle = hiveOutputTableHandle; + + protocol::CreateHandle createHandle; + createHandle.handle = outputHandle; + + HivePrestoToVeloxConnector hiveConnector("hive"); + auto result = + hiveConnector.toVeloxInsertTableHandle(&createHandle, *typeParser_); + ASSERT_NE(result, nullptr); + + auto* hiveInsert = + dynamic_cast(result.get()); + ASSERT_NE(hiveInsert, nullptr); + + EXPECT_TRUE(hiveInsert->serdeParameters().empty()); +}