diff --git a/velox/connectors/hive/HiveConnectorSplit.h b/velox/connectors/hive/HiveConnectorSplit.h index 15be998884b..372cbf62895 100644 --- a/velox/connectors/hive/HiveConnectorSplit.h +++ b/velox/connectors/hive/HiveConnectorSplit.h @@ -38,6 +38,7 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { std::unordered_map customSplitInfo; std::shared_ptr extraFileInfo; std::unordered_map serdeParameters; + std::unordered_map metadataColumns; HiveConnectorSplit( const std::string& connectorId, @@ -50,7 +51,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { std::optional _tableBucketNumber = std::nullopt, const std::unordered_map& _customSplitInfo = {}, const std::shared_ptr& _extraFileInfo = {}, - const std::unordered_map& _serdeParameters = {}) + const std::unordered_map& _serdeParameters = {}, + const std::unordered_map& _metadataColumns = {}) : ConnectorSplit(connectorId), filePath(_filePath), fileFormat(_fileFormat), @@ -60,7 +62,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { tableBucketNumber(_tableBucketNumber), customSplitInfo(_customSplitInfo), extraFileInfo(_extraFileInfo), - serdeParameters(_serdeParameters) {} + serdeParameters(_serdeParameters), + metadataColumns(_metadataColumns) {} std::string toString() const override { if (tableBucketNumber.has_value()) { diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index 4fe174ed794..937547c7b5b 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -308,6 +308,8 @@ std::shared_ptr makeScanSpec( const RowTypePtr& dataColumns, const std::unordered_map>& partitionKeys, + const std::unordered_map>& + metadataColumns, memory::MemoryPool* pool) { auto spec = std::make_shared("root"); folly::F14FastMap> diff --git a/velox/connectors/hive/HiveConnectorUtil.h b/velox/connectors/hive/HiveConnectorUtil.h index 51335f09e76..c3c83fbb738 100644 --- a/velox/connectors/hive/HiveConnectorUtil.h +++ b/velox/connectors/hive/HiveConnectorUtil.h @@ -51,6 +51,8 @@ std::shared_ptr makeScanSpec( const RowTypePtr& dataColumns, const std::unordered_map>& partitionKeys, + const std::unordered_map>& + metadataColumns, memory::MemoryPool* pool); void configureReaderOptions( diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 6ff99105364..871928bb92d 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -119,6 +119,9 @@ HiveDataSource::HiveDataSource( if (handle->columnType() == HiveColumnHandle::ColumnType::kPartitionKey) { partitionKeys_.emplace(handle->name(), handle); } + if (handle->columnType() == HiveColumnHandle::ColumnType::kMetadata) { + metadataColumns_.emplace(handle->name(), handle); + } } std::vector readerRowNames; @@ -206,6 +209,7 @@ HiveDataSource::HiveDataSource( filters, hiveTableHandle_->dataColumns(), partitionKeys_, + metadataColumns_, pool_); if (remainingFilter) { metadataFilter_ = std::make_shared( diff --git a/velox/connectors/hive/HiveDataSource.h b/velox/connectors/hive/HiveDataSource.h index b2b4cac7aec..61e076ba772 100644 --- a/velox/connectors/hive/HiveDataSource.h +++ b/velox/connectors/hive/HiveDataSource.h @@ -100,6 +100,11 @@ class HiveDataSource : public DataSource { std::unordered_map> partitionKeys_; + // Column handles for the metadata columns keyed on metadata column + // name. + std::unordered_map> + metadataColumns_; + FileHandleFactory* const fileHandleFactory_; folly::Executor* const executor_; const ConnectorQueryCtx* const connectorQueryCtx_; diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index b6cce986008..8195c49a1da 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -155,6 +155,33 @@ void SplitReader::prepareSplit( baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_); } +namespace { +template +velox::variant convertFromString( + const std::optional& value, + const TypePtr& toType) { + if (value.has_value()) { + if constexpr (ToKind == TypeKind::VARCHAR) { + return velox::variant(value.value()); + } + if constexpr (ToKind == TypeKind::VARBINARY) { + return velox::variant::binary((value.value())); + } + if (toType->isDate()) { + return velox::variant(util::castFromDateString( + StringView(value.value()), true /*isIso8601*/)); + } + auto result = velox::util::Converter::cast(value.value()); + if constexpr (ToKind == TypeKind::TIMESTAMP) { + result.toGMT(Timestamp::defaultTimezone()); + } + return velox::variant(result); + } + return velox::variant(ToKind); +} + +} // namespace + std::vector SplitReader::adaptColumns( const RowTypePtr& fileType, const std::shared_ptr& tableSchema) { @@ -166,9 +193,19 @@ std::vector SplitReader::adaptColumns( auto* childSpec = childrenSpecs[i].get(); const std::string& fieldName = childSpec->fieldName(); - auto iter = hiveSplit_->partitionKeys.find(fieldName); - if (iter != hiveSplit_->partitionKeys.end()) { - setPartitionValue(childSpec, fieldName, iter->second); + auto partitionKey = hiveSplit_->partitionKeys.find(fieldName); + auto metadataColumn = hiveSplit_->metadataColumns.find(fieldName); + if (partitionKey != hiveSplit_->partitionKeys.end()) { + setPartitionValue(childSpec, fieldName, partitionKey->second); + } else if (metadataColumn != hiveSplit_->metadataColumns.end()) { + auto metadataColumnOutputType = + readerOutputType_->childAt(readerOutputType_->getChildIdx(fieldName)); + auto constValue = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( + convertFromString, + metadataColumnOutputType->kind(), + std::make_optional(metadataColumn->second), + metadataColumnOutputType); + setConstantValue(childSpec, metadataColumnOutputType, constValue); } else if (fieldName == kPath) { setConstantValue( childSpec, VARCHAR(), velox::variant(hiveSplit_->filePath)); @@ -260,34 +297,6 @@ void SplitReader::setNullConstantValue( type, 1, connectorQueryCtx_->memoryPool())); } -namespace { - -template -velox::variant convertFromString( - const std::optional& value, - const TypePtr& toType) { - if (value.has_value()) { - if constexpr (ToKind == TypeKind::VARCHAR) { - return velox::variant(value.value()); - } - if constexpr (ToKind == TypeKind::VARBINARY) { - return velox::variant::binary((value.value())); - } - if (toType->isDate()) { - return velox::variant(util::castFromDateString( - StringView(value.value()), true /*isIso8601*/)); - } - auto result = velox::util::Converter::cast(value.value()); - if constexpr (ToKind == TypeKind::TIMESTAMP) { - result.toGMT(Timestamp::defaultTimezone()); - } - return velox::variant(result); - } - return velox::variant(ToKind); -} - -} // namespace - void SplitReader::setPartitionValue( common::ScanSpec* spec, const std::string& partitionKey, diff --git a/velox/connectors/hive/TableHandle.cpp b/velox/connectors/hive/TableHandle.cpp index d03b2f57e43..edf35ff36c1 100644 --- a/velox/connectors/hive/TableHandle.cpp +++ b/velox/connectors/hive/TableHandle.cpp @@ -25,6 +25,7 @@ columnTypeNames() { {HiveColumnHandle::ColumnType::kPartitionKey, "PartitionKey"}, {HiveColumnHandle::ColumnType::kRegular, "Regular"}, {HiveColumnHandle::ColumnType::kSynthesized, "Synthesized"}, + {HiveColumnHandle::ColumnType::kMetadata, "Metadata"}, }; } diff --git a/velox/connectors/hive/TableHandle.h b/velox/connectors/hive/TableHandle.h index ee62a0892d7..b44baba208b 100644 --- a/velox/connectors/hive/TableHandle.h +++ b/velox/connectors/hive/TableHandle.h @@ -28,7 +28,7 @@ using SubfieldFilters = class HiveColumnHandle : public ColumnHandle { public: - enum class ColumnType { kPartitionKey, kRegular, kSynthesized }; + enum class ColumnType { kPartitionKey, kRegular, kSynthesized, kMetadata }; /// NOTE: 'dataType' is the column type in target write table. 'hiveType' is /// converted type of the corresponding column in source table which might not diff --git a/velox/connectors/hive/tests/HiveConnectorTest.cpp b/velox/connectors/hive/tests/HiveConnectorTest.cpp index 7f5a6043f52..4723bb0946e 100644 --- a/velox/connectors/hive/tests/HiveConnectorTest.cpp +++ b/velox/connectors/hive/tests/HiveConnectorTest.cpp @@ -87,7 +87,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_multilevel) { auto rowType = ROW({{"c0", columnType}}); auto subfields = makeSubfields({"c0.c0c1[3][\"foo\"].c0c1c0"}); auto scanSpec = makeScanSpec( - rowType, groupSubfields(subfields), {}, nullptr, {}, pool_.get()); + rowType, groupSubfields(subfields), {}, nullptr, {}, {}, pool_.get()); auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0"); validateNullConstant(*c0c0, *BIGINT()); auto* c0c1 = scanSpec->childByName("c0")->childByName("c0c1"); @@ -122,6 +122,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeFields) { {}, nullptr, {}, + {}, pool_.get()); auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0"); ASSERT_FALSE(c0c0->childByName("c0c0c0")->isConstant()); @@ -144,6 +145,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArray) { {}, nullptr, {}, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_EQ(c0->maxArrayElementsCount(), 2); @@ -160,7 +162,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArrayNegative) { auto subfields = makeSubfields({"c0[1].c0c0", "c0[-1].c0c2"}); auto groupedSubfields = groupSubfields(subfields); VELOX_ASSERT_USER_THROW( - makeScanSpec(rowType, groupedSubfields, {}, nullptr, {}, pool_.get()), + makeScanSpec(rowType, groupedSubfields, {}, nullptr, {}, {}, pool_.get()), "Non-positive array subscript cannot be push down"); } @@ -175,6 +177,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeMap) { {}, nullptr, {}, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); auto* keysFilter = c0->childByName(ScanSpec::kMapKeysFieldName)->filter(); @@ -200,6 +203,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) { {}, nullptr, {}, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_FALSE(c0->childByName(ScanSpec::kMapKeysFieldName)->filter()); @@ -218,6 +222,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) { {}, nullptr, {}, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_FALSE(c0->childByName(ScanSpec::kMapKeysFieldName)->filter()); @@ -240,6 +245,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { {}, nullptr, {}, + {}, pool_.get()); auto* keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -267,6 +273,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { {}, nullptr, {}, + {}, pool_.get()); keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -285,6 +292,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { {}, nullptr, {}, + {}, pool_.get()); keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -300,6 +308,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { {}, nullptr, {}, + {}, pool_.get()); keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -335,6 +344,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_filtersNotInRequiredSubfields) { filters, ROW({{"c0", c0Type}, {"c1", c1Type}}), {}, + {}, pool_.get()); auto c0 = scanSpec->childByName("c0"); ASSERT_FALSE(c0->isConstant()); @@ -379,6 +389,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_duplicateSubfields) { {}, nullptr, {}, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_EQ(c0->children().size(), 2); @@ -392,7 +403,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_filterPartitionKey) { SubfieldFilters filters; filters.emplace(Subfield("ds"), exec::equal("2023-10-13")); auto scanSpec = makeScanSpec( - rowType, {}, filters, rowType, {{"ds", nullptr}}, pool_.get()); + rowType, {}, filters, rowType, {{"ds", nullptr}}, {}, pool_.get()); ASSERT_TRUE(scanSpec->childByName("c0")->projectOut()); ASSERT_FALSE(scanSpec->childByName("ds")->projectOut()); }