Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
2b7f1ef
[SPARK-41423][CORE] Protobuf serializer for StageDataWrapper
panbingkun Dec 23, 2022
0dedace
[SPARK-41423][CORE] Protobuf serializer for StageDataWrapper
panbingkun Dec 24, 2022
f0ba3fc
[SPARK-41423][CORE] Protobuf serializer for StageDataWrapper
panbingkun Dec 24, 2022
65e8e02
[SPARK-41423][CORE] Protobuf serializer for StageDataWrapper
panbingkun Dec 25, 2022
f89b17a
[SPARK-41423][CORE] Protobuf serializer for StageDataWrapper
panbingkun Dec 25, 2022
809d5ff
[SPARK-41423][CORE] Protobuf serializer for StageDataWrapper
panbingkun Dec 25, 2022
21bbfca
[SPARK-41423][CORE] Protobuf serializer for StageDataWrapper
panbingkun Dec 25, 2022
e28a727
[SPARK-41423][CORE] Protobuf serializer for StageDataWrapper
panbingkun Dec 28, 2022
4bd85f8
[SPARK-41423][CORE] Protobuf serializer for StageDataWrapper
panbingkun Dec 28, 2022
59b5749
[SPARK-41423][CORE] Protobuf serializer for StageDataWrapper
panbingkun Dec 29, 2022
fac1115
Merge branch 'master' into SPARK-41423
panbingkun Dec 29, 2022
3def54d
Merge branch 'master' into SPARK-41423
panbingkun Dec 29, 2022
955b22a
[SPARK-41423][CORE] Protobuf serializer for StageDataWrapper
panbingkun Dec 29, 2022
bab6770
[SPARK-41423][CORE] Protobuf serializer for StageDataWrapper
panbingkun Dec 29, 2022
6cb476f
[SPARK-41423][CORE] Protobuf serializer for StageDataWrapper
panbingkun Dec 29, 2022
8aeb9c5
[SPARK-41423][CORE] Protobuf serializer for StageDataWrapper
panbingkun Dec 30, 2022
f8e6162
[SPARK-41423][CORE] Protobuf serializer for StageDataWrapper
panbingkun Dec 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -390,3 +390,214 @@ message SQLExecutionUIData {
repeated int64 stages = 11;
map<int64, string> metric_values = 12;
}

message StageDataWrapper {
StageData info = 1;
repeated int64 job_ids = 2;
map<string, int64> locality = 3;
}

message TaskData {
int64 task_id = 1;
int32 index = 2;
int32 attempt = 3;
int32 partition_id = 4;
int64 launch_time = 5;
optional int64 result_fetch_start = 6;
optional int64 duration = 7;
string executor_id = 8;
string host = 9;
string status = 10;
string task_locality = 11;
bool speculative = 12;
repeated AccumulableInfo accumulator_updates = 13;
optional string error_message = 14;
optional TaskMetrics task_metrics = 15;
map<string, string> executor_logs = 16;
int64 scheduler_delay = 17;
int64 getting_result_time = 18;
}

message StageData {
enum StageStatus {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why StageStatus designed as StageData inside enum ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If StageStatus is defined outside, the error message is as follows:
image

Then If StageStatus is defined as follows:
enum StageStatus {
STAGE_STATUS_UNSPECIFIED = 0;
STAGE_STATUS_ACTIVE = 1;
STAGE_STATUS_COMPLETE = 2;
STAGE_STATUS_FAILED = 3;
STAGE_STATUS_PENDING = 4;
STAGE_STATUS_SKIPPED = 5;
}

The Code of Serializer and Deerializer will be very ugly!
Will have to handle the operations of adding prefix and deleting prefix.

@panbingkun panbingkun Dec 28, 2022

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, the enum definition of JobExecutionStatus seems more reasonable in JobData ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine to me

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JobExecutionStatus is used in SQLExecutionUIData. So it can't be moved into JobData

@LuciferYang LuciferYang Dec 29, 2022

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As described in https://github.com/apache/spark/pull/39270/files, UNSPECIFIED in StageStatus should change to STAGE_STATUS_UNSPECIFIED and moved out of StageData

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New pr for JobExecutionStatus: #39286
@gengliangwang @LuciferYang

UNSPECIFIED = 0;
ACTIVE = 1;
COMPLETE = 2;
FAILED = 3;
PENDING = 4;
SKIPPED = 5;
}

StageStatus status = 1;
int64 stage_id = 2;
int32 attempt_id = 3;
int32 num_tasks = 4;
int32 num_active_tasks = 5;
int32 num_complete_tasks = 6;
int32 num_failed_tasks = 7;
int32 num_killed_tasks = 8;
int32 num_completed_indices = 9;

optional int64 submission_time = 10;
optional int64 first_task_launched_time = 11;
optional int64 completion_time = 12;
optional string failure_reason = 13;

int64 executor_deserialize_time = 14;
int64 executor_deserialize_cpu_time = 15;
int64 executor_run_time = 16;
int64 executor_cpu_time = 17;
int64 result_size = 18;
int64 jvm_gc_time = 19;
int64 result_serialization_time = 20;
int64 memory_bytes_spilled = 21;
int64 disk_bytes_spilled = 22;
int64 peak_execution_memory = 23;
int64 input_bytes = 24;
int64 input_records = 25;
int64 output_bytes = 26;
int64 output_records = 27;
int64 shuffle_remote_blocks_fetched = 28;
int64 shuffle_local_blocks_fetched = 29;
int64 shuffle_fetch_wait_time = 30;
int64 shuffle_remote_bytes_read = 31;
int64 shuffle_remote_bytes_read_to_disk = 32;
int64 shuffle_local_bytes_read = 33;
int64 shuffle_read_bytes = 34;
int64 shuffle_read_records = 35;
int64 shuffle_write_bytes = 36;
int64 shuffle_write_time = 37;
int64 shuffle_write_records = 38;

string name = 39;
optional string description = 40;
string details = 41;
string scheduling_pool = 42;

repeated int64 rdd_ids = 43;
repeated AccumulableInfo accumulator_updates = 44;
map<int64, TaskData> tasks = 45;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional map is not supported by pb

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, hmm... should we encapsulate this map?

such as

optional TaskMap tasks = 45;

message TaskMap {
  map<int64, TaskData> tasks = 1;
}

also cc @gengliangwang

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply a map is OK here. An empty map should make no difference with None here.

map<string, ExecutorStageSummary> executor_summary = 46;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

optional SpeculationStageSummary speculation_summary = 47;
map<string, int32> killed_tasks_summary = 48;
int32 resource_profile_id = 49;
optional ExecutorMetrics peak_executor_metrics = 50;
optional TaskMetricDistributions task_metrics_distributions = 51;
optional ExecutorMetricsDistributions executor_metrics_distributions = 52;
}

message TaskMetrics {
int64 executor_deserialize_time = 1;
int64 executor_deserialize_cpu_time = 2;
int64 executor_run_time = 3;
int64 executor_cpu_time = 4;
int64 result_size = 5;
int64 jvm_gc_time = 6;
int64 result_serialization_time = 7;
int64 memory_bytes_spilled = 8;
int64 disk_bytes_spilled = 9;
int64 peak_execution_memory = 10;
InputMetrics input_metrics = 11;
OutputMetrics output_metrics = 12;
ShuffleReadMetrics shuffle_read_metrics = 13;
ShuffleWriteMetrics shuffle_write_metrics = 14;
}

message InputMetrics {
int64 bytes_read = 1;
int64 records_read = 2;
}

message OutputMetrics {
int64 bytes_written = 1;
int64 records_written = 2;
}

message ShuffleReadMetrics {
int64 remote_blocks_fetched = 1;
int64 local_blocks_fetched = 2;
int64 fetch_wait_time = 3;
int64 remote_bytes_read = 4;
int64 remote_bytes_read_to_disk = 5;
int64 local_bytes_read = 6;
int64 records_read = 7;
}

message ShuffleWriteMetrics {
int64 bytes_written = 1;
int64 write_time = 2;
int64 records_written = 3;
}

message TaskMetricDistributions {
repeated double quantiles = 1;
repeated double duration = 2;
repeated double executor_deserialize_time = 3;
repeated double executor_deserialize_cpu_time = 4;
repeated double executor_run_time = 5;
repeated double executor_cpu_time = 6;
repeated double result_size = 7;
repeated double jvm_gc_time = 8;
repeated double result_serialization_time = 9;
repeated double getting_result_time = 10;
repeated double scheduler_delay = 11;
repeated double peak_execution_memory = 12;
repeated double memory_bytes_spilled = 13;
repeated double disk_bytes_spilled = 14;
InputMetricDistributions input_metrics = 15;
OutputMetricDistributions output_metrics = 16;
ShuffleReadMetricDistributions shuffle_read_metrics = 17;
ShuffleWriteMetricDistributions shuffle_write_metrics = 18;
}

message InputMetricDistributions {
repeated double bytes_read = 1;
repeated double records_read = 2;
}

message OutputMetricDistributions {
repeated double bytes_written = 1;
repeated double records_written = 2;
}

message ShuffleReadMetricDistributions {
repeated double read_bytes = 1;
repeated double read_records = 2;
repeated double remote_blocks_fetched = 3;
repeated double local_blocks_fetched = 4;
repeated double fetch_wait_time = 5;
repeated double remote_bytes_read = 6;
repeated double remote_bytes_read_to_disk = 7;
repeated double total_blocks_fetched = 8;
}

message ShuffleWriteMetricDistributions {
repeated double write_bytes = 1;
repeated double write_records = 2;
repeated double write_time = 3;
}

message ExecutorMetricsDistributions {
repeated double quantiles = 1;

repeated double task_time = 2;
repeated double failed_tasks = 3;
repeated double succeeded_tasks = 4;
repeated double killed_tasks = 5;
repeated double input_bytes = 6;
repeated double input_records = 7;
repeated double output_bytes = 8;
repeated double output_records = 9;
repeated double shuffle_read = 10;
repeated double shuffle_read_records = 11;
repeated double shuffle_write = 12;
repeated double shuffle_write_records = 13;
repeated double memory_bytes_spilled = 14;
repeated double disk_bytes_spilled = 15;
ExecutorPeakMetricsDistributions peak_memory_metrics = 16;
}

message ExecutorPeakMetricsDistributions {
repeated double quantiles = 1;
repeated ExecutorMetrics executor_metrics = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ org.apache.spark.status.protobuf.ResourceProfileWrapperSerializer
org.apache.spark.status.protobuf.SpeculationStageSummaryWrapperSerializer
org.apache.spark.status.protobuf.ExecutorSummaryWrapperSerializer
org.apache.spark.status.protobuf.ProcessSummaryWrapperSerializer
org.apache.spark.status.protobuf.StageDataWrapperSerializer
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.spark.status.protobuf

import org.apache.spark.status.ExecutorStageSummaryWrapper
import org.apache.spark.status.api.v1.ExecutorStageSummary
import org.apache.spark.status.protobuf.Utils.getOptional

class ExecutorStageSummaryWrapperSerializer extends ProtobufSerDe {

Expand All @@ -29,7 +27,7 @@ class ExecutorStageSummaryWrapperSerializer extends ProtobufSerDe {
serialize(input.asInstanceOf[ExecutorStageSummaryWrapper])

private def serialize(input: ExecutorStageSummaryWrapper): Array[Byte] = {
val info = serializeExecutorStageSummary(input.info)
val info = Utils.serializeExecutorStageSummary(input.info)
val builder = StoreTypes.ExecutorStageSummaryWrapper.newBuilder()
.setStageId(input.stageId.toLong)
.setStageAttemptId(input.stageAttemptId)
Expand All @@ -40,61 +38,11 @@ class ExecutorStageSummaryWrapperSerializer extends ProtobufSerDe {

def deserialize(bytes: Array[Byte]): ExecutorStageSummaryWrapper = {
val binary = StoreTypes.ExecutorStageSummaryWrapper.parseFrom(bytes)
val info = deserializeExecutorStageSummary(binary.getInfo)
val info = Utils.deserializeExecutorStageSummary(binary.getInfo)
new ExecutorStageSummaryWrapper(
stageId = binary.getStageId.toInt,
stageAttemptId = binary.getStageAttemptId,
executorId = binary.getExecutorId,
info = info)
}

private def serializeExecutorStageSummary(
input: ExecutorStageSummary): StoreTypes.ExecutorStageSummary = {
val builder = StoreTypes.ExecutorStageSummary.newBuilder()
.setTaskTime(input.taskTime)
.setFailedTasks(input.failedTasks)
.setSucceededTasks(input.succeededTasks)
.setKilledTasks(input.killedTasks)
.setInputBytes(input.inputBytes)
.setInputRecords(input.inputRecords)
.setOutputBytes(input.outputBytes)
.setOutputRecords(input.outputRecords)
.setShuffleRead(input.shuffleRead)
.setShuffleReadRecords(input.shuffleReadRecords)
.setShuffleWrite(input.shuffleWrite)
.setShuffleWriteRecords(input.shuffleWriteRecords)
.setMemoryBytesSpilled(input.memoryBytesSpilled)
.setDiskBytesSpilled(input.diskBytesSpilled)
.setIsBlacklistedForStage(input.isBlacklistedForStage)
.setIsExcludedForStage(input.isExcludedForStage)
input.peakMemoryMetrics.map { m =>
builder.setPeakMemoryMetrics(ExecutorMetricsSerializer.serialize(m))
}
builder.build()
}

def deserializeExecutorStageSummary(
binary: StoreTypes.ExecutorStageSummary): ExecutorStageSummary = {
val peakMemoryMetrics =
getOptional(binary.hasPeakMemoryMetrics,
() => ExecutorMetricsSerializer.deserialize(binary.getPeakMemoryMetrics))
new ExecutorStageSummary(
taskTime = binary.getTaskTime,
failedTasks = binary.getFailedTasks,
succeededTasks = binary.getSucceededTasks,
killedTasks = binary.getKilledTasks,
inputBytes = binary.getInputBytes,
inputRecords = binary.getInputRecords,
outputBytes = binary.getOutputBytes,
outputRecords = binary.getOutputRecords,
shuffleRead = binary.getShuffleRead,
shuffleReadRecords = binary.getShuffleReadRecords,
shuffleWrite = binary.getShuffleWrite,
shuffleWriteRecords = binary.getShuffleWriteRecords,
memoryBytesSpilled = binary.getMemoryBytesSpilled,
diskBytesSpilled = binary.getDiskBytesSpilled,
isBlacklistedForStage = binary.getIsBlacklistedForStage,
peakMemoryMetrics = peakMemoryMetrics,
isExcludedForStage = binary.getIsExcludedForStage)
}
}
Loading