diff --git a/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java index 810d2cbf5ccad..abe2fa4a44b73 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -342,9 +342,6 @@ public final class SystemSessionProperties // TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future. public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all"; public static final String NATIVE_EXECUTION_ENABLED = "native_execution_enabled"; - private static final String NATIVE_EXECUTION_EXECUTABLE_PATH = "native_execution_executable_path"; - private static final String NATIVE_EXECUTION_PROGRAM_ARGUMENTS = "native_execution_program_arguments"; - public static final String NATIVE_EXECUTION_PROCESS_REUSE_ENABLED = "native_execution_process_reuse_enabled"; public static final String INNER_JOIN_PUSHDOWN_ENABLED = "optimizer_inner_join_pushdown_enabled"; public static final String INEQUALITY_JOIN_PUSHDOWN_ENABLED = "optimizer_inequality_join_pushdown_enabled"; public static final String NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING = "native_min_columnar_encoding_channels_to_prefer_row_wise_encoding"; @@ -1569,11 +1566,6 @@ public SystemSessionProperties( "Enable execution on native engine", featuresConfig.isNativeExecutionEnabled(), true), - booleanProperty( - NATIVE_EXECUTION_PROCESS_REUSE_ENABLED, - "Enable reuse the native process within the same JVM", - true, - false), booleanProperty( NATIVE_ENFORCE_JOIN_BUILD_INPUT_PARTITION, "Enforce that the join build input is partitioned on join key", @@ -3001,11 +2993,6 @@ public static boolean shouldPushRemoteExchangeThroughGroupId(Session session) return session.getSystemProperty(PUSH_REMOTE_EXCHANGE_THROUGH_GROUP_ID, Boolean.class); } - public static boolean isNativeExecutionProcessReuseEnabled(Session session) - { - return session.getSystemProperty(NATIVE_EXECUTION_PROCESS_REUSE_ENABLED, Boolean.class); - } - public static boolean isNativeJoinBuildPartitionEnforced(Session session) { return session.getSystemProperty(NATIVE_ENFORCE_JOIN_BUILD_INPUT_PARTITION, Boolean.class); diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 684f491ba730c..fa5927f8f169a 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -241,7 +241,6 @@ public class FeaturesConfig private boolean disableIPAddressForNative; private String nativeExecutionExecutablePath = "./presto_server"; private String nativeExecutionProgramArguments = ""; - private boolean nativeExecutionProcessReuseEnabled = true; private boolean nativeEnforceJoinBuildInputPartition = true; private boolean randomizeOuterJoinNullKey; private RandomizeOuterJoinNullKeyStrategy randomizeOuterJoinNullKeyStrategy = RandomizeOuterJoinNullKeyStrategy.DISABLED; @@ -2347,19 +2346,6 @@ public String getNativeExecutionProgramArguments() return this.nativeExecutionProgramArguments; } - @Config("native-execution-process-reuse-enabled") - @ConfigDescription("Enable reuse the native process within the same JVM") - public FeaturesConfig setNativeExecutionProcessReuseEnabled(boolean nativeExecutionProcessReuseEnabled) - { - this.nativeExecutionProcessReuseEnabled = nativeExecutionProcessReuseEnabled; - return this; - } - - public boolean isNativeExecutionProcessReuseEnabled() - { - return this.nativeExecutionProcessReuseEnabled; - } - @Config("native-enforce-join-build-input-partition") @ConfigDescription("Enforce that the join build input is partitioned on join key") public FeaturesConfig setNativeEnforceJoinBuildInputPartition(boolean nativeEnforceJoinBuildInputPartition) diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index 4ec81d7cb0f55..2fa59a556288b 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -204,7 +204,6 @@ public void testDefaults() .setDisableIPAddressForNative(false) .setNativeExecutionExecutablePath("./presto_server") .setNativeExecutionProgramArguments("") - .setNativeExecutionProcessReuseEnabled(true) .setNativeEnforceJoinBuildInputPartition(true) .setRandomizeOuterJoinNullKeyEnabled(false) .setRandomizeOuterJoinNullKeyStrategy(RandomizeOuterJoinNullKeyStrategy.DISABLED) @@ -627,7 +626,6 @@ public void testExplicitPropertyMappings() .setDisableIPAddressForNative(true) .setNativeExecutionExecutablePath("/bin/echo") .setNativeExecutionProgramArguments("--v 1") - .setNativeExecutionProcessReuseEnabled(false) .setNativeEnforceJoinBuildInputPartition(false) .setRandomizeOuterJoinNullKeyEnabled(true) .setRandomizeOuterJoinNullKeyStrategy(RandomizeOuterJoinNullKeyStrategy.KEY_FROM_OUTER_JOIN) diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSessionProperties.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSessionProperties.java index 41707774f216f..03059b6d2d0e9 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSessionProperties.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSessionProperties.java @@ -15,6 +15,7 @@ import com.facebook.presto.Session; import com.facebook.presto.spi.session.PropertyMetadata; +import com.facebook.presto.sql.analyzer.FeaturesConfig; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import io.airlift.units.DataSize; @@ -72,6 +73,9 @@ public class PrestoSparkSessionProperties public static final String SPARK_HASH_PARTITION_COUNT_SCALING_FACTOR_ON_OUT_OF_MEMORY = "spark_hash_partition_count_scaling_factor_on_out_of_memory"; public static final String SPARK_ADAPTIVE_QUERY_EXECUTION_ENABLED = "spark_adaptive_query_execution_enabled"; public static final String ADAPTIVE_JOIN_SIDE_SWITCHING_ENABLED = "adaptive_join_side_switching_enabled"; + public static final String NATIVE_EXECUTION_EXECUTABLE_PATH = "native_execution_executable_path"; + public static final String NATIVE_EXECUTION_PROGRAM_ARGUMENTS = "native_execution_program_arguments"; + public static final String NATIVE_EXECUTION_PROCESS_REUSE_ENABLED = "native_execution_process_reuse_enabled"; public static final String NATIVE_EXECUTION_BROADCAST_BASE_PATH = "native_execution_broadcast_base_path"; public static final String NATIVE_TERMINATE_WITH_CORE_WHEN_UNRESPONSIVE_ENABLED = "native_terminate_with_core_when_unresponsive_enabled"; public static final String NATIVE_TERMINATE_WITH_CORE_TIMEOUT = "native_terminate_with_core_timeout"; @@ -84,11 +88,11 @@ public class PrestoSparkSessionProperties public PrestoSparkSessionProperties() { - this(new PrestoSparkConfig()); + this(new PrestoSparkConfig(), new FeaturesConfig()); } @Inject - public PrestoSparkSessionProperties(PrestoSparkConfig prestoSparkConfig) + public PrestoSparkSessionProperties(PrestoSparkConfig prestoSparkConfig, FeaturesConfig featuresConfig) { executionStrategyValidator = new ExecutionStrategyValidator(); sessionProperties = ImmutableList.of( @@ -267,6 +271,31 @@ public PrestoSparkSessionProperties(PrestoSparkConfig prestoSparkConfig) "Enables the adaptive optimizer to switch the build and probe sides of a join", prestoSparkConfig.isAdaptiveJoinSideSwitchingEnabled(), false), + stringProperty( + NATIVE_EXECUTION_EXECUTABLE_PATH, + "The native engine executable file path for pos native engine execution", + featuresConfig.getNativeExecutionExecutablePath(), + false), + stringProperty( + NATIVE_EXECUTION_PROGRAM_ARGUMENTS, + "Program arguments for native engine execution. The main target use case for this " + + "property is to control logging levels using glog flags. E,g, to enable verbose mode, add " + + "'--v 1'. More advanced glog gflags usage can be found at " + + "https://rpg.ifi.uzh.ch/docs/glog.html\n" + + "e.g. --vmodule=mapreduce=2,file=1,gfs*=3 --v=0\n" + + "will:\n" + + "\n" + + "a. Print VLOG(2) and lower messages from mapreduce.{h,cc}\n" + + "b. Print VLOG(1) and lower messages from file.{h,cc}\n" + + "c. Print VLOG(3) and lower messages from files prefixed with \"gfs\"\n" + + "d. Print VLOG(0) and lower messages from elsewhere", + featuresConfig.getNativeExecutionProgramArguments(), + false), + booleanProperty( + NATIVE_EXECUTION_PROCESS_REUSE_ENABLED, + "Enable reuse the native process within the same JVM", + true, + false), stringProperty( NATIVE_EXECUTION_BROADCAST_BASE_PATH, "Base path for temporary storage of broadcast data", @@ -488,4 +517,19 @@ public static int getAttemptNumberToApplyDynamicMemoryPoolTuning(Session session { return session.getSystemProperty(ATTEMPT_NUMBER_TO_APPLY_DYNAMIC_MEMORY_POOL_TUNING, Integer.class); } + + public static boolean isNativeExecutionProcessReuseEnabled(Session session) + { + return session.getSystemProperty(NATIVE_EXECUTION_PROCESS_REUSE_ENABLED, Boolean.class); + } + + public static String getNativeExecutionExecutablePath(Session session) + { + return session.getSystemProperty(NATIVE_EXECUTION_EXECUTABLE_PATH, String.class); + } + + public static String getNativeExecutionProgramArguments(Session session) + { + return session.getSystemProperty(NATIVE_EXECUTION_PROGRAM_ARGUMENTS, String.class); + } } diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/DetachedNativeExecutionProcessFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/DetachedNativeExecutionProcessFactory.java index 30727ba857aa2..50a7c1a912c0d 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/DetachedNativeExecutionProcessFactory.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/DetachedNativeExecutionProcessFactory.java @@ -20,7 +20,6 @@ import com.facebook.presto.spark.execution.property.WorkerProperty; import com.facebook.presto.spark.execution.task.ForNativeExecutionTask; import com.facebook.presto.spi.PrestoException; -import com.facebook.presto.sql.analyzer.FeaturesConfig; import com.google.inject.Inject; import io.airlift.units.Duration; @@ -29,6 +28,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static com.facebook.presto.spark.PrestoSparkSessionProperties.getNativeExecutionExecutablePath; +import static com.facebook.presto.spark.PrestoSparkSessionProperties.getNativeExecutionProgramArguments; import static com.facebook.presto.spi.StandardErrorCode.NATIVE_EXECUTION_PROCESS_LAUNCH_ERROR; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -48,10 +49,9 @@ public DetachedNativeExecutionProcessFactory( ExecutorService coreExecutor, ScheduledExecutorService errorRetryScheduledExecutor, JsonCodec serverInfoCodec, - WorkerProperty workerProperty, - FeaturesConfig featuresConfig) + WorkerProperty workerProperty) { - super(httpClient, coreExecutor, errorRetryScheduledExecutor, serverInfoCodec, workerProperty, featuresConfig); + super(httpClient, coreExecutor, errorRetryScheduledExecutor, serverInfoCodec, workerProperty); this.httpClient = requireNonNull(httpClient, "httpClient is null"); this.coreExecutor = requireNonNull(coreExecutor, "ecoreExecutor is null"); this.errorRetryScheduledExecutor = requireNonNull(errorRetryScheduledExecutor, "errorRetryScheduledExecutor is null"); @@ -70,8 +70,8 @@ public NativeExecutionProcess createNativeExecutionProcess(Session session, Dura { try { return new DetachedNativeExecutionProcess( - getExecutablePath(), - getProgramArguments(), + getNativeExecutionExecutablePath(session), + getNativeExecutionProgramArguments(session), session, httpClient, coreExecutor, diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/NativeExecutionProcessFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/NativeExecutionProcessFactory.java index cb3b677630b50..d9f4425b671a3 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/NativeExecutionProcessFactory.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/NativeExecutionProcessFactory.java @@ -20,7 +20,6 @@ import com.facebook.presto.spark.execution.property.WorkerProperty; import com.facebook.presto.spark.execution.task.ForNativeExecutionTask; import com.facebook.presto.spi.PrestoException; -import com.facebook.presto.sql.analyzer.FeaturesConfig; import io.airlift.units.Duration; import javax.annotation.PreDestroy; @@ -31,7 +30,9 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import static com.facebook.presto.SystemSessionProperties.isNativeExecutionProcessReuseEnabled; +import static com.facebook.presto.spark.PrestoSparkSessionProperties.getNativeExecutionExecutablePath; +import static com.facebook.presto.spark.PrestoSparkSessionProperties.getNativeExecutionProgramArguments; +import static com.facebook.presto.spark.PrestoSparkSessionProperties.isNativeExecutionProcessReuseEnabled; import static com.facebook.presto.spi.StandardErrorCode.NATIVE_EXECUTION_PROCESS_LAUNCH_ERROR; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -44,8 +45,6 @@ public class NativeExecutionProcessFactory private final ScheduledExecutorService errorRetryScheduledExecutor; private final JsonCodec serverInfoCodec; private final WorkerProperty workerProperty; - private final String executablePath; - private final String programArguments; private static NativeExecutionProcess process; @@ -55,16 +54,13 @@ public NativeExecutionProcessFactory( ExecutorService coreExecutor, ScheduledExecutorService errorRetryScheduledExecutor, JsonCodec serverInfoCodec, - WorkerProperty workerProperty, - FeaturesConfig featuresConfig) + WorkerProperty workerProperty) { this.httpClient = requireNonNull(httpClient, "httpClient is null"); this.coreExecutor = requireNonNull(coreExecutor, "coreExecutor is null"); this.errorRetryScheduledExecutor = requireNonNull(errorRetryScheduledExecutor, "errorRetryScheduledExecutor is null"); this.serverInfoCodec = requireNonNull(serverInfoCodec, "serverInfoCodec is null"); this.workerProperty = requireNonNull(workerProperty, "workerProperty is null"); - this.executablePath = featuresConfig.getNativeExecutionExecutablePath(); - this.programArguments = featuresConfig.getNativeExecutionProgramArguments(); } public synchronized NativeExecutionProcess getNativeExecutionProcess(Session session) @@ -79,8 +75,8 @@ public NativeExecutionProcess createNativeExecutionProcess(Session session, Dura { try { return new NativeExecutionProcess( - executablePath, - programArguments, + getNativeExecutionExecutablePath(session), + getNativeExecutionProgramArguments(session), session, httpClient, coreExecutor, @@ -103,14 +99,4 @@ public void stop() process.close(); } } - - protected String getExecutablePath() - { - return executablePath; - } - - protected String getProgramArguments() - { - return programArguments; - } } diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/execution/TestNativeExecutionProcess.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/execution/TestNativeExecutionProcess.java index 36dbd84327de5..fb557e3c71667 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/execution/TestNativeExecutionProcess.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/execution/TestNativeExecutionProcess.java @@ -26,14 +26,13 @@ import com.facebook.presto.spark.execution.property.NativeExecutionSystemConfig; import com.facebook.presto.spark.execution.property.NativeExecutionVeloxConfig; import com.facebook.presto.spark.execution.property.PrestoSparkWorkerProperty; -import com.facebook.presto.sql.analyzer.FeaturesConfig; import io.airlift.units.Duration; import org.testng.annotations.Test; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import static com.facebook.presto.testing.TestingSession.testSessionBuilder; +import static com.facebook.presto.spark.util.PrestoSparkTestSessionBuilder.getPrestoSparkTestingSessionBuilder; import static java.util.concurrent.Executors.newScheduledThreadPool; import static java.util.concurrent.Executors.newSingleThreadExecutor; import static org.testng.Assert.assertFalse; @@ -48,7 +47,7 @@ public class TestNativeExecutionProcess @Test public void testNativeProcessIsAlive() { - Session session = testSessionBuilder().build(); + Session session = getPrestoSparkTestingSessionBuilder().build(); NativeExecutionProcessFactory factory = createNativeExecutionProcessFactory(); NativeExecutionProcess process = factory.getNativeExecutionProcess(session); // Simulate the process is closed (crashed) @@ -59,7 +58,7 @@ public void testNativeProcessIsAlive() @Test public void testNativeProcessRelaunch() { - Session session = testSessionBuilder().build(); + Session session = getPrestoSparkTestingSessionBuilder().build(); NativeExecutionProcessFactory factory = createNativeExecutionProcessFactory(); NativeExecutionProcess process = factory.getNativeExecutionProcess(session); // Simulate the process is closed (crashed) @@ -74,7 +73,7 @@ public void testNativeProcessRelaunch() @Test public void testNativeProcessShutdown() { - Session session = testSessionBuilder().build(); + Session session = getPrestoSparkTestingSessionBuilder().build(); NativeExecutionProcessFactory factory = createNativeExecutionProcessFactory(); // Set the maxRetryDuration to 0 ms to allow the RequestErrorTracker failing immediately NativeExecutionProcess process = factory.createNativeExecutionProcess(session, new Duration(0, TimeUnit.MILLISECONDS)); @@ -99,8 +98,7 @@ private NativeExecutionProcessFactory createNativeExecutionProcessFactory() newSingleThreadExecutor(), errorScheduler, SERVER_INFO_JSON_CODEC, - workerProperty, - new FeaturesConfig().setNativeExecutionExecutablePath("/bin/echo")); + workerProperty); return factory; } } diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/execution/http/TestPrestoSparkHttpClient.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/execution/http/TestPrestoSparkHttpClient.java index f3f516aede37d..fdf903e36c044 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/execution/http/TestPrestoSparkHttpClient.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/execution/http/TestPrestoSparkHttpClient.java @@ -50,7 +50,6 @@ import com.facebook.presto.spi.page.PageCodecMarker; import com.facebook.presto.spi.page.PagesSerdeUtil; import com.facebook.presto.spi.page.SerializedPage; -import com.facebook.presto.sql.analyzer.FeaturesConfig; import com.facebook.presto.sql.planner.PlanFragment; import com.facebook.presto.testing.TestingSession; import com.google.common.collect.ArrayListMultimap; @@ -93,6 +92,7 @@ import static com.facebook.presto.execution.TaskTestUtils.createPlanFragment; import static com.facebook.presto.execution.buffer.OutputBuffers.BufferType.PARTITIONED; import static com.facebook.presto.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers; +import static com.facebook.presto.spark.util.PrestoSparkTestSessionBuilder.getPrestoSparkTestingSessionBuilder; import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static com.facebook.presto.testing.TestingSession.testSessionBuilder; import static com.google.common.net.HttpHeaders.CONTENT_TYPE; @@ -904,9 +904,8 @@ private NativeExecutionProcess createNativeExecutionProcess( scheduledExecutorService, scheduledExecutorService, SERVER_INFO_JSON_CODEC, - workerProperty, - new FeaturesConfig()); - return factory.createNativeExecutionProcess(testSessionBuilder().build(), maxErrorDuration); + workerProperty); + return factory.createNativeExecutionProcess(getPrestoSparkTestingSessionBuilder().build(), maxErrorDuration); } private HttpNativeExecutionTaskInfoFetcher createTaskInfoFetcher(TaskId taskId, TestingResponseManager testingResponseManager) diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/util/PrestoSparkTestSessionBuilder.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/util/PrestoSparkTestSessionBuilder.java new file mode 100644 index 0000000000000..d962fa904c64a --- /dev/null +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/util/PrestoSparkTestSessionBuilder.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spark.util; + +import com.facebook.presto.Session; +import com.facebook.presto.SystemSessionProperties; +import com.facebook.presto.spark.PrestoSparkSessionProperties; +import com.facebook.presto.spiller.NodeSpillConfig; +import com.facebook.presto.sql.analyzer.JavaFeaturesConfig; +import com.google.common.collect.Streams; + +import static com.facebook.presto.metadata.SessionPropertyManager.createTestingSessionPropertyManager; +import static com.facebook.presto.testing.TestingSession.testSessionBuilder; +import static com.google.common.collect.ImmutableList.toImmutableList; + +public class PrestoSparkTestSessionBuilder +{ + private PrestoSparkTestSessionBuilder() {} + + public static Session.SessionBuilder getPrestoSparkTestingSessionBuilder() + { + return testSessionBuilder(createTestingSessionPropertyManager( + Streams.concat( + new SystemSessionProperties().getSessionProperties().stream(), + new PrestoSparkSessionProperties().getSessionProperties().stream() + ).collect(toImmutableList()), + new JavaFeaturesConfig(), + new NodeSpillConfig())); + } +}