Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,24 @@ public TezConfiguration(boolean loadDefaults) {
TEZ_AM_PREFIX + "max.allowed.time-sec.for-read-error";
public static final int TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT = 300;

/**
* Double value. Assuming that a certain number of downstream hosts reported fetch failure for a
* given upstream host, this config drives the max allowed ratio of (downstream hosts) / (all hosts).
* The total number of used hosts are tracked by AMNodeTracker, which divides the distinct number of
* downstream hosts blaming source(upstream) tasks in a given vertex. If the fraction is beyond this
* limit, the upstream task attempt is marked as failed (so blamed for the fetch failure).
* E.g. if this set to 0.2, in case of 3 different hosts reporting fetch failure
* for the same upstream host in a cluster which currently utilizes 10 nodes, the upstream task
* is immediately blamed for the fetch failure.
*
* Expert level setting.
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type="integer")
public static final String TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOST_FAILURES_FRACTION =
TEZ_AM_PREFIX + "max.allowed.downstream.host.failures.fraction";
public static final double TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOST_FAILURES_FRACTION_DEFAULT = 0.2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looks good. This may not work for small clusters (e.g < 5 nodes) where single node failure can cause source to restart the task. This can be tweaked later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I'm assuming on that small cluster a fraction: 0.25 might work properly (so in case of 4 hosts, 1 failing downstream won't make the source restart immediately, at least 2 downstream reporting hosts are needed)


/**
* Boolean value. Determines when the final outputs to data sinks are committed. Commit is an
* output specific operation and typically involves making the output visible for consumption.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,33 +62,44 @@ public final class InputReadErrorEvent extends Event {
*/
private final boolean isDiskErrorAtSource;

/**
* The localhostName of the destination task attempt.
*/
private final String destinationLocalhostName;

private InputReadErrorEvent(final String diagnostics, final int index, final int version,
final int numFailures, boolean isLocalFetch, boolean isDiskErrorAtSource) {
final int numFailures, boolean isLocalFetch, boolean isDiskErrorAtSource, String destinationLocalhostName) {
super();
this.diagnostics = diagnostics;
this.index = index;
this.version = version;
this.numFailures = numFailures;
this.isLocalFetch = isLocalFetch;
this.isDiskErrorAtSource = isDiskErrorAtSource;
this.destinationLocalhostName = destinationLocalhostName;
}

public static InputReadErrorEvent create(String diagnostics, int index, int version,
boolean isLocalFetch, boolean isDiskErrorAtSource) {
return create(diagnostics, index, version, 1, isLocalFetch, isDiskErrorAtSource);
return create(diagnostics, index, version, 1, isLocalFetch, isDiskErrorAtSource, null);
}

public static InputReadErrorEvent create(String diagnostics, int index, int version) {
return create(diagnostics, index, version, 1, false, false);
return create(diagnostics, index, version, 1, false, false, null);
}

public static InputReadErrorEvent create(String diagnostics, int index, int version, boolean isLocalFetch,
boolean isDiskErrorAtSource, String destinationLocalhostName) {
return create(diagnostics, index, version, 1, isLocalFetch, isDiskErrorAtSource, destinationLocalhostName);
}

/**
* Create an InputReadErrorEvent.
*/
public static InputReadErrorEvent create(final String diagnostics, final int index,
final int version, final int numFailures, boolean isLocalFetch, boolean isDiskErrorAtSource) {
return new InputReadErrorEvent(diagnostics, index, version, numFailures, isLocalFetch,
isDiskErrorAtSource);
public static InputReadErrorEvent create(final String diagnostics, final int index, final int version,
final int numFailures, boolean isLocalFetch, boolean isDiskErrorAtSource, String destinationLocalhostName) {
return new InputReadErrorEvent(diagnostics, index, version, numFailures, isLocalFetch, isDiskErrorAtSource,
destinationLocalhostName);
}

public String getDiagnostics() {
Expand Down Expand Up @@ -118,6 +129,10 @@ public boolean isDiskErrorAtSource() {
return isDiskErrorAtSource;
}

public String getDestinationLocalhostName(){
return destinationLocalhostName;
}

@Override
public int hashCode() {
return Objects.hash(index, version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void containerCompleted(ContainerId containerId, int exitStatus, String diagnost
* Get the number of nodes being handled by the specified source
*
* @param sourceName the relevant source name
* @return the initial payload
* @return the number of nodes
*/
int getNumNodes(String sourceName);

Expand Down
1 change: 1 addition & 0 deletions tez-api/src/main/proto/Events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ message InputReadErrorEventProto {
optional int32 version = 3;
optional bool is_local_fetch = 4;
optional bool is_disk_error_at_source = 5;
optional string destination_localhost_name = 6;
}

message InputFailedEventProto {
Expand Down
5 changes: 5 additions & 0 deletions tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,13 @@ interface VertexConfig {
* @return tez.am.max.allowed.time-sec.for-read-error.
*/
int getMaxAllowedTimeForTaskReadErrorSec();
/**
* @return tez.am.max.allowed.downstream.host.failures.fraction.
*/
double getMaxAllowedDownstreamHostFailuresFraction();
}

void incrementRejectedTaskAttemptCount();
int getRejectedTaskAttemptCount();
Map<String, Set<String>> getDownstreamBlamingHosts();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ public class TaskAttemptEventOutputFailed extends TaskAttemptEvent
private TezEvent inputFailedEvent;
private int consumerTaskNumber;

public TaskAttemptEventOutputFailed(TezTaskAttemptID attemptId,
public TaskAttemptEventOutputFailed(TezTaskAttemptID sourceTaskAttemptId,
TezEvent tezEvent, int numConsumers) {
super(attemptId, TaskAttemptEventType.TA_OUTPUT_FAILED);
super(sourceTaskAttemptId, TaskAttemptEventType.TA_OUTPUT_FAILED);
this.inputFailedEvent = tezEvent;
this.consumerTaskNumber = numConsumers;
}
Expand Down
15 changes: 7 additions & 8 deletions tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ public void sendTezEventToSourceTasks(TezEvent tezEvent) throws AMUserCodeExcept
if (!bufferEvents.get()) {
switch (tezEvent.getEventType()) {
case INPUT_READ_ERROR_EVENT:
InputReadErrorEvent event = (InputReadErrorEvent) tezEvent.getEvent();
InputReadErrorEvent inputReadErrorEvent = (InputReadErrorEvent) tezEvent.getEvent();
TezTaskAttemptID destAttemptId = tezEvent.getSourceInfo()
.getTaskAttemptID();
int destTaskIndex = destAttemptId.getTaskID().getId();
Expand All @@ -383,10 +383,10 @@ public void sendTezEventToSourceTasks(TezEvent tezEvent) throws AMUserCodeExcept
try {
if (onDemandRouting) {
srcTaskIndex = ((EdgeManagerPluginOnDemand) edgeManager).routeInputErrorEventToSource(
destTaskIndex, event.getIndex());
destTaskIndex, inputReadErrorEvent.getIndex());
} else {
srcTaskIndex = edgeManager.routeInputErrorEventToSource(event,
destTaskIndex, event.getIndex());
srcTaskIndex = edgeManager.routeInputErrorEventToSource(inputReadErrorEvent,
destTaskIndex, inputReadErrorEvent.getIndex());
}
Preconditions.checkArgument(srcTaskIndex >= 0,
"SourceTaskIndex should not be negative,"
Expand Down Expand Up @@ -414,11 +414,10 @@ public void sendTezEventToSourceTasks(TezEvent tezEvent) throws AMUserCodeExcept
" edgeManager=" + edgeManager.getClass().getName());
}
TezTaskID srcTaskId = srcTask.getTaskId();
int taskAttemptIndex = event.getVersion();
int srcTaskAttemptIndex = inputReadErrorEvent.getVersion();
TezTaskAttemptID srcTaskAttemptId = TezTaskAttemptID.getInstance(srcTaskId,
taskAttemptIndex);
sendEvent(new TaskAttemptEventOutputFailed(srcTaskAttemptId,
tezEvent, numConsumers));
srcTaskAttemptIndex);
sendEvent(new TaskAttemptEventOutputFailed(srcTaskAttemptId, tezEvent, numConsumers));
break;
default:
throw new TezUncheckedException("Unhandled tez event type: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public static DataEventDependencyInfo fromProto(DataEventDependencyInfoProto pro
private Container container;
private long allocationTime;
private ContainerId containerId;
private NodeId containerNodeId;
protected NodeId containerNodeId;
private String nodeHttpAddress;
private String nodeRackName;

Expand Down Expand Up @@ -1793,85 +1793,130 @@ protected static class OutputReportedFailedTransition implements
MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {

@Override
public TaskAttemptStateInternal transition(TaskAttemptImpl attempt,
public TaskAttemptStateInternal transition(TaskAttemptImpl sourceAttempt,
TaskAttemptEvent event) {
TaskAttemptEventOutputFailed outputFailedEvent =
(TaskAttemptEventOutputFailed) event;
TezEvent tezEvent = outputFailedEvent.getInputFailedEvent();
TezTaskAttemptID failedDestTaId = tezEvent.getSourceInfo().getTaskAttemptID();
InputReadErrorEvent readErrorEvent = (InputReadErrorEvent)tezEvent.getEvent();
TezEvent inputFailedEvent = outputFailedEvent.getInputFailedEvent();
TezTaskAttemptID failedDestTaId = inputFailedEvent.getSourceInfo().getTaskAttemptID();

InputReadErrorEvent readErrorEvent = (InputReadErrorEvent)inputFailedEvent.getEvent();
int failedInputIndexOnDestTa = readErrorEvent.getIndex();
if (readErrorEvent.getVersion() != attempt.getID().getId()) {
throw new TezUncheckedException(attempt.getID()

if (readErrorEvent.getVersion() != sourceAttempt.getID().getId()) {
throw new TezUncheckedException(sourceAttempt.getID()
+ " incorrectly blamed for read error from " + failedDestTaId
+ " at inputIndex " + failedInputIndexOnDestTa + " version"
+ readErrorEvent.getVersion());
}
LOG.info(attempt.getID()
+ " blamed for read error from " + failedDestTaId
+ " at inputIndex " + failedInputIndexOnDestTa);
long time = attempt.clock.getTime();
Long firstErrReportTime = attempt.uniquefailedOutputReports.get(failedDestTaId);
// source host: where the data input is supposed to come from
String sHost = sourceAttempt.getNodeId().getHost();
// destination: where the data is tried to be fetched to
String dHost = readErrorEvent.getDestinationLocalhostName();

LOG.info("{} (on {}) blamed for read error from {} (on {}) at inputIndex {}", sourceAttempt.getID(),
sHost, failedDestTaId, dHost, failedInputIndexOnDestTa);

boolean tooManyDownstreamHostsBlamedTheSameUpstreamHost = false;
Map<String, Set<String>> downstreamBlamingHosts = sourceAttempt.getVertex().getDownstreamBlamingHosts();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't downstreamBlamingHosts structure occupy lot of memory in AM in large clusters? E.g think of a job running at 1000 node scale and running with 10 vertices. This will easily have 1000 tasks x 1000 nodes entries, which can lead to mem pressure on AM. Add number of vertices to this and it will be even more mem pressure.

How about tracking the downstream hostnames in a set and use that as optional dimension in the computation? This way, even if multiple task attempts were running the same host, it will be accounted as single failure (note that currently in master branch, it is accounted as multiple times).

To be more specific, is it possible to track downstream hostnames in a set and use that set.size() for computing the fraction to determine if src has to be re-executed or not?

Copy link
Contributor Author

@abstractdog abstractdog Oct 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rbalamohan: please note that this map is stored on vertex level...

sourceAttempt.getVertex().getDownstreamBlamingHosts()

...not on task attempt level, so the memory occupied is proportional to no. of vertices, do you think there is still a mem pressure problem?

btw. the idea of storing on vertex level was that if we already detected a LOST_OUTPUT for source_attempt_0 from a source_host_0, then we immediately mark source_attempt_x on source_host_0 OUTPUT_LOST if an input_read_error comes in blaming source_attempt_x, please let me know if this makes sense

This way, even if multiple task attempts were running the same host, it will be accounted as single failure

I guess downstreamBlamingHosts works this way, by storing:

Map<upstreamHost, Set<downstreamHost>>

the amount of entries are independent of the number of task attempts

To be more specific, is it possible to track downstream hostnames in a set and use that set.size() for computing the fraction to determine if src has to be re-executed or not?

I can try, do you have any idea in particular? "track downstream hostnames" means all downstream hostnames that reported input read error? also I'm struggling to understand how to make hosts be considerable in a fraction, I mean, what to divide with what...I need to think this over, please let me know if you have an idea

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought more about this again. Since this map would be populated mainly during error condition, there needs to be a lot of bad nodes to cause such case.

Should be ok to have this map. But the other concern is how to choose the right default config? 1 seems very restrictive, as source will bail out immediately on second downstream node reporting failure. Set it to a much higher value and on need basis this can be adjusted.

Can you check if this value can be adjusted at runtime? If so, this may need to be added in "confKeys" of respective inputs/outputs? E.g plz refer to UnorderedKVInput

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, let me think about it
honestly, I neither liked this restrictive and absolute config (even if I tried to rationalize how it can help)

Copy link
Contributor Author

@abstractdog abstractdog Oct 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ContainerLauncherContext.getNumNodes seems to be okay, because it's updated at runtime: every time when a task is allocated, the node is marked as seen
fortunately, it should work for LLAP without a Hive change, because LlapTaskSchedulerService.scheduleTask takes care of updating it via TaskSchedulerManager.taskAllocated (like DagAwareYarnTaskScheduler and YarnTaskSchedulerService in Tez)

getContext().taskAllocated(taskInfo.task, taskInfo.clientCookie, container);

so it doesn't reflect all the nodes, instead, the ones that are being used, I think I can use this as a max, and use a hostFailureFraction as reportingDownstreamHosts / numNodes
let me know if it doesn't make sense, I'll try to experiment a bit

Copy link
Contributor Author

@abstractdog abstractdog Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rbalamohan: uploaded a new patch here c523ebb

tested on cluster, I found that instead of the number of nodes, we need to take care of active hosts only
it was an interesting situation where the AM was running, and I restarted the hive llap application, and as node removal is not implemented in AMNodeTracker, it turned out we need to consider only ACTIVE nodes, here is what I saw when I logged nodes:

2021-10-21 05:34:53,798 [INFO] [Dispatcher thread {Central}] |impl.TaskAttemptImpl|: nodes: {ccycloud-4.hive-runtime-perf.root.hwx.site:40783:0deaa785-cd96-45ad-ae1c-e5f9e44be73a={AMNodeImpl: nodeId: ccycloud-4.hive-runtime-perf.root.hwx.site:40783:0deaa785-cd96-45ad-ae1c-e5f9e44be73a, state: UNHEALTHY, containers: 0, completed containers: 0, healthy: false, blackListed: false}, ccycloud-8.hive-runtime-perf.root.hwx.site:39959:fce0bc31-a1a4-49ea-b40c-53ea84a783f6={AMNodeImpl: nodeId: ccycloud-8.hive-runtime-perf.root.hwx.site:39959:fce0bc31-a1a4-49ea-b40c-53ea84a783f6, state: ACTIVE, containers: 221, completed containers: 179, healthy: true, blackListed: false}, ccycloud-7.hive-runtime-perf.root.hwx.site:38523:1a2b75f4-1d87-4735-bea4-d96790ee5420={AMNodeImpl: nodeId: ccycloud-7.hive-runtime-perf.root.hwx.site:38523:1a2b75f4-1d87-4735-bea4-d96790ee5420, state: ACTIVE, containers: 223, completed containers: 181, healthy: true, blackListed: false}, ccycloud-5.hive-runtime-perf.root.hwx.site:35671:ea352491-39bb-4d81-b9c5-e519b87696b8={AMNodeImpl: nodeId: ccycloud-5.hive-runtime-perf.root.hwx.site:35671:ea352491-39bb-4d81-b9c5-e519b87696b8, state: UNHEALTHY, containers: 0, completed containers: 0, healthy: false, blackListed: false}, ccycloud-4.hive-runtime-perf.root.hwx.site:46577:d8f0caff-790e-453a-96ac-14964d660f7e={AMNodeImpl: nodeId: ccycloud-4.hive-runtime-perf.root.hwx.site:46577:d8f0caff-790e-453a-96ac-14964d660f7e, state: ACTIVE, containers: 236, completed containers: 194, healthy: true, blackListed: false}, ccycloud-3.hive-runtime-perf.root.hwx.site:34353:69d5de51-df04-481b-9aad-a6b88a2a33c9={AMNodeImpl: nodeId: ccycloud-3.hive-runtime-perf.root.hwx.site:34353:69d5de51-df04-481b-9aad-a6b88a2a33c9, state: UNHEALTHY, containers: 0, completed containers: 0, healthy: false, blackListed: false}, ccycloud-6.hive-runtime-perf.root.hwx.site:44728:187f814d-9760-4965-881f-9dac9f4a2ae5={AMNodeImpl: nodeId: ccycloud-6.hive-runtime-perf.root.hwx.site:44728:187f814d-9760-4965-881f-9dac9f4a2ae5, state: UNHEALTHY, containers: 0, completed containers: 0, healthy: false, blackListed: false}, ccycloud-9.hive-runtime-perf.root.hwx.site:42123:dcb2622b-fed3-4f88-a4e2-70d0ebc57a5e={AMNodeImpl: nodeId: ccycloud-9.hive-runtime-perf.root.hwx.site:42123:dcb2622b-fed3-4f88-a4e2-70d0ebc57a5e, state: ACTIVE, containers: 230, completed containers: 188, healthy: true, blackListed: false}}

2021-10-21 05:34:53,798 [INFO] [Dispatcher thread {Central}] |impl.TaskAttemptImpl|: currentNumberOfFailingDownstreamHosts: 1, numNodes: 4, fraction: 0.25, max allowed: 0.2

this log message was made when numNodes: 4 was showing only state:ACTIVE nodes, but I used a temporary log message "nodes:", you can see 4 running LLAP daemons and 4 old, which are shown as UNHEALTHY instead of ACTIVE

if (!downstreamBlamingHosts.containsKey(sHost)) {
LOG.info("Host {} is blamed for fetch failure from {} for the first time", sHost, dHost);
downstreamBlamingHosts.put(sHost, new HashSet<String>());
}

downstreamBlamingHosts.get(sHost).add(dHost);
int currentNumberOfFailingDownstreamHosts = downstreamBlamingHosts.get(sHost).size();
int numNodes = getNumNodes(sourceAttempt);
float hostFailureFraction = numNodes > 0 ? ((float) currentNumberOfFailingDownstreamHosts) / numNodes : 0;
double maxAllowedHostFailureFraction = sourceAttempt.getVertex().getVertexConfig()
.getMaxAllowedDownstreamHostFailuresFraction();

if (hostFailureFraction > maxAllowedHostFailureFraction) {
LOG.info("Host will be marked fail: {} because of host failure fraction {} is beyond the limit {}", sHost,
hostFailureFraction, maxAllowedHostFailureFraction);
tooManyDownstreamHostsBlamedTheSameUpstreamHost = true;
}

long time = sourceAttempt.clock.getTime();

Long firstErrReportTime = sourceAttempt.uniquefailedOutputReports.get(failedDestTaId);
if (firstErrReportTime == null) {
attempt.uniquefailedOutputReports.put(failedDestTaId, time);
sourceAttempt.uniquefailedOutputReports.put(failedDestTaId, time);
firstErrReportTime = time;
}

int maxAllowedOutputFailures = attempt.getVertex().getVertexConfig()
int maxAllowedOutputFailures = sourceAttempt.getVertex().getVertexConfig()
.getMaxAllowedOutputFailures();
int maxAllowedTimeForTaskReadErrorSec = attempt.getVertex()
int maxAllowedTimeForTaskReadErrorSec = sourceAttempt.getVertex()
.getVertexConfig().getMaxAllowedTimeForTaskReadErrorSec();
double maxAllowedOutputFailuresFraction = attempt.getVertex()
double maxAllowedOutputFailuresFraction = sourceAttempt.getVertex()
.getVertexConfig().getMaxAllowedOutputFailuresFraction();

int readErrorTimespanSec = (int)((time - firstErrReportTime)/1000);
boolean crossTimeDeadline = readErrorTimespanSec >= maxAllowedTimeForTaskReadErrorSec;

int runningTasks = attempt.appContext.getCurrentDAG().getVertex(
int runningTasks = sourceAttempt.appContext.getCurrentDAG().getVertex(
failedDestTaId.getTaskID().getVertexID()).getRunningTasks();
float failureFraction = runningTasks > 0 ? ((float) attempt.uniquefailedOutputReports.size()) / runningTasks : 0;
float failureFraction =
runningTasks > 0 ? ((float) sourceAttempt.uniquefailedOutputReports.size()) / runningTasks : 0;
boolean withinFailureFractionLimits =
(failureFraction <= maxAllowedOutputFailuresFraction);
boolean withinOutputFailureLimits =
(attempt.uniquefailedOutputReports.size() < maxAllowedOutputFailures);
(sourceAttempt.uniquefailedOutputReports.size() < maxAllowedOutputFailures);

// If needed we can launch a background task without failing this task
// to generate a copy of the output just in case.
// If needed we can consider only running consumer tasks
if (!crossTimeDeadline && withinFailureFractionLimits && withinOutputFailureLimits
&& !(readErrorEvent.isLocalFetch() || readErrorEvent.isDiskErrorAtSource())) {
return attempt.getInternalState();
&& !(readErrorEvent.isLocalFetch() || readErrorEvent.isDiskErrorAtSource())
&& !tooManyDownstreamHostsBlamedTheSameUpstreamHost) {
return sourceAttempt.getInternalState();
}
String message = attempt.getID() + " being failed for too many output errors. "
String message = sourceAttempt.getID() + " being failed for too many output errors. "
+ "failureFraction=" + failureFraction
+ ", MAX_ALLOWED_OUTPUT_FAILURES_FRACTION="
+ maxAllowedOutputFailuresFraction
+ ", uniquefailedOutputReports=" + attempt.uniquefailedOutputReports.size()
+ ", uniquefailedOutputReports=" + sourceAttempt.uniquefailedOutputReports.size()
+ ", MAX_ALLOWED_OUTPUT_FAILURES=" + maxAllowedOutputFailures
+ ", hostFailureFraction=" + hostFailureFraction
+ " (" + currentNumberOfFailingDownstreamHosts + " / " + numNodes + ")"
+ ", MAX_ALLOWED_DOWNSTREAM_HOST_FAILURES_FRACTION="
+ maxAllowedHostFailureFraction
+ ", MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC="
+ maxAllowedTimeForTaskReadErrorSec
+ ", readErrorTimespan=" + readErrorTimespanSec
+ ", isLocalFetch=" + readErrorEvent.isLocalFetch()
+ ", isDiskErrorAtSource=" + readErrorEvent.isDiskErrorAtSource();

LOG.info(message);
attempt.addDiagnosticInfo(message);
sourceAttempt.addDiagnosticInfo(message);
// send input failed event
attempt.sendInputFailedToConsumers();
sourceAttempt.sendInputFailedToConsumers();
// Not checking for leafVertex since a READ_ERROR should only be reported for intermediate tasks.
if (attempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED) {
if (sourceAttempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED) {
(new TerminatedAfterSuccessHelper(FAILED_HELPER)).transition(
attempt, event);
sourceAttempt, event);
return TaskAttemptStateInternal.FAILED;
} else {
(new TerminatedWhileRunningTransition(FAILED_HELPER)).transition(
attempt, event);
sourceAttempt, event);
return TaskAttemptStateInternal.FAIL_IN_PROGRESS;
}
// TODO at some point. Nodes may be interested in FetchFailure info.
// Can be used to blacklist nodes.
}

private int getNumNodes(TaskAttemptImpl sourceAttempt) {
Vertex vertex = sourceAttempt.getVertex();
String taskSchedulerName = vertex.getServicePluginInfo().getTaskSchedulerName();
int sourceIndex = vertex.getAppContext().getTaskScheduerIdentifier(taskSchedulerName);
int numActiveNodes = vertex.getAppContext().getNodeTracker().getNumActiveNodes(sourceIndex);
if (LOG.isDebugEnabled()) {
int numAllNodes = vertex.getAppContext().getNodeTracker().getNumNodes(sourceIndex);
LOG.debug("Getting nodes, active/all: {}/{}", numActiveNodes, numAllNodes);
}
return numActiveNodes;
}
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl

final ServicePluginInfo servicePluginInfo;

/*
* For every upstream host (as map keys) contains every unique downstream hostnames that reported INPUT_READ_ERROR.
* This map helps to decide if there is a problem with the host that produced the map outputs. There is an assumption
* that if multiple downstream hosts report input errors for the same upstream host, then it's likely that the output
* has to be blamed and needs to rerun.
*/
private final Map<String, Set<String>> downstreamBlamingHosts = Maps.newHashMap();

private final float maxFailuresPercent;
private boolean logSuccessDiagnostics = false;
Expand Down Expand Up @@ -4833,6 +4840,10 @@ static class VertexConfigImpl implements VertexConfig {
* See tez.am.max.allowed.time-sec.for-read-error.
*/
private final int maxAllowedTimeForTaskReadErrorSec;
/**
* See tez.am.max.allowed.downstream.host.failures.fraction.
*/
private final double maxAllowedDownstreamHostFailuresFraction;

public VertexConfigImpl(Configuration conf) {
this.maxFailedTaskAttempts = conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
Expand All @@ -4857,6 +4868,10 @@ public VertexConfigImpl(Configuration conf) {
this.maxAllowedTimeForTaskReadErrorSec = conf.getInt(
TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC,
TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT);

this.maxAllowedDownstreamHostFailuresFraction = conf.getDouble(
TezConfiguration.TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOST_FAILURES_FRACTION,
TezConfiguration.TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOST_FAILURES_FRACTION_DEFAULT);
}

@Override
Expand Down Expand Up @@ -4899,8 +4914,20 @@ public boolean getTaskRescheduleRelaxedLocality() {
@Override public int getMaxAllowedTimeForTaskReadErrorSec() {
return maxAllowedTimeForTaskReadErrorSec;
}

/**
* @return maxAllowedDownstreamHostsReportingFetchFailure.
*/
@Override public double getMaxAllowedDownstreamHostFailuresFraction() {
return maxAllowedDownstreamHostFailuresFraction;
}
}

@Override
public AbstractService getSpeculator() { return speculator; }

@Override
public Map<String, Set<String>> getDownstreamBlamingHosts(){
return downstreamBlamingHosts;
}
}
Loading