Skip to content

Conversation

@ivoson
Copy link
Contributor

@ivoson ivoson commented Jan 5, 2023

What changes were proposed in this pull request?

As described in SPARK-41848, we update the executor's free cores based on executor's resource profile. For tasks with TaskResourceProfile in standalone cluster, since executors with default resource profile can be reused across different TaskResourceProfiles, and the task cpus can be different from the value we get from executor's resource profile.

The changes to fix the issues:

  1. Decrease executor free cores when launch tasks by taskCpus from TaskDescription;
  2. Adding taskCpus to StatusUpdate as well as the other resources, so that taskCpus can be reported when task is finished and we can increase executor free cores by the reported taskCpus;

Why are the changes needed?

Fixing the bug as described in SPARK-41848

Does this PR introduce any user-facing change?

No

How was this patch tested?

New UTs added.

@github-actions github-actions bot added the CORE label Jan 5, 2023
@ivoson
Copy link
Contributor Author

ivoson commented Jan 5, 2023

cc @Ngone51 could you please take a look at this PR? Thanks

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@Ngone51
Copy link
Member

Ngone51 commented Jan 6, 2023

cc @tgravescs @mridulm too

@Ngone51 Ngone51 requested review from mridulm and tgravescs January 6, 2023 02:10
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit = {
val resources = taskResources.getOrElse(taskId, Map.empty[String, ResourceInformation])
val msg = StatusUpdate(executorId, taskId, state, data, resources)
val cpus = taskCpus.getOrElse(taskId, 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we get the taskCpu by executor.runningTask(taskId).taskDescription.cpus to get rid of taskCpus?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that looks better. Making the change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

// this function is for testing only
def getExecutorAvailableCpus(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make it private spark?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

val conf = new SparkConf()
.set(EXECUTOR_CORES, execCores)
.set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive during test
.set(EXECUTOR_INSTANCES, 0) // avoid errors about duplicate executor registrations
Copy link
Member

@Ngone51 Ngone51 Jan 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why there could be "duplicate executor registrations"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kept the comments from other UTs, while I checked the code and didn't see any chance for the duplicate registrations. Removing the comments here.

when(mockEndpointRef.send(LaunchTask)).thenAnswer((_: InvocationOnMock) => {})

var executorAddedCount: Int = 0
val infos = scala.collection.mutable.ArrayBuffer[ExecutorInfo]()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move scala.collection.mutable to import list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@mridulm
Copy link
Contributor

mridulm commented Jan 6, 2023

I will need to test this and revisit the code to understand the issue better.
I would expect this info to be at driver already - not sure why executors are having to let the driver know.

}

// To avoid allocating any resources immediately after releasing the resource from the task to
// make sure that `availableAddrs` below won't change
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

available cpus?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.

@Ngone51
Copy link
Member

Ngone51 commented Jan 6, 2023

@mridulm Please note that this is a new bug introduced by a recent PR (#37268) that hasn't been rollout.

@mridulm
Copy link
Contributor

mridulm commented Jan 6, 2023

Ah ! Thanks for the context @Ngone51 - just checked git blame to see it is part of 3.4 :-)

@tgravescs
Copy link
Contributor

Overall approach looks good, I think @Ngone51 covered comments I had.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @xinrong-meng (Apache Spark 3.4 release manager) since this is filed as a blocker-level JIRA issue.

"Our unexpected executor does not have a request time.")
}

test("SPARK-41848: executor core decrease should base on taskCpus") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. decrease should base on -> should be decreased based on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

Copy link
Contributor

@WeichenXu123 WeichenXu123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@dongjoon-hyun
Copy link
Member

Merged to master. Thank you, @ivoson , @Ngone51 , @mridulm , @tgravescs , @WeichenXu123 .
Also, cc @sunchao

@mridulm
Copy link
Contributor

mridulm commented Jan 9, 2023

@Ngone51 Had a query here ... the number of cores for a task is determined in resourceOffers based on TaskSet - given this, cant we not leverage it at driver given a task id ? Instead of passing it from executor ?

@dongjoon-hyun
Copy link
Member

Sorry for missing that, @mridulm .

@mridulm
Copy link
Contributor

mridulm commented Jan 10, 2023

Oh no, you are good @dongjoon-hyun - I was not actively reviewing this PR !

@Ngone51
Copy link
Member

Ngone51 commented Jan 10, 2023

@mridulm That might be an alternative. I was thinking about it too. But checking the code, I found that a TaskSet might have been cleaned up when a task finishes, e.g., in the case of executor lost comes before the StatusUpdate. TaskSchedulerImpl.statusUpdate() has also considered this case:

("Ignoring update with state %s for TID %s because its task set is gone (this is " +

in that case, we'd have troubles to know the exact task cores used. We might need to maintain an extra constructor at the driver to track the resources used by the task if we want to do this alternative way.

Instead of passing it from executor ?

The current way is actually consistent with the way we handled with custom resources, which is also assigned at driver and returned with StatusUpdate. Though, the custom resources itself also has the same issue with task cores as you concerned.

@ivoson
Copy link
Contributor Author

ivoson commented Jan 10, 2023

Thanks for review. @Ngone51 @tgravescs @dongjoon-hyun @WeichenXu123 @mridulm

@ivoson ivoson deleted the SPARK-41848 branch January 10, 2023 05:09
@mridulm
Copy link
Contributor

mridulm commented Jan 10, 2023

Thanks @Ngone51, that is an excellent case where the approach I asked about wont work !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants