Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -572,4 +572,12 @@ public static int getJavaVersion() {
? Integer.parseInt(javaVersionString.split("\\.")[1]) // "1.8" -> 8
: Integer.parseInt(javaVersionString.split("\\.")[0]); // "9.x" -> 9, "11.x" -> 11
}

public static Throwable findRootCause(Throwable throwable) {
Throwable rootCause = throwable;
while (rootCause.getCause() != null && rootCause.getCause() != rootCause) {
rootCause = rootCause.getCause();
}
return rootCause;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
Expand Down Expand Up @@ -57,6 +58,7 @@
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.RunnableWithNdc;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.dag.api.InputDescriptor;
Expand Down Expand Up @@ -430,11 +432,11 @@ public void close() throws Exception {
Thread.interrupted();
if (eventRouterThread != null) {
eventRouterThread.interrupt();
LOG.info("Joining on EventRouter");
LOG.info("Joining on EventRouter (on close)");
try {
eventRouterThread.join();
} catch (InterruptedException e) {
LOG.info("Ignoring interrupt while waiting for the router thread to die");
LOG.info("Ignoring interrupt while waiting for the router thread to die (on close)");
Thread.currentThread().interrupt();
}
eventRouterThread = null;
Expand Down Expand Up @@ -748,6 +750,10 @@ private boolean handleEvent(TezEvent e) {
break;
}
} catch (Throwable t) {
if (this.state.get() == State.CLOSED && exceptionCanBeIgnoredInAFinishedTask(t)) {
LOG.info("Ignoring exception while handling an event (as task is already closed)", t);
return true;
}
LOG.warn("Failed to handle event", t);
registerError();
EventMetaData sourceInfo = new EventMetaData(
Expand All @@ -765,6 +771,13 @@ private boolean handleEvent(TezEvent e) {
return true;
}

private boolean exceptionCanBeIgnoredInAFinishedTask(Throwable t) {
Throwable rootCause = TezCommonUtils.findRootCause(t);
LOG.debug("Checking exception in exceptionCanBeIgnoredInAFinishedTask: {}", rootCause.getClass());
return rootCause instanceof InterruptedException || // any kind of thread interruption
rootCause instanceof ClosedByInterruptException; // exception thrown from MR inputs
}

@Override
public int getMaxEventsToHandle() {
return Math.max(0, maxEventBacklog - eventsToBeProcessed.size());
Expand Down Expand Up @@ -856,11 +869,11 @@ public void cleanup() throws InterruptedException {
setTaskDone();
if (eventRouterThread != null) {
eventRouterThread.interrupt();
LOG.info("Joining on EventRouter");
LOG.info("Joining on EventRouter (on cleanup)");
try {
eventRouterThread.join();
} catch (InterruptedException e) {
LOG.info("Ignoring interrupt while waiting for the router thread to die");
LOG.info("Ignoring interrupt while waiting for the router thread to die (on cleanup)");
Thread.currentThread().interrupt();
}
eventRouterThread = null;
Expand Down