Skip to content

Conversation

@xwu-intel
Copy link
Contributor

What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-36699

We proposed to optionally change behavior of stage-level scheduling by reusing compatible executors. Two executors binding to different resource profiles are compatible only when the executorResources (cores in particular if not defining custom resources) are the same, but taskResources can be different. When the executors are compatible, the tasks can be allocated to any of them even when in the different profiles. Users defining profiles should make sure the different taskResources are properly specified against the same executorResources.
A SparkConf option spark.dynamicAllocation.reuseExecutors is defined to change the default behavior which is not reusing executors. When this option is turned on, dynamic allocation will count all compatible executors number to meet init/min/max executor number restrictions.
The first PR will focus on reusing executors with same cores without custom resources.

Why are the changes needed?

Current stage-level scheduling allocated separated set of executors for different executor profiles. This approach simplified implementation, however is a waste of executor resources when the existing executors have enough resources to run the following tasks.

The typical user scenario is for different stages, user wants to use different core number for the task with same executor resources. For instance in CPU machine learning scenario, to achieve the best performance, given the same executor resources, when in ETL stage, user will allocate 1 core per task and many tasks, and in the following CPU training stage, user will use more cores per task and less tasks. In the existing implementation, two separated profiles and executors are created. Reusing executors will get better CPU resource utilization and better performance.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit tests

@github-actions github-actions bot added the CORE label Sep 9, 2021
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@xwu-intel
Copy link
Contributor Author

xwu-intel commented Sep 13, 2021

@tgravescs Main code is in place but some redundant code is not cleaned yet. I will finish it if the idea is accepted.
related discussions: https://issues.apache.org/jira/browse/SPARK-31437

@tgravescs
Copy link
Contributor

thanks for working on this, I'm very busy right now will likely be end of week or early next week and I'll look

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what exactly is the plan here, I know you wanted to get feedback, but are you going to add in check for all the resources to say they are compatible? Part of this comes down to other things as well. Like memory. I might have large containers with the same number of cores, are they ok to reuse. For instance I might have large containers that I'm using for ML vs ETL. So I think we need to define a policy in more detail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to look in more detail perhaps on how to do this, I don't really like haven't to add conditionals in so many places

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls let me know if there is better way.

@tgravescs
Copy link
Contributor

tgravescs commented Sep 30, 2021

A few other concerns here.

  1. how does this work with minimum executors, seems easiest enough to not worry about it for this and just reuse if there.
  2. how does this apply to executor monitor idle timeout. we don't timeout executors when more tasks to be run, do we take that into account here and keep compatible ones?
  3. Does UI show proper/useful information here
  4. I don't think this has taken numBarrierSlotsAvailable into account - see calculateAvailableSlots

@xwu-intel
Copy link
Contributor Author

xwu-intel commented Oct 8, 2021

A few other concerns here.

  1. how does this work with minimum executors, seems easiest enough to not worry about it for this and just reuse if there.

The idea is resource reuse so min/max will be adjusted to take into account for all compatible executors. (i.e. all compatible executors will share one min/max executors number)

https://github.com/apache/spark/blob/67034f284803bd10a487b5c67eb4c552ace950c3/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L558

  1. how does this apply to executor monitor idle timeout. we don't timeout executors when more tasks to be run, do we take that into account here and keep compatible ones?

Compatible executors will be used as the same as the executor with the original profile. It will also timeout if not more tasks. As mentioned above, it will share the min/max executors number with other compatible profiles.

  1. Does UI show proper/useful information here

Will check what UI need to be changed. Maybe show some compatible info in the Environment / Resource Profiles

  1. I don't think this has taken numBarrierSlotsAvailable into account - see calculateAvailableSlots

Didn't notice this, will check.

@xwu-intel
Copy link
Contributor Author

what exactly is the plan here, I know you wanted to get feedback, but are you going to add in check for all the resources to say they are compatible? Part of this comes down to other things as well. Like memory. I might have large containers with the same number of cores, are they ok to reuse. For instance I might have large containers that I'm using for ML vs ETL. So I think we need to define a policy in more detail.

We can discuss this. Actually current reusing condition is conservative (just use cores) rather than flexible (memory and other resources). And need to consider if it's easy for end user to control the resource sharing (Your case with larger memory and same cores will be reused here, it's hard to consider memory, up to user) and make sense for the real use case. I found some use cases for reusing cores but for other resources such as GPU I didn't have a clear thought right now. I will think over and discuss with you and welcome input.

@xwu-intel
Copy link
Contributor Author

xwu-intel commented Dec 2, 2021

@tgravescs

Busy for a while, back to this topic. I wil find some time to address code comments above.

For reuse policy, there are two options in my mind right now:

  1. strict match: only reuse executors with exact same resources (including all 3rd party resources), there is no resource waste but less user flexibility
  2. reuse larger executor: if there is a larger executor which has resources larger than or equal to current requirements, eg. if you define less memory in the new stage then you can reuse previous executor with larger memory. or if you define new stage with no GPU, you can reuse previous executor with GPU. in both cases, new stage has less resource requrements. But in this policy user should know there is some resource waste. They need to tradeoff reuse executor or create new ones.

Do we also allow user to select reuse policy at stage level?

I am not sure if all policies can be used in real-world scenerio. How about we implement some policy first to get things working for some scenerio and to leave policies options open to add new policy in the future?

@tgravescs
Copy link
Contributor

Sorry it looks like I missed your previous comments. Yes I think we should support multiple reuse policies and let the user specify. If we can make it a pluggable api that would be good as well and would let them decide what resources they are comfortable with allowing wasted. Maybe they are ok with memory wasted but not GPU for instance.

@github-actions github-actions bot added the BUILD label Jan 18, 2022
@tgravescs
Copy link
Contributor

@xwu99 please let me know when you think this is at a point to review again

@xwu-intel
Copy link
Contributor Author

@tgravescs I will add more tests since several places are changed but you can review first and provide feedback. Some updates since our last discussion:

  • There are two ways to enable reuse executors: 1) use config option "spark.scheduler.reuseCompatibleExecutors" 2) use overloaded interface "withResources" to specify what resources should be considered and what is the reuse policy (check code). The second one can enable a stage-level reuse policy change in case we want more flexibility. We can discuss which one is better and then refine the code.
  • Cache compatible resource profiles in resource profiler manager when profile added
  • Added ResourceProfileCompatiblePolicy and interfaces for customization, there are two predefined policies: EQUAL_RESOURCES, MORE_RESOURCES
  • Barrior mode support with changes in calculateAvailableSlots.
  • Event logging and json protocol support, WebUI changes, need to polish and add more tests.

@xwu-intel
Copy link
Contributor Author

xwu-intel commented Feb 17, 2022

@tgravescs Do you have time to check this first? I will add more tests later. Thanks!

@xwu-intel xwu-intel changed the title [WIP][SPARK-36699][Core] Reuse compatible executors for stage-level scheduling [SPARK-36699][Core] Reuse compatible executors for stage-level scheduling Feb 17, 2022
@xwu-intel xwu-intel requested a review from tgravescs February 25, 2022 07:51
@tgravescs
Copy link
Contributor

sorry been very busy, this is on my list to review still, hopefully in the next week.

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I accidentally submitted this before I was done typing, I'll add more in followup comment

how does this apply to executor monitor idle timeout. we don't timeout executors when more tasks to be run, do we take that into account here and keep compatible ones?

Compatible executors will be used as the same as the executor with the original profile. It will also timeout if not more tasks. As mentioned above, it will share the min/max executors number with other compatible profiles.

I'm not sure I follow this, I would expected the ExecutorAllocationManager.removeExecutors to be updated where it checks:

   } else if (newExecutorTotal - 1 < numExecutorsTargetPerResourceProfileId(rpId)) {
           logDebug(s"Not removing idle executor $executorIdToBeRemoved because there " +
            s"are only $newExecutorTotal executor(s) left (number of executor " +
            s"target ${numExecutorsTargetPerResourceProfileId(rpId)})")

It should not remove an executor if it a task could run on it.

// Ensure that our target fits within adjusted bounds:
val numCompatibleExecutors = numExecutorsTargetsCompatibleProfiles(rpId)
val adjustedMinNumExecutors = math.max(0, minNumExecutors - numCompatibleExecutors)
val adjustedMaxNumExecutors = math.max(1, maxNumExecutors - numCompatibleExecutors)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part doesn't make sense to me on initial reading. If we set the min and max and our target should still fit within those and not those adjusted. I get that you are trying to say including the compatible ones keep it in that limit but I think this is hard to read to understand that. this would also get more complicated if the reuse policy could change within an application.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part doesn't make sense to me on initial reading. If we set the min and max and our target should still fit within those and not those adjusted. I get that you are trying to say including the compatible ones keep it in that limit but I think this is hard to read to understand that. this would also get more complicated if the reuse policy could change within an application.

Yes, what I mean is the new min and max executor numbers if there are some executors reused. What is your suggestion to get this more easy ?

writeLock.lock()
try {
reuseResourceNames = resourceNames
reusePolicy = policy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must be missing where this is used? Or you wanted feedback first?

Copy link
Contributor Author

@xwu-intel xwu-intel Apr 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an extra param to specify policy for RDD.withResource. The code is to save reuse policy.

def withResources(rp: ResourceProfile,
reuseResourceNames: Set[String], reusePolicy: ResourceProfileCompatiblePolicy): this.type = {

But as you suggested setting a global policy is enough, I am thinking how to do that.

I must be missing where this is used? Or you wanted feedback first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow this, I would expected the ExecutorAllocationManager.removeExecutors to be updated where it checks:

   } else if (newExecutorTotal - 1 < numExecutorsTargetPerResourceProfileId(rpId)) {
           logDebug(s"Not removing idle executor $executorIdToBeRemoved because there " +
            s"are only $newExecutorTotal executor(s) left (number of executor " +
            s"target ${numExecutorsTargetPerResourceProfileId(rpId)})")

It should not remove an executor if it a task could run on it.

I didn't change removeExecutors logic, the executors are only removed when they timeout.

* Specify a ResourceProfile and reuse existing compatible executors to use when calculating
* this RDD.
* @param reuseResourceNames specify what resource should be checked when reusing executors
* @param reusePolicy specify executor reuse policy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather see ResourceProfileCompatiblePolicy as a public interface that user could implement their own policy. We can provide a couple very basic ones, equals and AllGreater or something like that.
Also while its more flexible to do per stage it also I think complicates the allocation strategy for dynamic allocation.
We would also need to know if this policy would allow for using this without dynamic allocation, because theoretically if you are reusing executor with same executor profile but different task requirements, you wouldn't need dynamic allocation.

@Since("3.1.0")
case class SparkListenerResourceProfileAdded(resourceProfile: ResourceProfile)
@Since("3.3.0")
case class SparkListenerResourceProfileAdded(resourceProfile: ResourceProfile,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer not to change this interface as it gets used by people. I'd rather create a new one if needed but need to look at how its all used. If its just for environment page to show compatible, I'm not sure its worth it. WE can come back to this once figure out main logic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for event logging and for showing compatible info in UI. I think it's a natural extension for ResourceProfileAdded event. We can leave it right now and pls let me know if we want a seperate event to address event with compatible info.

I would prefer not to change this interface as it gets used by people. I'd rather create a new one if needed but need to look at how its all used. If its just for environment page to show compatible, I'm not sure its worth it. WE can come back to this once figure out main logic

@tgravescs
Copy link
Contributor

It think it would be best to backup and discuss what exactly we want to target for this PR. These are the things I'm thinking:

  • Support a basic reuse policy (perhaps EXEC_CORES_EQUAL since I think that was your original use case) and allow user to specify their own. ie config that perhaps load all policies, like the spark.plugins config. User can give the policy a name and reference it by name.
  • I think we can keep the reuse policy specified at the application level rather then stage level to keep logic easier for now. We can always add an interface to specify at stage level later.
  • Do we want to allow reusing without dynamic allocation? We would have to throw at runtime if we discovered they tried to specify resource profile that wouldn't be compatible. We could keep this PR simple and not support that at first.

does that sound ok?

@xwu-intel
Copy link
Contributor Author

It think it would be best to backup and discuss what exactly we want to target for this PR. These are the things I'm thinking:

  • Support a basic reuse policy (perhaps EXEC_CORES_EQUAL since I think that was your original use case) and allow user to specify their own. ie config that perhaps load all policies, like the spark.plugins config. User can give the policy a name and reference it by name.
  • I think we can keep the reuse policy specified at the application level rather then stage level to keep logic easier for now. We can always add an interface to specify at stage level later.

For those two I agree to keep it simple at first to address the most common scenarios. I would check the other comments in the coming weeks.

  • Do we want to allow reusing without dynamic allocation? We would have to throw at runtime if we discovered they tried to specify resource profile that wouldn't be compatible. We could keep this PR simple and not support that at first.

Since ResourceProfiles are only supported with dynamic allocation right now. (SparkException("ResourceProfiles are only supported on YARN and Kubernetes with dynamic allocation enabled."). Maybe a seperate PR to address that.

does that sound ok?

@tgravescs
Copy link
Contributor

Since ResourceProfiles are only supported with dynamic allocation right now. (SparkException("ResourceProfiles are only supported on YARN and Kubernetes with dynamic allocation enabled."). Maybe a seperate PR to address that.

Yes I think this is fine to leave off and do separate

@xwu-intel
Copy link
Contributor Author

@tgravescs Thanks for the comments. I am too busy to response but I will address them when time allows.

@xwu-intel
Copy link
Contributor Author

Support a basic reuse policy (perhaps EXEC_CORES_EQUAL since I think that was your original use case) and allow user to specify their own. ie config that perhaps load all policies, like the spark.plugins config. User can give the policy a name and reference it by name.
I think we can keep the reuse policy specified at the application level rather then stage level to keep logic easier for now. We can always add an interface to specify at stage level later.

I will think over how to implement those two and make changes.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Sep 29, 2022
@github-actions github-actions bot closed this Sep 30, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants