Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@

import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;
import com.facebook.airlift.configuration.LegacyConfig;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;

import javax.validation.constraints.DecimalMax;
import javax.validation.constraints.DecimalMin;
Expand All @@ -30,6 +32,7 @@
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.TimeUnit.MINUTES;

public class PrestoSparkConfig
{
Expand Down Expand Up @@ -67,7 +70,8 @@ public class PrestoSparkConfig
private boolean adaptiveQueryExecutionEnabled;
private boolean adaptiveJoinSideSwitchingEnabled;
private String nativeExecutionBroadcastBasePath;
private boolean nativeTriggerCoredumpWhenUnresponsiveEnabled;
private boolean nativeTerminateWithCoreWhenUnresponsiveEnabled;
private Duration nativeTerminateWithCoreTimeout = new Duration(5, MINUTES);

public boolean isSparkPartitionCountAutoTuneEnabled()
{
Expand Down Expand Up @@ -493,16 +497,31 @@ public PrestoSparkConfig setNativeExecutionBroadcastBasePath(String nativeExecut
return this;
}

public boolean isNativeTriggerCoredumpWhenUnresponsiveEnabled()
public boolean isNativeTerminateWithCoreWhenUnresponsiveEnabled()
{
return nativeTriggerCoredumpWhenUnresponsiveEnabled;
return nativeTerminateWithCoreWhenUnresponsiveEnabled;
}

@Config("native-trigger-coredump-when-unresponsive-enabled")
@ConfigDescription("Trigger coredump of the native execution process when it becomes unresponsive")
public PrestoSparkConfig setNativeTriggerCoredumpWhenUnresponsiveEnabled(boolean nativeTriggerCoredumpWhenUnresponsiveEnabled)
@Config("native-terminate-with-core-when-unresponsive-enabled")
@LegacyConfig("native-trigger-coredump-when-unresponsive-enabled")
@ConfigDescription("Terminate native execution process with core when it becomes unresponsive")
public PrestoSparkConfig setNativeTerminateWithCoreWhenUnresponsiveEnabled(boolean nativeTerminateWithCoreWhenUnresponsiveEnabled)
{
this.nativeTriggerCoredumpWhenUnresponsiveEnabled = nativeTriggerCoredumpWhenUnresponsiveEnabled;
this.nativeTerminateWithCoreWhenUnresponsiveEnabled = nativeTerminateWithCoreWhenUnresponsiveEnabled;
return this;
}

@NotNull
public Duration getNativeTerminateWithCoreTimeout()
{
return nativeTerminateWithCoreTimeout;
}

@Config("native-terminate-with-core-timeout")
@ConfigDescription("Timeout for native execution process termination with core. The process is forcefully killed on timeout")
public PrestoSparkConfig setNativeTerminateWithCoreTimeout(Duration nativeTerminateWithCoreTimeout)
{
this.nativeTerminateWithCoreTimeout = nativeTerminateWithCoreTimeout;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;

import javax.inject.Inject;

Expand All @@ -30,6 +31,7 @@
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.dataSizeProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.doubleProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.durationProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;
import static com.google.common.base.Strings.nullToEmpty;
Expand Down Expand Up @@ -71,7 +73,8 @@ public class PrestoSparkSessionProperties
public static final String SPARK_ADAPTIVE_QUERY_EXECUTION_ENABLED = "spark_adaptive_query_execution_enabled";
public static final String ADAPTIVE_JOIN_SIDE_SWITCHING_ENABLED = "adaptive_join_side_switching_enabled";
public static final String NATIVE_EXECUTION_BROADCAST_BASE_PATH = "native_execution_broadcast_base_path";
public static final String NATIVE_TRIGGER_COREDUMP_WHEN_UNRESPONSIVE_ENABLED = "native_trigger_coredump_when_unresponsive_enabled";
public static final String NATIVE_TERMINATE_WITH_CORE_WHEN_UNRESPONSIVE_ENABLED = "native_terminate_with_core_when_unresponsive_enabled";
public static final String NATIVE_TERMINATE_WITH_CORE_TIMEOUT = "native_terminate_with_core_timeout";

private final List<PropertyMetadata<?>> sessionProperties;
private final ExecutionStrategyValidator executionStrategyValidator;
Expand Down Expand Up @@ -267,9 +270,14 @@ public PrestoSparkSessionProperties(PrestoSparkConfig prestoSparkConfig)
prestoSparkConfig.getNativeExecutionBroadcastBasePath(),
false),
booleanProperty(
NATIVE_TRIGGER_COREDUMP_WHEN_UNRESPONSIVE_ENABLED,
"Trigger coredump of the native execution process when it becomes unresponsive",
prestoSparkConfig.isNativeTriggerCoredumpWhenUnresponsiveEnabled(),
NATIVE_TERMINATE_WITH_CORE_WHEN_UNRESPONSIVE_ENABLED,
"Terminate native execution process with core when it becomes unresponsive",
prestoSparkConfig.isNativeTerminateWithCoreWhenUnresponsiveEnabled(),
false),
durationProperty(
NATIVE_TERMINATE_WITH_CORE_TIMEOUT,
"Timeout for native execution process termination with core. The process is forcefully killed on timeout",
prestoSparkConfig.getNativeTerminateWithCoreTimeout(),
false));
}

Expand Down Expand Up @@ -438,8 +446,13 @@ public static String getNativeExecutionBroadcastBasePath(Session session)
return session.getSystemProperty(NATIVE_EXECUTION_BROADCAST_BASE_PATH, String.class);
}

public static boolean isNativeTriggerCoredumpWhenUnresponsiveEnabled(Session session)
public static boolean isNativeTerminateWithCoreWhenUnresponsiveEnabled(Session session)
{
return session.getSystemProperty(NATIVE_TERMINATE_WITH_CORE_WHEN_UNRESPONSIVE_ENABLED, Boolean.class);
}

public static Duration getNativeTerminateWithCoreTimeout(Session session)
{
return session.getSystemProperty(NATIVE_TRIGGER_COREDUMP_WHEN_UNRESPONSIVE_ENABLED, Boolean.class);
return session.getSystemProperty(NATIVE_TERMINATE_WITH_CORE_TIMEOUT, Duration.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class NativeExecutionProcess
implements AutoCloseable
Expand Down Expand Up @@ -175,31 +176,51 @@ public SettableFuture<ServerInfo> getServerInfoWithRetry()
/**
* Triggers coredump (also terminates the process)
*/
public void sendCoreSignal()
public void terminateWithCore(Duration timeout)
{
// chosen as the least likely core signal to occur naturally (invalid sys call)
// https://man7.org/linux/man-pages/man7/signal.7.html
sendSignal(SIGSYS);
Process process = sendSignal(SIGSYS);
if (process == null) {
return;
}
try {
long pid = getPid(process);
log.info("Waiting %s for process %s to terminate", timeout, pid);
if (!process.waitFor(timeout.toMillis(), MILLISECONDS)) {
log.warn("Process %s did not terminate within %s", pid, timeout);
process.destroyForcibly();
}
else {
log.info("Process %s successfully terminated with status code %s", pid, process.exitValue());
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

public void sendSignal(int signal)
private Process sendSignal(int signal)
{
Process process = this.process;
if (process == null) {
log.warn("Failure sending signal, process does not exist");
return;
return null;
}
long pid = getPid(process);
if (!process.isAlive()) {
log.warn("Failure sending signal, process is dead: %s", pid);
return;
return null;
}
try {
log.info("Sending signal to process %s: %s", pid, signal);
Runtime.getRuntime().exec(format("kill -%s %s", signal, pid));
return process;
}
catch (IOException e) {
log.warn(e, "Failure sending signal to process %s", pid);
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.sun.management.OperatingSystemMXBean;
import io.airlift.units.Duration;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.util.CollectionAccumulator;
import scala.Tuple2;
Expand All @@ -95,7 +96,8 @@

import static com.facebook.presto.operator.ExchangeOperator.REMOTE_CONNECTOR_ID;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.getNativeExecutionBroadcastBasePath;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.isNativeTriggerCoredumpWhenUnresponsiveEnabled;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.getNativeTerminateWithCoreTimeout;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.isNativeTerminateWithCoreWhenUnresponsiveEnabled;
import static com.facebook.presto.spark.execution.nativeprocess.NativeExecutionProcessFactory.DEFAULT_URI;
import static com.facebook.presto.spark.util.PrestoSparkUtils.deserializeZstdCompressed;
import static com.facebook.presto.spark.util.PrestoSparkUtils.serializeZstdCompressed;
Expand Down Expand Up @@ -296,7 +298,8 @@ public <T extends PrestoSparkTaskOutput> IPrestoSparkTaskExecutor<T> doCreate(
Optional<String> broadcastDirectory =
isFixedBroadcastDistribution ? Optional.of(getBroadcastDirectoryPath(session)) : Optional.empty();

boolean triggerCoredumpWhenUnresponsive = isNativeTriggerCoredumpWhenUnresponsiveEnabled(session);
boolean terminateWithCoreWhenUnresponsive = isNativeTerminateWithCoreWhenUnresponsiveEnabled(session);
Duration terminateWithCoreTimeout = getNativeTerminateWithCoreTimeout(session);
try {
// 3. Submit the task to cpp process for execution
log.info("Submitting native execution task ");
Expand Down Expand Up @@ -325,10 +328,11 @@ public <T extends PrestoSparkTaskOutput> IPrestoSparkTaskExecutor<T> doCreate(
executionExceptionFactory,
cpuTracker,
nativeExecutionProcess,
triggerCoredumpWhenUnresponsive);
terminateWithCoreWhenUnresponsive,
terminateWithCoreTimeout);
}
catch (RuntimeException e) {
throw processFailure(e, nativeExecutionProcess, triggerCoredumpWhenUnresponsive);
throw processFailure(e, nativeExecutionProcess, terminateWithCoreWhenUnresponsive, terminateWithCoreTimeout);
}
}

Expand Down Expand Up @@ -524,7 +528,8 @@ private static class PrestoSparkNativeTaskOutputIterator<T extends PrestoSparkTa
private final PrestoSparkExecutionExceptionFactory executionExceptionFactory;
private final CpuTracker cpuTracker;
private final NativeExecutionProcess nativeExecutionProcess;
private final boolean triggerCoredumpWhenUnresponsive;
private final boolean terminateWithCoreWhenUnresponsive;
private final Duration terminateWithCoreTimeout;

public PrestoSparkNativeTaskOutputIterator(
int partitionId,
Expand All @@ -535,7 +540,8 @@ public PrestoSparkNativeTaskOutputIterator(
PrestoSparkExecutionExceptionFactory executionExceptionFactory,
CpuTracker cpuTracker,
NativeExecutionProcess nativeExecutionProcess,
boolean triggerCoredumpWhenUnresponsive)
boolean terminateWithCoreWhenUnresponsive,
Duration terminateWithCoreTimeout)
{
this.partitionId = partitionId;
this.nativeExecutionTask = nativeExecutionTask;
Expand All @@ -545,7 +551,8 @@ public PrestoSparkNativeTaskOutputIterator(
this.executionExceptionFactory = executionExceptionFactory;
this.cpuTracker = cpuTracker;
this.nativeExecutionProcess = requireNonNull(nativeExecutionProcess, "nativeExecutionProcess is null");
this.triggerCoredumpWhenUnresponsive = triggerCoredumpWhenUnresponsive;
this.terminateWithCoreWhenUnresponsive = terminateWithCoreWhenUnresponsive;
this.terminateWithCoreTimeout = requireNonNull(terminateWithCoreTimeout, "terminateWithCoreTimeout is null");
}

/**
Expand Down Expand Up @@ -624,7 +631,11 @@ private Optional<SerializedPage> computeNext()
catch (RuntimeException ex) {
// For a failed task, if taskInfo is present we still want to log the metrics
completeTask(false, taskInfoCollectionAccumulator, nativeExecutionTask, taskInfoCodec, cpuTracker);
throw executionExceptionFactory.toPrestoSparkExecutionException(processFailure(ex, nativeExecutionProcess, triggerCoredumpWhenUnresponsive));
throw executionExceptionFactory.toPrestoSparkExecutionException(processFailure(
ex,
nativeExecutionProcess,
terminateWithCoreWhenUnresponsive,
terminateWithCoreTimeout));
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -654,16 +665,17 @@ public Tuple2<MutablePartitionId, T> next()
private static RuntimeException processFailure(
RuntimeException failure,
NativeExecutionProcess process,
boolean triggerCoredumpWhenUnresponsive)
boolean terminateWithCoreWhenUnresponsive,
Duration terminateWithCoreTimeout)
{
if (failure instanceof PrestoTransportException) {
PrestoTransportException transportException = (PrestoTransportException) failure;
String message;
// lost communication with the native execution process
if (process.isAlive()) {
// process is unresponsive
if (triggerCoredumpWhenUnresponsive) {
process.sendCoreSignal();
if (terminateWithCoreWhenUnresponsive) {
process.terminateWithCore(terminateWithCoreTimeout);
}
message = "Native execution process is alive but unresponsive";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.airlift.configuration.testing.ConfigAssertions;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.testng.annotations.Test;

import java.util.Map;
Expand All @@ -25,6 +26,7 @@
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.TimeUnit.MINUTES;

public class TestPrestoSparkConfig
{
Expand Down Expand Up @@ -64,7 +66,8 @@ public void testDefaults()
.setExecutorAllocationStrategyEnabled(false)
.setHashPartitionCountAllocationStrategyEnabled(false)
.setNativeExecutionBroadcastBasePath(null)
.setNativeTriggerCoredumpWhenUnresponsiveEnabled(false));
.setNativeTerminateWithCoreWhenUnresponsiveEnabled(false)
.setNativeTerminateWithCoreTimeout(new Duration(5, MINUTES)));
}

@Test
Expand Down Expand Up @@ -103,7 +106,8 @@ public void testExplicitPropertyMappings()
.put("spark.executor-allocation-strategy-enabled", "true")
.put("spark.hash-partition-count-allocation-strategy-enabled", "true")
.put("native-execution-broadcast-base-path", "/tmp/broadcast_path")
.put("native-trigger-coredump-when-unresponsive-enabled", "true")
.put("native-terminate-with-core-when-unresponsive-enabled", "true")
.put("native-terminate-with-core-timeout", "1m")
.build();
PrestoSparkConfig expected = new PrestoSparkConfig()
.setSparkPartitionCountAutoTuneEnabled(false)
Expand Down Expand Up @@ -138,7 +142,8 @@ public void testExplicitPropertyMappings()
.setHashPartitionCountAllocationStrategyEnabled(true)
.setExecutorAllocationStrategyEnabled(true)
.setNativeExecutionBroadcastBasePath("/tmp/broadcast_path")
.setNativeTriggerCoredumpWhenUnresponsiveEnabled(true);
.setNativeTerminateWithCoreWhenUnresponsiveEnabled(true)
.setNativeTerminateWithCoreTimeout(new Duration(1, MINUTES));
assertFullMapping(properties, expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.common.type.Type;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;

import java.util.function.Function;

Expand Down Expand Up @@ -213,4 +214,17 @@ public static PropertyMetadata<DataSize> dataSizeProperty(String name, String de
value -> DataSize.valueOf((String) value),
DataSize::toString);
}

public static PropertyMetadata<Duration> durationProperty(String name, String description, Duration defaultValue, boolean hidden)
{
return new PropertyMetadata<>(
name,
description,
VARCHAR,
Duration.class,
defaultValue,
hidden,
value -> Duration.valueOf((String) value),
Duration::toString);
}
}