Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ Number of local parallel table writer threads per worker for partitioned writes.
set, the number set by ``task_writer_count`` will be used. It is required to be a power
of two for a Java query engine.

``eager-plan-validation-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``false``

This property enables the eager building and validation of a logical plan.
When enabled, the logical plan will begin to be built and validated before
queueing and allocation of cluster resources so that any errors or
incompatibilities in the query plan will fail quickly and inform the user.

.. _tuning-memory:

Memory Management Properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ public final class SystemSessionProperties
public static final String REWRITE_EXPRESSION_WITH_CONSTANT_EXPRESSION = "rewrite_expression_with_constant_expression";
public static final String PRINT_ESTIMATED_STATS_FROM_CACHE = "print_estimated_stats_from_cache";
public static final String REMOVE_CROSS_JOIN_WITH_CONSTANT_SINGLE_ROW_INPUT = "remove_cross_join_with_constant_single_row_input";
public static final String EAGER_PLAN_VALIDATION_ENABLED = "eager_plan_validation_enabled";

// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future.
public static final String NATIVE_SIMPLIFIED_EXPRESSION_EVALUATION_ENABLED = "native_simplified_expression_evaluation_enabled";
Expand Down Expand Up @@ -2018,6 +2019,11 @@ public SystemSessionProperties(
"If one input of the cross join is a single row with constant value, remove this cross join and replace with a project node",
featuresConfig.isRemoveCrossJoinWithSingleConstantRow(),
false),
booleanProperty(
EAGER_PLAN_VALIDATION_ENABLED,
"Enable eager building and validation of logical plan before queueing",
featuresConfig.isEagerPlanValidationEnabled(),
false),
new PropertyMetadata<>(
DEFAULT_VIEW_SECURITY_MODE,
format("Set default view security mode. Options are: %s",
Expand Down Expand Up @@ -3344,6 +3350,11 @@ public static boolean isRewriteExpressionWithConstantEnabled(Session session)
return session.getSystemProperty(REWRITE_EXPRESSION_WITH_CONSTANT_EXPRESSION, Boolean.class);
}

public static boolean isEagerPlanValidationEnabled(Session session)
{
return session.getSystemProperty(EAGER_PLAN_VALIDATION_ENABLED, Boolean.class);
}

public static CreateView.Security getDefaultViewSecurityMode(Session session)
{
return session.getSystemProperty(DEFAULT_VIEW_SECURITY_MODE, CreateView.Security.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution;

import com.facebook.airlift.concurrent.ThreadPoolExecutorMBean;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import javax.inject.Inject;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

public class EagerPlanValidationExecutionMBean
{
private final ThreadPoolExecutorMBean executorMBean;

@Inject
public EagerPlanValidationExecutionMBean(@ForEagerPlanValidation ExecutorService executor)
{
this.executorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) executor);
}

@Managed
@Nested
public ThreadPoolExecutorMBean getExecutor()
{
return executorMBean;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution;

import javax.inject.Qualifier;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@Qualifier
public @interface ForEagerPlanValidation
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,16 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static com.facebook.presto.SystemSessionProperties.getExecutionPolicy;
import static com.facebook.presto.SystemSessionProperties.getQueryAnalyzerTimeout;
import static com.facebook.presto.SystemSessionProperties.isEagerPlanValidationEnabled;
import static com.facebook.presto.SystemSessionProperties.isLogInvokedFunctionNamesEnabled;
import static com.facebook.presto.SystemSessionProperties.isSpoolingOutputBufferEnabled;
import static com.facebook.presto.common.RuntimeMetricName.FRAGMENT_PLAN_TIME_NANOS;
Expand Down Expand Up @@ -141,6 +144,8 @@ public class SqlQueryExecution
private final PlanCanonicalInfoProvider planCanonicalInfoProvider;
private final QueryAnalysis queryAnalysis;
private final AnalyzerContext analyzerContext;
private final CompletableFuture<PlanRoot> planFuture;
private final AtomicBoolean planFutureLocked = new AtomicBoolean();

private SqlQueryExecution(
QueryAnalyzer queryAnalyzer,
Expand All @@ -159,6 +164,7 @@ private SqlQueryExecution(
ExecutorService queryExecutor,
ScheduledExecutorService timeoutThreadExecutor,
SectionExecutionFactory sectionExecutionFactory,
ExecutorService eagerPlanValidationExecutor,
InternalNodeManager internalNodeManager,
ExecutionPolicy executionPolicy,
SplitSchedulerStats schedulerStats,
Expand Down Expand Up @@ -243,6 +249,10 @@ private SqlQueryExecution(
}
}
}

// Optionally build and validate plan immediately, before execution begins
planFuture = isEagerPlanValidationEnabled(getSession()) ?
CompletableFuture.supplyAsync(this::runCreateLogicalPlanAsync, eagerPlanValidationExecutor) : null;
}
}

Expand Down Expand Up @@ -460,8 +470,13 @@ public void start()
Thread.currentThread(),
timeoutThreadExecutor,
getQueryAnalyzerTimeout(getSession()))) {
// create logical plan for the query
plan = createLogicalPlanAndOptimize();
// If planFuture has not started, cancel and build plan in current thread
if (planFuture != null && !planFutureLocked.compareAndSet(false, true)) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the discussion at #23649 (comment), trying to call planFuture.cancel(false) will mark the future as cancelled, but the thread will continue to run. Using the AtomicBoolean planFutureLocked will effectively cancel the planFuture if set here first, then the future result (null) is discarded.

plan = planFuture.get();
}
else {
plan = createLogicalPlanAndOptimize();
}
}

metadata.beginQuery(getSession(), plan.getConnectors());
Expand Down Expand Up @@ -590,6 +605,21 @@ private PlanRoot createLogicalPlanAndOptimize()
}
}

private PlanRoot runCreateLogicalPlanAsync()
{
try {
// Check if creating plan async has been cancelled
if (planFutureLocked.compareAndSet(false, true)) {
return createLogicalPlanAndOptimize();
}
return null;
}
catch (Throwable e) {
fail(e);
throw e;
}
}

private void planDistribution(PlanRoot plan)
{
CloseableSplitSourceProvider splitSourceProvider = new CloseableSplitSourceProvider(splitManager::getSplits);
Expand Down Expand Up @@ -862,6 +892,7 @@ public static class SqlQueryExecutionFactory
private final ScheduledExecutorService timeoutThreadExecutor;
private final ExecutorService queryExecutor;
private final SectionExecutionFactory sectionExecutionFactory;
private final ExecutorService eagerPlanValidationExecutor;
private final InternalNodeManager internalNodeManager;
private final Map<String, ExecutionPolicy> executionPolicies;
private final StatsCalculator statsCalculator;
Expand All @@ -883,6 +914,7 @@ public static class SqlQueryExecutionFactory
@ForQueryExecution ExecutorService queryExecutor,
@ForTimeoutThread ScheduledExecutorService timeoutThreadExecutor,
SectionExecutionFactory sectionExecutionFactory,
@ForEagerPlanValidation ExecutorService eagerPlanValidationExecutor,
InternalNodeManager internalNodeManager,
Map<String, ExecutionPolicy> executionPolicies,
SplitSchedulerStats schedulerStats,
Expand All @@ -904,6 +936,7 @@ public static class SqlQueryExecutionFactory
this.queryExecutor = requireNonNull(queryExecutor, "queryExecutor is null");
this.timeoutThreadExecutor = requireNonNull(timeoutThreadExecutor, "timeoutThreadExecutor is null");
this.sectionExecutionFactory = requireNonNull(sectionExecutionFactory, "sectionExecutionFactory is null");
this.eagerPlanValidationExecutor = requireNonNull(eagerPlanValidationExecutor, "eagerPlanValidationExecutor is null");
this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null");
this.executionPolicies = requireNonNull(executionPolicies, "schedulerPolicies is null");
this.planOptimizers = planOptimizers.getPlanningTimeOptimizers();
Expand Down Expand Up @@ -946,6 +979,7 @@ public QueryExecution createQueryExecution(
queryExecutor,
timeoutThreadExecutor,
sectionExecutionFactory,
eagerPlanValidationExecutor,
internalNodeManager,
executionPolicy,
schedulerStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ private StreamingPlanSection tryCostBasedOptimize(StreamingPlanSection section)
.forEach(currentSubPlan -> {
Optional<PlanFragment> newPlanFragment = performRuntimeOptimizations(currentSubPlan);
if (newPlanFragment.isPresent()) {
planChecker.validatePlanFragment(newPlanFragment.get().getRoot(), session, metadata, warningCollector);
planChecker.validatePlanFragment(newPlanFragment.get(), session, metadata, warningCollector);
oldToNewFragment.put(currentSubPlan.getFragment(), newPlanFragment.get());
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import com.facebook.presto.event.QueryMonitorConfig;
import com.facebook.presto.event.QueryProgressMonitor;
import com.facebook.presto.execution.ClusterSizeMonitor;
import com.facebook.presto.execution.EagerPlanValidationExecutionMBean;
import com.facebook.presto.execution.ExecutionFactoriesManager;
import com.facebook.presto.execution.ExplainAnalyzeContext;
import com.facebook.presto.execution.ForEagerPlanValidation;
import com.facebook.presto.execution.ForQueryExecution;
import com.facebook.presto.execution.ForTimeoutThread;
import com.facebook.presto.execution.NodeResourceStatusConfig;
Expand Down Expand Up @@ -83,6 +85,7 @@
import com.facebook.presto.server.remotetask.RemoteTaskStats;
import com.facebook.presto.spi.memory.ClusterMemoryPoolManager;
import com.facebook.presto.spi.security.SelectedRole;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.analyzer.QueryExplainer;
import com.facebook.presto.sql.planner.PlanFragmenter;
import com.facebook.presto.sql.planner.PlanOptimizers;
Expand All @@ -104,8 +107,11 @@

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.airlift.concurrent.Threads.threadsNamed;
Expand Down Expand Up @@ -264,6 +270,8 @@ protected void setup(Binder binder)
.toInstance(newCachedThreadPool(threadsNamed("query-execution-%s")));
binder.bind(QueryExecutionMBean.class).in(Scopes.SINGLETON);
newExporter(binder).export(QueryExecutionMBean.class).as(generatedNameOf(QueryExecution.class));
binder.bind(EagerPlanValidationExecutionMBean.class).in(Scopes.SINGLETON);
newExporter(binder).export(EagerPlanValidationExecutionMBean.class).withGeneratedName();

binder.bind(SplitSchedulerStats.class).in(Scopes.SINGLETON);
newExporter(binder).export(SplitSchedulerStats.class).withGeneratedName();
Expand Down Expand Up @@ -376,6 +384,14 @@ public static ScheduledExecutorService createTimeoutThreadExecutor()
return executor;
}

@Provides
@Singleton
@ForEagerPlanValidation
public static ExecutorService createEagerPlanValidationExecutor(FeaturesConfig featuresConfig)
{
return new ThreadPoolExecutor(0, featuresConfig.getEagerPlanValidationThreadPoolSize(), 1L, TimeUnit.MINUTES, new LinkedBlockingQueue(), threadsNamed("plan-validation-%s"));
}

private void bindLowMemoryKiller(String name, Class<? extends LowMemoryKiller> clazz)
{
install(installModuleIf(
Expand All @@ -395,7 +411,8 @@ public ExecutorCleanup(
@ForQueryExecution ExecutorService queryExecutionExecutor,
@ForScheduler ScheduledExecutorService schedulerExecutor,
@ForTransactionManager ExecutorService transactionFinishingExecutor,
@ForTransactionManager ScheduledExecutorService transactionIdleExecutor)
@ForTransactionManager ScheduledExecutorService transactionIdleExecutor,
@ForEagerPlanValidation ExecutorService eagerPlanValidationExecutor)
{
executors = ImmutableList.<ExecutorService>builder()
.add(statementResponseExecutor)
Expand All @@ -404,6 +421,7 @@ public ExecutorCleanup(
.add(schedulerExecutor)
.add(transactionFinishingExecutor)
.add(transactionIdleExecutor)
.add(eagerPlanValidationExecutor)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.facebook.presto.spi.eventlistener.EventListenerFactory;
import com.facebook.presto.spi.function.FunctionNamespaceManagerFactory;
import com.facebook.presto.spi.nodestatus.NodeStatusNotificationProviderFactory;
import com.facebook.presto.spi.plan.PlanCheckerProviderFactory;
import com.facebook.presto.spi.prerequisites.QueryPrerequisitesFactory;
import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManagerFactory;
import com.facebook.presto.spi.security.PasswordAuthenticatorFactory;
Expand All @@ -48,6 +49,7 @@
import com.facebook.presto.spi.ttl.NodeTtlFetcherFactory;
import com.facebook.presto.sql.analyzer.AnalyzerProviderManager;
import com.facebook.presto.sql.analyzer.QueryPreparerProviderManager;
import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager;
import com.facebook.presto.storage.TempStorageManager;
import com.facebook.presto.tracing.TracerProviderManager;
import com.facebook.presto.ttl.clusterttlprovidermanagers.ClusterTtlProviderManager;
Expand Down Expand Up @@ -131,6 +133,7 @@ public class PluginManager
private final AnalyzerProviderManager analyzerProviderManager;
private final QueryPreparerProviderManager queryPreparerProviderManager;
private final NodeStatusNotificationManager nodeStatusNotificationManager;
private final PlanCheckerProviderManager planCheckerProviderManager;

@Inject
public PluginManager(
Expand All @@ -152,7 +155,8 @@ public PluginManager(
ClusterTtlProviderManager clusterTtlProviderManager,
HistoryBasedPlanStatisticsManager historyBasedPlanStatisticsManager,
TracerProviderManager tracerProviderManager,
NodeStatusNotificationManager nodeStatusNotificationManager)
NodeStatusNotificationManager nodeStatusNotificationManager,
PlanCheckerProviderManager planCheckerProviderManager)
{
requireNonNull(nodeInfo, "nodeInfo is null");
requireNonNull(config, "config is null");
Expand Down Expand Up @@ -184,6 +188,7 @@ public PluginManager(
this.analyzerProviderManager = requireNonNull(analyzerProviderManager, "analyzerProviderManager is null");
this.queryPreparerProviderManager = requireNonNull(queryPreparerProviderManager, "queryPreparerProviderManager is null");
this.nodeStatusNotificationManager = requireNonNull(nodeStatusNotificationManager, "nodeStatusNotificationManager is null");
this.planCheckerProviderManager = requireNonNull(planCheckerProviderManager, "planCheckerProviderManager is null");
}

public void loadPlugins()
Expand Down Expand Up @@ -348,6 +353,11 @@ public void installPlugin(Plugin plugin)
log.info("Registering node status notification provider %s", nodeStatusNotificationProviderFactory.getName());
nodeStatusNotificationManager.addNodeStatusNotificationProviderFactory(nodeStatusNotificationProviderFactory);
}

for (PlanCheckerProviderFactory planCheckerProviderFactory : plugin.getPlanCheckerProviderFactories()) {
log.info("Registering plan checker provider factory %s", planCheckerProviderFactory.getName());
planCheckerProviderManager.addPlanCheckerProviderFactory(planCheckerProviderFactory);
}
}

public void installCoordinatorPlugin(CoordinatorPlugin plugin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.facebook.presto.server.security.ServerSecurityModule;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.parser.SqlParserOptions;
import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager;
import com.facebook.presto.storage.TempStorageManager;
import com.facebook.presto.storage.TempStorageModule;
import com.facebook.presto.tracing.TracerProviderManager;
Expand Down Expand Up @@ -177,6 +178,7 @@ public void run()
injector.getInstance(TracerProviderManager.class).loadTracerProvider();
injector.getInstance(NodeStatusNotificationManager.class).loadNodeStatusNotificationProvider();
injector.getInstance(GracefulShutdownHandler.class).loadNodeStatusNotification();
injector.getInstance(PlanCheckerProviderManager.class).loadPlanCheckerProviders();
startAssociatedProcesses(injector);

injector.getInstance(Announcer.class).start();
Expand Down
Loading