Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 53 additions & 15 deletions velox/experimental/cudf/exec/CudfLocalPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,14 @@ CudfLocalPartition::CudfLocalPartition(
partitionKeyIndices_.push_back(fieldIndex);
}
}
partitionFunctionType_ = PartitionFunctionType::kHash;
} else if (
numPartitions_ > 1 && spec.find("ROUND ROBIN") != std::string::npos) {
partitionFunctionType_ = PartitionFunctionType::kRoundRobin;
}
VELOX_CHECK(numPartitions_ == 1 || partitionKeyIndices_.size() > 0);
VELOX_CHECK(
numPartitions_ == 1 || partitionKeyIndices_.size() > 0 ||
partitionFunctionType_ == PartitionFunctionType::kRoundRobin);

// Since we're replacing the LocalPartition with CudfLocalPartition, the
// number of producers is already set. Adding producer only adds to a counter
Expand All @@ -110,27 +116,58 @@ CudfLocalPartition::CudfLocalPartition(
// }
}

void CudfLocalPartition::recordOutputStats(RowVectorPtr& input) {
{
auto lockedStats = stats_.wlock();
lockedStats->addOutputVector(input->estimateFlatSize(), input->size());
}
}

void CudfLocalPartition::flushVectorPool() {
// We reuse the LocalExchangeQueue from the CPU implementation. That impl
// stores used vectors in a vector pool for the CPU LocalPartition to re-use.
// CudfLocalPartition does not need it and does not extract it. This results
// in unnecessary extension of the lifetimes of vectors that were exchanged,
// resulting in kind of a memory leak.
// This is a hack to forcefully flush the vector pools.

for (auto& queue : queues_) {
queue->getVector();
}
}

void CudfLocalPartition::addInput(RowVectorPtr input) {
flushVectorPool();
VELOX_NVTX_OPERATOR_FUNC_RANGE();
recordOutputStats(input);
auto cudfVector = std::dynamic_pointer_cast<CudfVector>(input);
VELOX_CHECK(cudfVector, "Input must be a CudfVector");
auto stream = cudfVector->stream();

if (numPartitions_ > 1) {
// Use cudf hash partitioning
auto tableView = cudfVector->getTableView();
std::vector<cudf::size_type> partitionKeyIndices;
for (const auto& idx : partitionKeyIndices_) {
partitionKeyIndices.push_back(static_cast<cudf::size_type>(idx));
}

auto [partitionedTable, partitionOffsets] = cudf::hash_partition(
tableView,
partitionKeyIndices,
numPartitions_,
cudf::hash_id::HASH_MURMUR3,
cudf::DEFAULT_HASH_SEED,
stream);
auto [partitionedTable, partitionOffsets] = [&]() {
auto tableView = cudfVector->getTableView();
// Use cudf hash partitioning
if (partitionFunctionType_ == PartitionFunctionType::kHash) {
std::vector<cudf::size_type> partitionKeyIndices;
for (const auto& idx : partitionKeyIndices_) {
partitionKeyIndices.push_back(static_cast<cudf::size_type>(idx));
}

return cudf::hash_partition(
tableView,
partitionKeyIndices,
numPartitions_,
cudf::hash_id::HASH_MURMUR3,
cudf::DEFAULT_HASH_SEED,
stream);
} else if (partitionFunctionType_ == PartitionFunctionType::kRoundRobin) {
return cudf::round_robin_partition(
tableView, numPartitions_, counter_, stream);
counter_ = (counter_ + cudfVector->size()) % numPartitions_;
}
VELOX_FAIL("Unsupported partition function");
}();

VELOX_CHECK(partitionOffsets.size() == numPartitions_);
VELOX_CHECK(partitionOffsets[0] == 0);
Expand Down Expand Up @@ -203,6 +240,7 @@ bool CudfLocalPartition::isFinished() {
if (!futures_.empty() || !noMoreInput_) {
return false;
}
flushVectorPool();

return true;
}
Expand Down
12 changes: 12 additions & 0 deletions velox/experimental/cudf/exec/CudfLocalPartition.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@

namespace facebook::velox::cudf_velox {

enum class PartitionFunctionType {
kHash,
kRoundRobin,
};

class CudfLocalPartition : public exec::Operator, public NvtxHelper {
public:
CudfLocalPartition(
Expand All @@ -33,6 +38,8 @@ class CudfLocalPartition : public exec::Operator, public NvtxHelper {
return fmt::format("CudfLocalPartition({})", numPartitions_);
}

void recordOutputStats(RowVectorPtr& input);

void addInput(RowVectorPtr input) override;

RowVectorPtr getOutput() override {
Expand All @@ -54,9 +61,14 @@ class CudfLocalPartition : public exec::Operator, public NvtxHelper {
static bool shouldReplace(
const std::shared_ptr<const core::LocalPartitionNode>& planNode);

private:
void flushVectorPool();

protected:
const std::vector<std::shared_ptr<exec::LocalExchangeQueue>> queues_;
const size_t numPartitions_;
PartitionFunctionType partitionFunctionType_;
size_t counter_{0};

std::vector<exec::BlockingReason> blockingReasons_;
std::vector<ContinueFuture> futures_;
Expand Down
9 changes: 7 additions & 2 deletions velox/experimental/cudf/exec/ToCudf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ bool CompileState::compile(bool allowCpuFallback) {
return isAnyOf<
exec::OrderBy,
exec::HashAggregation,
exec::StreamingAggregation,
exec::Limit,
exec::LocalPartition,
exec::LocalExchange,
Expand All @@ -179,6 +180,7 @@ bool CompileState::compile(bool allowCpuFallback) {
return isAnyOf<
exec::OrderBy,
exec::HashAggregation,
exec::StreamingAggregation,
exec::Limit,
exec::LocalPartition,
exec::AssignUniqueId>(op) ||
Expand All @@ -190,6 +192,7 @@ bool CompileState::compile(bool allowCpuFallback) {
return isAnyOf<
exec::OrderBy,
exec::HashAggregation,
exec::StreamingAggregation,
exec::Limit,
exec::LocalExchange,
exec::AssignUniqueId>(op) ||
Expand Down Expand Up @@ -266,9 +269,11 @@ bool CompileState::compile(bool allowCpuFallback) {
getPlanNode(orderByOp->planNodeId()));
VELOX_CHECK(planNode != nullptr);
replaceOp.push_back(std::make_unique<CudfOrderBy>(id, ctx, planNode));
} else if (auto hashAggOp = dynamic_cast<exec::HashAggregation*>(oper)) {
} else if (
dynamic_cast<exec::HashAggregation*>(oper) or
dynamic_cast<exec::StreamingAggregation*>(oper)) {
auto planNode = std::dynamic_pointer_cast<const core::AggregationNode>(
getPlanNode(hashAggOp->planNodeId()));
getPlanNode(oper->planNodeId()));
VELOX_CHECK(planNode != nullptr);
replaceOp.push_back(
std::make_unique<CudfHashAggregation>(id, ctx, planNode));
Expand Down
Loading