Skip to content

Commit

Permalink
Move probe rows of Join. (#4283)
Browse files Browse the repository at this point in the history
Co-authored-by: Sophie <[email protected]>
  • Loading branch information
Shylock-Hg and Sophie-Xie authored Jun 13, 2022
1 parent 2b35538 commit 7ca2bad
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 35 deletions.
3 changes: 3 additions & 0 deletions src/graph/executor/Executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ class Executor : private boost::noncopyable, private cpp::NonMovable {
// Check whether the variable is movable, it's movable when reach end of lifetime
// This method shouldn't call after `finish` method!
bool movable(const Variable *var);
bool movable(const std::string &var) {
return movable(qctx_->symTable()->getVar(var));
}

// Store the result of this executor to execution context
Status finish(Result &&result);
Expand Down
74 changes: 57 additions & 17 deletions src/graph/executor/query/InnerJoinExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,25 @@ folly::Future<Status> InnerJoinExecutor::join(const std::vector<Expression*>& ha
hashTable.reserve(bucketSize);
if (lhsIter_->size() < rhsIter_->size()) {
buildSingleKeyHashTable(hashKeys.front(), lhsIter_.get(), hashTable);
mv_ = movable(rightVar());
result = singleKeyProbe(probeKeys.front(), rhsIter_.get(), hashTable);
} else {
exchange_ = true;
buildSingleKeyHashTable(probeKeys.front(), rhsIter_.get(), hashTable);
mv_ = movable(leftVar());
result = singleKeyProbe(hashKeys.front(), lhsIter_.get(), hashTable);
}
} else {
std::unordered_map<List, std::vector<const Row*>> hashTable;
hashTable.reserve(bucketSize);
if (lhsIter_->size() < rhsIter_->size()) {
buildHashTable(hashKeys, lhsIter_.get(), hashTable);
mv_ = movable(rightVar());
result = probe(probeKeys, rhsIter_.get(), hashTable);
} else {
exchange_ = true;
buildHashTable(probeKeys, rhsIter_.get(), hashTable);
mv_ = movable(leftVar());
result = probe(hashKeys, lhsIter_.get(), hashTable);
}
}
Expand All @@ -74,7 +78,13 @@ DataSet InnerJoinExecutor::probe(
Value val = col->eval(ctx(probeIter));
list.values.emplace_back(std::move(val));
}
buildNewRow<List>(hashTable, list, *probeIter->row(), ds);
if (mv_) {
// Probe row only match key in HashTable once, so we could move it directly,
// key/value in HashTable will be matched multiple times, so we can't move it.
buildNewRow<List>(hashTable, list, probeIter->moveRow(), ds);
} else {
buildNewRow<List>(hashTable, list, *probeIter->row(), ds);
}
}
return ds;
}
Expand All @@ -87,37 +97,66 @@ DataSet InnerJoinExecutor::singleKeyProbe(
QueryExpressionContext ctx(ectx_);
for (; probeIter->valid(); probeIter->next()) {
auto& val = probeKey->eval(ctx(probeIter));
buildNewRow<Value>(hashTable, val, *probeIter->row(), ds);
if (mv_) {
// Probe row only match key in HashTable once, so we could move it directly,
// key/value in HashTable will be matched multiple times, so we can't move it.
buildNewRow<Value>(hashTable, val, probeIter->moveRow(), ds);
} else {
buildNewRow<Value>(hashTable, val, *probeIter->row(), ds);
}
}
return ds;
}

template <class T>
void InnerJoinExecutor::buildNewRow(const std::unordered_map<T, std::vector<const Row*>>& hashTable,
const T& val,
const Row& rRow,
Row rRow,
DataSet& ds) const {
const auto& range = hashTable.find(val);
if (range == hashTable.end()) {
return;
}
for (auto* row : range->second) {
auto& lRow = *row;
Row newRow;
newRow.reserve(lRow.size() + rRow.size());
auto& values = newRow.values;
for (std::size_t i = 0; i < (range->second.size() - 1); ++i) {
if (exchange_) {
values.insert(values.end(),
std::make_move_iterator(rRow.values.begin()),
std::make_move_iterator(rRow.values.end()));
values.insert(values.end(), lRow.values.begin(), lRow.values.end());
ds.rows.emplace_back(newRow(rRow, *range->second[i]));
} else {
values.insert(values.end(), lRow.values.begin(), lRow.values.end());
values.insert(values.end(),
std::make_move_iterator(rRow.values.begin()),
std::make_move_iterator(rRow.values.end()));
ds.rows.emplace_back(newRow(*range->second[i], rRow));
}
ds.rows.emplace_back(std::move(newRow));
}
// Move probe row in last new row creating
if (exchange_) {
ds.rows.emplace_back(newRow(std::move(rRow), *range->second.back()));
} else {
ds.rows.emplace_back(newRow(*range->second.back(), std::move(rRow)));
}
}

Row InnerJoinExecutor::newRow(Row left, Row right) const {
Row r;
r.reserve(left.size() + right.size());
r.values.insert(r.values.end(),
std::make_move_iterator(left.values.begin()),
std::make_move_iterator(left.values.end()));
r.values.insert(r.values.end(),
std::make_move_iterator(right.values.begin()),
std::make_move_iterator(right.values.end()));
return r;
}

const std::string& InnerJoinExecutor::leftVar() const {
if (node_->kind() == PlanNode::Kind::kBiInnerJoin) {
return node_->asNode<BiJoin>()->leftInputVar();
} else {
return node_->asNode<Join>()->leftVar().first;
}
}

const std::string& InnerJoinExecutor::rightVar() const {
if (node_->kind() == PlanNode::Kind::kBiInnerJoin) {
return node_->asNode<BiJoin>()->rightInputVar();
} else {
return node_->asNode<Join>()->rightVar().first;
}
}

Expand All @@ -132,5 +171,6 @@ folly::Future<Status> BiInnerJoinExecutor::execute() {
NG_RETURN_IF_ERROR(checkBiInputDataSets());
return join(joinNode->hashKeys(), joinNode->probeKeys(), joinNode->colNames());
}

} // namespace graph
} // namespace nebula
11 changes: 10 additions & 1 deletion src/graph/executor/query/InnerJoinExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,20 @@ class InnerJoinExecutor : public JoinExecutor {
template <class T>
void buildNewRow(const std::unordered_map<T, std::vector<const Row*>>& hashTable,
const T& val,
const Row& rRow,
Row rRow,
DataSet& ds) const;

// concat rows
Row newRow(Row left, Row right) const;

const std::string& leftVar() const;

const std::string& rightVar() const;

private:
bool exchange_{false};
// Does the probe result movable?
bool mv_{false};
};

// No diffrence with inner join in processing data, but the dependencies would be executed in
Expand Down
12 changes: 12 additions & 0 deletions src/graph/executor/query/JoinExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,17 @@ void JoinExecutor::buildSingleKeyHashTable(
}
}

Row JoinExecutor::newRow(Row left, Row right) const {
Row r;
r.reserve(left.size() + right.size());
r.values.insert(r.values.end(),
std::make_move_iterator(left.values.begin()),
std::make_move_iterator(left.values.end()));
r.values.insert(r.values.end(),
std::make_move_iterator(right.values.begin()),
std::make_move_iterator(right.values.end()));
return r;
}

} // namespace graph
} // namespace nebula
3 changes: 3 additions & 0 deletions src/graph/executor/query/JoinExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class JoinExecutor : public Executor {
Iterator* iter,
std::unordered_map<Value, std::vector<const Row*>>& hashTable) const;

// concat rows
Row newRow(Row left, Row right) const;

std::unique_ptr<Iterator> lhsIter_;
std::unique_ptr<Iterator> rhsIter_;
size_t colSize_{0};
Expand Down
34 changes: 21 additions & 13 deletions src/graph/executor/query/LeftJoinExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ folly::Future<Status> LeftJoinExecutor::join(const std::vector<Expression*>& has
hashTable.reserve(rhsIter_->empty() ? 1 : rhsIter_->size());
if (!lhsIter_->empty()) {
buildSingleKeyHashTable(probeKeys.front(), rhsIter_.get(), hashTable);
mv_ = movable(node()->inputVars()[0]);
result = singleKeyProbe(hashKeys.front(), lhsIter_.get(), hashTable);
}
} else {
std::unordered_map<List, std::vector<const Row*>> hashTable;
hashTable.reserve(rhsIter_->empty() ? 1 : rhsIter_->size());
if (!lhsIter_->empty()) {
buildHashTable(probeKeys, rhsIter_.get(), hashTable);
mv_ = movable(node()->inputVars()[0]);
result = probe(hashKeys, lhsIter_.get(), hashTable);
}
}
Expand All @@ -62,7 +64,13 @@ DataSet LeftJoinExecutor::probe(
list.values.emplace_back(std::move(val));
}

buildNewRow<List>(hashTable, list, *probeIter->row(), ds);
if (mv_) {
// Probe row only match key in HashTable once, so we could move it directly,
// key/value in HashTable will be matched multiple times, so we can't move it.
buildNewRow<List>(hashTable, list, probeIter->moveRow(), ds);
} else {
buildNewRow<List>(hashTable, list, *probeIter->row(), ds);
}
}
return ds;
}
Expand All @@ -76,15 +84,21 @@ DataSet LeftJoinExecutor::singleKeyProbe(
QueryExpressionContext ctx(ectx_);
for (; probeIter->valid(); probeIter->next()) {
auto& val = probeKey->eval(ctx(probeIter));
buildNewRow<Value>(hashTable, val, *probeIter->row(), ds);
if (mv_) {
// Probe row only match key in HashTable once, so we could move it directly,
// key/value in HashTable will be matched multiple times, so we can't move it.
buildNewRow<Value>(hashTable, val, probeIter->moveRow(), ds);
} else {
buildNewRow<Value>(hashTable, val, *probeIter->row(), ds);
}
}
return ds;
}

template <class T>
void LeftJoinExecutor::buildNewRow(const std::unordered_map<T, std::vector<const Row*>>& hashTable,
const T& val,
const Row& lRow,
Row lRow,
DataSet& ds) const {
auto range = hashTable.find(val);
if (range == hashTable.end()) {
Expand All @@ -98,17 +112,11 @@ void LeftJoinExecutor::buildNewRow(const std::unordered_map<T, std::vector<const
values.insert(values.end(), colSize_ - lRowSize, Value::kNullValue);
ds.rows.emplace_back(std::move(newRow));
} else {
for (auto* row : range->second) {
auto& rRow = *row;
Row newRow;
auto& values = newRow.values;
values.reserve(lRow.size() + rRow.size());
values.insert(values.end(),
std::make_move_iterator(lRow.values.begin()),
std::make_move_iterator(lRow.values.end()));
values.insert(values.end(), rRow.values.begin(), rRow.values.end());
ds.rows.emplace_back(std::move(newRow));
for (std::size_t i = 0; i < (range->second.size() - 1); ++i) {
ds.rows.emplace_back(newRow(lRow, *range->second[i]));
}
// Move probe row in last new row creating
ds.rows.emplace_back(newRow(std::move(lRow), *range->second.back()));
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/graph/executor/query/LeftJoinExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ class LeftJoinExecutor : public JoinExecutor {
template <class T>
void buildNewRow(const std::unordered_map<T, std::vector<const Row*>>& hashTable,
const T& val,
const Row& lRow,
Row lRow,
DataSet& ds) const;

// Does the probe result movable?
bool mv_{false};
size_t rightColSize_{0};
};

Expand Down
7 changes: 4 additions & 3 deletions src/graph/executor/query/RollUpApplyExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ DataSet RollUpApplyExecutor::probeZeroKey(Iterator* probeIter, const List& hashT
ds.rows.reserve(probeIter->size());
QueryExpressionContext ctx(ectx_);
for (; probeIter->valid(); probeIter->next()) {
Row row = *probeIter->row();
Row row = mv_ ? probeIter->moveRow() : *probeIter->row();
row.emplace_back(std::move(hashTable));
ds.rows.emplace_back(std::move(row));
}
Expand All @@ -104,7 +104,7 @@ DataSet RollUpApplyExecutor::probeSingleKey(Expression* probeKey,
if (found != hashTable.end()) {
vals = found->second;
}
Row row = *probeIter->row();
Row row = mv_ ? probeIter->moveRow() : *probeIter->row();
row.emplace_back(std::move(vals));
ds.rows.emplace_back(std::move(row));
}
Expand All @@ -130,7 +130,7 @@ DataSet RollUpApplyExecutor::probe(std::vector<Expression*> probeKeys,
if (found != hashTable.end()) {
vals = found->second;
}
Row row = *probeIter->row();
Row row = mv_ ? probeIter->moveRow() : *probeIter->row();
row.emplace_back(std::move(vals));
ds.rows.emplace_back(std::move(row));
}
Expand All @@ -141,6 +141,7 @@ folly::Future<Status> RollUpApplyExecutor::rollUpApply() {
auto* rollUpApplyNode = asNode<RollUpApply>(node());
NG_RETURN_IF_ERROR(checkBiInputDataSets());
DataSet result;
mv_ = movable(node()->inputVars()[0]);
if (rollUpApplyNode->compareCols().size() == 0) {
List hashTable;
buildZeroKeyHashTable(rollUpApplyNode->collectCol(), rhsIter_.get(), hashTable);
Expand Down
2 changes: 2 additions & 0 deletions src/graph/executor/query/RollUpApplyExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class RollUpApplyExecutor : public Executor {
std::unique_ptr<Iterator> lhsIter_;
std::unique_ptr<Iterator> rhsIter_;
size_t colSize_{0};
// Does the probe result movable?
bool mv_{false};
};

} // namespace graph
Expand Down

0 comments on commit 7ca2bad

Please sign in to comment.