Skip to content
7 changes: 5 additions & 2 deletions velox/connectors/hive/HiveConnectorSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
std::unordered_map<std::string, std::string> customSplitInfo;
std::shared_ptr<std::string> extraFileInfo;
std::unordered_map<std::string, std::string> serdeParameters;
std::unordered_map<std::string, std::string> metadataColumns;

HiveConnectorSplit(
const std::string& connectorId,
Expand All @@ -50,7 +51,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
std::optional<int32_t> _tableBucketNumber = std::nullopt,
const std::unordered_map<std::string, std::string>& _customSplitInfo = {},
const std::shared_ptr<std::string>& _extraFileInfo = {},
const std::unordered_map<std::string, std::string>& _serdeParameters = {})
const std::unordered_map<std::string, std::string>& _serdeParameters = {},
const std::unordered_map<std::string, std::string>& _metadataColumns = {})
: ConnectorSplit(connectorId),
filePath(_filePath),
fileFormat(_fileFormat),
Expand All @@ -60,7 +62,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
tableBucketNumber(_tableBucketNumber),
customSplitInfo(_customSplitInfo),
extraFileInfo(_extraFileInfo),
serdeParameters(_serdeParameters) {}
serdeParameters(_serdeParameters),
metadataColumns(_metadataColumns) {}

std::string toString() const override {
if (tableBucketNumber.has_value()) {
Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
const RowTypePtr& dataColumns,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
partitionKeys,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
metadataColumns,
memory::MemoryPool* pool) {
auto spec = std::make_shared<common::ScanSpec>("root");
folly::F14FastMap<std::string, std::vector<const common::Subfield*>>
Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
const RowTypePtr& dataColumns,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
partitionKeys,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
metadataColumns,
memory::MemoryPool* pool);

void configureReaderOptions(
Expand Down
4 changes: 4 additions & 0 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ HiveDataSource::HiveDataSource(
if (handle->columnType() == HiveColumnHandle::ColumnType::kPartitionKey) {
partitionKeys_.emplace(handle->name(), handle);
}
if (handle->columnType() == HiveColumnHandle::ColumnType::kMetadata) {
metadataColumns_.emplace(handle->name(), handle);
}
}

std::vector<std::string> readerRowNames;
Expand Down Expand Up @@ -206,6 +209,7 @@ HiveDataSource::HiveDataSource(
filters,
hiveTableHandle_->dataColumns(),
partitionKeys_,
metadataColumns_,
pool_);
if (remainingFilter) {
metadataFilter_ = std::make_shared<common::MetadataFilter>(
Expand Down
5 changes: 5 additions & 0 deletions velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ class HiveDataSource : public DataSource {
std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>
partitionKeys_;

// Column handles for the metadata columns keyed on metadata column
// name.
std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>
metadataColumns_;

FileHandleFactory* const fileHandleFactory_;
folly::Executor* const executor_;
const ConnectorQueryCtx* const connectorQueryCtx_;
Expand Down
71 changes: 40 additions & 31 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,33 @@ void SplitReader::prepareSplit(
baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_);
}

namespace {
template <TypeKind ToKind>
velox::variant convertFromString(
const std::optional<std::string>& value,
const TypePtr& toType) {
if (value.has_value()) {
if constexpr (ToKind == TypeKind::VARCHAR) {
return velox::variant(value.value());
}
if constexpr (ToKind == TypeKind::VARBINARY) {
return velox::variant::binary((value.value()));
}
if (toType->isDate()) {
return velox::variant(util::castFromDateString(
StringView(value.value()), true /*isIso8601*/));
}
auto result = velox::util::Converter<ToKind>::cast(value.value());
if constexpr (ToKind == TypeKind::TIMESTAMP) {
result.toGMT(Timestamp::defaultTimezone());
}
return velox::variant(result);
}
return velox::variant(ToKind);
}

} // namespace

std::vector<TypePtr> SplitReader::adaptColumns(
const RowTypePtr& fileType,
const std::shared_ptr<const velox::RowType>& tableSchema) {
Expand All @@ -166,9 +193,19 @@ std::vector<TypePtr> SplitReader::adaptColumns(
auto* childSpec = childrenSpecs[i].get();
const std::string& fieldName = childSpec->fieldName();

auto iter = hiveSplit_->partitionKeys.find(fieldName);
if (iter != hiveSplit_->partitionKeys.end()) {
setPartitionValue(childSpec, fieldName, iter->second);
auto partitionKey = hiveSplit_->partitionKeys.find(fieldName);
auto metadataColumn = hiveSplit_->metadataColumns.find(fieldName);
if (partitionKey != hiveSplit_->partitionKeys.end()) {
setPartitionValue(childSpec, fieldName, partitionKey->second);
} else if (metadataColumn != hiveSplit_->metadataColumns.end()) {
auto metadataColumnOutputType =
readerOutputType_->childAt(readerOutputType_->getChildIdx(fieldName));
auto constValue = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH(
convertFromString,
metadataColumnOutputType->kind(),
std::make_optional(metadataColumn->second),
metadataColumnOutputType);
setConstantValue(childSpec, metadataColumnOutputType, constValue);
} else if (fieldName == kPath) {
setConstantValue(
childSpec, VARCHAR(), velox::variant(hiveSplit_->filePath));
Expand Down Expand Up @@ -260,34 +297,6 @@ void SplitReader::setNullConstantValue(
type, 1, connectorQueryCtx_->memoryPool()));
}

namespace {

template <TypeKind ToKind>
velox::variant convertFromString(
const std::optional<std::string>& value,
const TypePtr& toType) {
if (value.has_value()) {
if constexpr (ToKind == TypeKind::VARCHAR) {
return velox::variant(value.value());
}
if constexpr (ToKind == TypeKind::VARBINARY) {
return velox::variant::binary((value.value()));
}
if (toType->isDate()) {
return velox::variant(util::castFromDateString(
StringView(value.value()), true /*isIso8601*/));
}
auto result = velox::util::Converter<ToKind>::cast(value.value());
if constexpr (ToKind == TypeKind::TIMESTAMP) {
result.toGMT(Timestamp::defaultTimezone());
}
return velox::variant(result);
}
return velox::variant(ToKind);
}

} // namespace

void SplitReader::setPartitionValue(
common::ScanSpec* spec,
const std::string& partitionKey,
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/hive/TableHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ columnTypeNames() {
{HiveColumnHandle::ColumnType::kPartitionKey, "PartitionKey"},
{HiveColumnHandle::ColumnType::kRegular, "Regular"},
{HiveColumnHandle::ColumnType::kSynthesized, "Synthesized"},
{HiveColumnHandle::ColumnType::kMetadata, "Metadata"},
};
}

Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/TableHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ using SubfieldFilters =

class HiveColumnHandle : public ColumnHandle {
public:
enum class ColumnType { kPartitionKey, kRegular, kSynthesized };
enum class ColumnType { kPartitionKey, kRegular, kSynthesized, kMetadata };

/// NOTE: 'dataType' is the column type in target write table. 'hiveType' is
/// converted type of the corresponding column in source table which might not
Expand Down
17 changes: 14 additions & 3 deletions velox/connectors/hive/tests/HiveConnectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_multilevel) {
auto rowType = ROW({{"c0", columnType}});
auto subfields = makeSubfields({"c0.c0c1[3][\"foo\"].c0c1c0"});
auto scanSpec = makeScanSpec(
rowType, groupSubfields(subfields), {}, nullptr, {}, pool_.get());
rowType, groupSubfields(subfields), {}, nullptr, {}, {}, pool_.get());
auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0");
validateNullConstant(*c0c0, *BIGINT());
auto* c0c1 = scanSpec->childByName("c0")->childByName("c0c1");
Expand Down Expand Up @@ -122,6 +122,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeFields) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0");
ASSERT_FALSE(c0c0->childByName("c0c0c0")->isConstant());
Expand All @@ -144,6 +145,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArray) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_EQ(c0->maxArrayElementsCount(), 2);
Expand All @@ -160,7 +162,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArrayNegative) {
auto subfields = makeSubfields({"c0[1].c0c0", "c0[-1].c0c2"});
auto groupedSubfields = groupSubfields(subfields);
VELOX_ASSERT_USER_THROW(
makeScanSpec(rowType, groupedSubfields, {}, nullptr, {}, pool_.get()),
makeScanSpec(rowType, groupedSubfields, {}, nullptr, {}, {}, pool_.get()),
"Non-positive array subscript cannot be push down");
}

Expand All @@ -175,6 +177,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeMap) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
auto* keysFilter = c0->childByName(ScanSpec::kMapKeysFieldName)->filter();
Expand All @@ -200,6 +203,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_FALSE(c0->childByName(ScanSpec::kMapKeysFieldName)->filter());
Expand All @@ -218,6 +222,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_FALSE(c0->childByName(ScanSpec::kMapKeysFieldName)->filter());
Expand All @@ -240,6 +245,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) {
{},
nullptr,
{},
{},
pool_.get());
auto* keysFilter = scanSpec->childByName("c0")
->childByName(ScanSpec::kMapKeysFieldName)
Expand Down Expand Up @@ -267,6 +273,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) {
{},
nullptr,
{},
{},
pool_.get());
keysFilter = scanSpec->childByName("c0")
->childByName(ScanSpec::kMapKeysFieldName)
Expand All @@ -285,6 +292,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) {
{},
nullptr,
{},
{},
pool_.get());
keysFilter = scanSpec->childByName("c0")
->childByName(ScanSpec::kMapKeysFieldName)
Expand All @@ -300,6 +308,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) {
{},
nullptr,
{},
{},
pool_.get());
keysFilter = scanSpec->childByName("c0")
->childByName(ScanSpec::kMapKeysFieldName)
Expand Down Expand Up @@ -335,6 +344,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_filtersNotInRequiredSubfields) {
filters,
ROW({{"c0", c0Type}, {"c1", c1Type}}),
{},
{},
pool_.get());
auto c0 = scanSpec->childByName("c0");
ASSERT_FALSE(c0->isConstant());
Expand Down Expand Up @@ -379,6 +389,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_duplicateSubfields) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_EQ(c0->children().size(), 2);
Expand All @@ -392,7 +403,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_filterPartitionKey) {
SubfieldFilters filters;
filters.emplace(Subfield("ds"), exec::equal("2023-10-13"));
auto scanSpec = makeScanSpec(
rowType, {}, filters, rowType, {{"ds", nullptr}}, pool_.get());
rowType, {}, filters, rowType, {{"ds", nullptr}}, {}, pool_.get());
ASSERT_TRUE(scanSpec->childByName("c0")->projectOut());
ASSERT_FALSE(scanSpec->childByName("ds")->projectOut());
}
Expand Down