-
Notifications
You must be signed in to change notification settings - Fork 440
TEZ-4338: Tez should consider node information to realize OUTPUT_LOST as early as possible - upstream(mapper) problems #152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
| */ | ||
| @ConfigurationScope(Scope.AM) | ||
| @ConfigurationProperty(type="integer") | ||
| public static final String TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOSTS_REPORTING_FETCH_FAILURE = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we determine this value for large cluster or dynamically changing cluster?
| sHost, failedDestTaId, dHost, failedInputIndexOnDestTa); | ||
|
|
||
| boolean tooManyDownstreamHostsBlamedTheSameUpstreamHost = false; | ||
| Map<String, Set<String>> downstreamBlamingHosts = sourceAttempt.getVertex().getDownstreamBlamingHosts(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't downstreamBlamingHosts structure occupy lot of memory in AM in large clusters? E.g think of a job running at 1000 node scale and running with 10 vertices. This will easily have 1000 tasks x 1000 nodes entries, which can lead to mem pressure on AM. Add number of vertices to this and it will be even more mem pressure.
How about tracking the downstream hostnames in a set and use that as optional dimension in the computation? This way, even if multiple task attempts were running the same host, it will be accounted as single failure (note that currently in master branch, it is accounted as multiple times).
To be more specific, is it possible to track downstream hostnames in a set and use that set.size() for computing the fraction to determine if src has to be re-executed or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rbalamohan: please note that this map is stored on vertex level...
sourceAttempt.getVertex().getDownstreamBlamingHosts()
...not on task attempt level, so the memory occupied is proportional to no. of vertices, do you think there is still a mem pressure problem?
btw. the idea of storing on vertex level was that if we already detected a LOST_OUTPUT for source_attempt_0 from a source_host_0, then we immediately mark source_attempt_x on source_host_0 OUTPUT_LOST if an input_read_error comes in blaming source_attempt_x, please let me know if this makes sense
This way, even if multiple task attempts were running the same host, it will be accounted as single failure
I guess downstreamBlamingHosts works this way, by storing:
Map<upstreamHost, Set<downstreamHost>>
the amount of entries are independent of the number of task attempts
To be more specific, is it possible to track downstream hostnames in a set and use that set.size() for computing the fraction to determine if src has to be re-executed or not?
I can try, do you have any idea in particular? "track downstream hostnames" means all downstream hostnames that reported input read error? also I'm struggling to understand how to make hosts be considerable in a fraction, I mean, what to divide with what...I need to think this over, please let me know if you have an idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thought more about this again. Since this map would be populated mainly during error condition, there needs to be a lot of bad nodes to cause such case.
Should be ok to have this map. But the other concern is how to choose the right default config? 1 seems very restrictive, as source will bail out immediately on second downstream node reporting failure. Set it to a much higher value and on need basis this can be adjusted.
Can you check if this value can be adjusted at runtime? If so, this may need to be added in "confKeys" of respective inputs/outputs? E.g plz refer to UnorderedKVInput
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, let me think about it
honestly, I neither liked this restrictive and absolute config (even if I tried to rationalize how it can help)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ContainerLauncherContext.getNumNodes seems to be okay, because it's updated at runtime: every time when a task is allocated, the node is marked as seen
fortunately, it should work for LLAP without a Hive change, because LlapTaskSchedulerService.scheduleTask takes care of updating it via TaskSchedulerManager.taskAllocated (like DagAwareYarnTaskScheduler and YarnTaskSchedulerService in Tez)
getContext().taskAllocated(taskInfo.task, taskInfo.clientCookie, container);
so it doesn't reflect all the nodes, instead, the ones that are being used, I think I can use this as a max, and use a hostFailureFraction as reportingDownstreamHosts / numNodes
let me know if it doesn't make sense, I'll try to experiment a bit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rbalamohan: uploaded a new patch here c523ebb
tested on cluster, I found that instead of the number of nodes, we need to take care of active hosts only
it was an interesting situation where the AM was running, and I restarted the hive llap application, and as node removal is not implemented in AMNodeTracker, it turned out we need to consider only ACTIVE nodes, here is what I saw when I logged nodes:
2021-10-21 05:34:53,798 [INFO] [Dispatcher thread {Central}] |impl.TaskAttemptImpl|: nodes: {ccycloud-4.hive-runtime-perf.root.hwx.site:40783:0deaa785-cd96-45ad-ae1c-e5f9e44be73a={AMNodeImpl: nodeId: ccycloud-4.hive-runtime-perf.root.hwx.site:40783:0deaa785-cd96-45ad-ae1c-e5f9e44be73a, state: UNHEALTHY, containers: 0, completed containers: 0, healthy: false, blackListed: false}, ccycloud-8.hive-runtime-perf.root.hwx.site:39959:fce0bc31-a1a4-49ea-b40c-53ea84a783f6={AMNodeImpl: nodeId: ccycloud-8.hive-runtime-perf.root.hwx.site:39959:fce0bc31-a1a4-49ea-b40c-53ea84a783f6, state: ACTIVE, containers: 221, completed containers: 179, healthy: true, blackListed: false}, ccycloud-7.hive-runtime-perf.root.hwx.site:38523:1a2b75f4-1d87-4735-bea4-d96790ee5420={AMNodeImpl: nodeId: ccycloud-7.hive-runtime-perf.root.hwx.site:38523:1a2b75f4-1d87-4735-bea4-d96790ee5420, state: ACTIVE, containers: 223, completed containers: 181, healthy: true, blackListed: false}, ccycloud-5.hive-runtime-perf.root.hwx.site:35671:ea352491-39bb-4d81-b9c5-e519b87696b8={AMNodeImpl: nodeId: ccycloud-5.hive-runtime-perf.root.hwx.site:35671:ea352491-39bb-4d81-b9c5-e519b87696b8, state: UNHEALTHY, containers: 0, completed containers: 0, healthy: false, blackListed: false}, ccycloud-4.hive-runtime-perf.root.hwx.site:46577:d8f0caff-790e-453a-96ac-14964d660f7e={AMNodeImpl: nodeId: ccycloud-4.hive-runtime-perf.root.hwx.site:46577:d8f0caff-790e-453a-96ac-14964d660f7e, state: ACTIVE, containers: 236, completed containers: 194, healthy: true, blackListed: false}, ccycloud-3.hive-runtime-perf.root.hwx.site:34353:69d5de51-df04-481b-9aad-a6b88a2a33c9={AMNodeImpl: nodeId: ccycloud-3.hive-runtime-perf.root.hwx.site:34353:69d5de51-df04-481b-9aad-a6b88a2a33c9, state: UNHEALTHY, containers: 0, completed containers: 0, healthy: false, blackListed: false}, ccycloud-6.hive-runtime-perf.root.hwx.site:44728:187f814d-9760-4965-881f-9dac9f4a2ae5={AMNodeImpl: nodeId: ccycloud-6.hive-runtime-perf.root.hwx.site:44728:187f814d-9760-4965-881f-9dac9f4a2ae5, state: UNHEALTHY, containers: 0, completed containers: 0, healthy: false, blackListed: false}, ccycloud-9.hive-runtime-perf.root.hwx.site:42123:dcb2622b-fed3-4f88-a4e2-70d0ebc57a5e={AMNodeImpl: nodeId: ccycloud-9.hive-runtime-perf.root.hwx.site:42123:dcb2622b-fed3-4f88-a4e2-70d0ebc57a5e, state: ACTIVE, containers: 230, completed containers: 188, healthy: true, blackListed: false}}
2021-10-21 05:34:53,798 [INFO] [Dispatcher thread {Central}] |impl.TaskAttemptImpl|: currentNumberOfFailingDownstreamHosts: 1, numNodes: 4, fraction: 0.25, max allowed: 0.2
this log message was made when numNodes: 4 was showing only state:ACTIVE nodes, but I used a temporary log message "nodes:", you can see 4 running LLAP daemons and 4 old, which are shown as UNHEALTHY instead of ACTIVE
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
Show resolved
Hide resolved
… as early as possible - upstream(mapper) problems Change-Id: I8907507b0527a65e6492fb7d83ff55b0f17bd7a4
This comment has been minimized.
This comment has been minimized.
…t injectable errors
|
🎊 +1 overall
This message was automatically generated. |
|
@rbalamohan: is there anything that's needed from this PR to get it committed? thanks in advance |
| @ConfigurationProperty(type="integer") | ||
| public static final String TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOST_FAILURES_FRACTION = | ||
| TEZ_AM_PREFIX + "max.allowed.downstream.host.failures.fraction"; | ||
| public static final double TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOST_FAILURES_FRACTION_DEFAULT = 0.2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good. This may not work for small clusters (e.g < 5 nodes) where single node failure can cause source to restart the task. This can be tweaked later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I'm assuming on that small cluster a fraction: 0.25 might work properly (so in case of 4 hosts, 1 failing downstream won't make the source restart immediately, at least 2 downstream reporting hosts are needed)
|
LGTM. +1. |
The fix is actually to store downstreamBlamingHosts on vertex level. This is a map which simply stores a set of downstream hosts that reported fetch failure for every upstream host. The idea is to detect the upstream node failure better, and I'm trying to achieve this by assuming if downstream tasks from at least n nodes (n is configurable) reported failures for an upstream host, then the upstream tasks will be marked as OUTPUT_LOST and will rerun. As we're handling this on vertex level, we can fail a mapper task even if we get the first fetch failure reported blaming that, as we already figured out the mapper host itself has been reported as a possible root cause of failures.
Changes in this patch:
more examples in javadoc