Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -342,9 +342,6 @@ public final class SystemSessionProperties
// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future.
public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all";
public static final String NATIVE_EXECUTION_ENABLED = "native_execution_enabled";
private static final String NATIVE_EXECUTION_EXECUTABLE_PATH = "native_execution_executable_path";
private static final String NATIVE_EXECUTION_PROGRAM_ARGUMENTS = "native_execution_program_arguments";
public static final String NATIVE_EXECUTION_PROCESS_REUSE_ENABLED = "native_execution_process_reuse_enabled";
public static final String INNER_JOIN_PUSHDOWN_ENABLED = "optimizer_inner_join_pushdown_enabled";
public static final String INEQUALITY_JOIN_PUSHDOWN_ENABLED = "optimizer_inequality_join_pushdown_enabled";
public static final String NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING = "native_min_columnar_encoding_channels_to_prefer_row_wise_encoding";
Expand Down Expand Up @@ -1569,11 +1566,6 @@ public SystemSessionProperties(
"Enable execution on native engine",
featuresConfig.isNativeExecutionEnabled(),
true),
booleanProperty(
NATIVE_EXECUTION_PROCESS_REUSE_ENABLED,
"Enable reuse the native process within the same JVM",
true,
false),
booleanProperty(
NATIVE_ENFORCE_JOIN_BUILD_INPUT_PARTITION,
"Enforce that the join build input is partitioned on join key",
Expand Down Expand Up @@ -3001,11 +2993,6 @@ public static boolean shouldPushRemoteExchangeThroughGroupId(Session session)
return session.getSystemProperty(PUSH_REMOTE_EXCHANGE_THROUGH_GROUP_ID, Boolean.class);
}

public static boolean isNativeExecutionProcessReuseEnabled(Session session)
{
return session.getSystemProperty(NATIVE_EXECUTION_PROCESS_REUSE_ENABLED, Boolean.class);
}

public static boolean isNativeJoinBuildPartitionEnforced(Session session)
{
return session.getSystemProperty(NATIVE_ENFORCE_JOIN_BUILD_INPUT_PARTITION, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ public class FeaturesConfig
private boolean disableIPAddressForNative;
private String nativeExecutionExecutablePath = "./presto_server";
private String nativeExecutionProgramArguments = "";
private boolean nativeExecutionProcessReuseEnabled = true;
private boolean nativeEnforceJoinBuildInputPartition = true;
private boolean randomizeOuterJoinNullKey;
private RandomizeOuterJoinNullKeyStrategy randomizeOuterJoinNullKeyStrategy = RandomizeOuterJoinNullKeyStrategy.DISABLED;
Expand Down Expand Up @@ -2347,19 +2346,6 @@ public String getNativeExecutionProgramArguments()
return this.nativeExecutionProgramArguments;
}

@Config("native-execution-process-reuse-enabled")
@ConfigDescription("Enable reuse the native process within the same JVM")
public FeaturesConfig setNativeExecutionProcessReuseEnabled(boolean nativeExecutionProcessReuseEnabled)
{
this.nativeExecutionProcessReuseEnabled = nativeExecutionProcessReuseEnabled;
return this;
}

public boolean isNativeExecutionProcessReuseEnabled()
{
return this.nativeExecutionProcessReuseEnabled;
}

@Config("native-enforce-join-build-input-partition")
@ConfigDescription("Enforce that the join build input is partitioned on join key")
public FeaturesConfig setNativeEnforceJoinBuildInputPartition(boolean nativeEnforceJoinBuildInputPartition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ public void testDefaults()
.setDisableIPAddressForNative(false)
.setNativeExecutionExecutablePath("./presto_server")
.setNativeExecutionProgramArguments("")
.setNativeExecutionProcessReuseEnabled(true)
.setNativeEnforceJoinBuildInputPartition(true)
.setRandomizeOuterJoinNullKeyEnabled(false)
.setRandomizeOuterJoinNullKeyStrategy(RandomizeOuterJoinNullKeyStrategy.DISABLED)
Expand Down Expand Up @@ -627,7 +626,6 @@ public void testExplicitPropertyMappings()
.setDisableIPAddressForNative(true)
.setNativeExecutionExecutablePath("/bin/echo")
.setNativeExecutionProgramArguments("--v 1")
.setNativeExecutionProcessReuseEnabled(false)
.setNativeEnforceJoinBuildInputPartition(false)
.setRandomizeOuterJoinNullKeyEnabled(true)
.setRandomizeOuterJoinNullKeyStrategy(RandomizeOuterJoinNullKeyStrategy.KEY_FROM_OUTER_JOIN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.Session;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
Expand Down Expand Up @@ -72,6 +73,9 @@ public class PrestoSparkSessionProperties
public static final String SPARK_HASH_PARTITION_COUNT_SCALING_FACTOR_ON_OUT_OF_MEMORY = "spark_hash_partition_count_scaling_factor_on_out_of_memory";
public static final String SPARK_ADAPTIVE_QUERY_EXECUTION_ENABLED = "spark_adaptive_query_execution_enabled";
public static final String ADAPTIVE_JOIN_SIDE_SWITCHING_ENABLED = "adaptive_join_side_switching_enabled";
public static final String NATIVE_EXECUTION_EXECUTABLE_PATH = "native_execution_executable_path";
public static final String NATIVE_EXECUTION_PROGRAM_ARGUMENTS = "native_execution_program_arguments";
public static final String NATIVE_EXECUTION_PROCESS_REUSE_ENABLED = "native_execution_process_reuse_enabled";
public static final String NATIVE_EXECUTION_BROADCAST_BASE_PATH = "native_execution_broadcast_base_path";
public static final String NATIVE_TERMINATE_WITH_CORE_WHEN_UNRESPONSIVE_ENABLED = "native_terminate_with_core_when_unresponsive_enabled";
public static final String NATIVE_TERMINATE_WITH_CORE_TIMEOUT = "native_terminate_with_core_timeout";
Expand All @@ -84,11 +88,11 @@ public class PrestoSparkSessionProperties

public PrestoSparkSessionProperties()
{
this(new PrestoSparkConfig());
this(new PrestoSparkConfig(), new FeaturesConfig());
}

@Inject
public PrestoSparkSessionProperties(PrestoSparkConfig prestoSparkConfig)
public PrestoSparkSessionProperties(PrestoSparkConfig prestoSparkConfig, FeaturesConfig featuresConfig)
{
executionStrategyValidator = new ExecutionStrategyValidator();
sessionProperties = ImmutableList.of(
Expand Down Expand Up @@ -267,6 +271,31 @@ public PrestoSparkSessionProperties(PrestoSparkConfig prestoSparkConfig)
"Enables the adaptive optimizer to switch the build and probe sides of a join",
prestoSparkConfig.isAdaptiveJoinSideSwitchingEnabled(),
false),
stringProperty(
NATIVE_EXECUTION_EXECUTABLE_PATH,
"The native engine executable file path for pos native engine execution",
featuresConfig.getNativeExecutionExecutablePath(),
false),
stringProperty(
NATIVE_EXECUTION_PROGRAM_ARGUMENTS,
"Program arguments for native engine execution. The main target use case for this " +
"property is to control logging levels using glog flags. E,g, to enable verbose mode, add " +
"'--v 1'. More advanced glog gflags usage can be found at " +
"https://rpg.ifi.uzh.ch/docs/glog.html\n" +
"e.g. --vmodule=mapreduce=2,file=1,gfs*=3 --v=0\n" +
"will:\n" +
"\n" +
"a. Print VLOG(2) and lower messages from mapreduce.{h,cc}\n" +
"b. Print VLOG(1) and lower messages from file.{h,cc}\n" +
"c. Print VLOG(3) and lower messages from files prefixed with \"gfs\"\n" +
"d. Print VLOG(0) and lower messages from elsewhere",
featuresConfig.getNativeExecutionProgramArguments(),
false),
booleanProperty(
NATIVE_EXECUTION_PROCESS_REUSE_ENABLED,
"Enable reuse the native process within the same JVM",
true,
false),
stringProperty(
NATIVE_EXECUTION_BROADCAST_BASE_PATH,
"Base path for temporary storage of broadcast data",
Expand Down Expand Up @@ -488,4 +517,19 @@ public static int getAttemptNumberToApplyDynamicMemoryPoolTuning(Session session
{
return session.getSystemProperty(ATTEMPT_NUMBER_TO_APPLY_DYNAMIC_MEMORY_POOL_TUNING, Integer.class);
}

public static boolean isNativeExecutionProcessReuseEnabled(Session session)
{
return session.getSystemProperty(NATIVE_EXECUTION_PROCESS_REUSE_ENABLED, Boolean.class);
}

public static String getNativeExecutionExecutablePath(Session session)
{
return session.getSystemProperty(NATIVE_EXECUTION_EXECUTABLE_PATH, String.class);
}

public static String getNativeExecutionProgramArguments(Session session)
{
return session.getSystemProperty(NATIVE_EXECUTION_PROGRAM_ARGUMENTS, String.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.facebook.presto.spark.execution.property.WorkerProperty;
import com.facebook.presto.spark.execution.task.ForNativeExecutionTask;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.google.inject.Inject;
import io.airlift.units.Duration;

Expand All @@ -29,6 +28,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static com.facebook.presto.spark.PrestoSparkSessionProperties.getNativeExecutionExecutablePath;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.getNativeExecutionProgramArguments;
import static com.facebook.presto.spi.StandardErrorCode.NATIVE_EXECUTION_PROCESS_LAUNCH_ERROR;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand All @@ -48,10 +49,9 @@ public DetachedNativeExecutionProcessFactory(
ExecutorService coreExecutor,
ScheduledExecutorService errorRetryScheduledExecutor,
JsonCodec<ServerInfo> serverInfoCodec,
WorkerProperty<?, ?, ?, ?> workerProperty,
FeaturesConfig featuresConfig)
WorkerProperty<?, ?, ?, ?> workerProperty)
{
super(httpClient, coreExecutor, errorRetryScheduledExecutor, serverInfoCodec, workerProperty, featuresConfig);
super(httpClient, coreExecutor, errorRetryScheduledExecutor, serverInfoCodec, workerProperty);
this.httpClient = requireNonNull(httpClient, "httpClient is null");
this.coreExecutor = requireNonNull(coreExecutor, "ecoreExecutor is null");
this.errorRetryScheduledExecutor = requireNonNull(errorRetryScheduledExecutor, "errorRetryScheduledExecutor is null");
Expand All @@ -70,8 +70,8 @@ public NativeExecutionProcess createNativeExecutionProcess(Session session, Dura
{
try {
return new DetachedNativeExecutionProcess(
getExecutablePath(),
getProgramArguments(),
getNativeExecutionExecutablePath(session),
getNativeExecutionProgramArguments(session),
session,
httpClient,
coreExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.facebook.presto.spark.execution.property.WorkerProperty;
import com.facebook.presto.spark.execution.task.ForNativeExecutionTask;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import io.airlift.units.Duration;

import javax.annotation.PreDestroy;
Expand All @@ -31,7 +30,9 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static com.facebook.presto.SystemSessionProperties.isNativeExecutionProcessReuseEnabled;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.getNativeExecutionExecutablePath;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.getNativeExecutionProgramArguments;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.isNativeExecutionProcessReuseEnabled;
import static com.facebook.presto.spi.StandardErrorCode.NATIVE_EXECUTION_PROCESS_LAUNCH_ERROR;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand All @@ -44,8 +45,6 @@ public class NativeExecutionProcessFactory
private final ScheduledExecutorService errorRetryScheduledExecutor;
private final JsonCodec<ServerInfo> serverInfoCodec;
private final WorkerProperty<?, ?, ?, ?> workerProperty;
private final String executablePath;
private final String programArguments;

private static NativeExecutionProcess process;

Expand All @@ -55,16 +54,13 @@ public NativeExecutionProcessFactory(
ExecutorService coreExecutor,
ScheduledExecutorService errorRetryScheduledExecutor,
JsonCodec<ServerInfo> serverInfoCodec,
WorkerProperty<?, ?, ?, ?> workerProperty,
FeaturesConfig featuresConfig)
WorkerProperty<?, ?, ?, ?> workerProperty)
{
this.httpClient = requireNonNull(httpClient, "httpClient is null");
this.coreExecutor = requireNonNull(coreExecutor, "coreExecutor is null");
this.errorRetryScheduledExecutor = requireNonNull(errorRetryScheduledExecutor, "errorRetryScheduledExecutor is null");
this.serverInfoCodec = requireNonNull(serverInfoCodec, "serverInfoCodec is null");
this.workerProperty = requireNonNull(workerProperty, "workerProperty is null");
this.executablePath = featuresConfig.getNativeExecutionExecutablePath();
this.programArguments = featuresConfig.getNativeExecutionProgramArguments();
}

public synchronized NativeExecutionProcess getNativeExecutionProcess(Session session)
Expand All @@ -79,8 +75,8 @@ public NativeExecutionProcess createNativeExecutionProcess(Session session, Dura
{
try {
return new NativeExecutionProcess(
executablePath,
programArguments,
getNativeExecutionExecutablePath(session),
getNativeExecutionProgramArguments(session),
session,
httpClient,
coreExecutor,
Expand All @@ -103,14 +99,4 @@ public void stop()
process.close();
}
}

protected String getExecutablePath()
{
return executablePath;
}

protected String getProgramArguments()
{
return programArguments;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@
import com.facebook.presto.spark.execution.property.NativeExecutionSystemConfig;
import com.facebook.presto.spark.execution.property.NativeExecutionVeloxConfig;
import com.facebook.presto.spark.execution.property.PrestoSparkWorkerProperty;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import io.airlift.units.Duration;
import org.testng.annotations.Test;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static com.facebook.presto.spark.util.PrestoSparkTestSessionBuilder.getPrestoSparkTestingSessionBuilder;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.testng.Assert.assertFalse;
Expand All @@ -48,7 +47,7 @@ public class TestNativeExecutionProcess
@Test
public void testNativeProcessIsAlive()
{
Session session = testSessionBuilder().build();
Session session = getPrestoSparkTestingSessionBuilder().build();
NativeExecutionProcessFactory factory = createNativeExecutionProcessFactory();
NativeExecutionProcess process = factory.getNativeExecutionProcess(session);
// Simulate the process is closed (crashed)
Expand All @@ -59,7 +58,7 @@ public void testNativeProcessIsAlive()
@Test
public void testNativeProcessRelaunch()
{
Session session = testSessionBuilder().build();
Session session = getPrestoSparkTestingSessionBuilder().build();
NativeExecutionProcessFactory factory = createNativeExecutionProcessFactory();
NativeExecutionProcess process = factory.getNativeExecutionProcess(session);
// Simulate the process is closed (crashed)
Expand All @@ -74,7 +73,7 @@ public void testNativeProcessRelaunch()
@Test
public void testNativeProcessShutdown()
{
Session session = testSessionBuilder().build();
Session session = getPrestoSparkTestingSessionBuilder().build();
NativeExecutionProcessFactory factory = createNativeExecutionProcessFactory();
// Set the maxRetryDuration to 0 ms to allow the RequestErrorTracker failing immediately
NativeExecutionProcess process = factory.createNativeExecutionProcess(session, new Duration(0, TimeUnit.MILLISECONDS));
Expand All @@ -99,8 +98,7 @@ private NativeExecutionProcessFactory createNativeExecutionProcessFactory()
newSingleThreadExecutor(),
errorScheduler,
SERVER_INFO_JSON_CODEC,
workerProperty,
new FeaturesConfig().setNativeExecutionExecutablePath("/bin/echo"));
workerProperty);
return factory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import com.facebook.presto.spi.page.PageCodecMarker;
import com.facebook.presto.spi.page.PagesSerdeUtil;
import com.facebook.presto.spi.page.SerializedPage;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.testing.TestingSession;
import com.google.common.collect.ArrayListMultimap;
Expand Down Expand Up @@ -93,6 +92,7 @@
import static com.facebook.presto.execution.TaskTestUtils.createPlanFragment;
import static com.facebook.presto.execution.buffer.OutputBuffers.BufferType.PARTITIONED;
import static com.facebook.presto.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers;
import static com.facebook.presto.spark.util.PrestoSparkTestSessionBuilder.getPrestoSparkTestingSessionBuilder;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
Expand Down Expand Up @@ -904,9 +904,8 @@ private NativeExecutionProcess createNativeExecutionProcess(
scheduledExecutorService,
scheduledExecutorService,
SERVER_INFO_JSON_CODEC,
workerProperty,
new FeaturesConfig());
return factory.createNativeExecutionProcess(testSessionBuilder().build(), maxErrorDuration);
workerProperty);
return factory.createNativeExecutionProcess(getPrestoSparkTestingSessionBuilder().build(), maxErrorDuration);
}

private HttpNativeExecutionTaskInfoFetcher createTaskInfoFetcher(TaskId taskId, TestingResponseManager testingResponseManager)
Expand Down
Loading
Loading