Skip to content

Comments

[SPARK-41423][CORE] Protobuf serializer for StageDataWrapper#39192

Closed
panbingkun wants to merge 17 commits intoapache:masterfrom
panbingkun:SPARK-41423
Closed

[SPARK-41423][CORE] Protobuf serializer for StageDataWrapper#39192
panbingkun wants to merge 17 commits intoapache:masterfrom
panbingkun:SPARK-41423

Conversation

@panbingkun
Copy link
Contributor

@panbingkun panbingkun commented Dec 23, 2022

What changes were proposed in this pull request?

Add Protobuf serializer for StageDataWrapper.

Why are the changes needed?

Support fast and compact serialization/deserialization for StageDataWrapper over RocksDB.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New UT.

@github-actions github-actions bot added the CORE label Dec 23, 2022
@panbingkun
Copy link
Contributor Author

Waiting for me to add new UT.

@panbingkun panbingkun changed the title [WIP][SPARK-41423][CORE] Protobuf serializer for StageDataWrapper [SPARK-41423][CORE] Protobuf serializer for StageDataWrapper Dec 24, 2022

repeated int64 rdd_ids = 43;
repeated AccumulableInfo accumulator_updates = 44;
map<int64, TaskData> tasks = 45;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional map is not supported by pb

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, hmm... should we encapsulate this map?

such as

optional TaskMap tasks = 45;

message TaskMap {
  map<int64, TaskData> tasks = 1;
}

also cc @gengliangwang

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply a map is OK here. An empty map should make no difference with None here.

val description =
getOptional(binary.hasDescription, () => weakIntern(binary.getDescription))
val accumulatorUpdates = Utils.deserializeAccumulableInfos(binary.getAccumulatorUpdatesList)
val tasks = MapUtils.isEmpty(binary.getTasksMap) match {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional map is not supported by pb

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@panbingkun
Copy link
Contributor Author

cc @gengliangwang @LuciferYang

@LuciferYang
Copy link
Contributor

also cc @techaddict

}

message StageData {
enum StageStatus {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why StageStatus designed as StageData inside enum ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If StageStatus is defined outside, the error message is as follows:
image

Then If StageStatus is defined as follows:
enum StageStatus {
STAGE_STATUS_UNSPECIFIED = 0;
STAGE_STATUS_ACTIVE = 1;
STAGE_STATUS_COMPLETE = 2;
STAGE_STATUS_FAILED = 3;
STAGE_STATUS_PENDING = 4;
STAGE_STATUS_SKIPPED = 5;
}

The Code of Serializer and Deerializer will be very ugly!
Will have to handle the operations of adding prefix and deleting prefix.

Copy link
Contributor Author

@panbingkun panbingkun Dec 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, the enum definition of JobExecutionStatus seems more reasonable in JobData ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine to me

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JobExecutionStatus is used in SQLExecutionUIData. So it can't be moved into JobData

Copy link
Contributor

@LuciferYang LuciferYang Dec 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As described in https://github.com/apache/spark/pull/39270/files, UNSPECIFIED in StageStatus should change to STAGE_STATUS_UNSPECIFIED and moved out of StageData

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New pr for JobExecutionStatus: #39286
@gengliangwang @LuciferYang


repeated int64 rdd_ids = 43;
repeated AccumulableInfo accumulator_updates = 44;
map<int64, TaskData> tasks = 45;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, hmm... should we encapsulate this map?

such as

optional TaskMap tasks = 45;

message TaskMap {
  map<int64, TaskData> tasks = 1;
}

also cc @gengliangwang

repeated int64 rdd_ids = 43;
repeated AccumulableInfo accumulator_updates = 44;
map<int64, TaskData> tasks = 45;
map<string, ExecutorStageSummary> executor_summary = 46;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

stageData.rddIds.foreach(id => stageDataBuilder.addRddIds(id.toLong))
stageData.accumulatorUpdates.foreach { update =>
stageDataBuilder.addAccumulatorUpdates(Utils.serializeAccumulableInfo(update))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are 3 choices for the definition of serializeAccumulableInfo function:

  1. Move it from class TaskDataWrapperSerializer to companion object TaskDataWrapperSerializer
  2. Move it from class TaskDataWrapperSerializer to object AccumulableInfoSerializer
  3. Keep the status quo and let StageDataWrapperSerializer hold a TaskDataWrapperSerializer instance

Similar suggestions for deserializeAccumulableInfo\serializeExecutorStageSummary\deserializeExecutorStageSummary and I think Utils should be a more general functions

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for AccumulableInfoSerializer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let me do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@gengliangwang
Copy link
Member

This is a big one. @panbingkun Thanks for working on it!


object AccumulableInfoSerializer {

private[protobuf] def serializeAccumulableInfo(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serializeAccumulableInfo -> serialize

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

builder.build()
}

private[protobuf] def deserializeAccumulableInfos(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deserializeAccumulableInfos -> deserialize,

nit: I prefer to deserialize(info AccumulableInfo), looks more generic, but now is also ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


private[protobuf] def deserializeAccumulableInfos(
updates: JList[StoreTypes.AccumulableInfo]): ArrayBuffer[AccumulableInfo] = {
val accumulatorUpdates = new ArrayBuffer[AccumulableInfo]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with a initialSize ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


override val supportClass: Class[_] = classOf[StageDataWrapper]

override def serialize(input: Any): Array[Byte] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can merge the two serialize to one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

val executorSummary = MapUtils.isEmpty(binary.getExecutorSummaryMap) match {
case true => None
case _ => Some(binary.getExecutorSummaryMap.asScala.mapValues(
ExecutorStageSummarySerializer.deserializeExecutorStageSummary(_)).toMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can convertible to a method value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

case _ => Some(binary.getTasksMap.asScala.map(
entry => (entry._1.toLong, deserializeTaskData(entry._2))).toMap)
}
val executorSummary = MapUtils.isEmpty(binary.getExecutorSummaryMap) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just true and false, I prefer to if {} else {}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

new ExecutorPeakMetricsDistributions(
quantiles = binary.getQuantilesList.asScala.map(_.toDouble).toIndexedSeq,
executorMetrics = binary.getExecutorMetricsList.asScala.map(
ExecutorMetricsSerializer.deserialize(_)).toIndexedSeq
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can convertible to a method value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

launchTime = new Date(binary.getLaunchTime),
resultFetchStart = resultFetchStart,
duration = duration,
executorId = weakIntern(binary.getExecutorId),
Copy link
Contributor

@LuciferYang LuciferYang Dec 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When should we use weakIntern? Seems not all serializers use it, will this affect performance?

For example, when new AccumulableInfo in AccumulableInfoSerializer, we didn't use weakIntern

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency, we use weak here, eg:

executorId = weakIntern(binary.getExecutorId),
host = weakIntern(binary.getHost),
status = weakIntern(binary.getStatus),
taskLocality = weakIntern(binary.getTaskLocality),

As far as I know, when the field is of type string (not include map<string....>)

import org.apache.spark.status.api.v1.AccumulableInfo
import org.apache.spark.status.protobuf.Utils.getOptional

object AccumulableInfoSerializer {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put the private[protobuf] before the object AccumulableInfoSerializer

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that we don't need to have private[protobuf] before each method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


object ExecutorStageSummarySerializer {

private[protobuf] def serialize(input: ExecutorStageSummary): StoreTypes.ExecutorStageSummary = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


import org.apache.spark.status.api.v1.StageStatus

object StageStatusSerializer {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}

private def assert(result: TaskMetrics, input: TaskMetrics): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename all the assert methods as checkAnwser(result, expected)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except a few minor comments

@gengliangwang
Copy link
Member

@panbingkun Thanks for the work, merging to master

dongjoon-hyun pushed a commit that referenced this pull request Jan 3, 2023
…atorUpdates for Scala 2.13

### What changes were proposed in this pull request?

This PR is a followup of #39192 that excludes `StageData.rddIds` and `StageData.accumulatorUpdates` for Scala 2.13

### Why are the changes needed?

To recover the Scala 2.13 build. It is currently broken (https://github.com/apache/spark/actions/runs/3824617107/jobs/6506925003):

```
[error] spark-core: Failed binary compatibility check against org.apache.spark:spark-core_2.13:3.3.0! Found 3 potential problems (filtered 997)
[error]  * method rddIds()scala.collection.immutable.Seq in class org.apache.spark.status.api.v1.StageData has a different result type in current version, where it is scala.collection.Seq rather than scala.collection.immutable.Seq
[error]    filter with: ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.StageData.rddIds")
[error]  * method accumulatorUpdates()scala.collection.immutable.Seq in class org.apache.spark.status.api.v1.StageData has a different result type in current version, where it is scala.collection.Seq rather than scala.collection.immutable.Seq
[error]    filter with: ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.StageData.accumulatorUpdates")
[error]  * method this(org.apache.spark.status.api.v1.StageStatus,Int,Int,Int,Int,Int,Int,Int,Int,scala.Option,scala.Option,scala.Option,scala.Option,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,java.lang.String,scala.Option,java.lang.String,java.lang.String,scala.collection.immutable.Seq,scala.collection.immutable.Seq,scala.Option,scala.Option,scala.Option,scala.collection.immutable.Map,Int,scala.Option,scala.Option,scala.Option)Unit in class org.apache.spark.status.api.v1.StageData's type is different in current version, where it is (org.apache.spark.status.api.v1.StageStatus,Int,Int,Int,Int,Int,Int,Int,Int,scala.Option,scala.Option,scala.Option,scala.Option,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,java.lang.String,scala.Option,java.lang.String,java.lang.String,scala.collection.Seq,scala.collection.Seq,scala.Option,scala.Option,scala.Option,scala.collection.immutable.Map,Int,scala.Option,scala.Option,scala.Option)Unit instead of (org.apache.spark.status.api.v1.StageStatus,Int,Int,Int,Int,Int,Int,Int,Int,scala.Option,scala.Option,scala.Option,scala.Option,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,java.lang.String,scala.Option,java.lang.String,java.lang.String,scala.collection.immutable.Seq,scala.collection.immutable.Seq,scala.Option,scala.Option,scala.Option,scala.collection.immutable.Map,Int,scala.Option,scala.Option,scala.Option)Unit
[error]    filter with: ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.StageData.this")
```

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

Manually tested.

Closes #39356 from HyukjinKwon/SPARK-41423.

Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants