Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1313,10 +1313,10 @@ void PrestoServer::enableWorkerStatsReporting() {

void PrestoServer::initPrestoToVeloxPlanValidator() {
VELOX_CHECK_NULL(planValidator_);
planValidator_ = std::make_shared<PrestoToVeloxPlanValidator>();
planValidator_ = std::make_shared<VeloxPlanValidator>();
}

PrestoToVeloxPlanValidator* PrestoServer::getPlanValidator() {
VeloxPlanValidator* PrestoServer::getPlanValidator() {
return planValidator_.get();
}

Expand Down
6 changes: 3 additions & 3 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "presto_cpp/main/PeriodicHeartbeatManager.h"
#include "presto_cpp/main/PrestoExchangeSource.h"
#include "presto_cpp/main/PrestoServerOperations.h"
#include "presto_cpp/main/types/PrestoToVeloxPlanValidator.h"
#include "presto_cpp/main/types/VeloxPlanValidator.h"
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/memory/MemoryAllocator.h"
#if __has_include("filesystem")
Expand Down Expand Up @@ -184,7 +184,7 @@ class PrestoServer {
/// Invoked to initialize Presto to Velox plan validator.
virtual void initPrestoToVeloxPlanValidator();

PrestoToVeloxPlanValidator* getPlanValidator();
VeloxPlanValidator* getPlanValidator();

/// Invoked to get the list of filters passed to the http server.
std::vector<std::unique_ptr<proxygen::RequestHandlerFactory>>
Expand Down Expand Up @@ -246,7 +246,7 @@ class PrestoServer {
// Executor for spilling.
std::shared_ptr<folly::CPUThreadPoolExecutor> spillerExecutor_;

std::shared_ptr<PrestoToVeloxPlanValidator> planValidator_;
std::shared_ptr<VeloxPlanValidator> planValidator_;

std::unique_ptr<http::HttpClientConnectionPool> exchangeSourceConnectionPool_;

Expand Down
6 changes: 3 additions & 3 deletions presto-native-execution/presto_cpp/main/TaskResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

#include "presto_cpp/main/TaskManager.h"
#include "presto_cpp/main/http/HttpServer.h"
#include "presto_cpp/main/types/PrestoToVeloxPlanValidator.h"
#include "presto_cpp/main/types/VeloxPlanValidator.h"
#include "velox/common/memory/Memory.h"

namespace facebook::presto {
Expand All @@ -25,7 +25,7 @@ class TaskResource {
explicit TaskResource(
velox::memory::MemoryPool* pool,
folly::Executor* httpSrvCpuExecutor,
PrestoToVeloxPlanValidator* planValidator,
VeloxPlanValidator* planValidator,
TaskManager& taskManager)
: httpSrvCpuExecutor_(httpSrvCpuExecutor),
pool_{pool},
Expand Down Expand Up @@ -100,7 +100,7 @@ class TaskResource {

folly::Executor* const httpSrvCpuExecutor_;
velox::memory::MemoryPool* const pool_;
PrestoToVeloxPlanValidator* const planValidator_;
VeloxPlanValidator* const planValidator_;

TaskManager& taskManager_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ SystemConfig::SystemConfig() {
STR_PROP(kCacheVeloxTtlThreshold, "2d"),
STR_PROP(kCacheVeloxTtlCheckInterval, "1h"),
BOOL_PROP(kEnableRuntimeMetricsCollection, false),
BOOL_PROP(kPlanValidatorFailOnNestedLoopJoin, false),
};
}

Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,8 @@ class SystemConfig : public ConfigBase {
static constexpr std::string_view kSinkMaxBufferSize{"sink.max-buffer-size"};
static constexpr std::string_view kDriverMaxPagePartitioningBufferSize{
"driver.max-page-partitioning-buffer-size"};
static constexpr std::string_view kPlanValidatorFailOnNestedLoopJoin{
"velox-plan-validator-fail-on-nested-loop-join"};

SystemConfig();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,7 @@ class TaskManagerTest : public testing::Test {
taskManager_ = std::make_unique<TaskManager>(
driverExecutor_.get(), httpSrvCpuExecutor_.get(), nullptr);

auto validator =
std::make_shared<facebook::presto::PrestoToVeloxPlanValidator>();
auto validator = std::make_shared<facebook::presto::VeloxPlanValidator>();
taskResource_ = std::make_unique<TaskResource>(
leafPool_.get(),
httpSrvCpuExecutor_.get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ target_link_libraries(presto_type_converter velox_type_parser)

add_library(
presto_types OBJECT
PrestoToVeloxQueryPlan.cpp PrestoToVeloxExpr.cpp
PrestoToVeloxPlanValidator.cpp PrestoToVeloxSplit.cpp
PrestoToVeloxConnector.cpp)
PrestoToVeloxQueryPlan.cpp PrestoToVeloxExpr.cpp VeloxPlanValidator.cpp
PrestoToVeloxSplit.cpp PrestoToVeloxConnector.cpp)
add_dependencies(presto_types presto_operators presto_type_converter velox_type
velox_type_fbhive velox_dwio_dwrf_proto)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "presto_cpp/main/types/VeloxPlanValidator.h"
#include "presto_cpp/main/common/Configs.h"

namespace facebook::presto {
bool planHasNestedJoinLoop(const velox::core::PlanNodePtr planNode) {
if (auto joinNode =
std::dynamic_pointer_cast<const velox::core::NestedLoopJoinNode>(
planNode)) {
return true;
}

for (auto plan : planNode->sources()) {
if (planHasNestedJoinLoop(plan)) {
return true;
}
}

return false;
}

void VeloxPlanValidator::validatePlanFragment(
const velox::core::PlanFragment& fragment) {
const auto failOnNestedLoopJoin =
SystemConfig::instance()
->optionalProperty<bool>(
SystemConfig::kPlanValidatorFailOnNestedLoopJoin)
.value_or(false);
if (failOnNestedLoopJoin) {
VELOX_CHECK(
!planHasNestedJoinLoop(fragment.planNode),
"Velox plan uses nested join loop which isn't supported.");
}
}

} // namespace facebook::presto
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
#include "velox/core/PlanFragment.h"

namespace facebook::presto {
class PrestoToVeloxPlanValidator {
class VeloxPlanValidator {
public:
virtual bool validatePlanFragment(const velox::core::PlanFragment& fragment);
virtual ~PrestoToVeloxPlanValidator() = default;
virtual void validatePlanFragment(const velox::core::PlanFragment& fragment);
virtual ~VeloxPlanValidator() = default;
};
} // namespace facebook::presto
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.nativeworker;

import com.facebook.presto.tests.AbstractTestQueryFramework;
import org.testng.annotations.Test;

public abstract class AbstractTestNativePlanValidation
extends AbstractTestQueryFramework
{
@Test
public void testNestedLoopJoinPlainValidationFailure()
{
assertQueryFails(
"SELECT EXISTS(SELECT 1 WHERE l.orderkey > 0 OR l.orderkey != 3) " +
"FROM lineitem l LIMIT 1", ".*Plan uses nested join loop which isn't supported.*");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static QueryRunner createQueryRunner(

defaultQueryRunner.close();

return createNativeQueryRunner(dataDirectory.get().toString(), prestoServerPath.get(), workerCount, cacheMaxSize, true, Optional.empty(), storageFormat, addStorageFormatToPath);
return createNativeQueryRunner(dataDirectory.get().toString(), prestoServerPath.get(), workerCount, cacheMaxSize, true, Optional.empty(), storageFormat, addStorageFormatToPath, false);
}

public static QueryRunner createJavaQueryRunner()
Expand Down Expand Up @@ -251,7 +251,7 @@ public static QueryRunner createNativeIcebergQueryRunner(
false,
false,
OptionalInt.of(workerCount.orElse(4)),
getExternalWorkerLauncher("iceberg", prestoServerPath, cacheMaxSize, remoteFunctionServerUds),
getExternalWorkerLauncher("iceberg", prestoServerPath, cacheMaxSize, remoteFunctionServerUds, false),
dataDirectory,
addStorageFormatToPath);
}
Expand All @@ -264,7 +264,8 @@ public static QueryRunner createNativeQueryRunner(
boolean useThrift,
Optional<String> remoteFunctionServerUds,
String storageFormat,
boolean addStorageFormatToPath)
boolean addStorageFormatToPath,
Boolean failOnNestedLoopJoin)
throws Exception
{
// The property "hive.allow-drop-table" needs to be set to true because security is always "legacy" in NativeQueryRunner.
Expand All @@ -287,7 +288,7 @@ public static QueryRunner createNativeQueryRunner(
hiveProperties,
workerCount,
Optional.of(Paths.get(addStorageFormatToPath ? dataDirectory + "/" + storageFormat : dataDirectory)),
getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, remoteFunctionServerUds));
getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, remoteFunctionServerUds, failOnNestedLoopJoin));
}

public static QueryRunner createNativeCteQueryRunner(boolean useThrift, String storageFormat)
Expand Down Expand Up @@ -330,13 +331,13 @@ public static QueryRunner createNativeCteQueryRunner(boolean useThrift, String s
hiveProperties,
workerCount,
Optional.of(Paths.get(addStorageFormatToPath ? dataDirectory + "/" + storageFormat : dataDirectory)),
getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, Optional.empty()));
getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, Optional.empty(), false));
}

public static QueryRunner createNativeQueryRunner(String remoteFunctionServerUds)
throws Exception
{
return createNativeQueryRunner(false, DEFAULT_STORAGE_FORMAT, Optional.ofNullable(remoteFunctionServerUds));
return createNativeQueryRunner(false, DEFAULT_STORAGE_FORMAT, Optional.ofNullable(remoteFunctionServerUds), false);
}

public static QueryRunner createNativeQueryRunner(boolean useThrift)
Expand All @@ -345,13 +346,19 @@ public static QueryRunner createNativeQueryRunner(boolean useThrift)
return createNativeQueryRunner(useThrift, DEFAULT_STORAGE_FORMAT);
}

public static QueryRunner createNativeQueryRunner(boolean useThrift, boolean failOnNestedLoopJoin)
throws Exception
{
return createNativeQueryRunner(useThrift, DEFAULT_STORAGE_FORMAT, Optional.empty(), failOnNestedLoopJoin);
}

public static QueryRunner createNativeQueryRunner(boolean useThrift, String storageFormat)
throws Exception
{
return createNativeQueryRunner(useThrift, storageFormat, Optional.empty());
return createNativeQueryRunner(useThrift, storageFormat, Optional.empty(), false);
}

public static QueryRunner createNativeQueryRunner(boolean useThrift, String storageFormat, Optional<String> remoteFunctionServerUds)
public static QueryRunner createNativeQueryRunner(boolean useThrift, String storageFormat, Optional<String> remoteFunctionServerUds, Boolean failOnNestedLoopJoin)
throws Exception
{
int cacheMaxSize = 0;
Expand All @@ -364,7 +371,8 @@ public static QueryRunner createNativeQueryRunner(boolean useThrift, String stor
useThrift,
remoteFunctionServerUds,
storageFormat,
true);
true,
failOnNestedLoopJoin);
}

// Start the remote function server. Return the UDS path used to communicate with it.
Expand Down Expand Up @@ -411,7 +419,7 @@ public static NativeQueryRunnerParameters getNativeQueryRunnerParameters()
return new NativeQueryRunnerParameters(prestoServerPath, dataDirectory, workerCount);
}

public static Optional<BiFunction<Integer, URI, Process>> getExternalWorkerLauncher(String catalogName, String prestoServerPath, int cacheMaxSize, Optional<String> remoteFunctionServerUds)
public static Optional<BiFunction<Integer, URI, Process>> getExternalWorkerLauncher(String catalogName, String prestoServerPath, int cacheMaxSize, Optional<String> remoteFunctionServerUds, Boolean failOnNestedLoopJoin)
{
return
Optional.of((workerIndex, discoveryUri) -> {
Expand All @@ -436,6 +444,11 @@ public static Optional<BiFunction<Integer, URI, Process>> getExternalWorkerLaunc
"remote-function-server.serde=presto_page%n" +
"remote-function-server.signature.files.directory.path=%s%n", configProperties, REMOTE_FUNCTION_CATALOG_NAME, remoteFunctionServerUds.get(), jsonSignaturesPath);
}

if (failOnNestedLoopJoin) {
configProperties = format("%s%n" + "velox-plan-validator-fail-on-nested-loop-join=true%n", configProperties);
}

Files.write(tempDirectoryPath.resolve("config.properties"), configProperties.getBytes());
Files.write(tempDirectoryPath.resolve("node.properties"),
format("node.id=%s%n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.nativeworker;
import com.facebook.presto.testing.QueryRunner;

#include "presto_cpp/main/types/PrestoToVeloxPlanValidator.h"

namespace facebook::presto {
bool PrestoToVeloxPlanValidator::validatePlanFragment(
const velox::core::PlanFragment& fragment) {
return true;
public class TestNativePlanValidation
extends AbstractTestNativePlanValidation
{
protected QueryRunner createQueryRunner() throws Exception
{
return PrestoNativeQueryRunnerUtils.createNativeQueryRunner(false, true);
}
}
} // namespace facebook::presto