Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 72 additions & 29 deletions velox/exec/tests/TableEvolutionFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,43 @@ TableEvolutionFuzzer::parseFileFormats(std::string input) {

namespace {

// Helper function to randomly select aggregates from available columns
// without replacement. Returns a list of aggregate expressions.
void generateAggregatesForColumns(
const std::vector<int>& availableColumns,
const std::vector<std::string>& supportedAggFuncs,
const RowTypePtr& schema,
FuzzerGenerator& rng,
std::vector<std::string>& aggregates) {
if (availableColumns.empty()) {
return;
}

int numAggregates = std::min(
static_cast<int>(availableColumns.size()),
std::min(
static_cast<int>(5),
static_cast<int>(
folly::Random::rand32(1, availableColumns.size() + 1, rng))));

std::unordered_set<int> selectedIndices;
for (int i = 0; i < numAggregates; ++i) {
if (folly::Random::oneIn(2, rng)) {
int randomIdx;
do {
randomIdx = folly::Random::rand32(availableColumns.size(), rng);
} while (selectedIndices.count(randomIdx) > 0);
selectedIndices.insert(randomIdx);

int colIdx = availableColumns[randomIdx];
std::string aggFunc = supportedAggFuncs[folly::Random::rand32(
supportedAggFuncs.size(), rng)];
aggregates.push_back(
fmt::format("{}({})", aggFunc, schema->nameOf(colIdx)));
}
}
}

std::vector<std::vector<RowVectorPtr>> runTaskCursors(
const std::vector<std::shared_ptr<TaskCursor>>& cursors,
folly::Executor& executor) {
Expand Down Expand Up @@ -410,7 +447,7 @@ fuzzer::ExpressionFuzzer::FuzzedExpressionData generateRemainingFilters(

// Generate random aggregation configuration for pushdown testing.
// Only generates aggregations that are eligible for pushdown:
// - Supported aggregate functions: min, max, sum
// - Supported aggregate functions: min, max, bool_and, bool_or
// - Each column can only be used by at most one aggregate
// - Grouping keys are optional (can be empty for global aggregation)
// - Columns with filters (subfield or remaining) are excluded to enable
Expand All @@ -421,7 +458,11 @@ std::optional<AggregationConfig> generateAggregationConfig(
const std::unordered_set<std::string>& filteredColumns) {
// List of aggregate functions that support pushdown
// Note: Excluding 'sum' to avoid integer overflow in fuzzer with random data
static const std::vector<std::string> supportedAggs = {"min", "max"};
static const std::vector<std::string> supportedNumericAggs = {"min", "max"};
static const std::vector<std::string> supportedBooleanAggs = {
"bool_and", "bool_or"};
static const std::vector<std::string> supportedIntegerAggs = {
"bitwise_and_agg", "bitwise_or_agg", "bitwise_xor_agg"};

// Randomly decide number of grouping keys (0 to 2)
int numGroupingKeys = folly::Random::rand32(3, rng);
Expand All @@ -441,7 +482,9 @@ std::optional<AggregationConfig> generateAggregationConfig(
// For aggregation pushdown to work, each column should only be used once
// and columns with filters should be excluded
std::vector<std::string> aggregates;
std::vector<int> availableColumns;
std::vector<int> availableNumericColumns;
std::vector<int> availableIntegerColumns;
std::vector<int> availableBooleanColumns;
for (int i = 0; i < schema->size(); ++i) {
if (usedColumnIndices.count(i) == 0) {
auto columnName = schema->nameOf(i);
Expand All @@ -451,41 +494,41 @@ std::optional<AggregationConfig> generateAggregationConfig(
}

auto type = schema->childAt(i);
// Only numeric types support min/max/sum
// Check if it's a primitive numeric type (not decimal)
// Integer types: randomly choose between min/max or bitwise aggregations
// Note: Exclude DATE type as it doesn't support bitwise aggregations
if ((type->isInteger() || type->isBigint() || type->isSmallint() ||
type->isTinyint() || type->isReal() || type->isDouble()) &&
!type->isDecimal()) {
availableColumns.push_back(i);
type->isTinyint()) &&
!type->isDate()) {
if (folly::Random::oneIn(2, rng)) {
availableIntegerColumns.push_back(i);
} else {
availableNumericColumns.push_back(i);
}
}
// Float types support min/max only
else if ((type->isReal() || type->isDouble()) && !type->isDecimal()) {
availableNumericColumns.push_back(i);
}
// Boolean types support bool_and/bool_or
else if (type->isBoolean()) {
availableBooleanColumns.push_back(i);
}
}
}

if (availableColumns.empty()) {
// Need at least one column to aggregate
if (availableNumericColumns.empty() && availableBooleanColumns.empty() &&
availableIntegerColumns.empty()) {
return std::nullopt;
}

// Randomly select 1-5 aggregates
int numAggregates = std::min(
static_cast<int>(5),
static_cast<int>(
folly::Random::rand32(1, availableColumns.size() + 1, rng)));

// Randomly pick columns for aggregates without replacement
std::unordered_set<int> selectedIndices;
for (int i = 0; i < numAggregates; ++i) {
int randomIdx;
do {
randomIdx = folly::Random::rand32(availableColumns.size(), rng);
} while (selectedIndices.count(randomIdx) > 0);
selectedIndices.insert(randomIdx);

int colIdx = availableColumns[randomIdx];
std::string aggFunc =
supportedAggs[folly::Random::rand32(supportedAggs.size(), rng)];
aggregates.push_back(
fmt::format("{}({})", aggFunc, schema->nameOf(colIdx)));
}
generateAggregatesForColumns(
availableNumericColumns, supportedNumericAggs, schema, rng, aggregates);
generateAggregatesForColumns(
availableBooleanColumns, supportedBooleanAggs, schema, rng, aggregates);
generateAggregatesForColumns(
availableIntegerColumns, supportedIntegerAggs, schema, rng, aggregates);

if (aggregates.empty()) {
return std::nullopt;
Expand Down
Loading