diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 470b04cc5f..967f58250e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -759,20 +759,22 @@ private void logProgress() { } } - public synchronized void copyFailed(InputAttemptFetchFailure fetchFailure, MapHost host, + public void copyFailed(InputAttemptFetchFailure fetchFailure, MapHost host, boolean readError, boolean connectError) { failedShuffleCounter.increment(1); inputContext.notifyProgress(); - int failures = incrementAndGetFailureAttempt(fetchFailure.getInputAttemptIdentifier()); + int failures; - if (!fetchFailure.isLocalFetch()) { - /** - * Track the number of failures that has happened since last completion. - * This gets reset on a successful copy. - */ - failedShufflesSinceLastCompletion++; + synchronized (this) { + failures = incrementAndGetFailureAttempt(fetchFailure.getInputAttemptIdentifier()); + if (!fetchFailure.isLocalFetch()) { + /** + * Track the number of failures that has happened since last completion. + * This gets reset on a successful copy. + */ + failedShufflesSinceLastCompletion++; + } } - /** * Inform AM: * - In case of read/connect error @@ -794,14 +796,18 @@ public synchronized void copyFailed(InputAttemptFetchFailure fetchFailure, MapHo } //Restart consumer in case shuffle is not healthy - if (!isShuffleHealthy(fetchFailure)) { + try { + checkShuffleHealthy(fetchFailure); + } catch (IOException e) { + // reportException should be called outside synchronized(this) due to TEZ-4334 + exceptionReporter.reportException(e); return; } penalizeHost(host, failures); } - private boolean isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) { + private void isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) throws IOException { int attemptFailures = getFailureCount(srcAttempt); if (attemptFailures >= abortFailureLimit) { // This task has seen too many fetch failures - report it as failed. The @@ -816,15 +822,11 @@ private boolean isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) { inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier(), srcAttempt.getAttemptNumber()) + ". threshold=" + abortFailureLimit; - IOException ioe = new IOException(errorMsg); - // Shuffle knows how to deal with failures post shutdown via the onFailure hook - exceptionReporter.reportException(ioe); - return true; + throw new IOException(errorMsg); } - return false; } - private void penalizeHost(MapHost host, int failures) { + private synchronized void penalizeHost(MapHost host, int failures) { host.penalize(); HostPort hostPort = new HostPort(host.getHost(), host.getPort()); @@ -1008,14 +1010,15 @@ private boolean isFetcherHealthy(String logContext) { return fetcherHealthy; } - boolean isShuffleHealthy(InputAttemptFetchFailure fetchFailure) { + /** + * This method checks if the current shuffle is healthy and throw IOException if it's not, + * then the caller is supposed to handle the IOException. + */ + private synchronized void checkShuffleHealthy(InputAttemptFetchFailure fetchFailure) + throws IOException { InputAttemptIdentifier srcAttempt = fetchFailure.getInputAttemptIdentifier(); - if (isAbortLimitExceeedFor(srcAttempt)) { - return false; - } - - final float MIN_REQUIRED_PROGRESS_PERCENT = minReqProgressFraction; - final float MAX_ALLOWED_STALL_TIME_PERCENT = maxStallTimeFraction; + // supposed to throw IOException if exceeded + isAbortLimitExceeedFor(srcAttempt); int doneMaps = numInputs - remainingMaps.get(); @@ -1025,7 +1028,7 @@ boolean isShuffleHealthy(InputAttemptFetchFailure fetchFailure) { // check if the reducer has progressed enough boolean reducerProgressedEnough = (((float)doneMaps / numInputs) - >= MIN_REQUIRED_PROGRESS_PERCENT); + >= minReqProgressFraction); // check if the reducer is stalled for a long time // duration for which the reducer is stalled @@ -1038,7 +1041,7 @@ boolean isShuffleHealthy(InputAttemptFetchFailure fetchFailure) { boolean reducerStalled = (shuffleProgressDuration > 0) && (((float)stallDuration / shuffleProgressDuration) - >= MAX_ALLOWED_STALL_TIME_PERCENT); + >= maxStallTimeFraction); // kill if not healthy and has insufficient progress if ((failureCounts.size() >= maxFailedUniqueFetches || @@ -1059,10 +1062,8 @@ boolean isShuffleHealthy(InputAttemptFetchFailure fetchFailure) { LOG.debug("Host failures=" + hostFailures.keySet()); } // Shuffle knows how to deal with failures post shutdown via the onFailure hook - exceptionReporter.reportException(new IOException(errorMsg, fetchFailure.getCause())); - return false; + throw new IOException(errorMsg, fetchFailure.getCause()); } - return true; } public synchronized void addKnownMapOutput(String inputHostName,