Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -757,18 +757,22 @@ private void logProgress() {
}
}

public synchronized void copyFailed(InputAttemptFetchFailure fetchFailure, MapHost host,
public void copyFailed(InputAttemptFetchFailure fetchFailure, MapHost host,
boolean readError, boolean connectError) {
failedShuffleCounter.increment(1);
inputContext.notifyProgress();
int failures = incrementAndGetFailureAttempt(fetchFailure.getInputAttemptIdentifier());
int failures;

if (!fetchFailure.isLocalFetch()) {
/**
* Track the number of failures that has happened since last completion.
* This gets reset on a successful copy.
*/
failedShufflesSinceLastCompletion++;
synchronized (this) {
failures = incrementAndGetFailureAttempt(fetchFailure.getInputAttemptIdentifier());

if (!fetchFailure.isLocalFetch()) {
/**
* Track the number of failures that has happened since last completion.
* This gets reset on a successful copy.
*/
failedShufflesSinceLastCompletion++;
}
}

/**
Expand Down Expand Up @@ -822,7 +826,7 @@ private boolean isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) {
return false;
}

private void penalizeHost(MapHost host, int failures) {
private synchronized void penalizeHost(MapHost host, int failures) {
host.penalize();

HostPort hostPort = new HostPort(host.getHost(), host.getPort());
Expand All @@ -842,7 +846,7 @@ private void penalizeHost(MapHost host, int failures) {
penalties.add(new Penalty(host, penaltyDelay));
}

private int getFailureCount(InputAttemptIdentifier srcAttempt) {
private synchronized int getFailureCount(InputAttemptIdentifier srcAttempt) {
IntWritable failureCount = failureCounts.get(srcAttempt);
return (failureCount == null) ? 0 : failureCount.get();
}
Expand Down Expand Up @@ -1015,51 +1019,63 @@ boolean isShuffleHealthy(InputAttemptIdentifier srcAttempt) {
final float MIN_REQUIRED_PROGRESS_PERCENT = minReqProgressFraction;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you touch this codepath, don't forget to remove these lines of redeclaring stuff:

    final float MIN_REQUIRED_PROGRESS_PERCENT = minReqProgressFraction;
    final float MAX_ALLOWED_STALL_TIME_PERCENT = maxStallTimeFraction;

I don't get what's the purpose...I guess they are leftovers from a code that needed final variables (in-place Runnables or whatever)

final float MAX_ALLOWED_STALL_TIME_PERCENT = maxStallTimeFraction;

int doneMaps = numInputs - remainingMaps.get();
String errorMsg = null;
Copy link
Contributor

@abstractdog abstractdog Feb 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead of creating synchronized block here, it's time to refactor this strange and confusing piece of code...we're reporting exceptions and returning with a vague boolean, which is not okay, instead, we should do it in copyFailed: catch all IOExceptions and report them, what about:

    //Restart consumer in case shuffle is not healthy
try{
   checkIfShuffleIsHealthy(fetchFailure)
} catch(IOExcepion e){
   exceptionReporter.reportException(e);
   return;
}

in checkIfShuffleIsHealthy we can do separate things in a synchronized way and don't have to return a boolean:

void checkIfShuffleIsHealthy(){
   checkIfAbortLimitIsExceeedFor(srcAttempt)
   checkWhateverTheRestOfTheMethodDoes(srcAttempt)
}

regarding checkWhateverTheRestOfTheMethodDoes:

  1. this is the logic you're about to make synchronized, you can make it I guess (as exception reporting is handled in caller method copyFailed as suggested above
  2. find a proper method name for checkWhateverTheRestOfTheMethodDoes, which reflects what it actually does

with the method refactoring, you don't have to introduce huge synchronized block, instead you can make it clear with a convenient method name what is it to be syncronized

does it make sense @glapark ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abstractdog The goal of the patch I submitted was to prevent deadlock, and it was not about simplifying the logic in check???() methods. At the time of submitting the patch, I didn't fully understand the details of the logic in check???() and related methods. Another plan could be to create a new patch for simplifying the logic here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, the original goal has already been achieved I guess
what I really meant was to make a simple refactor in the very same codepath without making the patch bigger, please refer to #273

boolean result;

String logContext = "srcAttempt=" + srcAttempt.toString();
boolean fetcherHealthy = isFetcherHealthy(logContext);

// check if the reducer has progressed enough
boolean reducerProgressedEnough =
(((float)doneMaps / numInputs)
>= MIN_REQUIRED_PROGRESS_PERCENT);

// check if the reducer is stalled for a long time
// duration for which the reducer is stalled
int stallDuration =
(int)(System.currentTimeMillis() - lastProgressTime);

// duration for which the reducer ran with progress
int shuffleProgressDuration =
(int)(lastProgressTime - startTime);

boolean reducerStalled = (shuffleProgressDuration > 0) &&
(((float)stallDuration / shuffleProgressDuration)
>= MAX_ALLOWED_STALL_TIME_PERCENT);

// kill if not healthy and has insufficient progress
if ((failureCounts.size() >= maxFailedUniqueFetches ||
failureCounts.size() == (numInputs - doneMaps))
&& !fetcherHealthy
&& (!reducerProgressedEnough || reducerStalled)) {
String errorMsg = (srcNameTrimmed + ": "
+ "Shuffle failed with too many fetch failures and insufficient progress: "
+ "[failureCounts=" + failureCounts.size()
+ ", pendingInputs=" + (numInputs - doneMaps)
+ ", fetcherHealthy=" + fetcherHealthy
+ ", reducerProgressedEnough=" + reducerProgressedEnough
+ ", reducerStalled=" + reducerStalled)
+ "]";
LOG.error(errorMsg);
if (LOG.isDebugEnabled()) {
LOG.debug("Host failures=" + hostFailures.keySet());
synchronized (this) {

int doneMaps = numInputs - remainingMaps.get();

String logContext = "srcAttempt=" + srcAttempt.toString();
boolean fetcherHealthy = isFetcherHealthy(logContext);

// check if the reducer has progressed enough
boolean reducerProgressedEnough =
(((float)doneMaps / numInputs)
>= MIN_REQUIRED_PROGRESS_PERCENT);

// check if the reducer is stalled for a long time
// duration for which the reducer is stalled
int stallDuration =
(int)(System.currentTimeMillis() - lastProgressTime);

// duration for which the reducer ran with progress
int shuffleProgressDuration =
(int)(lastProgressTime - startTime);

boolean reducerStalled = (shuffleProgressDuration > 0) &&
(((float)stallDuration / shuffleProgressDuration)
>= MAX_ALLOWED_STALL_TIME_PERCENT);

// kill if not healthy and has insufficient progress
if ((failureCounts.size() >= maxFailedUniqueFetches ||
failureCounts.size() == (numInputs - doneMaps))
&& !fetcherHealthy
&& (!reducerProgressedEnough || reducerStalled)) {
errorMsg = (srcNameTrimmed + ": "
+ "Shuffle failed with too many fetch failures and insufficient progress: "
+ "[failureCounts=" + failureCounts.size()
+ ", pendingInputs=" + (numInputs - doneMaps)
+ ", fetcherHealthy=" + fetcherHealthy
+ ", reducerProgressedEnough=" + reducerProgressedEnough
+ ", reducerStalled=" + reducerStalled)
+ "]";
LOG.error(errorMsg);
if (LOG.isDebugEnabled()) {
LOG.debug("Host failures=" + hostFailures.keySet());
}
result = false;
} else {
result = true;
}
}

if (!result) {
// Shuffle knows how to deal with failures post shutdown via the onFailure hook
// reportException() should be called outside synchronized(this)
exceptionReporter.reportException(new IOException(errorMsg));
return false;
}
return true;
return result;
}

public synchronized void addKnownMapOutput(String inputHostName,
Expand Down