-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-39023] [K8s] Add Executor Pod inter-pod anti-affinity #36358
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
dcoliversun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add unit test?
Thanks for your review, i'm going to add unit test later. |
martin-g
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this solution generic enough ? I.e. would it solve this problem for all applications or some applications will need customizations/modifications ?
One can achieve the same now by using the PodTemplate config. The advantage is that the application can provide a config that is specific for its needs.
| val sparkPod = | ||
| new AntiAffinityFeatureStep(executorConf).configurePod(SparkPod.initialPod()) | ||
|
|
||
| assert(sparkPod.pod.getSpec.getAffinity.getPodAntiAffinity != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please cache sparkPod.pod.getSpec.getAffinity.getPodAntiAffinity as a local variable. It will simplify the code a lot!
|
Thanks for your review!
This solution is not for special application. It is used to alleviate data skew when running on kubernetes.In essence, there is casued by no balanced scheduling for the amount of shuffle data.
In this feature, need applicationId to help drift apart another executor pod, so i choose add anti-affinity during building executor pod instead of using template yaml. BTW, I'd like to know about the one you mentioned by using the PodTemplate config. |
https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template |
Thanks for your reply, I know this and have used it in other ways. Perhaps I didn't make it clear above that this PR requires to keep the Executor anti-affinity of Application granularity, we need to use the |
|
Can one of the admins verify this patch? |
|
You may use |
I see. Thank you for your introduction! Currently, I consider that shuffle skew caused by Kubernetes scheduling occurs frequently during spark on kubernetes. If using Pod-template way, users may need to know about this, rather than learn about it through Spark configurations Doc. Can help alleviate the problem by turning on parameters rather than learning and using Pod templates. This may make it difficult for the user to ease the shuffle skew On Kubernetes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @zwangsheng . Thank you for making a PR.
However, Apache Spark community wants to avoid feature duplications like this.
The proposed feature is already delivered to many production environments via PodTemplate and has been used by the customers without any problem. Adding another configuration only makes the users confused .
| .doc("If enable, register executor with anti affinity. This anti affinity will help " + | ||
| "Kubernetes assign executors of the same Application to different nodes " + | ||
| "as much as possible") | ||
| .version("3.2.1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition, Apache Spark follows Semantic Versioning policy which means new features and improvements should be the version of master branch, currently, 3.4.0.
| "as much as possible") | ||
| .version("3.2.1") | ||
| .booleanConf | ||
| .createWithDefault(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default value is correct because AntiAffinity could hurt EKS AutoScale feature.
@dongjoon-hyun Thanks for your reply. I can understand the above and accept it. Thanks all for review this PR!!! I will close this PR and look forward to meeting in the another PR. |
|
Thank you so much, @zwangsheng . |
What changes were proposed in this pull request?
Add Inter-Pod anti-affinity to Executor Pod.
Why are the changes needed?
Why should we need this?
When Spark On Kubernetes is running, Executor Pod clusters occur in certain conditions (uneven resource allocation in Kubernetes, high load On some nodes, low load On some nodes), causing Shuffle data skew. This causes Spark application to fail or performance bottlenecks, such as Shuffle Fetch timeout and connection refuse after connection number.
How does this PR help?
Add the AntiAffinity feature to Executor Pod to ensure simple anti-affinity scheduling at Application granularity.
Executor Pod Yaml represents:
Why should use this?
The functionality mentioned in this PR was tested on a cluster.
Using three Kubernetes Node(node-1, node-2, node-3).
In the case of sufficient or insufficient cluster resources, it has the same effect Whether the feature is enabled or not. Kubernetes assigns pods to nodes with low load based on global resources. When only one node has a small load, Kubernetes will schedule all executor pods to this node.
Here is the experiment results:
(Experiments show pods are scheduled to which node)
Experiment 1:
Three nodes hold a small amount of equal load.
Enable Feature:
Disable Feature:
Experiment 2:
Node 1 have no idle resources, Node 2 and Node 3 hold a small amount of equal load.
Enable Feature:
Disable Feature:
If some nodes are busy or the load is unbalanced, leaving the feature off means that Kubernetes will pick the Node with the lowest load and distribute it until it is no longer the one with the lowest load. After the feature is enabled, it is first allocated to nodes with low load, and then allocated to nodes with low load except hassan according to the anti-affinity of Application granularity.
Experiment 2:
Node 1 have no idle resources, Node 2 has a higher load than Node 3.
Enable Feature:
Disable Feature:
According to the above experimental results, we can see that under normal circumstances, after the feature is turned on, there will not be any difference from that when it is not turned on; In extreme cases, after enabling the feature, it can better alleviate the accumulation of Pods On a Node and prevent performance bottleneck to a certain extent.
Will this make any difference?
Add features to apply for the Executor of Pod add
AntiAffinity PreferredDuringSchedulingIgnoreedDuringExecution, Used to add the Application granularity Preferred antiaffinity based on Kubernetes' global resource-oriented (and other customized scheduling policies). We hope Kubernetes can help us smooth out ExecutorPod from the Application level as much as possible while adhering to the original scheduling rules.Why choose this?
We are currently concerned with gathering distribution between pods.
According to the Kubernetes Assigning Pods To Nodes,can see that Kubernetes provides us with
requiredDuringSchedulingIgnoredDuringExecution&preferredDuringSchedulingIgnoredDuringExecution。requiredDuringSchedulingIgnoredDuringExecution: The scheduler can't schedule the Pod unless the rule is met. This functions likenodeSelector, but with a more expressive syntax.preferredDuringSchedulingIgnoredDuringExecution: The scheduler tries to find a node that meets the rule. If a matching node is not available, the scheduler still schedules the Pod.It's not hard to see,
preferredDuringSchedulingIgnoredDuringExecutionis more in line with the issues we raised in this PR. We want Kubernetes to help smooth out Executor pods as much as possible, but in the worst case, where there is only one Node with resources left, we also need to be able to allocate Executor pods to that one Node.We need to antiaffinity executorPods from the granularity of the Application, so we choose to add the antiaffinity before allocating the Executor and after generating applicationId.
Due to the ApplicationId limitation, we could not pin it to the
pod-template.yaml.From the perspective of normal scheduling policy, there is no great negative impact. From experiments and theories, it can be known that the new feature will not affect the existing scheduling content.
However, to some extent, breaking Executor pods will affect the localization of Shuffle data, that is, the number of blocks in LocalHost will be reduced.
Judging from the experiments and results so far, this trade-off is worthwhile.
If we do not adopt the strategy of breaking up executors, they will gather under certain circumstances, leading to Shuffle data skews and significantly affecting task performance. More seriously, after the connection number is full, Executor Fetch Block fails and Stage fails. Even Application failure.
Breaking executors increases network connectivity and traffic at the cluster level. However, the increased consumption is scattered in the cluster and not concentrated on several nodes, so the overall stability is acceptable.
The number of Executor Pods is related to the amount of Shuffle data. Currently, the Shuffle tilt problem can be alleviated by controlling the number of Executor Pods.
At present, this is only a step of anti-affinity reference, and later may be carried out directly through shuffle quantity.
Does this PR introduce any user-facing change?
Yes
How was this patch tested?
Local