From 1d40286cc8c11acf1d8873f3f17cc558825e8efb Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Sun, 17 Oct 2021 19:37:26 +0200 Subject: [PATCH] TEZ-4336: ShuffleScheduler should try to report the original exception (when shuffle becomes unhealthy) --- .../common/shuffle/InputAttemptFetchFailure.java | 10 ++++++++++ .../orderedgrouped/FetcherOrderedGrouped.java | 4 ++-- .../shuffle/orderedgrouped/ShuffleScheduler.java | 12 +++++++----- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputAttemptFetchFailure.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputAttemptFetchFailure.java index d94db35c2f..4ce1699cf5 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputAttemptFetchFailure.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputAttemptFetchFailure.java @@ -33,6 +33,7 @@ public class InputAttemptFetchFailure { private final InputAttemptIdentifier inputAttemptIdentifier; private final boolean isLocalFetch; private final boolean isDiskErrorAtSource; + private Throwable cause = null; public InputAttemptFetchFailure(InputAttemptIdentifier inputAttemptIdentifier) { this(inputAttemptIdentifier, false, false); @@ -112,4 +113,13 @@ public String toString() { return String.format("%s, isLocalFetch: %s, isDiskErrorAtSource: %s", inputAttemptIdentifier.toString(), isLocalFetch, isDiskErrorAtSource); } + + public InputAttemptFetchFailure withCause(Throwable throwable) { + this.cause = throwable; + return this; + } + + public Throwable getCause() { + return cause; + } } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index c9bd092f05..a4328af44a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -378,7 +378,7 @@ boolean setupConnection(MapHost host, Collection attempt for (InputAttemptIdentifier left : remaining.values()) { // Need to be handling temporary glitches .. // Report read error to the AM to trigger source failure heuristics - scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(left), host, connectSucceeded, + scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(left).withCause(ie), host, connectSucceeded, !connectSucceeded); } return false; @@ -738,7 +738,7 @@ protected void setupLocalDiskFetch(MapHost host) throws InterruptedException { if (!stopped) { hasFailures = true; ioErrs.increment(1); - scheduler.copyFailed(InputAttemptFetchFailure.fromLocalFetchFailure(srcAttemptId), + scheduler.copyFailed(InputAttemptFetchFailure.fromLocalFetchFailure(srcAttemptId).withCause(e), host, true, false); LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " + host.getHostIdentifier(), e); diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 540d44f409..dd27d45aed 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -178,6 +178,7 @@ enum ShuffleErrors { private final Referee referee; @VisibleForTesting final Map failureCounts = new HashMap(); + final Set uniqueHosts = Sets.newHashSet(); private final Map hostFailures = new HashMap(); private final InputContext inputContext; @@ -792,7 +793,7 @@ public synchronized void copyFailed(InputAttemptFetchFailure fetchFailure, MapHo } //Restart consumer in case shuffle is not healthy - if (!isShuffleHealthy(fetchFailure.getInputAttemptIdentifier())) { + if (!isShuffleHealthy(fetchFailure)) { return; } @@ -1006,8 +1007,8 @@ private boolean isFetcherHealthy(String logContext) { return fetcherHealthy; } - boolean isShuffleHealthy(InputAttemptIdentifier srcAttempt) { - + boolean isShuffleHealthy(InputAttemptFetchFailure fetchFailure) { + InputAttemptIdentifier srcAttempt = fetchFailure.getInputAttemptIdentifier(); if (isAbortLimitExceeedFor(srcAttempt)) { return false; } @@ -1049,14 +1050,15 @@ boolean isShuffleHealthy(InputAttemptIdentifier srcAttempt) { + ", pendingInputs=" + (numInputs - doneMaps) + ", fetcherHealthy=" + fetcherHealthy + ", reducerProgressedEnough=" + reducerProgressedEnough - + ", reducerStalled=" + reducerStalled) + + ", reducerStalled=" + reducerStalled + + ", hostFailures=" + hostFailures) + "]"; LOG.error(errorMsg); if (LOG.isDebugEnabled()) { LOG.debug("Host failures=" + hostFailures.keySet()); } // Shuffle knows how to deal with failures post shutdown via the onFailure hook - exceptionReporter.reportException(new IOException(errorMsg)); + exceptionReporter.reportException(new IOException(errorMsg, fetchFailure.getCause())); return false; } return true;