diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index bcd9c7a5c8..2d1f58b3c9 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -841,7 +841,17 @@ public TezConfiguration(boolean loadDefaults) { public static final boolean TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT = false; /** - * Int value. Upper limit on the number of threads used to delete DAG directories on nodes. + * Boolean value. Instructs AM to delete intermediate attempt data for failed task attempts. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="boolean") + public static final String TEZ_AM_TASK_ATTEMPT_CLEANUP_ON_FAILURE = TEZ_AM_PREFIX + + "task.attempt.cleanup.on.failure"; + public static final boolean TEZ_AM_TASK_ATTEMPT_CLEANUP_ON_FAILURE_DEFAULT = false; + + /** + * Int value. Upper limit on the number of threads used to delete DAG directories and failed task attempts + * directories on nodes. */ @ConfigurationScope(Scope.AM) @ConfigurationProperty(type="integer") diff --git a/tez-common/src/main/java/org/apache/tez/common/DagContainerLauncher.java b/tez-common/src/main/java/org/apache/tez/common/DagContainerLauncher.java index e3bd385a1a..6bda0a8da3 100644 --- a/tez-common/src/main/java/org/apache/tez/common/DagContainerLauncher.java +++ b/tez-common/src/main/java/org/apache/tez/common/DagContainerLauncher.java @@ -20,8 +20,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; @@ -40,4 +42,7 @@ public DagContainerLauncher(ContainerLauncherContext containerLauncherContext) { } public abstract void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager); + + public abstract void taskAttemptFailed(TezTaskAttemptID taskAttemptID, + JobTokenSecretManager jobTokenSecretManager, NodeId nodeId); } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index fc01b8b6bf..f1486e8ba0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -68,6 +68,7 @@ import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Options; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.client.CallerContext; import org.apache.tez.client.TezClientUtils; import org.apache.tez.common.TezUtils; @@ -179,6 +180,7 @@ import org.apache.tez.dag.history.events.DAGSubmittedEvent; import org.apache.tez.dag.history.utils.DAGUtils; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.utils.Graph; import org.apache.tez.dag.utils.RelocalizationUtils; @@ -2781,4 +2783,7 @@ String buildPluginComponentLog(List namedEntityDescriptor return sb.toString(); } + public void taskAttemptFailed(TezTaskAttemptID attemptID, NodeId nodeId) { + getContainerLauncherManager().taskAttemptFailed(attemptID, jobTokenSecretManager, nodeId); + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index cb8545f8ca..b840798c26 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -39,13 +39,13 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.dag.app.dag.event.TaskEventTAFailed; import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; @@ -1263,6 +1263,13 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) { } // The attempt would have informed the scheduler about it's failure + // Delete the intermediate shuffle data for failed task attempt + TaskAttempt taskAttempt = task.getAttempt(castEvent.getTaskAttemptID()); + if (taskAttempt.getAssignedContainer() != null) { + NodeId nodeId = taskAttempt.getAssignedContainer().getNodeId(); + task.appContext.getAppMaster().taskAttemptFailed(taskAttempt.getID(), nodeId); + } + task.taskAttemptStatus.put(castEvent.getTaskAttemptID().getId(), true); if (task.failedAttempts < task.maxFailedAttempts && castEvent.getTaskFailureType() == TaskFailureType.NON_FATAL) { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java index d55787e25a..b0e0f0cf0d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.Utils; @@ -35,6 +36,7 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; @@ -200,6 +202,12 @@ public void dagComplete(TezDAGID dag, JobTokenSecretManager secretManager) { } } + public void taskAttemptFailed(TezTaskAttemptID taskAttemptID, JobTokenSecretManager secretManager, NodeId nodeId) { + for (int i = 0; i < containerLaunchers.length; i++) { + containerLaunchers[i].taskAttemptFailed(taskAttemptID, secretManager, nodeId); + } + } + public void dagSubmitted() { // Nothing to do right now. Indicates that a new DAG has been submitted and // the context has updated information. diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java index 8ecac14856..5d262bdab4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java @@ -15,8 +15,10 @@ package org.apache.tez.dag.app.launcher; import org.apache.tez.common.DagContainerLauncher; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerStopRequest; @@ -46,4 +48,11 @@ public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManage ((DagContainerLauncher)real).dagComplete(dag, jobTokenSecretManager); } } + + public void taskAttemptFailed(TezTaskAttemptID taskAttemptID, JobTokenSecretManager jobTokenSecretManager, + NodeId nodeId) { + if (real instanceof DagContainerLauncher) { + ((DagContainerLauncher) real).taskAttemptFailed(taskAttemptID, jobTokenSecretManager, nodeId); + } + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java index 27ece70513..87b7366bfc 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; public abstract class DeletionTracker { @@ -35,6 +36,11 @@ public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManage //do nothing } + public void taskAttemptFailed(TezTaskAttemptID taskAttemptID, JobTokenSecretManager jobTokenSecretManager, + NodeId nodeId) { + //do nothing + } + public void addNodeShufflePort(NodeId nodeId, int port) { //do nothing } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java index 06dae2d2b0..e4204bfc71 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java @@ -34,6 +34,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.hadoop.conf.Configuration; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +71,26 @@ public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManage } } + @Override + public void taskAttemptFailed(TezTaskAttemptID taskAttemptID, JobTokenSecretManager jobTokenSecretManager, + NodeId nodeId) { + super.taskAttemptFailed(taskAttemptID, jobTokenSecretManager, nodeId); + if (nodeIdShufflePortMap == null || nodeIdShufflePortMap.get(nodeId) == null) { + LOG.warn("Unable to find the shuffle port for shuffle data deletion of failed task attempt."); + return; + } + int shufflePort = nodeIdShufflePortMap.get(nodeId); + if (shufflePort != TezRuntimeUtils.INVALID_PORT) { + TaskAttemptFailedRunnable taskAttemptFailedRunnable = new TaskAttemptFailedRunnable(nodeId, shufflePort, + taskAttemptID, TezRuntimeUtils.getHttpConnectionParams(conf), jobTokenSecretManager); + try { + dagCleanupService.submit(taskAttemptFailedRunnable); + } catch (RejectedExecutionException rejectedException) { + LOG.info("Ignoring failed task attempt deletion request for " + taskAttemptFailedRunnable); + } + } + } + @Override public void addNodeShufflePort(NodeId nodeId, int port) { if (port != TezRuntimeUtils.INVALID_PORT) { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index ae308098a7..ebc8f95566 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -43,11 +43,13 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.DagContainerLauncher; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; @@ -93,6 +95,8 @@ public class LocalContainerLauncher extends DagContainerLauncher { private final boolean isLocalMode; int shufflePort = TezRuntimeUtils.INVALID_PORT; private DeletionTracker deletionTracker; + private boolean dagDelete; + private boolean failedTaskAttemptDelete; private final ConcurrentHashMap> runningContainers = @@ -155,10 +159,14 @@ public LocalContainerLauncher(ContainerLauncherContext containerLauncherContext, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LocalTaskExecutionThread #%d") .build()); this.taskExecutorService = MoreExecutors.listeningDecorator(rawExecutor); - boolean cleanupDagDataOnComplete = ShuffleUtils.isTezShuffleHandler(conf) - && conf.getBoolean(TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION, + dagDelete = ShuffleUtils.isTezShuffleHandler(conf) && + conf.getBoolean(TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION, TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT); - if (cleanupDagDataOnComplete) { + failedTaskAttemptDelete = ShuffleUtils.isTezShuffleHandler(conf) && + conf.getBoolean(TezConfiguration.TEZ_AM_TASK_ATTEMPT_CLEANUP_ON_FAILURE, + TezConfiguration.TEZ_AM_TASK_ATTEMPT_CLEANUP_ON_FAILURE_DEFAULT); + + if (dagDelete || failedTaskAttemptDelete) { String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS, TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT); deletionTracker = ReflectionUtils.createClazzInstance( @@ -441,9 +449,16 @@ public void stopContainer(ContainerStopRequest stopRequest) { @Override public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) { - if (deletionTracker != null) { + if (dagDelete && deletionTracker != null) { deletionTracker.dagComplete(dag, jobTokenSecretManager); } } + @Override + public void taskAttemptFailed(TezTaskAttemptID taskAttemptID, JobTokenSecretManager jobTokenSecretManager, + NodeId nodeId) { + if (failedTaskAttemptDelete && deletionTracker != null) { + deletionTracker.taskAttemptFailed(taskAttemptID, jobTokenSecretManager, nodeId); + } + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TaskAttemptFailedDeleteRunnable.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TaskAttemptFailedDeleteRunnable.java new file mode 100644 index 0000000000..d2587b5190 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TaskAttemptFailedDeleteRunnable.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.launcher; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.http.BaseHttpConnection; +import org.apache.tez.http.HttpConnectionParams; +import org.apache.tez.runtime.library.common.TezRuntimeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; + +class TaskAttemptFailedRunnable implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(TaskAttemptFailedRunnable.class); + private final NodeId nodeId; + private final TezTaskAttemptID taskAttemptID; + private final JobTokenSecretManager jobTokenSecretManager; + private final int shufflePort; + private final HttpConnectionParams httpConnectionParams; + + TaskAttemptFailedRunnable(NodeId nodeId, int shufflePort, TezTaskAttemptID taskAttemptID, + HttpConnectionParams httpConnectionParams, + JobTokenSecretManager jobTokenSecretMgr) { + this.nodeId = nodeId; + this.shufflePort = shufflePort; + this.taskAttemptID = taskAttemptID; + this.httpConnectionParams = httpConnectionParams; + this.jobTokenSecretManager = jobTokenSecretMgr; + } + + @Override + public void run() { + BaseHttpConnection httpConnection = null; + try { + URL baseURL = TezRuntimeUtils.constructBaseURIForShuffleHandlerTaskAttemptFailed( + nodeId.getHost(), shufflePort, taskAttemptID.getTaskID().getVertexID().getDAGId(). + getApplicationId().toString(), taskAttemptID.getTaskID().getVertexID().getDAGId().getId(), + taskAttemptID.toString(), false); + httpConnection = TezRuntimeUtils.getHttpConnection(true, baseURL, httpConnectionParams, + "FailedTaskAttemptDelete", jobTokenSecretManager); + httpConnection.connect(); + httpConnection.getInputStream(); + } catch (Exception e) { + LOG.warn("Could not setup HTTP Connection to the node " + nodeId.getHost() + + " for failed task attempt delete. ", e); + } finally { + try { + if (httpConnection != null) { + httpConnection.cleanup(true); + } + } catch (IOException ioe) { + LOG.warn("Encountered IOException for " + nodeId.getHost() + " during close. ", ioe); + } + } + } + + @Override + public String toString() { + return "TaskAttemptFailedRunnable nodeId=" + nodeId + ", shufflePort=" + shufflePort + ", taskAttemptId=" + + taskAttemptID.toString(); + } +} diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java index f5be74683e..88ed4f7b89 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java @@ -34,6 +34,7 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.DagContainerLauncher; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezUtils; @@ -41,6 +42,7 @@ import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; @@ -90,6 +92,8 @@ public class TezContainerLauncherImpl extends DagContainerLauncher { private ContainerManagementProtocolProxy cmProxy; private AtomicBoolean serviceStopped = new AtomicBoolean(false); private DeletionTracker deletionTracker = null; + private boolean dagDelete; + private boolean failedTaskAttemptDelete; private Container getContainer(ContainerOp event) { ContainerId id = event.getBaseOperation().getContainerId(); @@ -332,10 +336,14 @@ public void run() { }; eventHandlingThread.setName("ContainerLauncher Event Handler"); eventHandlingThread.start(); - boolean cleanupDagDataOnComplete = ShuffleUtils.isTezShuffleHandler(conf) - && conf.getBoolean(TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION, - TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT); - if (cleanupDagDataOnComplete) { + dagDelete = ShuffleUtils.isTezShuffleHandler(conf) && + conf.getBoolean(TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION, + TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT); + failedTaskAttemptDelete = ShuffleUtils.isTezShuffleHandler(conf) && + conf.getBoolean(TezConfiguration.TEZ_AM_TASK_ATTEMPT_CLEANUP_ON_FAILURE, + TezConfiguration.TEZ_AM_TASK_ATTEMPT_CLEANUP_ON_FAILURE_DEFAULT); + + if (dagDelete || failedTaskAttemptDelete) { String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS, TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT); deletionTracker = ReflectionUtils.createClazzInstance( @@ -441,9 +449,16 @@ public void stopContainer(ContainerStopRequest stopRequest) { @Override public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) { - if (deletionTracker != null) { + if (dagDelete && deletionTracker != null) { deletionTracker.dagComplete(dag, jobTokenSecretManager); } } + @Override + public void taskAttemptFailed(TezTaskAttemptID taskAttemptID, JobTokenSecretManager jobTokenSecretManager, + NodeId nodeId) { + if (failedTaskAttemptDelete && deletionTracker != null) { + deletionTracker.taskAttemptFailed(taskAttemptID, jobTokenSecretManager, nodeId); + } + } } diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherWrapper.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherWrapper.java index 8778f32d58..c4f4eff0cf 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherWrapper.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherWrapper.java @@ -24,7 +24,7 @@ public class TestContainerLauncherWrapper { @Test(timeout = 5000) public void testDelegation() throws Exception { PluginWrapperTestHelpers.testDelegation(ContainerLauncherWrapper.class, ContainerLauncher.class, - Sets.newHashSet("getContainerLauncher", "dagComplete")); + Sets.newHashSet("getContainerLauncher", "dagComplete", "taskAttemptFailed")); } } diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java index 1bdddde529..610a260f58 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java @@ -57,6 +57,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputByteBuffer; @@ -998,6 +1000,7 @@ private void handleRequest(ChannelHandlerContext ctx, FullHttpRequest request) final Map> q = new QueryStringDecoder(request.getUri()).parameters(); final List keepAliveList = q.get("keepAlive"); final List dagCompletedQ = q.get("dagAction"); + final List taskAttemptFailedQ = q.get("taskAttemptAction"); boolean keepAliveParam = false; if (keepAliveList != null && keepAliveList.size() == 1) { keepAliveParam = Boolean.parseBoolean(keepAliveList.get(0)); @@ -1019,6 +1022,9 @@ private void handleRequest(ChannelHandlerContext ctx, FullHttpRequest request) if (deleteDagDirectories(ctx.channel(), dagCompletedQ, jobQ, dagIdQ)) { return; } + if (deleteTaskAttemptDirectories(ctx.channel(), taskAttemptFailedQ, jobQ, dagIdQ, mapIds)) { + return; + } if (mapIds == null || reduceRange == null || jobQ == null || dagIdQ == null) { sendError(ctx, "Required param job, dag, map and reduce", BAD_REQUEST); return; @@ -1098,14 +1104,24 @@ private void handleRequest(ChannelHandlerContext ctx, FullHttpRequest request) ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); } + private boolean isNullOrEmpty(List entries) { + return entries == null || entries.isEmpty(); + } + + private boolean notEmptyAndContains(List entries, String key) { + if (entries == null || entries.isEmpty()) { + return false; + } + return entries.get(0).contains(key); + } + private boolean deleteDagDirectories(Channel channel, List dagCompletedQ, List jobQ, List dagIdQ) { if (jobQ == null || jobQ.isEmpty()) { return false; } - if (dagCompletedQ != null && !dagCompletedQ.isEmpty() && dagCompletedQ.get(0).contains("delete") - && dagIdQ != null && !dagIdQ.isEmpty()) { + if (notEmptyAndContains(dagCompletedQ,"delete") && !isNullOrEmpty(dagIdQ)) { String base = getDagLocation(jobQ.get(0), dagIdQ.get(0), userRsrc.get(jobQ.get(0))); try { FileContext lfc = FileContext.getLocalFSFileContext(); @@ -1122,6 +1138,40 @@ private boolean deleteDagDirectories(Channel channel, return false; } + private boolean deleteTaskAttemptDirectories(Channel channel, List taskAttemptFailedQ, + List jobQ, List dagIdQ, List taskAttemptIdQ) { + if (jobQ == null || jobQ.isEmpty()) { + return false; + } + if (notEmptyAndContains(taskAttemptFailedQ,"delete") && !isNullOrEmpty(taskAttemptIdQ)) { + for (String taskAttemptId : taskAttemptIdQ) { + String baseStr = getBaseLocation(jobQ.get(0), dagIdQ.get(0), userRsrc.get(jobQ.get(0))); + try { + FileSystem fs = FileSystem.getLocal(conf).getRaw(); + for (Path basePath : lDirAlloc.getAllLocalPathsToRead(baseStr, conf)) { + for (FileStatus fileStatus : fs.listStatus(basePath)) { + Path taskAttemptPath = fileStatus.getPath(); + if (taskAttemptPath.getName().startsWith(taskAttemptId)) { + if (fs.delete(taskAttemptPath, true)) { + LOG.info("Deleted directory : " + taskAttemptPath); + // remove entry from IndexCache + indexCache.removeMap(taskAttemptPath.getName()); + break; + } + } + } + } + } catch (IOException e) { + LOG.warn("Encountered exception during failed task attempt delete " + e); + } + } + channel.writeAndFlush(new DefaultHttpResponse(HTTP_1_1, OK)) + .addListener(ChannelFutureListener.CLOSE); + return true; + } + return false; + } + /** * Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend * and increments it. This method is first called by channelRead() diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java index 2bf9cb2ad0..ba8da1bdf6 100644 --- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java @@ -1263,6 +1263,88 @@ protected void sendError(ChannelHandlerContext ctx, String message, } } + @Test(timeout = 5000) + public void testFailedTaskAttemptDelete() throws Exception { + final ArrayList failures = new ArrayList(1); + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "simple"); + UserGroupInformation.setConfiguration(conf); + File absLogDir = new File("target", TestShuffleHandler.class. + getSimpleName() + "LocDir").getAbsoluteFile(); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); + ApplicationId appId = ApplicationId.newInstance(12345, 1); + String appAttemptId = "attempt_12345_1_m_1_0"; + String user = "randomUser"; + List fileMap = new ArrayList(); + String taskAttemptDirStr = + StringUtils.join(Path.SEPARATOR, + new String[] {absLogDir.getAbsolutePath(), + ShuffleHandler.USERCACHE, user, + ShuffleHandler.APPCACHE, appId.toString(), "dag_1/output/", appAttemptId}); + File taskAttemptDir = new File(taskAttemptDirStr); + Assert.assertFalse("Task Attempt Directory should not exist", taskAttemptDir.exists()); + createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, + conf, fileMap); + ShuffleHandler shuffleHandler = new ShuffleHandler() { + @Override + protected Shuffle getShuffle(Configuration conf) { + // replace the shuffle handler with one stubbed for testing + return new Shuffle(conf) { + @Override + protected void sendError(ChannelHandlerContext ctx, String message, + HttpResponseStatus status) { + if (failures.size() == 0) { + failures.add(new Error(message)); + ctx.channel().close(); + } + } + }; + } + }; + shuffleHandler.init(conf); + try { + shuffleHandler.start(); + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + outputBuffer.reset(); + Token jt = + new Token("identifier".getBytes(), + "password".getBytes(), new Text(user), new Text("shuffleService")); + jt.write(outputBuffer); + shuffleHandler + .initializeApplication(new ApplicationInitializationContext(user, + appId, ByteBuffer.wrap(outputBuffer.getData(), 0, + outputBuffer.getLength()))); + URL url = + new URL( + "http://127.0.0.1:" + + shuffleHandler.getConfig().get( + ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + + "/mapOutput?taskAttemptAction=delete&job=job_12345_0001&dag=1&map=" + appAttemptId); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + Assert.assertTrue("Task Attempt Directory does not exist!", taskAttemptDir.exists()); + conn.connect(); + try { + DataInputStream is = new DataInputStream(conn.getInputStream()); + is.close(); + Assert.assertFalse("Task Attempt file was not deleted!", taskAttemptDir.exists()); + } catch (EOFException e) { + // ignore + } + Assert.assertEquals("sendError called due to shuffle error", + 0, failures.size()); + } finally { + shuffleHandler.stop(); + FileUtil.fullyDelete(absLogDir); + } + } + @Test(timeout = 4000) public void testSendMapCount() throws Exception { final List listenerList = diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java index 9d9b8c16c0..48b23bc694 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java @@ -187,6 +187,25 @@ public static URL constructBaseURIForShuffleHandlerDagComplete( return new URL(sb.toString()); } + public static URL constructBaseURIForShuffleHandlerTaskAttemptFailed( + String host, int port, String appId, int dagIdentifier, String taskAttemptIdentifier, boolean sslShuffle) + throws MalformedURLException { + String httpProtocol = (sslShuffle) ? "https://" : "http://"; + StringBuilder sb = new StringBuilder(httpProtocol); + sb.append(host); + sb.append(":"); + sb.append(port); + sb.append("/"); + sb.append("mapOutput?taskAttemptAction=delete"); + sb.append("&job="); + sb.append(appId.replace("application", "job")); + sb.append("&dag="); + sb.append(String.valueOf(dagIdentifier)); + sb.append("&map="); + sb.append(String.valueOf(taskAttemptIdentifier)); + return new URL(sb.toString()); + } + public static HttpConnectionParams getHttpConnectionParams(Configuration conf) { int connectionTimeout = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,