Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2228,15 +2228,49 @@ PlanNodePtr RowNumberNode::create(const folly::dynamic& obj, void* context) {
source);
}

namespace {
std::unordered_map<TopNRowNumberNode::RankFunction, std::string>
rankFunctionNames() {
return {
{TopNRowNumberNode::RankFunction::kRowNumber, "row_number"},
{TopNRowNumberNode::RankFunction::kRank, "rank"},
{TopNRowNumberNode::RankFunction::kDenseRank, "dense_rank"},
};
}
} // namespace

// static
const char* TopNRowNumberNode::rankFunctionName(
TopNRowNumberNode::RankFunction function) {
static const auto kFunctionNames = rankFunctionNames();
auto it = kFunctionNames.find(function);
VELOX_CHECK(
it != kFunctionNames.end(),
"Invalid window type {}",
static_cast<int>(function));
return it->second.c_str();
}

// static
TopNRowNumberNode::RankFunction TopNRowNumberNode::rankFunctionFromName(
const std::string& name) {
static const auto kFunctionNames = invertMap(rankFunctionNames());
auto it = kFunctionNames.find(name);
VELOX_CHECK(it != kFunctionNames.end(), "Invalid rank function " + name);
return it->second;
}

TopNRowNumberNode::TopNRowNumberNode(
PlanNodeId id,
RankFunction function,
std::vector<FieldAccessTypedExprPtr> partitionKeys,
std::vector<FieldAccessTypedExprPtr> sortingKeys,
std::vector<SortOrder> sortingOrders,
const std::optional<std::string>& rowNumberColumnName,
int32_t limit,
PlanNodePtr source)
: PlanNode(std::move(id)),
function_(function),
partitionKeys_{std::move(partitionKeys)},
sortingKeys_{std::move(sortingKeys)},
sortingOrders_{std::move(sortingOrders)},
Expand Down Expand Up @@ -2274,6 +2308,8 @@ TopNRowNumberNode::TopNRowNumberNode(
}

void TopNRowNumberNode::addDetails(std::stringstream& stream) const {
stream << rankFunctionName(function_) << " ";

if (!partitionKeys_.empty()) {
stream << "partition by (";
addFields(stream, partitionKeys_);
Expand All @@ -2289,6 +2325,7 @@ void TopNRowNumberNode::addDetails(std::stringstream& stream) const {

folly::dynamic TopNRowNumberNode::serialize() const {
auto obj = PlanNode::serialize();
obj["function"] = rankFunctionName(function_);
obj["partitionKeys"] = ISerializable::serialize(partitionKeys_);
obj["sortingKeys"] = ISerializable::serialize(sortingKeys_);
obj["sortingOrders"] = serializeSortingOrders(sortingOrders_);
Expand All @@ -2310,6 +2347,7 @@ PlanNodePtr TopNRowNumberNode::create(
const folly::dynamic& obj,
void* context) {
auto source = deserializeSingleSource(obj, context);
auto function = rankFunctionFromName(obj["function"].asString());
auto partitionKeys = deserializeFields(obj["partitionKeys"], context);
auto sortingKeys = deserializeFields(obj["sortingKeys"], context);

Expand All @@ -2322,6 +2360,7 @@ PlanNodePtr TopNRowNumberNode::create(

return std::make_shared<TopNRowNumberNode>(
deserializePlanNodeId(obj),
function,
partitionKeys,
sortingKeys,
sortingOrders,
Expand Down
58 changes: 53 additions & 5 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -4733,31 +4733,73 @@ class MarkDistinctNode : public PlanNode {
const RowTypePtr outputType_;
};

/// Optimized version of a WindowNode for a single row_number function with a
/// limit over sorted partitions.
/// The output of this node contains all input columns followed by an optional
/// Optimized version of a WindowNode for a single row_number, rank or
/// dense_rank function with a limit over sorted partitions. The output of this
/// node contains all input columns followed by an optional
/// 'rowNumberColumnName' BIGINT column.
class TopNRowNumberNode : public PlanNode {
public:
enum class RankFunction {
kRowNumber,
kRank,
kDenseRank,
};

static const char* rankFunctionName(TopNRowNumberNode::RankFunction function);

static RankFunction rankFunctionFromName(const std::string& name);

/// @param rankFunction RanksFunction (row_number, rank, dense_rank) for TopN.
/// @param partitionKeys Partitioning keys. May be empty.
/// @param sortingKeys Sorting keys. May not be empty and may not intersect
/// with 'partitionKeys'.
/// @param sortingOrders Sorting orders, one per sorting key.
/// @param rowNumberColumnName Optional name of the column containing row
/// numbers. If not specified, the output doesn't include 'row number'
/// column. This is used when computing partial results.
/// numbers (or rank and dense_rank). If not specified, the output doesn't
/// include 'row number' column. This is used when computing partial results.
/// @param limit Per-partition limit. The number of
/// rows produced by this node will not exceed this value for any given
/// partition. Extra rows will be dropped.
TopNRowNumberNode(
PlanNodeId id,
RankFunction function,
std::vector<FieldAccessTypedExprPtr> partitionKeys,
std::vector<FieldAccessTypedExprPtr> sortingKeys,
std::vector<SortOrder> sortingOrders,
const std::optional<std::string>& rowNumberColumnName,
int32_t limit,
PlanNodePtr source);

/// Note : This constructor is for backwards compatibility. Remove it after
/// migrating Prestissimo to use the new constructor.
/// @param partitionKeys Partitioning keys. May be empty.
/// @param sortingKeys Sorting keys. May not be empty and may not intersect
/// with 'partitionKeys'.
/// @param sortingOrders Sorting orders, one per sorting key.
/// @param rowNumberColumnName Optional name of the column containing row
/// numbers (or rank and dense_rank). If not specified, the output doesn't
/// include 'row number' column. This is used when computing partial results.
/// @param limit Per-partition limit. The number of
/// rows produced by this node will not exceed this value for any given
/// partition. Extra rows will be dropped.
TopNRowNumberNode(
PlanNodeId id,
std::vector<FieldAccessTypedExprPtr> partitionKeys,
std::vector<FieldAccessTypedExprPtr> sortingKeys,
std::vector<SortOrder> sortingOrders,
const std::optional<std::string>& rowNumberColumnName,
int32_t limit,
PlanNodePtr source)
: TopNRowNumberNode(
id,
RankFunction::kRowNumber,
partitionKeys,
sortingKeys,
sortingOrders,
rowNumberColumnName,
limit,
source) {}

class Builder {
public:
Builder() = default;
Expand Down Expand Up @@ -4884,6 +4926,10 @@ class TopNRowNumberNode : public PlanNode {
return limit_;
}

RankFunction rankFunction() const {
return function_;
}

bool generateRowNumber() const {
return outputType_->size() > sources_[0]->outputType()->size();
}
Expand All @@ -4899,6 +4945,8 @@ class TopNRowNumberNode : public PlanNode {
private:
void addDetails(std::stringstream& stream) const override;

const RankFunction function_;

const std::vector<FieldAccessTypedExprPtr> partitionKeys_;

const std::vector<FieldAccessTypedExprPtr> sortingKeys_;
Expand Down
Loading
Loading