Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions presto-native-execution/presto_cpp/main/PrestoTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,10 +590,20 @@ void PrestoTask::updateExecutionInfoLocked(
prestoTaskStats.outputPositions = 0;
prestoTaskStats.outputDataSizeInBytes = 0;

prestoTaskStats.queuedDrivers = veloxTaskStats.numQueuedDrivers;
prestoTaskStats.totalDrivers = veloxTaskStats.numTotalDrivers;
// Presto Java reports number of drivers to number of splits in Presto UI
// because split and driver are 1 to 1 mapping relationship. This is not true
// in Prestissimo where 1 driver handles many splits. In order to quickly
// unblock developers from viewing the correct progress of splits in
// Prestissimo's coordinator UI, we put number of splits in total, queued, and
// finished to indicate the progress of the query. Number of running drivers
// are passed as it is to have a proper running drivers count in UI.
//
// TODO: We should really extend the API (protocol::TaskStats and Presto
Comment thread
aditi-pandit marked this conversation as resolved.
// coordinator UI) to have splits information as a proper fix.
prestoTaskStats.totalDrivers = veloxTaskStats.numTotalSplits;
prestoTaskStats.queuedDrivers = veloxTaskStats.numQueuedSplits;
prestoTaskStats.runningDrivers = veloxTaskStats.numRunningDrivers;
prestoTaskStats.completedDrivers = veloxTaskStats.numCompletedDrivers;
prestoTaskStats.completedDrivers = veloxTaskStats.numFinishedSplits;

prestoTaskStats.pipelines.resize(veloxTaskStats.pipelineStats.size());
for (int i = 0; i < veloxTaskStats.pipelineStats.size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1109,13 +1109,15 @@ HivePrestoToVeloxConnector::toVeloxSplit(
for (const auto& [key, value] : hiveSplit->storage.serdeParameters) {
serdeParameters[key] = value;
}
std::unordered_map<std::string, std::string> infoColumns;
infoColumns.reserve(2);
infoColumns.insert(
{"$file_size", std::to_string(hiveSplit->fileSplit.fileSize)});
infoColumns.insert(
std::unordered_map<std::string, std::string> infoColumns = {
Comment thread
aditi-pandit marked this conversation as resolved.
{"$path", hiveSplit->fileSplit.path},
{"$file_size", std::to_string(hiveSplit->fileSplit.fileSize)},
{"$file_modified_time",
std::to_string(hiveSplit->fileSplit.fileModifiedTime)});
std::to_string(hiveSplit->fileSplit.fileModifiedTime)},
};
if (hiveSplit->tableBucketNumber) {
infoColumns["$bucket"] = std::to_string(*hiveSplit->tableBucketNumber);
}
auto veloxSplit =
std::make_unique<velox::connector::hive::HiveConnectorSplit>(
catalogId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,29 @@ core::PlanNodePtr VeloxQueryPlanConverterBase::toVeloxQueryPlan(
left->outputType()));
}

// For ScanFilter and ScanFilterProject, the planner sometimes put the
// remaining filter in a FilterNode after the TableScan. We need to put it
// back to TableScan so that Velox can leverage it to do stripe level
// skipping. Otherwise we only get row level skipping and lose some
// optimization opportunity in case of very low selectivity.
if (auto tableScan = std::dynamic_pointer_cast<const protocol::TableScanNode>(
node->source)) {
if (auto* tableLayout = dynamic_cast<protocol::HiveTableLayoutHandle*>(
tableScan->table.connectorTableLayout.get())) {
auto remainingFilter =
Comment thread
aditi-pandit marked this conversation as resolved.
exprConverter_.toVeloxExpr(tableLayout->remainingPredicate);
if (auto* constant = dynamic_cast<const core::ConstantTypedExpr*>(
remainingFilter.get())) {
bool value = constant->value().value<bool>();
// We should get empty values node instead of table scan if the
// remaining filter is constantly false.
VELOX_CHECK(value, "Unexpected always-false remaining predicate");
tableLayout->remainingPredicate = node->predicate;
return toVeloxQueryPlan(tableScan, tableWriteInfo, taskId);
}
}
}

return std::make_shared<core::FilterNode>(
node->id,
exprConverter_.toVeloxExpr(node->predicate),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ TEST_F(PlanConverterTest, scanAgg) {
ASSERT_EQ(
tableHandle->dataColumns()->toString(),
"ROW<nationkey:BIGINT,name:VARCHAR,regionkey:BIGINT,complex_type:ARRAY<MAP<VARCHAR,ROW<id:BIGINT,description:VARCHAR>>>,comment:VARCHAR>");
ASSERT_TRUE(tableHandle->remainingFilter());
ASSERT_EQ(
tableHandle->remainingFilter()->toString(),
"presto.default.lt(presto.default.rand(),0.0001)");

auto tableParameters = tableHandle->tableParameters();
ASSERT_EQ(tableParameters.size(), 6);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ TEST_F(PrestoToVeloxSplitTest, bucketConversion) {
ASSERT_EQ(veloxHiveSplit.bucketConversion->tableBucketCount, 4096);
ASSERT_EQ(veloxHiveSplit.bucketConversion->partitionBucketCount, 512);
ASSERT_EQ(veloxHiveSplit.bucketConversion->bucketColumnHandles.size(), 1);
ASSERT_EQ(veloxHiveSplit.infoColumns.at("$path"), hiveSplit.fileSplit.path);
ASSERT_EQ(veloxHiveSplit.infoColumns.at("$bucket"), "42");
auto& veloxColumn = veloxHiveSplit.bucketConversion->bucketColumnHandles[0];
ASSERT_EQ(veloxColumn->name(), "c0");
ASSERT_EQ(*veloxColumn->dataType(), *BIGINT());
Expand Down
Loading