Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -759,20 +759,22 @@ private void logProgress() {
}
}

public synchronized void copyFailed(InputAttemptFetchFailure fetchFailure, MapHost host,
public void copyFailed(InputAttemptFetchFailure fetchFailure, MapHost host,
boolean readError, boolean connectError) {
failedShuffleCounter.increment(1);
inputContext.notifyProgress();
int failures = incrementAndGetFailureAttempt(fetchFailure.getInputAttemptIdentifier());
int failures;

if (!fetchFailure.isLocalFetch()) {
/**
* Track the number of failures that has happened since last completion.
* This gets reset on a successful copy.
*/
failedShufflesSinceLastCompletion++;
synchronized (this) {
failures = incrementAndGetFailureAttempt(fetchFailure.getInputAttemptIdentifier());
if (!fetchFailure.isLocalFetch()) {
/**
* Track the number of failures that has happened since last completion.
* This gets reset on a successful copy.
*/
failedShufflesSinceLastCompletion++;
}
}

/**
* Inform AM:
* - In case of read/connect error
Expand All @@ -794,14 +796,18 @@ public synchronized void copyFailed(InputAttemptFetchFailure fetchFailure, MapHo
}

//Restart consumer in case shuffle is not healthy
if (!isShuffleHealthy(fetchFailure)) {
try {
checkShuffleHealthy(fetchFailure);
} catch (IOException e) {
// reportException should be called outside synchronized(this) due to TEZ-4334
exceptionReporter.reportException(e);
return;
}

penalizeHost(host, failures);
}

private boolean isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) {
private void isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) throws IOException {
int attemptFailures = getFailureCount(srcAttempt);
if (attemptFailures >= abortFailureLimit) {
// This task has seen too many fetch failures - report it as failed. The
Expand All @@ -816,15 +822,11 @@ private boolean isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) {
inputContext.getSourceVertexName(),
srcAttempt.getInputIdentifier(),
srcAttempt.getAttemptNumber()) + ". threshold=" + abortFailureLimit;
IOException ioe = new IOException(errorMsg);
// Shuffle knows how to deal with failures post shutdown via the onFailure hook
exceptionReporter.reportException(ioe);
return true;
throw new IOException(errorMsg);
}
return false;
}

private void penalizeHost(MapHost host, int failures) {
private synchronized void penalizeHost(MapHost host, int failures) {
host.penalize();

HostPort hostPort = new HostPort(host.getHost(), host.getPort());
Expand Down Expand Up @@ -1008,14 +1010,15 @@ private boolean isFetcherHealthy(String logContext) {
return fetcherHealthy;
}

boolean isShuffleHealthy(InputAttemptFetchFailure fetchFailure) {
/**
* This method checks if the current shuffle is healthy and throw IOException if it's not,
* then the caller is supposed to handle the IOException.
*/
private synchronized void checkShuffleHealthy(InputAttemptFetchFailure fetchFailure)
throws IOException {
InputAttemptIdentifier srcAttempt = fetchFailure.getInputAttemptIdentifier();
if (isAbortLimitExceeedFor(srcAttempt)) {
return false;
}

final float MIN_REQUIRED_PROGRESS_PERCENT = minReqProgressFraction;
final float MAX_ALLOWED_STALL_TIME_PERCENT = maxStallTimeFraction;
// supposed to throw IOException if exceeded
isAbortLimitExceeedFor(srcAttempt);

int doneMaps = numInputs - remainingMaps.get();

Expand All @@ -1025,7 +1028,7 @@ boolean isShuffleHealthy(InputAttemptFetchFailure fetchFailure) {
// check if the reducer has progressed enough
boolean reducerProgressedEnough =
(((float)doneMaps / numInputs)
>= MIN_REQUIRED_PROGRESS_PERCENT);
>= minReqProgressFraction);

// check if the reducer is stalled for a long time
// duration for which the reducer is stalled
Expand All @@ -1038,7 +1041,7 @@ boolean isShuffleHealthy(InputAttemptFetchFailure fetchFailure) {

boolean reducerStalled = (shuffleProgressDuration > 0) &&
(((float)stallDuration / shuffleProgressDuration)
>= MAX_ALLOWED_STALL_TIME_PERCENT);
>= maxStallTimeFraction);

// kill if not healthy and has insufficient progress
if ((failureCounts.size() >= maxFailedUniqueFetches ||
Expand All @@ -1059,10 +1062,8 @@ boolean isShuffleHealthy(InputAttemptFetchFailure fetchFailure) {
LOG.debug("Host failures=" + hostFailures.keySet());
}
// Shuffle knows how to deal with failures post shutdown via the onFailure hook
exceptionReporter.reportException(new IOException(errorMsg, fetchFailure.getCause()));
return false;
throw new IOException(errorMsg, fetchFailure.getCause());
}
return true;
}

public synchronized void addKnownMapOutput(String inputHostName,
Expand Down