Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 9ff396a

Browse files
committed
Copy table stats for shuffled data.
Signed-off-by: ienkovich <[email protected]>
1 parent 599eea8 commit 9ff396a

File tree

6 files changed

+93
-0
lines changed

6 files changed

+93
-0
lines changed

omniscidb/QueryEngine/RelAlgExecutor.cpp

+72
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,10 @@ void RelAlgExecutor::executeStepWithPartitionedAggregation(const hdk::ir::Node*
790790
proj.reset();
791791
}
792792

793+
// Currently, we merge shuffle node with simple projections only. Therefore, we can
794+
// assign original table stats to shuffling results to avoid metadata computation.
795+
maybeCopyTableStatsFromInput(shuffle_node.get());
796+
793797
// Create new aggregation node and execute it.
794798
auto part_agg = std::make_shared<hdk::ir::Aggregate>(
795799
agg->getGroupByCount(), agg->getAggs(), agg->getFields(), agg_input_shared);
@@ -826,6 +830,74 @@ void RelAlgExecutor::executeStepWithPartitionedAggregation(const hdk::ir::Node*
826830
temporary_tables_.erase(-new_root->getId());
827831
}
828832

833+
void RelAlgExecutor::maybeCopyTableStatsFromInput(const hdk::ir::Node* node) {
834+
std::vector<int> col_mapping;
835+
col_mapping.reserve(node->size());
836+
// Stats copy is supported for shuffle and simple projections only.
837+
if (node->is<hdk::ir::Project>()) {
838+
auto proj = node->as<hdk::ir::Project>();
839+
if (!proj->isSimple()) {
840+
VLOG(1) << "Cannot copy table stats for non-simple projection.";
841+
return;
842+
}
843+
for (auto& expr : proj->getExprs()) {
844+
col_mapping.push_back(expr->as<hdk::ir::ColumnRef>()->index());
845+
}
846+
} else if (node->is<hdk::ir::Shuffle>()) {
847+
for (auto& expr : node->as<hdk::ir::Shuffle>()->exprs()) {
848+
CHECK(expr->is<hdk::ir::ColumnRef>());
849+
col_mapping.push_back(expr->as<hdk::ir::ColumnRef>()->index());
850+
}
851+
} else {
852+
VLOG(1) << "Cannot copy table stats for node " << node->toString();
853+
return;
854+
}
855+
856+
// We can traverse through a chain of simple projections to the original data source.
857+
auto data_source = node->getInput(0);
858+
while (!data_source->getResult() && !data_source->is<hdk::ir::Scan>()) {
859+
auto proj = data_source->as<hdk::ir::Project>();
860+
if (!proj || !proj->isSimple()) {
861+
VLOG(1) << "Cannot copy table stats due to non-simple projection. "
862+
<< node->toString();
863+
return;
864+
}
865+
for (size_t i = 0; i < col_mapping.size(); ++i) {
866+
auto idx = static_cast<size_t>(col_mapping[i]);
867+
CHECK_LT(idx, proj->size());
868+
col_mapping[i] = proj->getExpr(idx)->as<hdk::ir::ColumnRef>()->index();
869+
}
870+
data_source = data_source->getInput(0);
871+
}
872+
873+
auto input_token =
874+
data_source->getResult() ? data_source->getResult()->getToken().get() : nullptr;
875+
auto input_scan = data_source->getResult() ? nullptr : data_source->as<hdk::ir::Scan>();
876+
int input_db_id = input_token ? input_token->dbId() : input_scan->getDatabaseId();
877+
int input_table_id = input_token ? input_token->tableId() : input_scan->getTableId();
878+
auto input_meta = data_provider_->getTableMetadata(input_db_id, input_table_id);
879+
if (input_meta.hasComputedTableStats()) {
880+
auto& orig_stats = input_meta.getTableStats();
881+
auto target_token = node->getResult()->getToken();
882+
TableStats stats;
883+
for (size_t i = 0; i < col_mapping.size(); ++i) {
884+
auto target_col_id = target_token->columnId(i);
885+
auto input_col_id = input_token
886+
? input_token->columnId(col_mapping[i])
887+
: input_scan->getColumnInfo(col_mapping[i])->column_id;
888+
CHECK(orig_stats.count(input_col_id))
889+
<< "Cannot find stats for column " << input_col_id
890+
<< ". data_source=" << data_source->toString();
891+
stats.emplace(target_col_id, orig_stats.at(input_col_id));
892+
}
893+
target_token->setTableStats(std::move(stats));
894+
VLOG(1) << "Copy table stats from " << input_db_id << ":" << input_table_id << " to "
895+
<< target_token->dbId() << ":" << target_token->tableId();
896+
} else {
897+
VLOG(1) << "Cannot copy table stats because original table stats are unavailable.";
898+
}
899+
}
900+
829901
void RelAlgExecutor::executeStep(const hdk::ir::Node* step_root,
830902
const CompilationOptions& co,
831903
const ExecutionOptions& eo,

omniscidb/QueryEngine/RelAlgExecutor.h

+1
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ class RelAlgExecutor {
131131
const ExecutionOptions& eo,
132132
size_t estimated_buffer_size,
133133
const int64_t queue_time_ms);
134+
void maybeCopyTableStatsFromInput(const hdk::ir::Node* node);
134135
ExecutionResult executeStep(const hdk::ir::Node* step_root,
135136
const CompilationOptions& co,
136137
const ExecutionOptions& eo,

omniscidb/ResultSetRegistry/ResultSetRegistry.cpp

+11
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,17 @@ void ResultSetRegistry::drop(const ResultSetTableToken& token) {
176176
SimpleSchemaProvider::dropTable(token.dbId(), token.tableId());
177177
}
178178

179+
void ResultSetRegistry::setTableStats(const ResultSetTableToken& token,
180+
TableStats stats) {
181+
mapd_shared_lock<mapd_shared_mutex> data_lock(data_mutex_);
182+
CHECK(tables_.count(token.tableId()));
183+
auto* table = tables_.at(token.tableId()).get();
184+
data_lock.unlock();
185+
186+
mapd_unique_lock<mapd_shared_mutex> table_lock(table->mutex);
187+
table->table_stats = std::move(stats);
188+
}
189+
179190
ResultSetTableTokenPtr ResultSetRegistry::head(const ResultSetTableToken& token,
180191
size_t n) {
181192
mapd_shared_lock<mapd_shared_mutex> data_lock(data_mutex_);

omniscidb/ResultSetRegistry/ResultSetRegistry.h

+2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ class ResultSetRegistry : public SimpleSchemaProvider,
3939
ResultSetPtr get(const ResultSetTableToken& token, size_t frag_id) const;
4040
void drop(const ResultSetTableToken& token);
4141

42+
void setTableStats(const ResultSetTableToken& token, TableStats stats);
43+
4244
ResultSetTableTokenPtr head(const ResultSetTableToken& token, size_t n);
4345
ResultSetTableTokenPtr tail(const ResultSetTableToken& token, size_t n);
4446

omniscidb/ResultSetRegistry/ResultSetTableToken.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ void ResultSetTableToken::reset() {
3838
}
3939
}
4040

41+
void ResultSetTableToken::setTableStats(TableStats stats) const {
42+
registry_->setTableStats(*this, std::move(stats));
43+
}
44+
4145
ResultSetTableTokenPtr ResultSetTableToken::head(size_t n) const {
4246
return registry_->head(*this, n);
4347
}

omniscidb/ResultSetRegistry/ResultSetTableToken.h

+3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "ResultSetTable.h"
1010

1111
#include "DataMgr/ChunkMetadata.h"
12+
#include "DataProvider/TableFragmentsInfo.h"
1213
#include "SchemaMgr/TableInfo.h"
1314

1415
#include "arrow/api.h"
@@ -54,6 +55,8 @@ class ResultSetTableToken : public std::enable_shared_from_this<ResultSetTableTo
5455

5556
const std::string& tableName() const { return tinfo_->name; }
5657

58+
void setTableStats(TableStats stats) const;
59+
5760
ResultSetTableTokenPtr head(size_t n) const;
5861
ResultSetTableTokenPtr tail(size_t n) const;
5962

0 commit comments

Comments
 (0)