Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 49 additions & 30 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,29 @@ bool shouldEagerlyMaterialize(

} // namespace

void HiveDataSource::processColumnHandle(const HiveColumnHandlePtr& handle) {
switch (handle->columnType()) {
case HiveColumnHandle::ColumnType::kRegular:
break;
case HiveColumnHandle::ColumnType::kPartitionKey:
partitionKeys_.emplace(handle->name(), handle);
break;
case HiveColumnHandle::ColumnType::kSynthesized:
infoColumns_.emplace(handle->name(), handle);
break;
case HiveColumnHandle::ColumnType::kRowIndex:
specialColumns_.rowIndex = handle->name();
break;
case HiveColumnHandle::ColumnType::kRowId:
specialColumns_.rowId = handle->name();
break;
}
}

HiveDataSource::HiveDataSource(
const RowTypePtr& outputType,
const connector::ConnectorTableHandlePtr& tableHandle,
const connector::ColumnHandleMap& columnHandles,
const connector::ColumnHandleMap& assignments,
FileHandleFactory* fileHandleFactory,
folly::Executor* ioExecutor,
const ConnectorQueryCtx* connectorQueryCtx,
Expand All @@ -67,38 +86,42 @@ HiveDataSource::HiveDataSource(
pool_(connectorQueryCtx->memoryPool()),
outputType_(outputType),
expressionEvaluator_(connectorQueryCtx->expressionEvaluator()) {
// Column handled keyed on the column alias, the name used in the query.
for (const auto& [canonicalizedName, columnHandle] : columnHandles) {
auto handle =
std::dynamic_pointer_cast<const HiveColumnHandle>(columnHandle);
VELOX_CHECK_NOT_NULL(
handle,
"ColumnHandle must be an instance of HiveColumnHandle for {}",
canonicalizedName);
switch (handle->columnType()) {
case HiveColumnHandle::ColumnType::kRegular:
break;
case HiveColumnHandle::ColumnType::kPartitionKey:
partitionKeys_.emplace(handle->name(), handle);
break;
case HiveColumnHandle::ColumnType::kSynthesized:
infoColumns_.emplace(handle->name(), handle);
break;
case HiveColumnHandle::ColumnType::kRowIndex:
specialColumns_.rowIndex = handle->name();
break;
case HiveColumnHandle::ColumnType::kRowId:
specialColumns_.rowId = handle->name();
break;
hiveTableHandle_ =
std::dynamic_pointer_cast<const HiveTableHandle>(tableHandle);
VELOX_CHECK_NOT_NULL(
hiveTableHandle_, "TableHandle must be an instance of HiveTableHandle");

if (hiveTableHandle_->columnHandles().empty()) {
// Column handled keyed on the column alias, the name used in the query.
for (const auto& [canonicalizedName, columnHandle] : assignments) {
auto handle =
std::dynamic_pointer_cast<const HiveColumnHandle>(columnHandle);
VELOX_CHECK_NOT_NULL(
handle,
"ColumnHandle must be an instance of HiveColumnHandle for {}",
canonicalizedName);
processColumnHandle(handle);
}
} else {
folly::F14FastSet<const connector::ColumnHandle*> assignmentHandles;
for (const auto& [_, columnHandle] : assignments) {
assignmentHandles.insert(columnHandle.get());
}
for (auto& handle : hiveTableHandle_->columnHandles()) {
assignmentHandles.erase(handle.get());
processColumnHandle(handle);
}
VELOX_CHECK(
assignmentHandles.empty(),
"Assignments must be a subset of column handles");
}

std::vector<std::string> readColumnNames;
auto readColumnTypes = outputType_->children();
for (const auto& outputName : outputType_->names()) {
auto it = columnHandles.find(outputName);
auto it = assignments.find(outputName);
VELOX_CHECK(
it != columnHandles.end(),
it != assignments.end(),
"ColumnHandle is missing for output column: {}",
outputName);

Expand All @@ -113,10 +136,6 @@ HiveDataSource::HiveDataSource(
}
}

hiveTableHandle_ =
std::dynamic_pointer_cast<const HiveTableHandle>(tableHandle);
VELOX_CHECK_NOT_NULL(
hiveTableHandle_, "TableHandle must be an instance of HiveTableHandle");
if (hiveConfig_->isFileColumnNamesReadAsLowerCase(
connectorQueryCtx->sessionProperties())) {
checkColumnNameLowerCase(outputType_);
Expand Down
6 changes: 5 additions & 1 deletion velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class HiveDataSource : public DataSource {
HiveDataSource(
const RowTypePtr& outputType,
const connector::ConnectorTableHandlePtr& tableHandle,
const connector::ColumnHandleMap& columnHandles,
const connector::ColumnHandleMap& assignments,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const ConnectorQueryCtx* connectorQueryCtx,
Expand Down Expand Up @@ -147,6 +147,10 @@ class HiveDataSource : public DataSource {
return emptyOutput_;
}

// Add the information from column handle to the corresponding fields in this
// object.
void processColumnHandle(const HiveColumnHandlePtr& handle);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you add a comment to explain what this method is doing? Should we verify consistency between assignments and columnHandles? E.g. what if column x is reported as partition key in assignment and regular column in columnHandles?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's something I have in mind but did not add it, let me add some very strict check for now since it's new


// The row type for the data source output, not including filter-only columns
const RowTypePtr outputType_;
core::ExpressionEvaluator* const expressionEvaluator_;
Expand Down
6 changes: 4 additions & 2 deletions velox/connectors/hive/TableHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,16 @@ HiveTableHandle::HiveTableHandle(
common::SubfieldFilters subfieldFilters,
const core::TypedExprPtr& remainingFilter,
const RowTypePtr& dataColumns,
const std::unordered_map<std::string, std::string>& tableParameters)
const std::unordered_map<std::string, std::string>& tableParameters,
std::vector<HiveColumnHandlePtr> columnHandles)
: ConnectorTableHandle(std::move(connectorId)),
tableName_(tableName),
filterPushdownEnabled_(filterPushdownEnabled),
subfieldFilters_(std::move(subfieldFilters)),
remainingFilter_(remainingFilter),
dataColumns_(dataColumns),
tableParameters_(tableParameters) {}
tableParameters_(tableParameters),
columnHandles_(std::move(columnHandles)) {}

std::string HiveTableHandle::toString() const {
std::stringstream out;
Expand Down
28 changes: 25 additions & 3 deletions velox/connectors/hive/TableHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ class HiveTableHandle : public ConnectorTableHandle {
common::SubfieldFilters subfieldFilters,
const core::TypedExprPtr& remainingFilter,
const RowTypePtr& dataColumns = nullptr,
const std::unordered_map<std::string, std::string>& tableParameters = {});
const std::unordered_map<std::string, std::string>& tableParameters = {},
std::vector<HiveColumnHandlePtr> columnHandles = {});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you document this new argument?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the document on the accessor method below


const std::string& tableName() const {
return tableName_;
Expand All @@ -157,27 +158,47 @@ class HiveTableHandle : public ConnectorTableHandle {
return tableName();
}

bool isFilterPushdownEnabled() const {
[[deprecated]] bool isFilterPushdownEnabled() const {
return filterPushdownEnabled_;
}

/// Single field filters that can be applied efficiently during file reading.
const common::SubfieldFilters& subfieldFilters() const {
return subfieldFilters_;
}

/// Everything else that cannot be converted into subfield filters, but still
/// require the data source to filter out. This is usually less efficient
/// than subfield filters but supports arbitrary boolean expression.
const core::TypedExprPtr& remainingFilter() const {
return remainingFilter_;
}

// Schema of the table. Need this for reading TEXTFILE.
/// Subset of schema of the table that we store in file (i.e.,
/// non-partitioning columns). This must be in the exact order as columns in
/// file (except trailing columns), but with the table schema during read
/// time.
///
/// This is needed for multiple purposes, including reading TEXTFILE and
/// handling schema evolution.
const RowTypePtr& dataColumns() const {
return dataColumns_;
}

/// Extra parameters to pass down to file format reader layer. Keys should be
/// in dwio::common::TableParameter.
const std::unordered_map<std::string, std::string>& tableParameters() const {
return tableParameters_;
}

/// Full schema including partitioning columns and data columns. If this is
/// non-empty, it should be a superset of the column handles in data source
/// assignments parameter (the shared_ptrs should pointing to the exact same
/// objects).
const std::vector<HiveColumnHandlePtr> columnHandles() const {
return columnHandles_;
}

std::string toString() const override;

folly::dynamic serialize() const override;
Expand All @@ -195,6 +216,7 @@ class HiveTableHandle : public ConnectorTableHandle {
const core::TypedExprPtr remainingFilter_;
const RowTypePtr dataColumns_;
const std::unordered_map<std::string, std::string> tableParameters_;
const std::vector<HiveColumnHandlePtr> columnHandles_;
};

using HiveTableHandlePtr = std::shared_ptr<const HiveTableHandle>;
Expand Down
26 changes: 26 additions & 0 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6143,5 +6143,31 @@ TEST_F(TableScanTest, parallelUnitLoader) {
ASSERT_GT(stats.count("waitForUnitReadyNanos"), 0);
}

TEST_F(TableScanTest, allColumnHandles) {
auto data = makeVectors(1, 10, ROW({"a", "b"}, BIGINT()));
auto filePath = TempFilePath::create();
writeToFile(filePath->getPath(), data);
std::vector<HiveColumnHandlePtr> columnHandles = {
partitionKey("ds", VARCHAR()),
regularColumn("a", BIGINT()),
regularColumn("b", BIGINT()),
};
connector::ColumnHandleMap assignments = {{"x", columnHandles[1]}};
auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath())
.partitionKey("ds", "2025-10-23")
.build();
auto plan = PlanBuilder()
.startTableScan()
.outputType(ROW({"x"}, {BIGINT()}))
.assignments(assignments)
.dataColumns(asRowType(data[0]->type()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need both dataColumns and columnHandles?

Copy link
Contributor Author

@Yuhta Yuhta Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataColumns are needed for schema evolution purpose as well, because column handle does not tell us the position of the column in file. In this test case though we can probably skip it, but in prod usage we always supply it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to document all pieces of information that go into HiveTableHandle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will beef up the comments on the HiveTableHandle accessors

.columnHandles(columnHandles)
.remainingFilter("length(ds) + a % 2 > 0")
.endTableScan()
.planNode();
AssertQueryBuilder(plan).split(split).assertResults(
makeRowVector({data[0]->childAt(0)}));
}

} // namespace
} // namespace facebook::velox::exec
19 changes: 17 additions & 2 deletions velox/exec/tests/utils/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,20 @@ core::PlanNodePtr PlanBuilder::TableScanBuilder::build(core::PlanNodeId id) {
}
}

const RowTypePtr& parseType = dataColumns_ ? dataColumns_ : outputType_;
RowTypePtr parseType;
if (!columnHandles_.empty()) {
std::vector<std::string> names;
std::vector<TypePtr> types;
for (auto& handle : columnHandles_) {
names.push_back(handle->name());
types.push_back(handle->hiveType());
}
parseType = ROW(std::move(names), std::move(types));
} else if (dataColumns_) {
parseType = dataColumns_;
} else {
parseType = outputType_;
}

core::TypedExprPtr filterNodeExpr;

Expand Down Expand Up @@ -319,7 +332,9 @@ core::PlanNodePtr PlanBuilder::TableScanBuilder::build(core::PlanNodeId id) {
true,
std::move(subfieldFiltersMap_),
remainingFilterExpr,
dataColumns_);
dataColumns_,
/*tableParameters=*/std::unordered_map<std::string, std::string>{},
columnHandles_);
}
core::PlanNodePtr result = std::make_shared<core::TableScanNode>(
id, outputType_, tableHandle_, assignments_);
Expand Down
7 changes: 7 additions & 0 deletions velox/exec/tests/utils/PlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,12 @@ class PlanBuilder {
return *this;
}

TableScanBuilder& columnHandles(
std::vector<connector::hive::HiveColumnHandlePtr> columnHandles) {
columnHandles_ = std::move(columnHandles);
return *this;
}

/// @param assignments Optional ColumnHandles.
/// outputType names should match the keys in the 'assignments' map. The
/// 'assignments' map may contain more columns than 'outputType' if some
Expand All @@ -341,6 +347,7 @@ class PlanBuilder {
RowTypePtr outputType_;
core::ExprPtr remainingFilter_;
RowTypePtr dataColumns_;
std::vector<connector::hive::HiveColumnHandlePtr> columnHandles_;
std::unordered_map<std::string, std::string> columnAliases_;
connector::ConnectorTableHandlePtr tableHandle_;
connector::ColumnHandleMap assignments_;
Expand Down
Loading