From 8f2b973a11fd74e13827bb8d5fbfc1ddee9d1058 Mon Sep 17 00:00:00 2001 From: Amit Dutta Date: Tue, 13 Aug 2024 16:35:32 -0700 Subject: [PATCH] [native] Add a sample plan validator and e2e tests. --- .../presto_cpp/main/PrestoServer.cpp | 4 +- .../presto_cpp/main/PrestoServer.h | 6 +-- .../presto_cpp/main/TaskResource.h | 6 +-- .../presto_cpp/main/common/Configs.cpp | 1 + .../presto_cpp/main/common/Configs.h | 2 + .../presto_cpp/main/tests/TaskManagerTest.cpp | 3 +- .../presto_cpp/main/types/CMakeLists.txt | 5 +- .../main/types/VeloxPlanValidator.cpp | 49 +++++++++++++++++++ ...oxPlanValidator.h => VeloxPlanValidator.h} | 6 +-- .../AbstractTestNativePlanValidation.java | 29 +++++++++++ .../PrestoNativeQueryRunnerUtils.java | 33 +++++++++---- .../TestNativePlanValidation.java} | 16 +++--- 12 files changed, 127 insertions(+), 33 deletions(-) create mode 100644 presto-native-execution/presto_cpp/main/types/VeloxPlanValidator.cpp rename presto-native-execution/presto_cpp/main/types/{PrestoToVeloxPlanValidator.h => VeloxPlanValidator.h} (82%) create mode 100644 presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativePlanValidation.java rename presto-native-execution/{presto_cpp/main/types/PrestoToVeloxPlanValidator.cpp => src/test/java/com/facebook/presto/nativeworker/TestNativePlanValidation.java} (62%) diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 21384cf218b52..f25362a658aa7 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -1313,10 +1313,10 @@ void PrestoServer::enableWorkerStatsReporting() { void PrestoServer::initPrestoToVeloxPlanValidator() { VELOX_CHECK_NULL(planValidator_); - planValidator_ = std::make_shared(); + planValidator_ = std::make_shared(); } -PrestoToVeloxPlanValidator* PrestoServer::getPlanValidator() { +VeloxPlanValidator* PrestoServer::getPlanValidator() { return planValidator_.get(); } diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.h b/presto-native-execution/presto_cpp/main/PrestoServer.h index f74b7dc65e72c..f2e03056cd08e 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.h +++ b/presto-native-execution/presto_cpp/main/PrestoServer.h @@ -25,7 +25,7 @@ #include "presto_cpp/main/PeriodicHeartbeatManager.h" #include "presto_cpp/main/PrestoExchangeSource.h" #include "presto_cpp/main/PrestoServerOperations.h" -#include "presto_cpp/main/types/PrestoToVeloxPlanValidator.h" +#include "presto_cpp/main/types/VeloxPlanValidator.h" #include "velox/common/caching/AsyncDataCache.h" #include "velox/common/memory/MemoryAllocator.h" #if __has_include("filesystem") @@ -184,7 +184,7 @@ class PrestoServer { /// Invoked to initialize Presto to Velox plan validator. virtual void initPrestoToVeloxPlanValidator(); - PrestoToVeloxPlanValidator* getPlanValidator(); + VeloxPlanValidator* getPlanValidator(); /// Invoked to get the list of filters passed to the http server. std::vector> @@ -246,7 +246,7 @@ class PrestoServer { // Executor for spilling. std::shared_ptr spillerExecutor_; - std::shared_ptr planValidator_; + std::shared_ptr planValidator_; std::unique_ptr exchangeSourceConnectionPool_; diff --git a/presto-native-execution/presto_cpp/main/TaskResource.h b/presto-native-execution/presto_cpp/main/TaskResource.h index 2fbe74d4af369..048588ff8b3f8 100644 --- a/presto-native-execution/presto_cpp/main/TaskResource.h +++ b/presto-native-execution/presto_cpp/main/TaskResource.h @@ -15,7 +15,7 @@ #include "presto_cpp/main/TaskManager.h" #include "presto_cpp/main/http/HttpServer.h" -#include "presto_cpp/main/types/PrestoToVeloxPlanValidator.h" +#include "presto_cpp/main/types/VeloxPlanValidator.h" #include "velox/common/memory/Memory.h" namespace facebook::presto { @@ -25,7 +25,7 @@ class TaskResource { explicit TaskResource( velox::memory::MemoryPool* pool, folly::Executor* httpSrvCpuExecutor, - PrestoToVeloxPlanValidator* planValidator, + VeloxPlanValidator* planValidator, TaskManager& taskManager) : httpSrvCpuExecutor_(httpSrvCpuExecutor), pool_{pool}, @@ -100,7 +100,7 @@ class TaskResource { folly::Executor* const httpSrvCpuExecutor_; velox::memory::MemoryPool* const pool_; - PrestoToVeloxPlanValidator* const planValidator_; + VeloxPlanValidator* const planValidator_; TaskManager& taskManager_; }; diff --git a/presto-native-execution/presto_cpp/main/common/Configs.cpp b/presto-native-execution/presto_cpp/main/common/Configs.cpp index fc6b4e366eca4..50e1bb52cb291 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.cpp +++ b/presto-native-execution/presto_cpp/main/common/Configs.cpp @@ -234,6 +234,7 @@ SystemConfig::SystemConfig() { STR_PROP(kCacheVeloxTtlThreshold, "2d"), STR_PROP(kCacheVeloxTtlCheckInterval, "1h"), BOOL_PROP(kEnableRuntimeMetricsCollection, false), + BOOL_PROP(kPlanValidatorFailOnNestedLoopJoin, false), }; } diff --git a/presto-native-execution/presto_cpp/main/common/Configs.h b/presto-native-execution/presto_cpp/main/common/Configs.h index 74fa387ef6d1e..b4b233a987c49 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.h +++ b/presto-native-execution/presto_cpp/main/common/Configs.h @@ -639,6 +639,8 @@ class SystemConfig : public ConfigBase { static constexpr std::string_view kSinkMaxBufferSize{"sink.max-buffer-size"}; static constexpr std::string_view kDriverMaxPagePartitioningBufferSize{ "driver.max-page-partitioning-buffer-size"}; + static constexpr std::string_view kPlanValidatorFailOnNestedLoopJoin{ + "velox-plan-validator-fail-on-nested-loop-join"}; SystemConfig(); diff --git a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp index d940fb033f35a..1da6d2d482fbd 100644 --- a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp @@ -247,8 +247,7 @@ class TaskManagerTest : public testing::Test { taskManager_ = std::make_unique( driverExecutor_.get(), httpSrvCpuExecutor_.get(), nullptr); - auto validator = - std::make_shared(); + auto validator = std::make_shared(); taskResource_ = std::make_unique( leafPool_.get(), httpSrvCpuExecutor_.get(), diff --git a/presto-native-execution/presto_cpp/main/types/CMakeLists.txt b/presto-native-execution/presto_cpp/main/types/CMakeLists.txt index 167d1c8258634..ecdd1bb30b7ff 100644 --- a/presto-native-execution/presto_cpp/main/types/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/types/CMakeLists.txt @@ -16,9 +16,8 @@ target_link_libraries(presto_type_converter velox_type_parser) add_library( presto_types OBJECT - PrestoToVeloxQueryPlan.cpp PrestoToVeloxExpr.cpp - PrestoToVeloxPlanValidator.cpp PrestoToVeloxSplit.cpp - PrestoToVeloxConnector.cpp) + PrestoToVeloxQueryPlan.cpp PrestoToVeloxExpr.cpp VeloxPlanValidator.cpp + PrestoToVeloxSplit.cpp PrestoToVeloxConnector.cpp) add_dependencies(presto_types presto_operators presto_type_converter velox_type velox_type_fbhive velox_dwio_dwrf_proto) diff --git a/presto-native-execution/presto_cpp/main/types/VeloxPlanValidator.cpp b/presto-native-execution/presto_cpp/main/types/VeloxPlanValidator.cpp new file mode 100644 index 0000000000000..1adba985c1cbb --- /dev/null +++ b/presto-native-execution/presto_cpp/main/types/VeloxPlanValidator.cpp @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "presto_cpp/main/types/VeloxPlanValidator.h" +#include "presto_cpp/main/common/Configs.h" + +namespace facebook::presto { +bool planHasNestedJoinLoop(const velox::core::PlanNodePtr planNode) { + if (auto joinNode = + std::dynamic_pointer_cast( + planNode)) { + return true; + } + + for (auto plan : planNode->sources()) { + if (planHasNestedJoinLoop(plan)) { + return true; + } + } + + return false; +} + +void VeloxPlanValidator::validatePlanFragment( + const velox::core::PlanFragment& fragment) { + const auto failOnNestedLoopJoin = + SystemConfig::instance() + ->optionalProperty( + SystemConfig::kPlanValidatorFailOnNestedLoopJoin) + .value_or(false); + if (failOnNestedLoopJoin) { + VELOX_CHECK( + !planHasNestedJoinLoop(fragment.planNode), + "Velox plan uses nested join loop which isn't supported."); + } +} + +} // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxPlanValidator.h b/presto-native-execution/presto_cpp/main/types/VeloxPlanValidator.h similarity index 82% rename from presto-native-execution/presto_cpp/main/types/PrestoToVeloxPlanValidator.h rename to presto-native-execution/presto_cpp/main/types/VeloxPlanValidator.h index 3ee4f6a35d5aa..810dc9c565aac 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxPlanValidator.h +++ b/presto-native-execution/presto_cpp/main/types/VeloxPlanValidator.h @@ -15,9 +15,9 @@ #include "velox/core/PlanFragment.h" namespace facebook::presto { -class PrestoToVeloxPlanValidator { +class VeloxPlanValidator { public: - virtual bool validatePlanFragment(const velox::core::PlanFragment& fragment); - virtual ~PrestoToVeloxPlanValidator() = default; + virtual void validatePlanFragment(const velox::core::PlanFragment& fragment); + virtual ~VeloxPlanValidator() = default; }; } // namespace facebook::presto diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativePlanValidation.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativePlanValidation.java new file mode 100644 index 0000000000000..f207385e9c3bd --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativePlanValidation.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker; + +import com.facebook.presto.tests.AbstractTestQueryFramework; +import org.testng.annotations.Test; + +public abstract class AbstractTestNativePlanValidation + extends AbstractTestQueryFramework +{ + @Test + public void testNestedLoopJoinPlainValidationFailure() + { + assertQueryFails( + "SELECT EXISTS(SELECT 1 WHERE l.orderkey > 0 OR l.orderkey != 3) " + + "FROM lineitem l LIMIT 1", ".*Plan uses nested join loop which isn't supported.*"); + } +} diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java index b4b8dba60eca5..69bb2433b3f18 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java @@ -88,7 +88,7 @@ public static QueryRunner createQueryRunner( defaultQueryRunner.close(); - return createNativeQueryRunner(dataDirectory.get().toString(), prestoServerPath.get(), workerCount, cacheMaxSize, true, Optional.empty(), storageFormat, addStorageFormatToPath); + return createNativeQueryRunner(dataDirectory.get().toString(), prestoServerPath.get(), workerCount, cacheMaxSize, true, Optional.empty(), storageFormat, addStorageFormatToPath, false); } public static QueryRunner createJavaQueryRunner() @@ -251,7 +251,7 @@ public static QueryRunner createNativeIcebergQueryRunner( false, false, OptionalInt.of(workerCount.orElse(4)), - getExternalWorkerLauncher("iceberg", prestoServerPath, cacheMaxSize, remoteFunctionServerUds), + getExternalWorkerLauncher("iceberg", prestoServerPath, cacheMaxSize, remoteFunctionServerUds, false), dataDirectory, addStorageFormatToPath); } @@ -264,7 +264,8 @@ public static QueryRunner createNativeQueryRunner( boolean useThrift, Optional remoteFunctionServerUds, String storageFormat, - boolean addStorageFormatToPath) + boolean addStorageFormatToPath, + Boolean failOnNestedLoopJoin) throws Exception { // The property "hive.allow-drop-table" needs to be set to true because security is always "legacy" in NativeQueryRunner. @@ -287,7 +288,7 @@ public static QueryRunner createNativeQueryRunner( hiveProperties, workerCount, Optional.of(Paths.get(addStorageFormatToPath ? dataDirectory + "/" + storageFormat : dataDirectory)), - getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, remoteFunctionServerUds)); + getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, remoteFunctionServerUds, failOnNestedLoopJoin)); } public static QueryRunner createNativeCteQueryRunner(boolean useThrift, String storageFormat) @@ -330,13 +331,13 @@ public static QueryRunner createNativeCteQueryRunner(boolean useThrift, String s hiveProperties, workerCount, Optional.of(Paths.get(addStorageFormatToPath ? dataDirectory + "/" + storageFormat : dataDirectory)), - getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, Optional.empty())); + getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, Optional.empty(), false)); } public static QueryRunner createNativeQueryRunner(String remoteFunctionServerUds) throws Exception { - return createNativeQueryRunner(false, DEFAULT_STORAGE_FORMAT, Optional.ofNullable(remoteFunctionServerUds)); + return createNativeQueryRunner(false, DEFAULT_STORAGE_FORMAT, Optional.ofNullable(remoteFunctionServerUds), false); } public static QueryRunner createNativeQueryRunner(boolean useThrift) @@ -345,13 +346,19 @@ public static QueryRunner createNativeQueryRunner(boolean useThrift) return createNativeQueryRunner(useThrift, DEFAULT_STORAGE_FORMAT); } + public static QueryRunner createNativeQueryRunner(boolean useThrift, boolean failOnNestedLoopJoin) + throws Exception + { + return createNativeQueryRunner(useThrift, DEFAULT_STORAGE_FORMAT, Optional.empty(), failOnNestedLoopJoin); + } + public static QueryRunner createNativeQueryRunner(boolean useThrift, String storageFormat) throws Exception { - return createNativeQueryRunner(useThrift, storageFormat, Optional.empty()); + return createNativeQueryRunner(useThrift, storageFormat, Optional.empty(), false); } - public static QueryRunner createNativeQueryRunner(boolean useThrift, String storageFormat, Optional remoteFunctionServerUds) + public static QueryRunner createNativeQueryRunner(boolean useThrift, String storageFormat, Optional remoteFunctionServerUds, Boolean failOnNestedLoopJoin) throws Exception { int cacheMaxSize = 0; @@ -364,7 +371,8 @@ public static QueryRunner createNativeQueryRunner(boolean useThrift, String stor useThrift, remoteFunctionServerUds, storageFormat, - true); + true, + failOnNestedLoopJoin); } // Start the remote function server. Return the UDS path used to communicate with it. @@ -411,7 +419,7 @@ public static NativeQueryRunnerParameters getNativeQueryRunnerParameters() return new NativeQueryRunnerParameters(prestoServerPath, dataDirectory, workerCount); } - public static Optional> getExternalWorkerLauncher(String catalogName, String prestoServerPath, int cacheMaxSize, Optional remoteFunctionServerUds) + public static Optional> getExternalWorkerLauncher(String catalogName, String prestoServerPath, int cacheMaxSize, Optional remoteFunctionServerUds, Boolean failOnNestedLoopJoin) { return Optional.of((workerIndex, discoveryUri) -> { @@ -436,6 +444,11 @@ public static Optional> getExternalWorkerLaunc "remote-function-server.serde=presto_page%n" + "remote-function-server.signature.files.directory.path=%s%n", configProperties, REMOTE_FUNCTION_CATALOG_NAME, remoteFunctionServerUds.get(), jsonSignaturesPath); } + + if (failOnNestedLoopJoin) { + configProperties = format("%s%n" + "velox-plan-validator-fail-on-nested-loop-join=true%n", configProperties); + } + Files.write(tempDirectoryPath.resolve("config.properties"), configProperties.getBytes()); Files.write(tempDirectoryPath.resolve("node.properties"), format("node.id=%s%n" + diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxPlanValidator.cpp b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestNativePlanValidation.java similarity index 62% rename from presto-native-execution/presto_cpp/main/types/PrestoToVeloxPlanValidator.cpp rename to presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestNativePlanValidation.java index 4ef9a8179aefb..707d336caf595 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxPlanValidator.cpp +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestNativePlanValidation.java @@ -11,12 +11,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package com.facebook.presto.nativeworker; +import com.facebook.presto.testing.QueryRunner; -#include "presto_cpp/main/types/PrestoToVeloxPlanValidator.h" - -namespace facebook::presto { -bool PrestoToVeloxPlanValidator::validatePlanFragment( - const velox::core::PlanFragment& fragment) { - return true; +public class TestNativePlanValidation + extends AbstractTestNativePlanValidation +{ + protected QueryRunner createQueryRunner() throws Exception + { + return PrestoNativeQueryRunnerUtils.createNativeQueryRunner(false, true); + } } -} // namespace facebook::presto