scheduleSplits(InternalNode node, Multimap newTasks = ImmutableSet.builder();
Collection tasks = this.tasks.get(node);
@@ -572,18 +625,35 @@ private void updateTaskStatus(TaskId taskId, TaskStatus taskStatus)
}
TaskState taskState = taskStatus.getState();
- if (taskState == TaskState.FAILED) {
+ if (taskState == TaskState.GRACEFUL_FAILED) {
// no matter if it is possible to recover - the task is failed
failedTasks.add(taskId);
+ if (taskStatus.getRetryableSplitCount() > 0) {
+ RuntimeStats splitRetryStats = new RuntimeStats();
+ //node and task we are retrying from and destination node and task we are retrying to
+ String retryMetricName = new StringBuilder()
+ .append("coord:pending-splits-task:" + getTaskIdentifier(taskId))
+ .toString();
+ //track how many splits we are retrying from the source task
+ splitRetryStats.addMetricValue(retryMetricName, RuntimeUnit.NONE, taskStatus.getRetryableSplitCount());
+ session.getRuntimeStats().update(splitRetryStats);
+ }
RuntimeException failure = taskStatus.getFailures().stream()
.findFirst()
.map(this::rewriteTransportFailure)
.map(ExecutionFailureInfo::toException)
.orElse(new PrestoException(GENERIC_INTERNAL_ERROR, "A task failed for an unknown reason"));
- if (isRecoverable(taskStatus.getFailures())) {
+
+ if (isFailedTasksBelowThreshold()) {
try {
stageTaskRecoveryCallback.get().recover(taskId);
+
+ RemoteTask failedTask = getAllTasks().stream()
+ .filter(task -> task.getTaskId().equals(taskId))
+ .collect(onlyElement());
+ failedTask.setIsRetried();
+
finishedTasks.add(taskId);
}
catch (Throwable t) {
@@ -598,6 +668,17 @@ private void updateTaskStatus(TaskId taskId, TaskStatus taskStatus)
stateMachine.transitionToFailed(failure);
}
}
+ else if (taskState == TaskState.FAILED) {
+ // no matter if it is possible to recover - the task is failed
+ failedTasks.add(taskId);
+ RuntimeException failure = taskStatus.getFailures().stream()
+ .findFirst()
+ .map(this::rewriteTransportFailure)
+ .map(ExecutionFailureInfo::toException)
+ .orElse(new PrestoException(GENERIC_INTERNAL_ERROR, "A task failed for an unknown reason"));
+ finishedTasks.add(taskId);
+ stateMachine.transitionToFailed(failure);
+ }
else if (taskState == TaskState.ABORTED) {
// A task should only be in the aborted state if the STAGE is done (ABORTED or FAILED)
stateMachine.transitionToFailed(new PrestoException(GENERIC_INTERNAL_ERROR, "A task is in the ABORTED state but stage is " + stageExecutionState));
@@ -608,25 +689,102 @@ else if (taskState == TaskState.FINISHED) {
// The finishedTasks.add(taskStatus.getTaskId()) must happen before the getState() (see schedulingComplete)
stageExecutionState = getState();
+ if (isRetryOfFailedSplitsEnabled && planFragment.isLeaf() && stageExecutionState == StageExecutionState.SCHEDULING_RETRIED_SPLITS) {
+ if (!isFailedTasksBelowThreshold() || noMoreRetry()) {
+ log.info("QueryId = %s, whenNoMoreRetry is triggered.", taskId.getQueryId());
+ whenNoMoreRetry.set(null);
+ return;
+ }
+ }
+
if (stageExecutionState == StageExecutionState.SCHEDULED || stageExecutionState == StageExecutionState.RUNNING) {
if (taskState == TaskState.RUNNING) {
stateMachine.transitionToRunning();
}
+
if (finishedTasks.size() == allTasks.size()) {
stateMachine.transitionToFinished();
}
}
}
- private boolean isRecoverable(List failures)
+ public ListenableFuture> getBlocked()
{
- for (ExecutionFailureInfo failure : failures) {
- if (!RECOVERABLE_ERROR_CODES.contains(failure.getErrorCode())) {
- return false;
+ return whenNoMoreRetry;
+ }
+
+ public synchronized boolean noMoreRetry()
+ {
+ checkState(planFragment.isLeaf());
+
+ if (failedTasks.isEmpty()) {
+ checkState(finishedTasks.isEmpty());
+ List idleRunningHttpRemoteTasks = getAllTasks().stream()
+ .filter(task -> task.getTaskStatus().getState() == TaskState.RUNNING)
+ .filter(task -> task.isTaskIdling())
+ .collect(toList());
+ boolean result = idleRunningHttpRemoteTasks.size() == allTasks.size();
+
+ if (result) {
+ log.info("QueryId = %s, noMoreRetry in failedTasks empty branch. idleRunningHttpRemoteTasks = %s, allTasks = %s", getStageExecutionId().getStageId().getQueryId(), idleRunningHttpRemoteTasks.size(), allTasks.size());
}
+ return result;
+ }
+
+ return noMoreRetryWithFailedTasks();
+ }
+
+ private boolean noMoreRetryWithFailedTasks()
+ {
+ checkState(finishedTasks.size() != allTasks.size());
+
+ if (!isFailedTasksBelowThreshold()) {
+ log.info("QueryId = %s, noMoreRetry in noMoreRetryWithFailedTasks. isFailedTasksBelowThreshold exceeds the threshold: failedTasks = %s, allTasks = %s", getStageExecutionId().getStageId().getQueryId(), failedTasks.size(), allTasks.size());
+ return true;
+ }
+
+ List idleRunningHttpRemoteTasks = getAllTasks().stream()
+ .filter(task -> task.getTaskStatus().getState() == TaskState.RUNNING)
+ .filter(task -> task.isTaskIdling())
+ .collect(toList());
+
+ long retriedFailedTaskCount = getAllTasks().stream()
+ .filter(task -> task.getTaskStatus().getState() == TaskState.GRACEFUL_FAILED)
+ .filter(RemoteTask::isRetried)
+ .count();
+
+ boolean isAllTasksEitherIdlingOrFailedTasksHaveBeenRetriedOrTooManyFailedTasks;
+ synchronized (this) {
+ isAllTasksEitherIdlingOrFailedTasksHaveBeenRetriedOrTooManyFailedTasks = (idleRunningHttpRemoteTasks.size() == allTasks.size() - failedTasks.size() && retriedFailedTaskCount == failedTasks.size());
}
- return stageTaskRecoveryCallback.isPresent() &&
- failedTasks.size() < allTasks.size() * maxFailedTaskPercentage;
+
+ if (isAllTasksEitherIdlingOrFailedTasksHaveBeenRetriedOrTooManyFailedTasks) {
+ log.info("QueryId = %s, noMoreRetry in isAllTasksEitherIdlingOrFailedTasksHaveBeenRetriedOrTooManyFailedTasks. idleRunningHttpRemoteTasks = %s, allTasks = %s, failedTask = %s, retriedFailedTaskCount = %s", getStageExecutionId().getStageId().getQueryId(), idleRunningHttpRemoteTasks.size(), allTasks.size(), failedTasks.size(), retriedFailedTaskCount);
+ }
+
+ return isAllTasksEitherIdlingOrFailedTasksHaveBeenRetriedOrTooManyFailedTasks;
+ }
+
+ private synchronized boolean isFailedTasksBelowThreshold()
+ {
+ // Even though failedTasks and allTasks are marked as Guard, the whole expression need to be evaluated synchronously to avoid failedTasks and allTasks are updated from the callback thread in the middle of the expression evaluation.
+ return failedTasks.size() < allTasks.size() * maxFailedTaskPercentage;
+ }
+// private boolean isRecoverable(List failures)
+// {
+// for (ExecutionFailureInfo failure : failures) {
+// if (!getRecoverableErrorCodes().contains(failure.getErrorCode())) {
+// return false;
+// }
+// }
+// boolean isRecoverable = stageTaskRecoveryCallback.isPresent() && failedTasks.size() < allTasks.size() * maxFailedTaskPercentage;
+// log.info("Failure recovery error check , isRecoverable = %s, failure error codes = %s", isRecoverable, failures.stream().map(failure -> failure.getErrorCode()).collect(toImmutableList()));
+// return isRecoverable;
+// }
+
+ public Set getRecoverableErrorCodes()
+ {
+ return DEFAULT_RECOVERABLE_ERROR_CODES;
}
private synchronized void updateFinalTaskInfo(TaskInfo finalTaskInfo)
@@ -669,7 +827,8 @@ private ExecutionFailureInfo rewriteTransportFailure(ExecutionFailureInfo execut
executionFailureInfo.getErrorLocation(),
REMOTE_HOST_GONE.toErrorCode(),
executionFailureInfo.getRemoteHost(),
- executionFailureInfo.getErrorCause());
+ executionFailureInfo.getErrorCause(),
+ executionFailureInfo.getFailureDetectionTimeInNanos());
}
@Override
@@ -757,4 +916,18 @@ public synchronized void invoke(T payload, Executor executor)
}
}
}
+
+ public String getTaskIdentifier(com.facebook.presto.execution.TaskId taskId)
+ {
+ return new StringBuilder()
+ .append("S")
+ .append(taskId.getStageExecutionId().getStageId().getId())
+ .append(".")
+ .append(taskId.getStageExecutionId().getId())
+ .append(".")
+ .append(taskId.getId())
+ .append(".")
+ .append(taskId.getAttemptNumber())
+ .toString();
+ }
}
diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlTask.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlTask.java
index 4c9645e010c42..4c06a7aeac5d1 100644
--- a/presto-main/src/main/java/com/facebook/presto/execution/SqlTask.java
+++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlTask.java
@@ -35,6 +35,7 @@
import com.facebook.presto.operator.TaskStats;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorMetadataUpdateHandle;
+import com.facebook.presto.spi.NodePoolType;
import com.facebook.presto.spi.connector.ConnectorMetadataUpdater;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.sql.planner.PlanFragment;
@@ -44,6 +45,8 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
+import it.unimi.dsi.fastutil.longs.LongArraySet;
+import it.unimi.dsi.fastutil.longs.LongSet;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@@ -60,6 +63,7 @@
import static com.facebook.presto.execution.TaskState.ABORTED;
import static com.facebook.presto.execution.TaskState.FAILED;
+import static com.facebook.presto.execution.TaskState.GRACEFUL_FAILED;
import static com.facebook.presto.util.Failures.toFailures;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
@@ -72,6 +76,7 @@ public class SqlTask
private static final Logger log = Logger.get(SqlTask.class);
private final TaskId taskId;
+ private final NodePoolType poolType;
private final TaskInstanceId taskInstanceId;
private final URI location;
private final String nodeId;
@@ -100,7 +105,8 @@ public static SqlTask createSqlTask(
Function onDone,
DataSize maxBufferSize,
CounterStat failedTasks,
- SpoolingOutputBufferFactory spoolingOutputBufferFactory)
+ SpoolingOutputBufferFactory spoolingOutputBufferFactory,
+ NodePoolType poolType)
{
SqlTask sqlTask = new SqlTask(
taskId,
@@ -111,7 +117,8 @@ public static SqlTask createSqlTask(
exchangeClientSupplier,
taskNotificationExecutor,
maxBufferSize,
- spoolingOutputBufferFactory);
+ spoolingOutputBufferFactory,
+ poolType);
sqlTask.initialize(onDone, failedTasks);
return sqlTask;
}
@@ -125,7 +132,8 @@ private SqlTask(
ExchangeClientSupplier exchangeClientSupplier,
ExecutorService taskNotificationExecutor,
DataSize maxBufferSize,
- SpoolingOutputBufferFactory spoolingOutputBufferFactory)
+ SpoolingOutputBufferFactory spoolingOutputBufferFactory,
+ NodePoolType poolType)
{
this.taskId = requireNonNull(taskId, "taskId is null");
this.taskInstanceId = new TaskInstanceId(UUID.randomUUID());
@@ -133,6 +141,7 @@ private SqlTask(
this.nodeId = requireNonNull(nodeId, "nodeId is null");
this.queryContext = requireNonNull(queryContext, "queryContext is null");
this.sqlTaskExecutionFactory = requireNonNull(sqlTaskExecutionFactory, "sqlTaskExecutionFactory is null");
+ this.poolType = requireNonNull(poolType, "poolType is null");
requireNonNull(exchangeClientSupplier, "exchangeClientSupplier is null");
requireNonNull(taskNotificationExecutor, "taskNotificationExecutor is null");
requireNonNull(maxBufferSize, "maxBufferSize is null");
@@ -149,6 +158,7 @@ private SqlTask(
() -> queryContext.getTaskContextByTaskId(taskId).localSystemMemoryContext(),
spoolingOutputBufferFactory);
taskStateMachine = new TaskStateMachine(taskId, taskNotificationExecutor);
+ taskStateMachine.setPoolType(poolType);
}
// this is a separate method to ensure that the `this` reference is not leaked during construction
@@ -189,6 +199,9 @@ public void stateChanged(TaskState newState)
// closed buffers signal to upstream tasks that everything finished cleanly
outputBuffer.fail();
}
+ else if (newState == GRACEFUL_FAILED) {
+ outputBuffer.destroy();
+ }
else {
outputBuffer.destroy();
}
@@ -277,6 +290,9 @@ private TaskStatus createTaskStatus(TaskHolder taskHolder)
long fullGcCount = 0;
long fullGcTimeInMillis = 0L;
long totalCpuTimeInNanos = 0L;
+ long retryableSplitCount = 0L;
+ LongSet completedSplits = LongArraySet.of();
+ boolean isTaskIdling = false;
if (taskHolder.getFinalTaskInfo() != null) {
TaskStats taskStats = taskHolder.getFinalTaskInfo().getStats();
queuedPartitionedDrivers = taskStats.getQueuedPartitionedDrivers();
@@ -289,6 +305,7 @@ private TaskStatus createTaskStatus(TaskHolder taskHolder)
fullGcCount = taskStats.getFullGcCount();
fullGcTimeInMillis = taskStats.getFullGcTimeInMillis();
totalCpuTimeInNanos = taskStats.getTotalCpuTimeInNanos();
+ retryableSplitCount = taskStats.getRetryableSplitCount();
}
else if (taskHolder.getTaskExecution() != null) {
long physicalWrittenBytes = 0;
@@ -308,8 +325,10 @@ else if (taskHolder.getTaskExecution() != null) {
completedDriverGroups = taskContext.getCompletedDriverGroups();
fullGcCount = taskContext.getFullGcCount();
fullGcTimeInMillis = taskContext.getFullGcTime().toMillis();
+ completedSplits = taskContext.getCompletedSplitSequenceIds();
+ retryableSplitCount = taskContext.getRetryableSplitCount();
+ isTaskIdling = taskHolder.getTaskExecution().isTaskIdling();
}
-
return new TaskStatus(
taskInstanceId.getUuidLeastSignificantBits(),
taskInstanceId.getUuidMostSignificantBits(),
@@ -317,6 +336,7 @@ else if (taskHolder.getTaskExecution() != null) {
state,
location,
completedDriverGroups,
+ completedSplits,
failures,
queuedPartitionedDrivers,
runningPartitionedDrivers,
@@ -331,7 +351,9 @@ else if (taskHolder.getTaskExecution() != null) {
totalCpuTimeInNanos,
taskStatusAgeInMillis,
queuedPartitionedSplitsWeight,
- runningPartitionedSplitsWeight);
+ runningPartitionedSplitsWeight,
+ retryableSplitCount,
+ isTaskIdling);
}
private TaskStats getTaskStats(TaskHolder taskHolder)
diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecution.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecution.java
index e03b1416132bc..7ed5bd52da41a 100644
--- a/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecution.java
+++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskExecution.java
@@ -14,16 +14,19 @@
package com.facebook.presto.execution;
import com.facebook.airlift.concurrent.SetThreadName;
+import com.facebook.airlift.log.Logger;
import com.facebook.presto.event.SplitMonitor;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.execution.buffer.BufferState;
import com.facebook.presto.execution.buffer.OutputBuffer;
import com.facebook.presto.execution.executor.TaskExecutor;
import com.facebook.presto.execution.executor.TaskHandle;
+import com.facebook.presto.execution.executor.TaskShutdownManager;
import com.facebook.presto.operator.Driver;
import com.facebook.presto.operator.DriverContext;
import com.facebook.presto.operator.DriverFactory;
import com.facebook.presto.operator.DriverStats;
+import com.facebook.presto.operator.HostShuttingDownException;
import com.facebook.presto.operator.PipelineContext;
import com.facebook.presto.operator.PipelineExecutionStrategy;
import com.facebook.presto.operator.StageExecutionDescriptor;
@@ -108,6 +111,7 @@ public class SqlTaskExecution
// In this case,
// * a driver could belong to pipeline 1 and driver life cycle 42.
// * another driver could belong to pipeline 3 and task-wide driver life cycle.
+ private static final Logger log = Logger.get(SqlTaskExecution.class);
private final TaskId taskId;
private final TaskStateMachine taskStateMachine;
@@ -259,12 +263,15 @@ private static TaskHandle createTaskHandle(
LocalExecutionPlan localExecutionPlan,
TaskExecutor taskExecutor)
{
+ TaskShutdownManager taskShutdownManager = new TaskShutdownManager(taskStateMachine, taskContext);
TaskHandle taskHandle = taskExecutor.addTask(
taskStateMachine.getTaskId(),
outputBuffer::getUtilization,
getInitialSplitsPerNode(taskContext.getSession()),
getSplitConcurrencyAdjustmentInterval(taskContext.getSession()),
- getMaxDriversPerTask(taskContext.getSession()));
+ getMaxDriversPerTask(taskContext.getSession()),
+ Optional.of(taskShutdownManager),
+ Optional.of(outputBuffer));
taskStateMachine.addStateChangeListener(state -> {
if (state.isDone()) {
taskExecutor.removeTask(taskHandle);
@@ -548,21 +555,21 @@ private void scheduleDriversForDriverGroupLifeCycle(Lifespan lifespan)
private synchronized void enqueueDriverSplitRunner(boolean forceRunSplit, List runners)
{
// schedule driver to be executed
- List> finishedFutures = taskExecutor.enqueueSplits(taskHandle, forceRunSplit, runners);
+ List> finishedFutures = taskExecutor.enqueueSplits(taskHandle, forceRunSplit, runners);
checkState(finishedFutures.size() == runners.size(), "Expected %s futures but got %s", runners.size(), finishedFutures.size());
// when driver completes, update state and fire events
for (int i = 0; i < finishedFutures.size(); i++) {
- ListenableFuture> finishedFuture = finishedFutures.get(i);
+ ListenableFuture finishedFuture = finishedFutures.get(i);
final DriverSplitRunner splitRunner = runners.get(i);
// record new driver
status.incrementRemainingDriver(splitRunner.getLifespan());
- Futures.addCallback(finishedFuture, new FutureCallback