diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 470b04cc5f..0812c6a610 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -902,29 +902,21 @@ private void informAM(InputAttemptFetchFailure fetchFailure) { private boolean hasFailedAcrossNodes(String logContext) { int numUniqueHosts = uniqueHosts.size(); Preconditions.checkArgument(numUniqueHosts > 0, "No values in unique hosts"); - int threshold = Math.max(3, - (int) Math.ceil(numUniqueHosts * hostFailureFraction)); - int total = 0; - boolean failedAcrossNodes = false; + int threshold = Math.max(3, (int) Math.ceil(numUniqueHosts * hostFailureFraction)); + int totalFailures = 0; + int totalThreshold = threshold * minFailurePerHost; for(HostPort host : uniqueHosts) { IntWritable failures = hostFailures.get(host); if (failures != null && failures.get() > minFailurePerHost) { - total++; - failedAcrossNodes = (total > (threshold * minFailurePerHost)); - if (failedAcrossNodes) { - break; + totalFailures++; + if (totalFailures > totalThreshold) { + LOG.info("Number of failures across nodes ({}) has exceeded the total threshold limit ({}) " + + "for InputAttemptIdentifier: {}", totalFailures, totalThreshold, logContext); + return true; } } } - - LOG.info(logContext + ", numUniqueHosts=" + numUniqueHosts - + ", hostFailureThreshold=" + threshold - + ", hostFailuresCount=" + hostFailures.size() - + ", hosts crossing threshold=" + total - + ", reducerFetchIssues=" + failedAcrossNodes - ); - - return failedAcrossNodes; + return false; } private boolean allEventsReceived() {