diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java index 3163968908..370711cc91 100644 --- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java +++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java @@ -572,4 +572,12 @@ public static int getJavaVersion() { ? Integer.parseInt(javaVersionString.split("\\.")[1]) // "1.8" -> 8 : Integer.parseInt(javaVersionString.split("\\.")[0]); // "9.x" -> 9, "11.x" -> 11 } + + public static Throwable findRootCause(Throwable throwable) { + Throwable rootCause = throwable; + while (rootCause.getCause() != null && rootCause.getCause() != rootCause) { + rootCause = rootCause.getCause(); + } + return rootCause; + } } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 583cc0099a..2dfc18a627 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -24,6 +24,7 @@ import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; @@ -57,6 +58,7 @@ import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.RunnableWithNdc; +import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezExecutors; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.dag.api.InputDescriptor; @@ -430,11 +432,11 @@ public void close() throws Exception { Thread.interrupted(); if (eventRouterThread != null) { eventRouterThread.interrupt(); - LOG.info("Joining on EventRouter"); + LOG.info("Joining on EventRouter (on close)"); try { eventRouterThread.join(); } catch (InterruptedException e) { - LOG.info("Ignoring interrupt while waiting for the router thread to die"); + LOG.info("Ignoring interrupt while waiting for the router thread to die (on close)"); Thread.currentThread().interrupt(); } eventRouterThread = null; @@ -748,6 +750,10 @@ private boolean handleEvent(TezEvent e) { break; } } catch (Throwable t) { + if (this.state.get() == State.CLOSED && exceptionCanBeIgnoredInAFinishedTask(t)) { + LOG.info("Ignoring exception while handling an event (as task is already closed)", t); + return true; + } LOG.warn("Failed to handle event", t); registerError(); EventMetaData sourceInfo = new EventMetaData( @@ -765,6 +771,13 @@ private boolean handleEvent(TezEvent e) { return true; } + private boolean exceptionCanBeIgnoredInAFinishedTask(Throwable t) { + Throwable rootCause = TezCommonUtils.findRootCause(t); + LOG.debug("Checking exception in exceptionCanBeIgnoredInAFinishedTask: {}", rootCause.getClass()); + return rootCause instanceof InterruptedException || // any kind of thread interruption + rootCause instanceof ClosedByInterruptException; // exception thrown from MR inputs + } + @Override public int getMaxEventsToHandle() { return Math.max(0, maxEventBacklog - eventsToBeProcessed.size()); @@ -856,11 +869,11 @@ public void cleanup() throws InterruptedException { setTaskDone(); if (eventRouterThread != null) { eventRouterThread.interrupt(); - LOG.info("Joining on EventRouter"); + LOG.info("Joining on EventRouter (on cleanup)"); try { eventRouterThread.join(); } catch (InterruptedException e) { - LOG.info("Ignoring interrupt while waiting for the router thread to die"); + LOG.info("Ignoring interrupt while waiting for the router thread to die (on cleanup)"); Thread.currentThread().interrupt(); } eventRouterThread = null;