diff --git a/velox/exec/fuzzer/CMakeLists.txt b/velox/exec/fuzzer/CMakeLists.txt index cf2ceecc954..014aa605d9b 100644 --- a/velox/exec/fuzzer/CMakeLists.txt +++ b/velox/exec/fuzzer/CMakeLists.txt @@ -33,7 +33,6 @@ target_link_libraries( velox_type_parser Folly::folly velox_hive_connector - velox_dwio_dwrf_reader velox_dwio_dwrf_writer velox_dwio_catalog_fbhive velox_dwio_faulty_file_sink) @@ -79,22 +78,39 @@ target_link_libraries( velox_aggregation_fuzzer_base velox_temp_path) +add_library(velox_row_number_fuzzer_base_lib RowNumberFuzzerBase.cpp) + +target_link_libraries( + velox_row_number_fuzzer_base_lib + velox_dwio_dwrf_reader + velox_fuzzer_util + velox_vector_fuzzer + velox_exec_test_lib) + add_library(velox_row_number_fuzzer_lib RowNumberFuzzer.cpp) +target_link_libraries( + velox_row_number_fuzzer_lib velox_row_number_fuzzer_base_lib velox_type + velox_expression_test_utility) + # RowNumber Fuzzer. add_executable(velox_row_number_fuzzer RowNumberFuzzerRunner.cpp) target_link_libraries( velox_row_number_fuzzer velox_row_number_fuzzer_lib) +add_library(velox_topn_row_number_fuzzer_lib TopNRowNumberFuzzer.cpp) + target_link_libraries( - velox_row_number_fuzzer_lib - velox_fuzzer_util - velox_type - velox_vector_fuzzer - velox_exec_test_lib + velox_topn_row_number_fuzzer_lib velox_row_number_fuzzer_base_lib velox_type velox_expression_test_utility) +# TopNRowNumber Fuzzer. +add_executable(velox_topn_row_number_fuzzer TopNRowNumberFuzzerRunner.cpp) + +target_link_libraries( + velox_topn_row_number_fuzzer velox_topn_row_number_fuzzer_lib) + add_library(velox_join_fuzzer JoinFuzzer.cpp) target_link_libraries( diff --git a/velox/exec/fuzzer/DuckQueryRunner.cpp b/velox/exec/fuzzer/DuckQueryRunner.cpp index 733675fec7e..547a81b58d6 100644 --- a/velox/exec/fuzzer/DuckQueryRunner.cpp +++ b/velox/exec/fuzzer/DuckQueryRunner.cpp @@ -166,6 +166,11 @@ std::optional DuckQueryRunner::toSql( return toSql(rowNumberNode); } + if (const auto topNRowNumberNode = + std::dynamic_pointer_cast(plan)) { + return toSql(topNRowNumberNode); + } + if (const auto joinNode = std::dynamic_pointer_cast(plan)) { return toSql(joinNode); @@ -377,4 +382,55 @@ std::optional DuckQueryRunner::toSql( return sql.str(); } + +std::optional DuckQueryRunner::toSql( + const std::shared_ptr& topNRowNumberNode) { + std::stringstream sql; + sql << "SELECT * FROM (SELECT "; + + const auto& inputType = topNRowNumberNode->sources()[0]->outputType(); + for (auto i = 0; i < inputType->size(); ++i) { + appendComma(i, sql); + sql << inputType->nameOf(i); + } + + sql << ", row_number() OVER ("; + + const auto& partitionKeys = topNRowNumberNode->partitionKeys(); + if (!partitionKeys.empty()) { + sql << "partition by "; + for (auto i = 0; i < partitionKeys.size(); ++i) { + appendComma(i, sql); + sql << partitionKeys[i]->name(); + } + } + + const auto& sortingKeys = topNRowNumberNode->sortingKeys(); + const auto& sortingOrders = topNRowNumberNode->sortingOrders(); + + if (!sortingKeys.empty()) { + sql << " ORDER BY "; + for (auto j = 0; j < sortingKeys.size(); ++j) { + appendComma(j, sql); + sql << sortingKeys[j]->name() << " " << sortingOrders[j].toString(); + } + } + + std::string rowNumberColumnName = topNRowNumberNode->generateRowNumber() + ? topNRowNumberNode->outputType()->nameOf( + topNRowNumberNode->outputType()->children().size() - 1) + : "row_number"; + + // TopNRowNumberNode should have a single source. + std::optional source = toSql(topNRowNumberNode->sources()[0]); + if (!source) { + return std::nullopt; + } + sql << ") as " << rowNumberColumnName << " FROM " << *source << ") "; + sql << " where " << rowNumberColumnName + << " <= " << topNRowNumberNode->limit(); + + return sql.str(); +} + } // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/DuckQueryRunner.h b/velox/exec/fuzzer/DuckQueryRunner.h index 3389d1ac3f4..523ea69806f 100644 --- a/velox/exec/fuzzer/DuckQueryRunner.h +++ b/velox/exec/fuzzer/DuckQueryRunner.h @@ -74,6 +74,9 @@ class DuckQueryRunner : public ReferenceQueryRunner { std::optional toSql( const std::shared_ptr& rowNumberNode); + std::optional toSql( + const std::shared_ptr& topNRowNumberNode); + std::unordered_set aggregateFunctionNames_; }; diff --git a/velox/exec/fuzzer/PrestoQueryRunner.cpp b/velox/exec/fuzzer/PrestoQueryRunner.cpp index 1bfce2bf115..372745ab841 100644 --- a/velox/exec/fuzzer/PrestoQueryRunner.cpp +++ b/velox/exec/fuzzer/PrestoQueryRunner.cpp @@ -183,6 +183,11 @@ std::optional PrestoQueryRunner::toSql( return toSql(rowNumberNode); } + if (const auto topNRowNumberNode = + std::dynamic_pointer_cast(plan)) { + return toSql(topNRowNumberNode); + } + if (auto tableWriteNode = std::dynamic_pointer_cast(plan)) { return toSql(tableWriteNode); @@ -497,6 +502,60 @@ std::optional PrestoQueryRunner::toSql( return sql.str(); } +std::optional PrestoQueryRunner::toSql( + const std::shared_ptr& topNRowNumberNode) { + if (!isSupportedDwrfType(topNRowNumberNode->sources()[0]->outputType())) { + return std::nullopt; + } + + std::stringstream sql; + sql << "SELECT * FROM (SELECT "; + + const auto& inputType = topNRowNumberNode->sources()[0]->outputType(); + for (auto i = 0; i < inputType->size(); ++i) { + appendComma(i, sql); + sql << inputType->nameOf(i); + } + + sql << ", row_number() OVER ("; + + const auto& partitionKeys = topNRowNumberNode->partitionKeys(); + if (!partitionKeys.empty()) { + sql << "partition by "; + for (auto i = 0; i < partitionKeys.size(); ++i) { + appendComma(i, sql); + sql << partitionKeys[i]->name(); + } + } + + const auto& sortingKeys = topNRowNumberNode->sortingKeys(); + const auto& sortingOrders = topNRowNumberNode->sortingOrders(); + + if (!sortingKeys.empty()) { + sql << " ORDER BY "; + for (auto j = 0; j < sortingKeys.size(); ++j) { + appendComma(j, sql); + sql << sortingKeys[j]->name() << " " << sortingOrders[j].toString(); + } + } + + std::string rowNumberColumnName = topNRowNumberNode->generateRowNumber() + ? topNRowNumberNode->outputType()->nameOf( + topNRowNumberNode->outputType()->children().size() - 1) + : "row_number"; + + // TopNRowNumberNode should have a single source. + std::optional source = toSql(topNRowNumberNode->sources()[0]); + if (!source) { + return std::nullopt; + } + sql << ") as " << rowNumberColumnName << " FROM " << *source << ") "; + sql << " where " << rowNumberColumnName + << " <= " << topNRowNumberNode->limit(); + + return sql.str(); +} + std::optional PrestoQueryRunner::toSql( const std::shared_ptr& tableWriteNode) { auto insertTableHandle = diff --git a/velox/exec/fuzzer/PrestoQueryRunner.h b/velox/exec/fuzzer/PrestoQueryRunner.h index ca6083c7a09..a8ec00737bd 100644 --- a/velox/exec/fuzzer/PrestoQueryRunner.h +++ b/velox/exec/fuzzer/PrestoQueryRunner.h @@ -120,6 +120,9 @@ class PrestoQueryRunner : public velox::exec::test::ReferenceQueryRunner { std::optional toSql( const std::shared_ptr& rowNumberNode); + std::optional toSql( + const std::shared_ptr& rowNumberNode); + std::optional toSql( const std::shared_ptr& tableWriteNode); diff --git a/velox/exec/fuzzer/RowNumberFuzzer.cpp b/velox/exec/fuzzer/RowNumberFuzzer.cpp index 4e9ccf1ef5e..2fc3320cdc4 100644 --- a/velox/exec/fuzzer/RowNumberFuzzer.cpp +++ b/velox/exec/fuzzer/RowNumberFuzzer.cpp @@ -15,107 +15,27 @@ */ #include "velox/exec/fuzzer/RowNumberFuzzer.h" -#include + #include #include "velox/common/file/FileSystems.h" -#include "velox/connectors/hive/HiveConnector.h" -#include "velox/connectors/hive/HiveConnectorSplit.h" -#include "velox/dwio/dwrf/RegisterDwrfReader.h" #include "velox/exec/fuzzer/FuzzerUtil.h" -#include "velox/exec/fuzzer/ReferenceQueryRunner.h" -#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/fuzzer/RowNumberFuzzerBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" -#include "velox/serializers/CompactRowSerializer.h" -#include "velox/serializers/PrestoSerializer.h" -#include "velox/serializers/UnsafeRowSerializer.h" -#include "velox/vector/fuzzer/VectorFuzzer.h" - -DEFINE_int32(steps, 10, "Number of plans to generate and test."); - -DEFINE_int32( - duration_sec, - 0, - "For how long it should run (in seconds). If zero, " - "it executes exactly --steps iterations and exits."); - -DEFINE_int32( - batch_size, - 100, - "The number of elements on each generated vector."); - -DEFINE_int32(num_batches, 10, "The number of generated vectors."); - -DEFINE_double( - null_ratio, - 0.1, - "Chance of adding a null value in a vector " - "(expressed as double from 0 to 1)."); - -DEFINE_bool(enable_spill, true, "Whether to test plans with spilling enabled."); - -DEFINE_int32( - max_spill_level, - -1, - "Max spill level, -1 means random [0, 7], otherwise the actual level."); - -DEFINE_bool( - enable_oom_injection, - false, - "When enabled OOMs will randomly be triggered while executing query " - "plans. The goal of this mode is to ensure unexpected exceptions " - "aren't thrown and the process isn't killed in the process of cleaning " - "up after failures. Therefore, results are not compared when this is " - "enabled. Note that this option only works in debug builds."); namespace facebook::velox::exec { namespace { -class RowNumberFuzzer { +class RowNumberFuzzer : public RowNumberFuzzerBase { public: explicit RowNumberFuzzer( size_t initialSeed, std::unique_ptr); - void go(); - - struct PlanWithSplits { - core::PlanNodePtr plan; - std::vector splits; - - explicit PlanWithSplits( - core::PlanNodePtr _plan, - const std::vector& _splits = {}) - : plan(std::move(_plan)), splits(_splits) {} - }; - private: - static VectorFuzzer::Options getFuzzerOptions() { - VectorFuzzer::Options opts; - opts.vectorSize = FLAGS_batch_size; - opts.stringVariableLength = true; - opts.stringLength = 100; - opts.nullRatio = FLAGS_null_ratio; - return opts; - } - - void seed(size_t seed) { - currentSeed_ = seed; - vectorFuzzer_.reSeed(seed); - rng_.seed(currentSeed_); - } - - void reSeed() { - seed(rng_()); - } - // Runs one test iteration from query plans generations, executions and result // verifications. - void verify(); - - int32_t randInt(int32_t min, int32_t max) { - return boost::random::uniform_int_distribution(min, max)(rng_); - } + void runSingleIteration() override; std::pair, std::vector> generatePartitionKeys(); @@ -124,12 +44,6 @@ class RowNumberFuzzer { const std::vector& keyNames, const std::vector& keyTypes); - std::optional computeReferenceResults( - core::PlanNodePtr& plan, - const std::vector& input); - - RowVectorPtr execute(const PlanWithSplits& plan, bool injectSpill); - void addPlansWithTableScan( const std::string& tableDir, const std::vector& partitionKeys, @@ -149,69 +63,12 @@ class RowNumberFuzzer { const RowTypePtr& type, const std::vector& partitionKeys, const std::vector& splits); - - FuzzerGenerator rng_; - size_t currentSeed_{0}; - - std::shared_ptr rootPool_{ - memory::memoryManager()->addRootPool( - "rowNumberFuzzer", - memory::kMaxMemory, - memory::MemoryReclaimer::create())}; - std::shared_ptr pool_{rootPool_->addLeafChild( - "rowNumberFuzzerLeaf", - true, - exec::MemoryReclaimer::create())}; - std::shared_ptr writerPool_{rootPool_->addAggregateChild( - "rowNumberFuzzerWriter", - exec::MemoryReclaimer::create())}; - VectorFuzzer vectorFuzzer_; - std::unique_ptr referenceQueryRunner_; }; RowNumberFuzzer::RowNumberFuzzer( size_t initialSeed, std::unique_ptr referenceQueryRunner) - : vectorFuzzer_{getFuzzerOptions(), pool_.get()}, - referenceQueryRunner_{std::move(referenceQueryRunner)} { - filesystems::registerLocalFileSystem(); - dwrf::registerDwrfReaderFactory(); - - if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kPresto)) { - serializer::presto::PrestoVectorSerde::registerNamedVectorSerde(); - } - if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kCompactRow)) { - serializer::CompactRowVectorSerde::registerNamedVectorSerde(); - } - if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kUnsafeRow)) { - serializer::spark::UnsafeRowVectorSerde::registerNamedVectorSerde(); - } - - // Make sure not to run out of open file descriptors. - std::unordered_map hiveConfig = { - {connector::hive::HiveConfig::kNumCacheFileHandles, "1000"}}; - connector::registerConnectorFactory( - std::make_shared()); - auto hiveConnector = - connector::getConnectorFactory( - connector::hive::HiveConnectorFactory::kHiveConnectorName) - ->newConnector( - test::kHiveConnectorId, - std::make_shared(std::move(hiveConfig))); - connector::registerConnector(hiveConnector); - - seed(initialSeed); -} - -template -bool isDone(size_t i, T startTime) { - if (FLAGS_duration_sec > 0) { - std::chrono::duration elapsed = - std::chrono::system_clock::now() - startTime; - return elapsed.count() >= FLAGS_duration_sec; - } - return i >= FLAGS_steps; -} + : RowNumberFuzzerBase(initialSeed, std::move(referenceQueryRunner)) {} std::pair, std::vector> RowNumberFuzzer::generatePartitionKeys() { @@ -247,7 +104,7 @@ std::vector RowNumberFuzzer::generateInput( return input; } -RowNumberFuzzer::PlanWithSplits RowNumberFuzzer::makeDefaultPlan( +RowNumberFuzzerBase::PlanWithSplits RowNumberFuzzer::makeDefaultPlan( const std::vector& partitionKeys, const std::vector& input) { auto planNodeIdGenerator = std::make_shared(); @@ -261,76 +118,7 @@ RowNumberFuzzer::PlanWithSplits RowNumberFuzzer::makeDefaultPlan( return PlanWithSplits{std::move(plan)}; } -std::optional -RowNumberFuzzer::computeReferenceResults( - core::PlanNodePtr& plan, - const std::vector& input) { - if (test::containsUnsupportedTypes(input[0]->type())) { - return std::nullopt; - } - return referenceQueryRunner_->execute(plan).first; -} - -RowVectorPtr RowNumberFuzzer::execute( - const PlanWithSplits& plan, - bool injectSpill) { - LOG(INFO) << "Executing query plan: " << plan.plan->toString(true, true); - - test::AssertQueryBuilder builder(plan.plan); - if (!plan.splits.empty()) { - builder.splits(plan.splits); - } - - std::shared_ptr spillDirectory; - int32_t spillPct{0}; - if (injectSpill) { - spillDirectory = exec::test::TempDirectoryPath::create(); - const auto maxSpillLevel = - FLAGS_max_spill_level == -1 ? randInt(0, 7) : FLAGS_max_spill_level; - builder.config(core::QueryConfig::kSpillEnabled, true) - .config(core::QueryConfig::kMaxSpillLevel, maxSpillLevel) - .config(core::QueryConfig::kRowNumberSpillEnabled, true) - .spillDirectory(spillDirectory->getPath()); - spillPct = 10; - } - - test::ScopedOOMInjector oomInjector( - []() -> bool { return folly::Random::oneIn(10); }, - 10); // Check the condition every 10 ms. - if (FLAGS_enable_oom_injection) { - oomInjector.enable(); - } - - // Wait for the task to be destroyed before start next query execution to - // avoid the potential interference of the background activities across query - // executions. - auto stopGuard = - folly::makeGuard([&]() { test::waitForAllTasksToBeDeleted(); }); - - TestScopedSpillInjection scopedSpillInjection(spillPct); - RowVectorPtr result; - try { - result = builder.copyResults(pool_.get()); - } catch (VeloxRuntimeError& e) { - if (FLAGS_enable_oom_injection && - e.errorCode() == facebook::velox::error_code::kMemCapExceeded && - e.message() == test::ScopedOOMInjector::kErrorMessage) { - // If we enabled OOM injection we expect the exception thrown by the - // ScopedOOMInjector. - return nullptr; - } - - throw e; - } - - if (VLOG_IS_ON(1)) { - VLOG(1) << std::endl << result->toString(0, result->size()); - } - - return result; -} - -RowNumberFuzzer::PlanWithSplits RowNumberFuzzer::makePlanWithTableScan( +RowNumberFuzzerBase::PlanWithSplits RowNumberFuzzer::makePlanWithTableScan( const RowTypePtr& type, const std::vector& partitionKeys, const std::vector& splits) { @@ -347,31 +135,6 @@ RowNumberFuzzer::PlanWithSplits RowNumberFuzzer::makePlanWithTableScan( return PlanWithSplits{plan, splits}; } -bool isTableScanSupported(const TypePtr& type) { - if (type->kind() == TypeKind::ROW && type->size() == 0) { - return false; - } - if (type->kind() == TypeKind::UNKNOWN) { - return false; - } - if (type->kind() == TypeKind::HUGEINT) { - return false; - } - // Disable testing with TableScan when input contains TIMESTAMP type, due to - // the issue #8127. - if (type->kind() == TypeKind::TIMESTAMP) { - return false; - } - - for (auto i = 0; i < type->size(); ++i) { - if (!isTableScanSupported(type->childAt(i))) { - return false; - } - } - - return true; -} - void RowNumberFuzzer::addPlansWithTableScan( const std::string& tableDir, const std::vector& partitionKeys, @@ -389,24 +152,18 @@ void RowNumberFuzzer::addPlansWithTableScan( asRowType(input[0]->type()), partitionKeys, inputSplits)); } -void RowNumberFuzzer::verify() { +void RowNumberFuzzer::runSingleIteration() { const auto [keyNames, keyTypes] = generatePartitionKeys(); + const auto input = generateInput(keyNames, keyTypes); test::logVectors(input); auto defaultPlan = makeDefaultPlan(keyNames, input); - const auto expected = execute(defaultPlan, /*injectSpill=*/false); + const auto expected = + execute(defaultPlan, pool_, /*injectSpill=*/false, false); if (expected != nullptr) { - if (const auto referenceResult = - computeReferenceResults(defaultPlan.plan, input)) { - VELOX_CHECK( - test::assertEqualResults( - referenceResult.value(), - defaultPlan.plan->outputType(), - {expected}), - "Velox and Reference results don't match"); - } + validateExpectedResults(defaultPlan.plan, input, expected); } std::vector altPlans; @@ -416,64 +173,16 @@ void RowNumberFuzzer::verify() { addPlansWithTableScan(tableScanDir->getPath(), keyNames, input, altPlans); for (auto i = 0; i < altPlans.size(); ++i) { - LOG(INFO) << "Testing plan #" << i; - auto actual = execute(altPlans[i], /*injectSpill=*/false); - if (actual != nullptr && expected != nullptr) { - VELOX_CHECK( - test::assertEqualResults({expected}, {actual}), - "Logically equivalent plans produced different results"); - } else { - VELOX_CHECK( - FLAGS_enable_oom_injection, "Got unexpected nullptr for results"); - } - - if (FLAGS_enable_spill) { - LOG(INFO) << "Testing plan #" << i << " with spilling"; - actual = execute(altPlans[i], /*=injectSpill=*/true); - if (actual != nullptr && expected != nullptr) { - try { - VELOX_CHECK( - test::assertEqualResults({expected}, {actual}), - "Logically equivalent plans produced different results"); - } catch (const VeloxException&) { - LOG(ERROR) << "Expected\n" - << expected->toString(0, expected->size()) << "\nActual\n" - << actual->toString(0, actual->size()); - throw; - } - } else { - VELOX_CHECK( - FLAGS_enable_oom_injection, "Got unexpected nullptr for results"); - } - } + testPlan( + altPlans[i], i, expected, "core::QueryConfig::kRowNumberSpillEnabled"); } } -void RowNumberFuzzer::go() { - VELOX_USER_CHECK( - FLAGS_steps > 0 || FLAGS_duration_sec > 0, - "Either --steps or --duration_sec needs to be greater than zero."); - VELOX_USER_CHECK_GE(FLAGS_batch_size, 10, "Batch size must be at least 10."); - - const auto startTime = std::chrono::system_clock::now(); - size_t iteration = 0; - - while (!isDone(iteration, startTime)) { - LOG(INFO) << "==============================> Started iteration " - << iteration << " (seed: " << currentSeed_ << ")"; - verify(); - LOG(INFO) << "==============================> Done with iteration " - << iteration; - - reSeed(); - ++iteration; - } -} } // namespace void rowNumberFuzzer( size_t seed, std::unique_ptr referenceQueryRunner) { - RowNumberFuzzer(seed, std::move(referenceQueryRunner)).go(); + RowNumberFuzzer(seed, std::move(referenceQueryRunner)).run(); } } // namespace facebook::velox::exec diff --git a/velox/exec/fuzzer/RowNumberFuzzerBase.cpp b/velox/exec/fuzzer/RowNumberFuzzerBase.cpp new file mode 100644 index 00000000000..68101cdb587 --- /dev/null +++ b/velox/exec/fuzzer/RowNumberFuzzerBase.cpp @@ -0,0 +1,285 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/fuzzer/RowNumberFuzzerBase.h" + +#include +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/dwio/dwrf/RegisterDwrfReader.h" +#include "velox/exec/fuzzer/FuzzerUtil.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/serializers/CompactRowSerializer.h" +#include "velox/serializers/PrestoSerializer.h" +#include "velox/serializers/UnsafeRowSerializer.h" + +DEFINE_int32(steps, 10, "Number of plans to generate and test."); + +DEFINE_int32( + duration_sec, + 0, + "For how long it should run (in seconds). If zero, " + "it executes exactly --steps iterations and exits."); + +DEFINE_int32( + batch_size, + 100, + "The number of elements on each generated vector."); + +DEFINE_int32(num_batches, 10, "The number of generated vectors."); + +DEFINE_double( + null_ratio, + 0.1, + "Chance of adding a null value in a vector " + "(expressed as double from 0 to 1)."); + +DEFINE_bool(enable_spill, true, "Whether to test plans with spilling enabled."); + +DEFINE_int32( + max_spill_level, + -1, + "Max spill level, -1 means random [0, 7], otherwise the actual level."); + +DEFINE_bool( + enable_oom_injection, + false, + "When enabled OOMs will randomly be triggered while executing query " + "plans. The goal of this mode is to ensure unexpected exceptions " + "aren't thrown and the process isn't killed in the process of cleaning " + "up after failures. Therefore, results are not compared when this is " + "enabled. Note that this option only works in debug builds."); + +namespace facebook::velox::exec { + +RowNumberFuzzerBase::RowNumberFuzzerBase( + size_t initialSeed, + std::unique_ptr referenceQueryRunner) + : vectorFuzzer_{getFuzzerOptions(), pool_.get()}, + referenceQueryRunner_{std::move(referenceQueryRunner)} { + setupReadWrite(); + seed(initialSeed); +} + +void RowNumberFuzzerBase::setupReadWrite() { + filesystems::registerLocalFileSystem(); + dwrf::registerDwrfReaderFactory(); + + if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kPresto)) { + serializer::presto::PrestoVectorSerde::registerNamedVectorSerde(); + } + if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kCompactRow)) { + serializer::CompactRowVectorSerde::registerNamedVectorSerde(); + } + if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kUnsafeRow)) { + serializer::spark::UnsafeRowVectorSerde::registerNamedVectorSerde(); + } + + // Make sure not to run out of open file descriptors. + std::unordered_map hiveConfig = { + {connector::hive::HiveConfig::kNumCacheFileHandles, "1000"}}; + test::registerHiveConnector(hiveConfig); +} + +// Sometimes we generate zero-column input of type ROW({}) or a column of type +// UNKNOWN(). Such data cannot be written to a file and therefore cannot +// be tested with TableScan. +bool RowNumberFuzzerBase::isTableScanSupported(const TypePtr& type) { + if (type->kind() == TypeKind::ROW && type->size() == 0) { + return false; + } + if (type->kind() == TypeKind::UNKNOWN) { + return false; + } + if (type->kind() == TypeKind::HUGEINT) { + return false; + } + // Disable testing with TableScan when input contains TIMESTAMP type, due to + // the issue #8127. + if (type->kind() == TypeKind::TIMESTAMP) { + return false; + } + + for (auto i = 0; i < type->size(); ++i) { + if (!isTableScanSupported(type->childAt(i))) { + return false; + } + } + + return true; +} + +void RowNumberFuzzerBase::validateExpectedResults( + const core::PlanNodePtr& plan, + const std::vector& input, + const RowVectorPtr& result) { + if (!test::containsUnsupportedTypes(input[0]->type())) { + auto [referenceResult, status] = + test::computeReferenceResults(plan, referenceQueryRunner_.get()); + if (referenceResult.has_value()) { + VELOX_CHECK( + test::assertEqualResults( + referenceResult.value(), plan->outputType(), {result}), + "Velox and Reference results don't match"); + } + } +} + +template +bool isDone(size_t i, T startTime) { + if (FLAGS_duration_sec > 0) { + std::chrono::duration elapsed = + std::chrono::system_clock::now() - startTime; + return elapsed.count() >= FLAGS_duration_sec; + } + return i >= FLAGS_steps; +} + +void RowNumberFuzzerBase::run() { + VELOX_USER_CHECK( + FLAGS_steps > 0 || FLAGS_duration_sec > 0, + "Either --steps or --duration_sec needs to be greater than zero."); + VELOX_USER_CHECK_GE(FLAGS_batch_size, 10, "Batch size must be at least 10."); + + const auto startTime = std::chrono::system_clock::now(); + size_t iteration = 0; + + while (!isDone(iteration, startTime)) { + LOG(INFO) << "==============================> Started iteration " + << iteration << " (seed: " << currentSeed_ << ")"; + runSingleIteration(); + LOG(INFO) << "==============================> Done with iteration " + << iteration; + + reSeed(); + ++iteration; + } +} + +RowVectorPtr RowNumberFuzzerBase::execute( + const PlanWithSplits& plan, + const std::shared_ptr& pool, + bool injectSpill, + bool injectOOM, + const std::optional& spillConfig, + int maxSpillLevel) { + LOG(INFO) << "Executing query plan: " << plan.plan->toString(true, true); + + test::AssertQueryBuilder builder(plan.plan); + if (!plan.splits.empty()) { + builder.splits(plan.splits); + } + + int32_t spillPct{0}; + if (injectSpill) { + VELOX_CHECK( + spillConfig.has_value(), + "Spill config not set for execute with spilling"); + VELOX_CHECK_GE( + maxSpillLevel, 0, "Max spill should be set for execute with spilling"); + std::shared_ptr spillDirectory; + spillDirectory = exec::test::TempDirectoryPath::create(); + builder.config(core::QueryConfig::kSpillEnabled, true) + .config(core::QueryConfig::kMaxSpillLevel, maxSpillLevel) + .config(spillConfig.value(), true) + .spillDirectory(spillDirectory->getPath()); + spillPct = 10; + } + + test::ScopedOOMInjector oomInjector( + []() -> bool { return folly::Random::oneIn(10); }, + 10); // Check the condition every 10 ms. + if (injectOOM) { + oomInjector.enable(); + } + + // Wait for the task to be destroyed before start next query execution to + // avoid the potential interference of the background activities across query + // executions. + auto stopGuard = + folly::makeGuard([&]() { test::waitForAllTasksToBeDeleted(); }); + + TestScopedSpillInjection scopedSpillInjection(spillPct); + RowVectorPtr result; + try { + result = builder.copyResults(pool.get()); + } catch (VeloxRuntimeError& e) { + if (injectOOM && + e.errorCode() == facebook::velox::error_code::kMemCapExceeded && + e.message() == test::ScopedOOMInjector::kErrorMessage) { + // If we enabled OOM injection we expect the exception thrown by the + // ScopedOOMInjector. + return nullptr; + } + + throw e; + } + + if (VLOG_IS_ON(1)) { + VLOG(1) << std::endl << result->toString(0, result->size()); + } + + return result; +} + +void RowNumberFuzzerBase::testPlan( + const PlanWithSplits& plan, + int32_t testNumber, + const RowVectorPtr& expected, + const std::optional& spillConfig) { + LOG(INFO) << "Testing plan #" << testNumber; + + auto actual = + execute(plan, pool_, /*injectSpill=*/false, FLAGS_enable_oom_injection); + if (actual != nullptr && expected != nullptr) { + VELOX_CHECK( + test::assertEqualResults({expected}, {actual}), + "Logically equivalent plans produced different results"); + } else { + VELOX_CHECK( + FLAGS_enable_oom_injection, "Got unexpected nullptr for results"); + } + + if (FLAGS_enable_spill) { + LOG(INFO) << "Testing plan #" << testNumber << " with spilling"; + const auto fuzzMaxSpillLevel = + FLAGS_max_spill_level == -1 ? randInt(0, 7) : FLAGS_max_spill_level; + actual = execute( + plan, + pool_, + /*=injectSpill=*/true, + FLAGS_enable_oom_injection, + spillConfig, + fuzzMaxSpillLevel); + if (actual != nullptr && expected != nullptr) { + try { + VELOX_CHECK( + test::assertEqualResults({expected}, {actual}), + "Logically equivalent plans produced different results"); + } catch (const VeloxException&) { + LOG(ERROR) << "Expected\n" + << expected->toString(0, expected->size()) << "\nActual\n" + << actual->toString(0, actual->size()); + throw; + } + } else { + VELOX_CHECK( + FLAGS_enable_oom_injection, "Got unexpected nullptr for results"); + } + } +} + +} // namespace facebook::velox::exec diff --git a/velox/exec/fuzzer/RowNumberFuzzerBase.h b/velox/exec/fuzzer/RowNumberFuzzerBase.h new file mode 100644 index 00000000000..abae8551c55 --- /dev/null +++ b/velox/exec/fuzzer/RowNumberFuzzerBase.h @@ -0,0 +1,139 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "velox/common/fuzzer/Utils.h" +#include "velox/exec/Split.h" +#include "velox/exec/fuzzer/ReferenceQueryRunner.h" +#include "velox/vector/fuzzer/VectorFuzzer.h" + +DECLARE_int32(steps); + +DECLARE_int32(duration_sec); + +DECLARE_int32(batch_size); + +DECLARE_int32(num_batches); + +DECLARE_double(null_ratio); + +DECLARE_bool(enable_spill); + +DECLARE_int32(max_spill_level); + +DECLARE_bool(enable_oom_injection); + +namespace facebook::velox::exec { + +class RowNumberFuzzerBase { + public: + explicit RowNumberFuzzerBase( + size_t initialSeed, + std::unique_ptr); + + void run(); + + virtual ~RowNumberFuzzerBase() = default; + + protected: + bool isTableScanSupported(const TypePtr& type); + + // Runs one test iteration from query plans generations, executions and result + // verifications. + virtual void runSingleIteration() = 0; + + // Sets up the Dwrf reader/writer, serializers and Hive connector for the + // fuzzers. + void setupReadWrite(); + + static VectorFuzzer::Options getFuzzerOptions() { + VectorFuzzer::Options opts; + opts.vectorSize = FLAGS_batch_size; + opts.stringVariableLength = true; + opts.stringLength = 100; + opts.nullRatio = FLAGS_null_ratio; + return opts; + } + + void seed(size_t seed) { + currentSeed_ = seed; + vectorFuzzer_.reSeed(seed); + rng_.seed(currentSeed_); + } + + void reSeed() { + seed(rng_()); + } + + int32_t randInt(int32_t min, int32_t max) { + return fuzzer::rand(rng_, min, max); + } + + // Validates the plan with input and result with the reference query runner. + void validateExpectedResults( + const core::PlanNodePtr& plan, + const std::vector& input, + const RowVectorPtr& result); + + struct PlanWithSplits { + core::PlanNodePtr plan; + std::vector splits; + + explicit PlanWithSplits( + core::PlanNodePtr _plan, + const std::vector& _splits = {}) + : plan(std::move(_plan)), splits(_splits) {} + }; + + // Executes a plan with spilling and oom injection possibly. + RowVectorPtr execute( + const PlanWithSplits& plan, + const std::shared_ptr& pool, + bool injectSpill, + bool injectOOM, + const std::optional& spillConfig = std::nullopt, + int maxSpillLevel = -1); + + // Tests a plan by executing it with and without spilling. OOM injection + // also might be done based on FLAG_enable_oom_injection. + void testPlan( + const PlanWithSplits& plan, + int32_t testNumber, + const RowVectorPtr& expected, + const std::optional& spillConfig); + + FuzzerGenerator rng_; + size_t currentSeed_{0}; + + std::shared_ptr rootPool_{ + memory::memoryManager()->addRootPool( + "rowNumberFuzzer", + memory::kMaxMemory, + memory::MemoryReclaimer::create())}; + std::shared_ptr pool_{rootPool_->addLeafChild( + "rowNumberFuzzerLeaf", + true, + memory::MemoryReclaimer::create())}; + std::shared_ptr writerPool_{rootPool_->addAggregateChild( + "rowNumberFuzzerWriter", + memory::MemoryReclaimer::create())}; + VectorFuzzer vectorFuzzer_; + std::unique_ptr referenceQueryRunner_; +}; + +} // namespace facebook::velox::exec diff --git a/velox/exec/fuzzer/TopNRowNumberFuzzer.cpp b/velox/exec/fuzzer/TopNRowNumberFuzzer.cpp new file mode 100644 index 00000000000..7d658aa6888 --- /dev/null +++ b/velox/exec/fuzzer/TopNRowNumberFuzzer.cpp @@ -0,0 +1,278 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/fuzzer/TopNRowNumberFuzzer.h" + +#include + +#include "velox/exec/fuzzer/FuzzerUtil.h" +#include "velox/exec/fuzzer/RowNumberFuzzerBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/vector/tests/utils/VectorMaker.h" + +namespace facebook::velox::exec { +namespace { + +class TopNRowNumberFuzzer : public RowNumberFuzzerBase { + public: + explicit TopNRowNumberFuzzer( + size_t initialSeed, + std::unique_ptr); + + private: + // Runs one test iteration from query plans generations, executions and result + // verifications. + void runSingleIteration() override; + + std::pair, std::vector> generateKeys( + const std::string& prefix); + + std::vector generateInput( + const std::vector& keyNames, + const std::vector& keyTypes, + const std::vector& partitionKeys, + const std::vector& sortingKeys); + + // Makes the query plan with default settings in TopNRowNumberFuzzer. + std::pair makeDefaultPlan( + const std::vector& partitionKeys, + const std::vector& sortKeys, + const std::vector& allKeys, + const std::vector& input); + + PlanWithSplits makePlanWithTableScan( + const std::vector& partitionKeys, + const std::vector& sortKeys, + const std::vector& allKeys, + int limit, + const std::vector& input, + const std::string& tableDir); +}; + +TopNRowNumberFuzzer::TopNRowNumberFuzzer( + size_t initialSeed, + std::unique_ptr referenceQueryRunner) + : RowNumberFuzzerBase(initialSeed, std::move(referenceQueryRunner)) {} + +std::pair, std::vector> +TopNRowNumberFuzzer::generateKeys(const std::string& prefix) { + static const std::vector kKeyTypes{ + BOOLEAN(), + TINYINT(), + SMALLINT(), + INTEGER(), + BIGINT(), + VARCHAR(), + DATE(), + REAL(), + DOUBLE(), + }; + + auto numKeys = randInt(1, 5); + std::vector keys; + std::vector types; + for (auto i = 0; i < numKeys; ++i) { + keys.push_back(fmt::format("{}{}", prefix, i)); + types.push_back(vectorFuzzer_.randOrderableType(kKeyTypes, 1)); + } + + return std::make_pair(keys, types); +} + +std::vector TopNRowNumberFuzzer::generateInput( + const std::vector& keyNames, + const std::vector& keyTypes, + const std::vector& partitionKeys, + const std::vector& sortingKeys) { + std::vector input; + vector_size_t size = vectorFuzzer_.getOptions().vectorSize; + velox::test::VectorMaker vectorMaker{pool_.get()}; + + std::unordered_set partitionKeySet{ + partitionKeys.begin(), partitionKeys.end()}; + std::unordered_set sortingKeySet{ + sortingKeys.begin(), sortingKeys.end()}; + + int64_t rowNumber = 0; + for (auto j = 0; j < FLAGS_num_batches; ++j) { + std::vector children; + + // Rank functions have semantics influenced by "peer" rows. Peer rows are + // rows in the same partition having the same order by + // key. In rank and dense_rank functions, peer rows have the same function + // result value. This code influences the fuzzer to generate such data. + // + // To build such rows the code separates the notions of "peer" groups and + // "partition" groups during data generation. A number of peers are chosen + // between (1, size) of the input. Rows with the same peer number have the + // same order by keys. This means that there are sets of rows in the input + // data which will have the same order by key. + // + // Each peer is then mapped to a partition group. Rows in the same partition + // group have the same partition keys. So a partition can contain a group of + // rows with the same order by key and there can be multiple such groups + // (each with different order by keys) in one partition. + // + // This style of data generation is preferable for ranking functions. + auto numPeerGroups = size ? randInt(1, size) : 1; + auto sortingIndices = vectorFuzzer_.fuzzIndices(size, numPeerGroups); + auto rawSortingIndices = sortingIndices->as(); + auto sortingNulls = vectorFuzzer_.fuzzNulls(size); + + auto numPartitions = randInt(1, numPeerGroups); + auto peerGroupToPartitionIndices = + vectorFuzzer_.fuzzIndices(numPeerGroups, numPartitions); + auto rawPeerGroupToPartitionIndices = + peerGroupToPartitionIndices->as(); + auto partitionIndices = + AlignedBuffer::allocate(size, pool_.get()); + auto rawPartitionIndices = partitionIndices->asMutable(); + auto partitionNulls = vectorFuzzer_.fuzzNulls(size); + for (auto i = 0; i < size; i++) { + auto peerGroup = rawSortingIndices[i]; + rawPartitionIndices[i] = rawPeerGroupToPartitionIndices[peerGroup]; + } + + for (auto i = 0; i < keyTypes.size() - 1; ++i) { + if (partitionKeySet.find(keyNames[i]) != partitionKeySet.end()) { + // The partition keys are built with a dictionary over a smaller set of + // values. This is done to introduce some repetition of key values for + // windowing. + auto baseVector = vectorFuzzer_.fuzz(keyTypes[i], numPartitions); + children.push_back(BaseVector::wrapInDictionary( + partitionNulls, partitionIndices, size, baseVector)); + } else if (sortingKeySet.find(keyNames[i]) != sortingKeySet.end()) { + auto baseVector = vectorFuzzer_.fuzz(keyTypes[i], numPeerGroups); + children.push_back(BaseVector::wrapInDictionary( + sortingNulls, sortingIndices, size, baseVector)); + } else { + children.push_back(vectorFuzzer_.fuzz(keyTypes[i], size)); + } + } + children.push_back(vectorMaker.flatVector( + size, [&](auto /*row*/) { return rowNumber++; })); + input.push_back(vectorMaker.rowVector(keyNames, children)); + } + + return input; +} + +std::pair +TopNRowNumberFuzzer::makeDefaultPlan( + const std::vector& partitionKeys, + const std::vector& sortKeys, + const std::vector& allKeys, + const std::vector& input) { + auto planNodeIdGenerator = std::make_shared(); + std::vector projectFields = allKeys; + projectFields.emplace_back("row_number"); + + int32_t limit = randInt(1, FLAGS_batch_size); + auto plan = test::PlanBuilder() + .values(input) + .topNRowNumber(partitionKeys, sortKeys, limit, true) + .project(projectFields) + .planNode(); + return std::make_pair(PlanWithSplits{std::move(plan)}, limit); +} + +RowNumberFuzzerBase::PlanWithSplits TopNRowNumberFuzzer::makePlanWithTableScan( + const std::vector& partitionKeys, + const std::vector& sortKeys, + const std::vector& allKeys, + int limit, + const std::vector& input, + const std::string& tableDir) { + VELOX_CHECK(!tableDir.empty()); + + std::vector projectFields = allKeys; + projectFields.emplace_back("row_number"); + + auto planNodeIdGenerator = std::make_shared(); + auto plan = test::PlanBuilder(planNodeIdGenerator) + .tableScan(asRowType(input[0]->type())) + .topNRowNumber(partitionKeys, sortKeys, limit, true) + .project(projectFields) + .planNode(); + + const std::vector splits = test::makeSplits( + input, fmt::format("{}/topn_row_number", tableDir), writerPool_); + return PlanWithSplits{plan, splits}; +} + +void TopNRowNumberFuzzer::runSingleIteration() { + const auto [partitionKeys, partitionTypes] = generateKeys("p"); + const auto [sortKeys, sortTypes] = generateKeys("s"); + + std::vector allSortKeys; + std::vector allSortTypes; + allSortKeys.insert(allSortKeys.begin(), sortKeys.begin(), sortKeys.end()); + allSortTypes.insert(allSortTypes.begin(), sortTypes.begin(), sortTypes.end()); + allSortKeys.push_back("row_id"); + allSortTypes.push_back(INTEGER()); + + std::vector allKeys; + std::vector allTypes; + allKeys.insert(allKeys.begin(), partitionKeys.begin(), partitionKeys.end()); + allTypes.insert( + allTypes.begin(), partitionTypes.begin(), partitionTypes.end()); + allKeys.insert(allKeys.end(), allSortKeys.begin(), allSortKeys.end()); + allTypes.insert(allTypes.end(), allSortTypes.begin(), allSortTypes.end()); + + const auto input = generateInput(allKeys, allTypes, partitionKeys, sortKeys); + test::logVectors(input); + + auto [defaultPlan, limit] = + makeDefaultPlan(partitionKeys, allSortKeys, allKeys, input); + + const auto expected = + execute(defaultPlan, pool_, /*injectSpill=*/false, false); + if (expected != nullptr) { + validateExpectedResults(defaultPlan.plan, input, expected); + } + + std::vector altPlans; + altPlans.push_back(std::move(defaultPlan)); + + const auto tableScanDir = exec::test::TempDirectoryPath::create(); + if (isTableScanSupported(input[0]->type())) { + altPlans.push_back(makePlanWithTableScan( + partitionKeys, + allSortKeys, + allKeys, + limit, + input, + tableScanDir->getPath())); + } + + for (auto i = 0; i < altPlans.size(); ++i) { + testPlan( + altPlans[i], + i, + expected, + "core::QueryConfig::kTopNRowNumberSpillEnabled"); + } +} + +} // namespace + +void topNRowNumberFuzzer( + size_t seed, + std::unique_ptr referenceQueryRunner) { + TopNRowNumberFuzzer(seed, std::move(referenceQueryRunner)).run(); +} +} // namespace facebook::velox::exec diff --git a/velox/exec/fuzzer/TopNRowNumberFuzzer.h b/velox/exec/fuzzer/TopNRowNumberFuzzer.h new file mode 100644 index 00000000000..eab13cdee14 --- /dev/null +++ b/velox/exec/fuzzer/TopNRowNumberFuzzer.h @@ -0,0 +1,25 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include "velox/exec/fuzzer/ReferenceQueryRunner.h" + +namespace facebook::velox::exec { +void topNRowNumberFuzzer( + size_t seed, + std::unique_ptr referenceQueryRunner); +} diff --git a/velox/exec/fuzzer/TopNRowNumberFuzzerRunner.cpp b/velox/exec/fuzzer/TopNRowNumberFuzzerRunner.cpp new file mode 100644 index 00000000000..b93b49be367 --- /dev/null +++ b/velox/exec/fuzzer/TopNRowNumberFuzzerRunner.cpp @@ -0,0 +1,98 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "velox/common/memory/SharedArbitrator.h" +#include "velox/exec/fuzzer/DuckQueryRunner.h" +#include "velox/exec/fuzzer/FuzzerUtil.h" +#include "velox/exec/fuzzer/ReferenceQueryRunner.h" +#include "velox/exec/fuzzer/TopNRowNumberFuzzer.h" + +/// TopNRowNumberFuzzerRunner leverages TopNRowNumberFuzzer and VectorFuzzer to +/// automatically generate and execute tests. It works as follows: +/// +/// 1. Plan Generation: Generate two equivalent query plans, one is +/// topn-row-number over ValuesNode and the other is over TableScanNode. +/// 2. Executes a variety of logically equivalent query plans and checks the +/// results are the same. +/// 3. Rinse and repeat. +/// +/// It is used as follows: +/// +/// $ ./velox_topn_row_number_fuzzer_test --duration_sec 600 +/// +/// The flags that configure TopNRowNumberFuzzer's behavior are: +/// +/// --steps: how many iterations to run. +/// --duration_sec: alternatively, for how many seconds it should run (takes +/// precedence over --steps). +/// --seed: pass a deterministic seed to reproduce the behavior (each iteration +/// will print a seed as part of the logs). +/// --v=1: verbose logging; print a lot more details about the execution. +/// --batch_size: size of input vector batches generated. +/// --num_batches: number of input vector batches to generate. +/// --enable_spill: test plans with spilling enabled. +/// --enable_oom_injection: randomly trigger OOM while executing query plans. +/// e.g: +/// +/// $ ./velox_topn_row_number_fuzzer_test \ +/// --seed 123 \ +/// --duration_sec 600 \ +/// --v=1 + +DEFINE_int64( + seed, + 0, + "Initial seed for random number generator used to reproduce previous " + "results (0 means start with random seed)."); + +DEFINE_string( + presto_url, + "", + "Presto coordinator URI along with port. If set, we use Presto " + "source of truth. Otherwise, use DuckDB. Example: " + "--presto_url=http://127.0.0.1:8080"); + +DEFINE_uint32( + req_timeout_ms, + 1000, + "Timeout in milliseconds for HTTP requests made to reference DB, " + "such as Presto. Example: --req_timeout_ms=2000"); + +DEFINE_int64(allocator_capacity, 8L << 30, "Allocator capacity in bytes."); + +DEFINE_int64(arbitrator_capacity, 6L << 30, "Arbitrator capacity in bytes."); + +using namespace facebook::velox; + +int main(int argc, char** argv) { + // Calls common init functions in the necessary order, initializing + // singletons, installing proper signal handlers for better debugging + // experience, and initialize glog and gflags. + folly::Init init(&argc, &argv); + exec::test::setupMemory(FLAGS_allocator_capacity, FLAGS_arbitrator_capacity); + std::shared_ptr rootPool{ + memory::memoryManager()->addRootPool()}; + auto referenceQueryRunner = exec::test::setupReferenceQueryRunner( + rootPool.get(), + FLAGS_presto_url, + "topn_row_number_fuzzer", + FLAGS_req_timeout_ms); + const size_t initialSeed = FLAGS_seed == 0 ? std::time(nullptr) : FLAGS_seed; + exec::topNRowNumberFuzzer(initialSeed, std::move(referenceQueryRunner)); +}