From 803921ab7c94cb239469e4aad6f408e854eae03f Mon Sep 17 00:00:00 2001 From: angerszhu Date: Thu, 26 Mar 2020 20:17:32 +0800 Subject: [PATCH 01/22] Executor metrics in stage level --- .../org/apache/spark/ui/static/stagepage.js | 18 +++++++++++++++++- .../spark/ui/static/stagespage-template.html | 4 ++++ .../spark/status/AppStatusListener.scala | 18 ++++++++++++++++-- .../org/apache/spark/status/LiveEntity.scala | 13 ++++++++++++- .../org/apache/spark/status/api/v1/api.scala | 10 +++++++++- 5 files changed, 58 insertions(+), 5 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js index 2f70b47c80f26..310992500fe3c 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js @@ -454,7 +454,23 @@ $(document).ready(function () { data : function (row, type) { return typeof row.diskBytesSpilled != 'undefined' ? formatBytes(row.diskBytesSpilled, type) : ""; } - } + },{ + data: function(row, type) { + return formatBytes(row.jvmHeapMemory, type) + "/" + formatBytes(row.jvmOffHeapMemory, type); + } + },{ + data: function(row, type) { + return formatBytes(row.onHeapExecutionMemory, type) + "/" + formatBytes(row.offHeapExecutionMemory, type); + } + },{ + data: function(row, type) { + return formatBytes(row.onHeapStorageMemory, type) + "/" + formatBytes(row.offHeapStorageMemory, type); + } + },{ + data: function(row, type) { + return formatBytes(row.directPoolMemory, type) + "/" + formatBytes(row.mappedPoolMemory, type); + } + } ], "order": [[0, "asc"]], "bAutoWidth": false, diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagespage-template.html b/core/src/main/resources/org/apache/spark/ui/static/stagespage-template.html index 77ea70e4ad966..c65c8fb62109b 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagespage-template.html +++ b/core/src/main/resources/org/apache/spark/ui/static/stagespage-template.html @@ -59,6 +59,10 @@

Aggregated Metrics by Executor

Shuffle Write Size / Records Spill (Memory) Spill (Disk) + JVMHeapMemory / JVMOffHeapMemory + OnHeapExecutionMemory / OffHeapExecutionMemory + OnHeapStorageMemory / OffHeapStorageMemory + DirectPoolMemory / MappedPoolMemory diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index c3f22f32993a8..9466d369b2ada 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import org.apache.spark._ -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.CPUS_PER_TASK import org.apache.spark.internal.config.Status._ @@ -605,6 +605,9 @@ private[spark] class AppStatusListener( stage.killedSummary = killedTasksSummary(event.reason, stage.killedSummary) } stage.activeTasksPerExecutor(event.taskInfo.executorId) -= 1 + + val metrics = stage.executorSummary(event.taskInfo.executorId).executorMetrics + metrics.compareAndUpdatePeakValues(event.taskExecutorMetrics) // [SPARK-24415] Wait for all tasks to finish before removing stage from live list val removeStage = stage.activeTasks == 0 && @@ -845,12 +848,23 @@ private[spark] class AppStatusListener( // check if there is a new peak value for any of the executor level memory metrics // for the live UI. SparkListenerExecutorMetricsUpdate events are only processed // for the live UI. - event.executorUpdates.foreach { case (_, peakUpdates) => + event.executorUpdates.foreach { case (stageKey, peakUpdates) => liveExecutors.get(event.execId).foreach { exec => if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(peakUpdates)) { maybeUpdate(exec, now) } } + + Option(liveStages.get(stageKey)).foreach { stage => + if (stageKey == EventLoggingListener.DRIVER_STAGE_KEY || + stageKey == (stage.info.stageId, stage.info.attemptNumber())) { + // If the update came from the driver, stageKey will be the dummy key (-1, -1), + // so record those peaks for all active stages. + // Otherwise, record the peaks for the matching stage. + val metrics = stage.executorSummary(event.execId).executorMetrics + metrics.compareAndUpdatePeakValues(peakUpdates) + } + } } // Flush updates if necessary. Executor heartbeat is an event that happens periodically. Flush // here to ensure the staleness of Spark UI doesn't last more than diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 2714f30de14f0..43024c6aa33d1 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -347,6 +347,8 @@ private class LiveExecutorStageSummary( var metrics = createMetrics(default = 0L) + var executorMetrics = new ExecutorMetrics() + override protected def doUpdate(): Any = { val info = new v1.ExecutorStageSummary( taskTime, @@ -363,7 +365,16 @@ private class LiveExecutorStageSummary( metrics.shuffleWriteMetrics.recordsWritten, metrics.memoryBytesSpilled, metrics.diskBytesSpilled, - isBlacklisted) + isBlacklisted, + executorMetrics.getMetricValue("JVMHeapMemory"), + executorMetrics.getMetricValue("JVMOffHeapMemory"), + executorMetrics.getMetricValue("OnHeapExecutionMemory"), + executorMetrics.getMetricValue("OffHeapExecutionMemory"), + executorMetrics.getMetricValue("OnHeapStorageMemory"), + executorMetrics.getMetricValue("OffHeapStorageMemory"), + executorMetrics.getMetricValue("DirectPoolMemory"), + executorMetrics.getMetricValue("MappedPoolMemory") + ) new ExecutorStageSummaryWrapper(stageId, attemptId, executorId, info) } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 5ec9b36393764..661372500d6fd 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -77,7 +77,15 @@ class ExecutorStageSummary private[spark]( val shuffleWriteRecords : Long, val memoryBytesSpilled : Long, val diskBytesSpilled : Long, - val isBlacklistedForStage: Boolean) + val isBlacklistedForStage: Boolean, + val jvmHeapMemory: Long, + val jvmOffHeapMemory: Long, + val onHeapExecutionMemory: Long, + val offHeapExecutionMemory: Long, + val onHeapStorageMemory: Long, + val offHeapStorageMemory: Long, + val directPoolMemory: Long, + val mappedPoolMemory: Long) class ExecutorSummary private[spark]( val id: String, From 55d520a9891c27107ea892132428622bc1fa9d6c Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sat, 2 May 2020 16:50:04 +0800 Subject: [PATCH 02/22] save --- .../org/apache/spark/ui/static/stagepage.js | 68 +++++++++++++------ .../org/apache/spark/ui/static/webui.css | 16 +++++ 2 files changed, 65 insertions(+), 19 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js index 310992500fe3c..d902ab5b17f3b 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js @@ -255,6 +255,13 @@ function reselectCheckboxesBasedOnTaskTableState() { allChecked = false; } } + for (var k = 0; k < executorOptionalColumns.length; k++) { + if (executorSummaryTableSelector.column(executorOptionalColumns[k]).visible()) { + $("#executor-box-"+optionalColumns[k]).prop('checked', true); + } else { + allChecked = false; + } + } if (allChecked) { $("#box-0").prop('checked', true); } @@ -278,6 +285,9 @@ var taskSummaryMetricsDataTable; var optionalColumns = [11, 12, 13, 14, 15, 16, 17, 21]; var taskTableSelector; +var executorOptionalColumns = [15, 16, 17, 18]; +var executorSummaryTableSelector; + $(document).ready(function () { setDataTableDefaults(); @@ -288,14 +298,18 @@ $(document).ready(function () { "" + "
" + "
Select All
" + - "
Scheduler Delay
" + - "
Task Deserialization Time
" + - "
Shuffle Read Blocked Time
" + - "
Shuffle Remote Reads
" + - "
Shuffle Write Time
" + - "
Result Serialization Time
" + - "
Getting Result Time
" + - "
Peak Execution Memory
" + + "
Scheduler Delay
" + + "
Task Deserialization Time
" + + "
Shuffle Read Blocked Time
" + + "
Shuffle Remote Reads
" + + "
Shuffle Write Time
" + + "
Result Serialization Time
" + + "
Getting Result Time
" + + "
Peak Execution Memory
" + + "
Executor JVMOnHeapMemory / JVMOffHeapMemory
" + + "
Executor OnHeapExecutionMemory / OffHeapExecutionMemory
" + + "
Executor OnHeapStorageMemory / OffHeapStorageMemory
" + + "
Executor DirectPoolMemory / MappedPoolMemory
" + "
"); $('#scheduler_delay').attr("data-toggle", "tooltip") @@ -472,13 +486,20 @@ $(document).ready(function () { } } ], + "columnDefs": [ + { "visible": false, "targets": 15 }, + { "visible": false, "targets": 16 }, + { "visible": false, "targets": 17 }, + { "visible": false, "targets": 18 } + ], + "deferRender": true, "order": [[0, "asc"]], "bAutoWidth": false, "oLanguage": { "sEmptyTable": "No data to show yet" } }; - var executorSummaryTableSelector = + executorSummaryTableSelector = $("#summary-executor-table").DataTable(executorSummaryConf); $('#parent-container [data-toggle="tooltip"]').tooltip(); @@ -930,30 +951,39 @@ $(document).ready(function () { var para = $(this).attr('data-column'); if (para == "0") { var allColumns = taskTableSelector.columns(optionalColumns); + var executorAllColumns = executorSummaryTableSelector.columns(executorOptionalColumns); if ($(this).is(":checked")) { $(".toggle-vis").prop('checked', true); allColumns.visible(true); + executorAllColumns.visible(true); createDataTableForTaskSummaryMetricsTable(taskSummaryMetricsTableArray); } else { $(".toggle-vis").prop('checked', false); allColumns.visible(false); + executorAllColumns.visible(false); var taskSummaryMetricsTableFilteredArray = taskSummaryMetricsTableArray.filter(row => row.checkboxId < 11); createDataTableForTaskSummaryMetricsTable(taskSummaryMetricsTableFilteredArray); } } else { - var column = taskTableSelector.column(para); - // Toggle the visibility - column.visible(!column.visible()); - var taskSummaryMetricsTableFilteredArray = []; - if ($(this).is(":checked")) { - taskSummaryMetricsTableCurrentStateArray.push(taskSummaryMetricsTableArray.filter(row => (row.checkboxId).toString() == para)[0]); - taskSummaryMetricsTableFilteredArray = taskSummaryMetricsTableCurrentStateArray.slice(); + var columnType = $(this).attr("column-type"); + if(columnType == 'task') { + var column = taskTableSelector.column(para); + // Toggle the visibility + column.visible(!column.visible()); + var taskSummaryMetricsTableFilteredArray = []; + if ($(this).is(":checked")) { + taskSummaryMetricsTableCurrentStateArray.push(taskSummaryMetricsTableArray.filter(row => (row.checkboxId).toString() == para)[0]); + taskSummaryMetricsTableFilteredArray = taskSummaryMetricsTableCurrentStateArray.slice(); + } else { + taskSummaryMetricsTableFilteredArray = + taskSummaryMetricsTableCurrentStateArray.filter(row => (row.checkboxId).toString() != para); + } + createDataTableForTaskSummaryMetricsTable(taskSummaryMetricsTableFilteredArray); } else { - taskSummaryMetricsTableFilteredArray = - taskSummaryMetricsTableCurrentStateArray.filter(row => (row.checkboxId).toString() != para); + var column = executorSummaryTableSelector.column(para); + column.visible(!column.visible()); } - createDataTableForTaskSummaryMetricsTable(taskSummaryMetricsTableFilteredArray); } }); diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css index f7f8a0e0e9061..70698ecba8b0a 100755 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.css +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css @@ -353,6 +353,22 @@ a.expandbutton { width: 180px; } +.executor-jvm-om-off-heap-memory-checkbox-div { + width: 480px; +} + +.executor-on-off-heap-execution-memory-checkbox-div { + width: 480px; +} + +.executor-on-off-heap-storage-memory-checkbox-div { + width: 480px; +} + +.executor-direct-mapped-pool-memory-checkbox-div { + width: 480px; +} + #active-tasks-table th { border-top: 1px solid #dddddd; border-bottom: 1px solid #dddddd; From 6d81362c5a1eb40855f892f97c79debb5412cab4 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sat, 2 May 2020 21:49:29 +0800 Subject: [PATCH 03/22] fix ut --- .../blacklisting_for_stage_expectation.json | 20 +++++++- ...acklisting_node_for_stage_expectation.json | 50 +++++++++++++++++-- .../one_stage_attempt_json_expectation.json | 10 +++- .../one_stage_json_expectation.json | 10 +++- ...age_with_accumulable_json_expectation.json | 10 +++- 5 files changed, 90 insertions(+), 10 deletions(-) diff --git a/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json b/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json index b18b19f7eeffb..aa9978bfcece4 100644 --- a/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json @@ -697,7 +697,15 @@ "shuffleWriteRecords" : 0, "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, - "isBlacklistedForStage" : true + "isBlacklistedForStage" : true, + "jvmHeapMemory" : 0, + "jvmOffHeapMemory" : 0, + "onHeapExecutionMemory" : 0, + "offHeapExecutionMemory" : 0, + "onHeapStorageMemory" : 0, + "offHeapStorageMemory" : 0, + "directPoolMemory" : 0, + "mappedPoolMemory" : 0 }, "1" : { "taskTime" : 708, @@ -714,7 +722,15 @@ "shuffleWriteRecords" : 10, "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, - "isBlacklistedForStage" : false + "isBlacklistedForStage" : false, + "jvmHeapMemory" : 0, + "jvmOffHeapMemory" : 0, + "onHeapExecutionMemory" : 0, + "offHeapExecutionMemory" : 0, + "onHeapStorageMemory" : 0, + "offHeapStorageMemory" : 0, + "directPoolMemory" : 0, + "mappedPoolMemory" : 0 } }, "killedTasksSummary" : { } diff --git a/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json b/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json index 8d11081247913..93f260ad502e1 100644 --- a/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json @@ -805,7 +805,15 @@ "shuffleWriteRecords" : 0, "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, - "isBlacklistedForStage" : true + "isBlacklistedForStage" : true, + "jvmHeapMemory" : 0, + "jvmOffHeapMemory" : 0, + "onHeapExecutionMemory" : 0, + "offHeapExecutionMemory" : 0, + "onHeapStorageMemory" : 0, + "offHeapStorageMemory" : 0, + "directPoolMemory" : 0, + "mappedPoolMemory" : 0 }, "5" : { "taskTime" : 1579, @@ -822,7 +830,15 @@ "shuffleWriteRecords" : 0, "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, - "isBlacklistedForStage" : true + "isBlacklistedForStage" : true, + "jvmHeapMemory" : 0, + "jvmOffHeapMemory" : 0, + "onHeapExecutionMemory" : 0, + "offHeapExecutionMemory" : 0, + "onHeapStorageMemory" : 0, + "offHeapStorageMemory" : 0, + "directPoolMemory" : 0, + "mappedPoolMemory" : 0 }, "1" : { "taskTime" : 2411, @@ -839,7 +855,15 @@ "shuffleWriteRecords" : 12, "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, - "isBlacklistedForStage" : false + "isBlacklistedForStage" : false, + "jvmHeapMemory" : 0, + "jvmOffHeapMemory" : 0, + "onHeapExecutionMemory" : 0, + "offHeapExecutionMemory" : 0, + "onHeapStorageMemory" : 0, + "offHeapStorageMemory" : 0, + "directPoolMemory" : 0, + "mappedPoolMemory" : 0 }, "2" : { "taskTime" : 2446, @@ -856,7 +880,15 @@ "shuffleWriteRecords" : 15, "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, - "isBlacklistedForStage" : false + "isBlacklistedForStage" : false, + "jvmHeapMemory" : 0, + "jvmOffHeapMemory" : 0, + "onHeapExecutionMemory" : 0, + "offHeapExecutionMemory" : 0, + "onHeapStorageMemory" : 0, + "offHeapStorageMemory" : 0, + "directPoolMemory" : 0, + "mappedPoolMemory" : 0 }, "3" : { "taskTime" : 1774, @@ -873,7 +905,15 @@ "shuffleWriteRecords" : 3, "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, - "isBlacklistedForStage" : true + "isBlacklistedForStage" : true, + "jvmHeapMemory" : 0, + "jvmOffHeapMemory" : 0, + "onHeapExecutionMemory" : 0, + "offHeapExecutionMemory" : 0, + "onHeapStorageMemory" : 0, + "offHeapStorageMemory" : 0, + "directPoolMemory" : 0, + "mappedPoolMemory" : 0 } }, "killedTasksSummary" : { } diff --git a/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json index 791907045e500..576e3233d9ab8 100644 --- a/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json @@ -459,7 +459,15 @@ "shuffleWriteRecords" : 0, "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, - "isBlacklistedForStage" : false + "isBlacklistedForStage" : false, + "jvmHeapMemory" : 0, + "jvmOffHeapMemory" : 0, + "onHeapExecutionMemory" : 0, + "offHeapExecutionMemory" : 0, + "onHeapStorageMemory" : 0, + "offHeapStorageMemory" : 0, + "directPoolMemory" : 0, + "mappedPoolMemory" : 0 } }, "killedTasksSummary" : { } diff --git a/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json index 50d3f74ae775f..1b099e9ed0074 100644 --- a/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json @@ -459,7 +459,15 @@ "shuffleWriteRecords" : 0, "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, - "isBlacklistedForStage" : false + "isBlacklistedForStage" : false, + "jvmHeapMemory" : 0, + "jvmOffHeapMemory" : 0, + "onHeapExecutionMemory" : 0, + "offHeapExecutionMemory" : 0, + "onHeapStorageMemory" : 0, + "offHeapStorageMemory" : 0, + "directPoolMemory" : 0, + "mappedPoolMemory" : 0 } }, "killedTasksSummary" : { } diff --git a/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json index 735a8257fc343..66e3c5d79cfa0 100644 --- a/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json @@ -503,7 +503,15 @@ "shuffleWriteRecords" : 0, "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, - "isBlacklistedForStage" : false + "isBlacklistedForStage" : false, + "jvmHeapMemory" : 0, + "jvmOffHeapMemory" : 0, + "onHeapExecutionMemory" : 0, + "offHeapExecutionMemory" : 0, + "onHeapStorageMemory" : 0, + "offHeapStorageMemory" : 0, + "directPoolMemory" : 0, + "mappedPoolMemory" : 0 } }, "killedTasksSummary" : { } From 7dba885022cceea0cc6b1697241e6c09c0de4199 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 6 May 2020 16:43:26 +0800 Subject: [PATCH 04/22] follow comment --- .../org/apache/spark/ui/static/stagepage.js | 10 +++++----- .../resources/org/apache/spark/ui/static/webui.css | 14 +------------- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js index d902ab5b17f3b..08aae717d4771 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js @@ -257,7 +257,7 @@ function reselectCheckboxesBasedOnTaskTableState() { } for (var k = 0; k < executorOptionalColumns.length; k++) { if (executorSummaryTableSelector.column(executorOptionalColumns[k]).visible()) { - $("#executor-box-"+optionalColumns[k]).prop('checked', true); + $("#executor-box-"+executorOptionalColumns[k]).prop('checked', true); } else { allChecked = false; } @@ -306,10 +306,10 @@ $(document).ready(function () { "
Result Serialization Time
" + "
Getting Result Time
" + "
Peak Execution Memory
" + - "
Executor JVMOnHeapMemory / JVMOffHeapMemory
" + - "
Executor OnHeapExecutionMemory / OffHeapExecutionMemory
" + - "
Executor OnHeapStorageMemory / OffHeapStorageMemory
" + - "
Executor DirectPoolMemory / MappedPoolMemory
" + + "
Executor JVMOnHeapMemory / JVMOffHeapMemory
" + + "
Executor OnHeapExecutionMemory / OffHeapExecutionMemory
" + + "
Executor OnHeapStorageMemory / OffHeapStorageMemory
" + + "
Executor DirectPoolMemory / MappedPoolMemory
" + ""); $('#scheduler_delay').attr("data-toggle", "tooltip") diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css index 70698ecba8b0a..9e86af075566a 100755 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.css +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css @@ -353,19 +353,7 @@ a.expandbutton { width: 180px; } -.executor-jvm-om-off-heap-memory-checkbox-div { - width: 480px; -} - -.executor-on-off-heap-execution-memory-checkbox-div { - width: 480px; -} - -.executor-on-off-heap-storage-memory-checkbox-div { - width: 480px; -} - -.executor-direct-mapped-pool-memory-checkbox-div { +.executor-jvm-metrics-checkbox-div { width: 480px; } From 22c1e9e5d1bf7fe9771162bc3b6ccb5dddf3fd45 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Thu, 7 May 2020 11:16:57 +0800 Subject: [PATCH 05/22] Update stagepage.js --- core/src/main/resources/org/apache/spark/ui/static/stagepage.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js index 08aae717d4771..d041439c9505e 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js @@ -245,7 +245,7 @@ function createRowMetadataForColumn(colKey, data, checkboxId) { function reselectCheckboxesBasedOnTaskTableState() { var allChecked = true; var taskSummaryMetricsTableCurrentFilteredArray = taskSummaryMetricsTableCurrentStateArray.slice(); - if (typeof taskTableSelector !== 'undefined' && taskSummaryMetricsTableCurrentStateArray.length > 0) { + if (typeof taskTableSelector !== 'undefined' && typeof executorSummaryTableSelector !== 'undefined' && taskSummaryMetricsTableCurrentStateArray.length > 0) { for (var k = 0; k < optionalColumns.length; k++) { if (taskTableSelector.column(optionalColumns[k]).visible()) { $("#box-"+optionalColumns[k]).prop('checked', true); From 30708542e2912d64d262babb322d37c910faec95 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 2 Dec 2020 18:58:48 +0800 Subject: [PATCH 06/22] Update stagepage.js --- .../resources/org/apache/spark/ui/static/stagepage.js | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js index d041439c9505e..335c0f7e7367c 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js @@ -245,16 +245,20 @@ function createRowMetadataForColumn(colKey, data, checkboxId) { function reselectCheckboxesBasedOnTaskTableState() { var allChecked = true; var taskSummaryMetricsTableCurrentFilteredArray = taskSummaryMetricsTableCurrentStateArray.slice(); - if (typeof taskTableSelector !== 'undefined' && typeof executorSummaryTableSelector !== 'undefined' && taskSummaryMetricsTableCurrentStateArray.length > 0) { + if ((typeof taskTableSelector !== 'undefined' && taskSummaryMetricsTableCurrentStateArray.length > 0) || typeof executorSummaryTableSelector !== 'undefined') { for (var k = 0; k < optionalColumns.length; k++) { if (taskTableSelector.column(optionalColumns[k]).visible()) { $("#box-"+optionalColumns[k]).prop('checked', true); - taskSummaryMetricsTableCurrentStateArray.push(taskSummaryMetricsTableArray.filter(row => (row.checkboxId).toString() == optionalColumns[k])[0]); - taskSummaryMetricsTableCurrentFilteredArray = taskSummaryMetricsTableCurrentStateArray.slice(); + var result = taskSummaryMetricsTableArray.filter(row => (row.checkboxId).toString() == optionalColumns[k])[0]; + if (typeof result !== 'undefined') { + taskSummaryMetricsTableCurrentStateArray.push(taskSummaryMetricsTableArray.filter(row => (row.checkboxId).toString() == optionalColumns[k])[0]); + taskSummaryMetricsTableCurrentFilteredArray = taskSummaryMetricsTableCurrentStateArray.slice(); + } } else { allChecked = false; } } + for (var k = 0; k < executorOptionalColumns.length; k++) { if (executorSummaryTableSelector.column(executorOptionalColumns[k]).visible()) { $("#executor-box-"+executorOptionalColumns[k]).prop('checked', true); From 6a32b44d412294fa4315d817b27f22ba5beea428 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 2 Dec 2020 20:08:22 +0800 Subject: [PATCH 07/22] Update stagepage.js --- .../org/apache/spark/ui/static/stagepage.js | 36 ++++++++++--------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js index 335c0f7e7367c..e28844bfa7414 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js @@ -246,30 +246,32 @@ function reselectCheckboxesBasedOnTaskTableState() { var allChecked = true; var taskSummaryMetricsTableCurrentFilteredArray = taskSummaryMetricsTableCurrentStateArray.slice(); if ((typeof taskTableSelector !== 'undefined' && taskSummaryMetricsTableCurrentStateArray.length > 0) || typeof executorSummaryTableSelector !== 'undefined') { - for (var k = 0; k < optionalColumns.length; k++) { - if (taskTableSelector.column(optionalColumns[k]).visible()) { - $("#box-"+optionalColumns[k]).prop('checked', true); - var result = taskSummaryMetricsTableArray.filter(row => (row.checkboxId).toString() == optionalColumns[k])[0]; - if (typeof result !== 'undefined') { + if (typeof taskTableSelector !== 'undefined' && taskSummaryMetricsTableCurrentStateArray.length > 0) { + for (var k = 0; k < optionalColumns.length; k++) { + if (taskTableSelector.column(optionalColumns[k]).visible()) { + $("#box-"+optionalColumns[k]).prop('checked', true); taskSummaryMetricsTableCurrentStateArray.push(taskSummaryMetricsTableArray.filter(row => (row.checkboxId).toString() == optionalColumns[k])[0]); taskSummaryMetricsTableCurrentFilteredArray = taskSummaryMetricsTableCurrentStateArray.slice(); + } else { + allChecked = false; } - } else { - allChecked = false; } - } + } + createDataTableForTaskSummaryMetricsTable(taskSummaryMetricsTableCurrentFilteredArray); - for (var k = 0; k < executorOptionalColumns.length; k++) { - if (executorSummaryTableSelector.column(executorOptionalColumns[k]).visible()) { - $("#executor-box-"+executorOptionalColumns[k]).prop('checked', true); - } else { - allChecked = false; + if (typeof executorSummaryTableSelector !== 'undefined') { + for (var k = 0; k < executorOptionalColumns.length; k++) { + if (executorSummaryTableSelector.column(executorOptionalColumns[k]).visible()) { + $("#executor-box-"+executorOptionalColumns[k]).prop('checked', true); + } else { + allChecked = false; + } } } + if (allChecked) { $("#box-0").prop('checked', true); } - createDataTableForTaskSummaryMetricsTable(taskSummaryMetricsTableCurrentFilteredArray); } } @@ -310,8 +312,8 @@ $(document).ready(function () { "
Result Serialization Time
" + "
Getting Result Time
" + "
Peak Execution Memory
" + - "
Executor JVMOnHeapMemory / JVMOffHeapMemory
" + - "
Executor OnHeapExecutionMemory / OffHeapExecutionMemory
" + + "
Executor JVMOnHeapMemory / JVMOffHeapMemory
" + + "
Executor OnHeapExecutionMemory / OffHeapExecutionMemory
" + "
Executor OnHeapStorageMemory / OffHeapStorageMemory
" + "
Executor DirectPoolMemory / MappedPoolMemory
" + ""); @@ -389,7 +391,7 @@ $(document).ready(function () { if (!dataToShow.showShuffleWriteData) { $('#shuffle_write_time').remove(); - optionalColumns.splice(7, 1) + optionalColumns.splice(optionalColumns.length - 1, 1) } // prepare data for executor summary table From a6dd5ab8c22b209d7cefd5cfc569531fefb88992 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 2 Dec 2020 21:01:15 +0800 Subject: [PATCH 08/22] solve conflicts --- .../org/apache/spark/ui/static/stagepage.js | 52 +++++++++++++------ .../spark/status/AppStatusListener.scala | 3 -- .../org/apache/spark/status/LiveEntity.scala | 2 +- 3 files changed, 37 insertions(+), 20 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js index 55ce24a5833c5..ca37026c67016 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js @@ -483,23 +483,43 @@ $(document).ready(function () { data : function (row, type) { return typeof row.diskBytesSpilled != 'undefined' ? formatBytes(row.diskBytesSpilled, type) : ""; } - },{ - data: function(row, type) { - return formatBytes(row.jvmHeapMemory, type) + "/" + formatBytes(row.jvmOffHeapMemory, type); + }, + { + data : function (row, type) { + if (type !== 'display') + return row.peakMemoryMetrics.JVMHeapMemory; + else + return (formatBytes(row.peakMemoryMetrics.JVMHeapMemory, type) + ' / ' + + formatBytes(row.peakMemoryMetrics.JVMOffHeapMemory, type)); + } + }, + { + data : function (row, type) { + if (type !== 'display') + return row.peakMemoryMetrics.OnHeapExecutionMemory; + else + return (formatBytes(row.peakMemoryMetrics.OnHeapExecutionMemory, type) + ' / ' + + formatBytes(row.peakMemoryMetrics.OffHeapExecutionMemory, type)); + } + }, + { + data : function (row, type) { + if (type !== 'display') + return row.peakMemoryMetrics.OnHeapStorageMemory; + else + return (formatBytes(row.peakMemoryMetrics.OnHeapStorageMemory, type) + ' / ' + + formatBytes(row.peakMemoryMetrics.OffHeapStorageMemory, type)); + } + }, + { + data : function (row, type) { + if (type !== 'display') + return row.peakMemoryMetrics.DirectPoolMemory; + else + return (formatBytes(row.peakMemoryMetrics.DirectPoolMemory, type) + ' / ' + + formatBytes(row.peakMemoryMetrics.MappedPoolMemory, type)); } - },{ - data: function(row, type) { - return formatBytes(row.onHeapExecutionMemory, type) + "/" + formatBytes(row.offHeapExecutionMemory, type); - } - },{ - data: function(row, type) { - return formatBytes(row.onHeapStorageMemory, type) + "/" + formatBytes(row.offHeapStorageMemory, type); - } - },{ - data: function(row, type) { - return formatBytes(row.directPoolMemory, type) + "/" + formatBytes(row.mappedPoolMemory, type); - } - } + } ], "columnDefs": [ { "visible": false, "targets": 15 }, diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 17b0e97d7adc0..5b0c1dc389af0 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -687,9 +687,6 @@ private[spark] class AppStatusListener( stage.killedSummary = killedTasksSummary(event.reason, stage.killedSummary) } stage.activeTasksPerExecutor(event.taskInfo.executorId) -= 1 - - val metrics = stage.executorSummary(event.taskInfo.executorId).executorMetrics - metrics.compareAndUpdatePeakValues(event.taskExecutorMetrics) // [SPARK-24415] Wait for all tasks to finish before removing stage from live list val removeStage = stage.activeTasks == 0 && diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 38f1f25f2fcaa..88594a0dbc4d5 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -386,7 +386,7 @@ private class LiveExecutorStageSummary( metrics.memoryBytesSpilled, metrics.diskBytesSpilled, isExcluded, - Some(peakExecutorMetrics).filter(_.isSet), + Some(peakExecutorMetrics), isExcluded) new ExecutorStageSummaryWrapper(stageId, attemptId, executorId, info) } From 91a1f8e09881a886a834e5133b90451b2b653133 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Thu, 3 Dec 2020 09:52:11 +0800 Subject: [PATCH 09/22] fix UT --- ...xcludeOnFailure_for_stage_expectation.json | 66 +++++-- ...eOnFailure_node_for_stage_expectation.json | 164 +++++++++++++----- .../one_stage_attempt_json_expectation.json | 32 +++- .../one_stage_json_expectation.json | 32 +++- ...age_with_accumulable_json_expectation.json | 32 +++- 5 files changed, 233 insertions(+), 93 deletions(-) diff --git a/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_for_stage_expectation.json b/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_for_stage_expectation.json index e999e650524c5..b3da4d1fccb7e 100644 --- a/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_for_stage_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_for_stage_expectation.json @@ -698,15 +698,29 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : true, - "isExcludedForStage" : false, - "jvmHeapMemory" : 0, - "jvmOffHeapMemory" : 0, - "onHeapExecutionMemory" : 0, - "offHeapExecutionMemory" : 0, - "onHeapStorageMemory" : 0, - "offHeapStorageMemory" : 0, - "directPoolMemory" : 0, - "mappedPoolMemory" : 0 + "peakMemoryMetrics" : { + "JVMHeapMemory" : -1, + "JVMOffHeapMemory" : 0, + "OnHeapExecutionMemory" : 0, + "OffHeapExecutionMemory" : 0, + "OnHeapStorageMemory" : 0, + "OffHeapStorageMemory" : 0, + "OnHeapUnifiedMemory" : 0, + "OffHeapUnifiedMemory" : 0, + "DirectPoolMemory" : 0, + "MappedPoolMemory" : 0, + "ProcessTreeJVMVMemory" : 0, + "ProcessTreeJVMRSSMemory" : 0, + "ProcessTreePythonVMemory" : 0, + "ProcessTreePythonRSSMemory" : 0, + "ProcessTreeOtherVMemory" : 0, + "ProcessTreeOtherRSSMemory" : 0, + "MinorGCCount" : 0, + "MinorGCTime" : 0, + "MajorGCCount" : 0, + "MajorGCTime" : 0 + }, + "isExcludedForStage" : true }, "1" : { "taskTime" : 708, @@ -724,17 +738,31 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : false, - "isExcludedForStage" : false, - "jvmHeapMemory" : 0, - "jvmOffHeapMemory" : 0, - "onHeapExecutionMemory" : 0, - "offHeapExecutionMemory" : 0, - "onHeapStorageMemory" : 0, - "offHeapStorageMemory" : 0, - "directPoolMemory" : 0, - "mappedPoolMemory" : 0 + "peakMemoryMetrics" : { + "JVMHeapMemory" : -1, + "JVMOffHeapMemory" : 0, + "OnHeapExecutionMemory" : 0, + "OffHeapExecutionMemory" : 0, + "OnHeapStorageMemory" : 0, + "OffHeapStorageMemory" : 0, + "OnHeapUnifiedMemory" : 0, + "OffHeapUnifiedMemory" : 0, + "DirectPoolMemory" : 0, + "MappedPoolMemory" : 0, + "ProcessTreeJVMVMemory" : 0, + "ProcessTreeJVMRSSMemory" : 0, + "ProcessTreePythonVMemory" : 0, + "ProcessTreePythonRSSMemory" : 0, + "ProcessTreeOtherVMemory" : 0, + "ProcessTreeOtherRSSMemory" : 0, + "MinorGCCount" : 0, + "MinorGCTime" : 0, + "MajorGCCount" : 0, + "MajorGCTime" : 0 + }, + "isExcludedForStage" : false } }, "killedTasksSummary" : { }, "resourceProfileId" : 0 -} +} \ No newline at end of file diff --git a/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_node_for_stage_expectation.json b/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_node_for_stage_expectation.json index 827eb6e89f783..75edae0548d05 100644 --- a/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_node_for_stage_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_node_for_stage_expectation.json @@ -806,15 +806,29 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : true, - "isExcludedForStage" : false, - "jvmHeapMemory" : 0, - "jvmOffHeapMemory" : 0, - "onHeapExecutionMemory" : 0, - "offHeapExecutionMemory" : 0, - "onHeapStorageMemory" : 0, - "offHeapStorageMemory" : 0, - "directPoolMemory" : 0, - "mappedPoolMemory" : 0 + "peakMemoryMetrics" : { + "JVMHeapMemory" : -1, + "JVMOffHeapMemory" : 0, + "OnHeapExecutionMemory" : 0, + "OffHeapExecutionMemory" : 0, + "OnHeapStorageMemory" : 0, + "OffHeapStorageMemory" : 0, + "OnHeapUnifiedMemory" : 0, + "OffHeapUnifiedMemory" : 0, + "DirectPoolMemory" : 0, + "MappedPoolMemory" : 0, + "ProcessTreeJVMVMemory" : 0, + "ProcessTreeJVMRSSMemory" : 0, + "ProcessTreePythonVMemory" : 0, + "ProcessTreePythonRSSMemory" : 0, + "ProcessTreeOtherVMemory" : 0, + "ProcessTreeOtherRSSMemory" : 0, + "MinorGCCount" : 0, + "MinorGCTime" : 0, + "MajorGCCount" : 0, + "MajorGCTime" : 0 + }, + "isExcludedForStage" : true }, "5" : { "taskTime" : 1579, @@ -832,15 +846,29 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : true, - "isExcludedForStage" : false, - "jvmHeapMemory" : 0, - "jvmOffHeapMemory" : 0, - "onHeapExecutionMemory" : 0, - "offHeapExecutionMemory" : 0, - "onHeapStorageMemory" : 0, - "offHeapStorageMemory" : 0, - "directPoolMemory" : 0, - "mappedPoolMemory" : 0 + "peakMemoryMetrics" : { + "JVMHeapMemory" : -1, + "JVMOffHeapMemory" : 0, + "OnHeapExecutionMemory" : 0, + "OffHeapExecutionMemory" : 0, + "OnHeapStorageMemory" : 0, + "OffHeapStorageMemory" : 0, + "OnHeapUnifiedMemory" : 0, + "OffHeapUnifiedMemory" : 0, + "DirectPoolMemory" : 0, + "MappedPoolMemory" : 0, + "ProcessTreeJVMVMemory" : 0, + "ProcessTreeJVMRSSMemory" : 0, + "ProcessTreePythonVMemory" : 0, + "ProcessTreePythonRSSMemory" : 0, + "ProcessTreeOtherVMemory" : 0, + "ProcessTreeOtherRSSMemory" : 0, + "MinorGCCount" : 0, + "MinorGCTime" : 0, + "MajorGCCount" : 0, + "MajorGCTime" : 0 + }, + "isExcludedForStage" : true }, "1" : { "taskTime" : 2411, @@ -858,15 +886,29 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : false, - "isExcludedForStage" : false, - "jvmHeapMemory" : 0, - "jvmOffHeapMemory" : 0, - "onHeapExecutionMemory" : 0, - "offHeapExecutionMemory" : 0, - "onHeapStorageMemory" : 0, - "offHeapStorageMemory" : 0, - "directPoolMemory" : 0, - "mappedPoolMemory" : 0 + "peakMemoryMetrics" : { + "JVMHeapMemory" : -1, + "JVMOffHeapMemory" : 0, + "OnHeapExecutionMemory" : 0, + "OffHeapExecutionMemory" : 0, + "OnHeapStorageMemory" : 0, + "OffHeapStorageMemory" : 0, + "OnHeapUnifiedMemory" : 0, + "OffHeapUnifiedMemory" : 0, + "DirectPoolMemory" : 0, + "MappedPoolMemory" : 0, + "ProcessTreeJVMVMemory" : 0, + "ProcessTreeJVMRSSMemory" : 0, + "ProcessTreePythonVMemory" : 0, + "ProcessTreePythonRSSMemory" : 0, + "ProcessTreeOtherVMemory" : 0, + "ProcessTreeOtherRSSMemory" : 0, + "MinorGCCount" : 0, + "MinorGCTime" : 0, + "MajorGCCount" : 0, + "MajorGCTime" : 0 + }, + "isExcludedForStage" : false }, "2" : { "taskTime" : 2446, @@ -884,15 +926,29 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : false, - "isExcludedForStage" : false, - "jvmHeapMemory" : 0, - "jvmOffHeapMemory" : 0, - "onHeapExecutionMemory" : 0, - "offHeapExecutionMemory" : 0, - "onHeapStorageMemory" : 0, - "offHeapStorageMemory" : 0, - "directPoolMemory" : 0, - "mappedPoolMemory" : 0 + "peakMemoryMetrics" : { + "JVMHeapMemory" : -1, + "JVMOffHeapMemory" : 0, + "OnHeapExecutionMemory" : 0, + "OffHeapExecutionMemory" : 0, + "OnHeapStorageMemory" : 0, + "OffHeapStorageMemory" : 0, + "OnHeapUnifiedMemory" : 0, + "OffHeapUnifiedMemory" : 0, + "DirectPoolMemory" : 0, + "MappedPoolMemory" : 0, + "ProcessTreeJVMVMemory" : 0, + "ProcessTreeJVMRSSMemory" : 0, + "ProcessTreePythonVMemory" : 0, + "ProcessTreePythonRSSMemory" : 0, + "ProcessTreeOtherVMemory" : 0, + "ProcessTreeOtherRSSMemory" : 0, + "MinorGCCount" : 0, + "MinorGCTime" : 0, + "MajorGCCount" : 0, + "MajorGCTime" : 0 + }, + "isExcludedForStage" : false }, "3" : { "taskTime" : 1774, @@ -910,17 +966,31 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : true, - "isExcludedForStage" : true, - "jvmHeapMemory" : 0, - "jvmOffHeapMemory" : 0, - "onHeapExecutionMemory" : 0, - "offHeapExecutionMemory" : 0, - "onHeapStorageMemory" : 0, - "offHeapStorageMemory" : 0, - "directPoolMemory" : 0, - "mappedPoolMemory" : 0 - } + "peakMemoryMetrics" : { + "JVMHeapMemory" : -1, + "JVMOffHeapMemory" : 0, + "OnHeapExecutionMemory" : 0, + "OffHeapExecutionMemory" : 0, + "OnHeapStorageMemory" : 0, + "OffHeapStorageMemory" : 0, + "OnHeapUnifiedMemory" : 0, + "OffHeapUnifiedMemory" : 0, + "DirectPoolMemory" : 0, + "MappedPoolMemory" : 0, + "ProcessTreeJVMVMemory" : 0, + "ProcessTreeJVMRSSMemory" : 0, + "ProcessTreePythonVMemory" : 0, + "ProcessTreePythonRSSMemory" : 0, + "ProcessTreeOtherVMemory" : 0, + "ProcessTreeOtherRSSMemory" : 0, + "MinorGCCount" : 0, + "MinorGCTime" : 0, + "MajorGCCount" : 0, + "MajorGCTime" : 0 + }, + "isExcludedForStage" : true + } }, "killedTasksSummary" : { }, "resourceProfileId" : 0 -} +} \ No newline at end of file diff --git a/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json index 60d2bdfecb803..6246f96944be0 100644 --- a/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json @@ -460,15 +460,29 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : false, - "isExcludedForStage" : false, - "jvmHeapMemory" : 0, - "jvmOffHeapMemory" : 0, - "onHeapExecutionMemory" : 0, - "offHeapExecutionMemory" : 0, - "onHeapStorageMemory" : 0, - "offHeapStorageMemory" : 0, - "directPoolMemory" : 0, - "mappedPoolMemory" : 0 + "peakMemoryMetrics" : { + "JVMHeapMemory" : -1, + "JVMOffHeapMemory" : 0, + "OnHeapExecutionMemory" : 0, + "OffHeapExecutionMemory" : 0, + "OnHeapStorageMemory" : 0, + "OffHeapStorageMemory" : 0, + "OnHeapUnifiedMemory" : 0, + "OffHeapUnifiedMemory" : 0, + "DirectPoolMemory" : 0, + "MappedPoolMemory" : 0, + "ProcessTreeJVMVMemory" : 0, + "ProcessTreeJVMRSSMemory" : 0, + "ProcessTreePythonVMemory" : 0, + "ProcessTreePythonRSSMemory" : 0, + "ProcessTreeOtherVMemory" : 0, + "ProcessTreeOtherRSSMemory" : 0, + "MinorGCCount" : 0, + "MinorGCTime" : 0, + "MajorGCCount" : 0, + "MajorGCTime" : 0 + }, + "isExcludedForStage" : false } }, "killedTasksSummary" : { }, diff --git a/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json index 9c53df4d51acf..512b3d8acdad3 100644 --- a/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json @@ -460,15 +460,29 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : false, - "isExcludedForStage" : false, - "jvmHeapMemory" : 0, - "jvmOffHeapMemory" : 0, - "onHeapExecutionMemory" : 0, - "offHeapExecutionMemory" : 0, - "onHeapStorageMemory" : 0, - "offHeapStorageMemory" : 0, - "directPoolMemory" : 0, - "mappedPoolMemory" : 0 + "peakMemoryMetrics" : { + "JVMHeapMemory" : -1, + "JVMOffHeapMemory" : 0, + "OnHeapExecutionMemory" : 0, + "OffHeapExecutionMemory" : 0, + "OnHeapStorageMemory" : 0, + "OffHeapStorageMemory" : 0, + "OnHeapUnifiedMemory" : 0, + "OffHeapUnifiedMemory" : 0, + "DirectPoolMemory" : 0, + "MappedPoolMemory" : 0, + "ProcessTreeJVMVMemory" : 0, + "ProcessTreeJVMRSSMemory" : 0, + "ProcessTreePythonVMemory" : 0, + "ProcessTreePythonRSSMemory" : 0, + "ProcessTreeOtherVMemory" : 0, + "ProcessTreeOtherRSSMemory" : 0, + "MinorGCCount" : 0, + "MinorGCTime" : 0, + "MajorGCCount" : 0, + "MajorGCTime" : 0 + }, + "isExcludedForStage" : false } }, "killedTasksSummary" : { }, diff --git a/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json index 4bfa840d11343..6f72488478225 100644 --- a/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json @@ -504,15 +504,29 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : false, - "isExcludedForStage" : false, - "jvmHeapMemory" : 0, - "jvmOffHeapMemory" : 0, - "onHeapExecutionMemory" : 0, - "offHeapExecutionMemory" : 0, - "onHeapStorageMemory" : 0, - "offHeapStorageMemory" : 0, - "directPoolMemory" : 0, - "mappedPoolMemory" : 0 + "peakMemoryMetrics" : { + "JVMHeapMemory" : -1, + "JVMOffHeapMemory" : 0, + "OnHeapExecutionMemory" : 0, + "OffHeapExecutionMemory" : 0, + "OnHeapStorageMemory" : 0, + "OffHeapStorageMemory" : 0, + "OnHeapUnifiedMemory" : 0, + "OffHeapUnifiedMemory" : 0, + "DirectPoolMemory" : 0, + "MappedPoolMemory" : 0, + "ProcessTreeJVMVMemory" : 0, + "ProcessTreeJVMRSSMemory" : 0, + "ProcessTreePythonVMemory" : 0, + "ProcessTreePythonRSSMemory" : 0, + "ProcessTreeOtherVMemory" : 0, + "ProcessTreeOtherRSSMemory" : 0, + "MinorGCCount" : 0, + "MinorGCTime" : 0, + "MajorGCCount" : 0, + "MajorGCTime" : 0 + }, + "isExcludedForStage" : false } }, "killedTasksSummary" : { }, From b8d705bcdc3e3db0eff0921c87915653a5377974 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 4 Dec 2020 14:45:40 +0800 Subject: [PATCH 10/22] Update stagepage.js --- .../org/apache/spark/ui/static/stagepage.js | 63 ++++++++++--------- 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js index ca37026c67016..f6f49d6fca489 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js @@ -243,36 +243,39 @@ function createRowMetadataForColumn(colKey, data, checkboxId) { } function reselectCheckboxesBasedOnTaskTableState() { - var allChecked = true; - var taskSummaryMetricsTableCurrentFilteredArray = taskSummaryMetricsTableCurrentStateArray.slice(); - if ((typeof taskTableSelector !== 'undefined' && taskSummaryMetricsTableCurrentStateArray.length > 0) || typeof executorSummaryTableSelector !== 'undefined') { - if (typeof taskTableSelector !== 'undefined' && taskSummaryMetricsTableCurrentStateArray.length > 0) { - for (var k = 0; k < optionalColumns.length; k++) { - if (taskTableSelector.column(optionalColumns[k]).visible()) { - $("#box-"+optionalColumns[k]).prop('checked', true); - taskSummaryMetricsTableCurrentStateArray.push(taskSummaryMetricsTableArray.filter(row => (row.checkboxId).toString() == optionalColumns[k])[0]); - taskSummaryMetricsTableCurrentFilteredArray = taskSummaryMetricsTableCurrentStateArray.slice(); - } else { - allChecked = false; - } - } - } - createDataTableForTaskSummaryMetricsTable(taskSummaryMetricsTableCurrentFilteredArray); - - if (typeof executorSummaryTableSelector !== 'undefined') { - for (var k = 0; k < executorOptionalColumns.length; k++) { - if (executorSummaryTableSelector.column(executorOptionalColumns[k]).visible()) { - $("#executor-box-"+executorOptionalColumns[k]).prop('checked', true); - } else { - allChecked = false; - } - } - } - - if (allChecked) { - $("#box-0").prop('checked', true); - } - } + var taskSummaryHasSelected = false; + var executorSummaryHasSelected = false; + var allTaskSummaryChecked = true; + var allExecutorSummaryChecked = true; + var taskSummaryMetricsTableCurrentFilteredArray = taskSummaryMetricsTableCurrentStateArray.slice(); + if (typeof taskTableSelector !== 'undefined' && taskSummaryMetricsTableCurrentStateArray.length > 0) { + for (var k = 0; k < optionalColumns.length; k++) { + if (taskTableSelector.column(optionalColumns[k]).visible()) { + taskSummaryHasSelected = true; + $("#box-" + optionalColumns[k]).prop('checked', true); + taskSummaryMetricsTableCurrentStateArray.push(taskSummaryMetricsTableArray.filter(row => (row.checkboxId).toString() == optionalColumns[k])[0]); + taskSummaryMetricsTableCurrentFilteredArray = taskSummaryMetricsTableCurrentStateArray.slice(); + } else { + allTaskSummaryChecked = false; + } + } + createDataTableForTaskSummaryMetricsTable(taskSummaryMetricsTableCurrentFilteredArray); + } + + if (typeof executorSummaryTableSelector !== 'undefined') { + for (var k = 0; k < executorOptionalColumns.length; k++) { + if (executorSummaryTableSelector.column(executorOptionalColumns[k]).visible()) { + executorSummaryHasSelected = true; + $("#executor-box-" + executorOptionalColumns[k]).prop('checked', true); + } else { + allExecutorSummaryChecked = false; + } + } + } + + if ((taskSummaryHasSelected || executorSummaryHasSelected) && allTaskSummaryChecked && allExecutorSummaryChecked) { + $("#box-0").prop('checked', true); + } } function getStageAttemptId() { From cf4d3c4626a902d9531b7c30b832bd9921913c4b Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 4 Dec 2020 16:04:40 +0800 Subject: [PATCH 11/22] Update stagepage.js --- .../org/apache/spark/ui/static/stagepage.js | 97 ++++++++++--------- 1 file changed, 49 insertions(+), 48 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js index f6f49d6fca489..90abc63229d4a 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js @@ -243,39 +243,39 @@ function createRowMetadataForColumn(colKey, data, checkboxId) { } function reselectCheckboxesBasedOnTaskTableState() { - var taskSummaryHasSelected = false; - var executorSummaryHasSelected = false; - var allTaskSummaryChecked = true; - var allExecutorSummaryChecked = true; - var taskSummaryMetricsTableCurrentFilteredArray = taskSummaryMetricsTableCurrentStateArray.slice(); - if (typeof taskTableSelector !== 'undefined' && taskSummaryMetricsTableCurrentStateArray.length > 0) { - for (var k = 0; k < optionalColumns.length; k++) { - if (taskTableSelector.column(optionalColumns[k]).visible()) { - taskSummaryHasSelected = true; - $("#box-" + optionalColumns[k]).prop('checked', true); - taskSummaryMetricsTableCurrentStateArray.push(taskSummaryMetricsTableArray.filter(row => (row.checkboxId).toString() == optionalColumns[k])[0]); - taskSummaryMetricsTableCurrentFilteredArray = taskSummaryMetricsTableCurrentStateArray.slice(); - } else { - allTaskSummaryChecked = false; - } - } - createDataTableForTaskSummaryMetricsTable(taskSummaryMetricsTableCurrentFilteredArray); - } - - if (typeof executorSummaryTableSelector !== 'undefined') { - for (var k = 0; k < executorOptionalColumns.length; k++) { - if (executorSummaryTableSelector.column(executorOptionalColumns[k]).visible()) { - executorSummaryHasSelected = true; - $("#executor-box-" + executorOptionalColumns[k]).prop('checked', true); - } else { - allExecutorSummaryChecked = false; - } - } - } - - if ((taskSummaryHasSelected || executorSummaryHasSelected) && allTaskSummaryChecked && allExecutorSummaryChecked) { - $("#box-0").prop('checked', true); - } + var taskSummaryHasSelected = false; + var executorSummaryHasSelected = false; + var allTaskSummaryChecked = true; + var allExecutorSummaryChecked = true; + var taskSummaryMetricsTableCurrentFilteredArray = taskSummaryMetricsTableCurrentStateArray.slice(); + if (typeof taskTableSelector !== 'undefined' && taskSummaryMetricsTableCurrentStateArray.length > 0) { + for (var k = 0; k < optionalColumns.length; k++) { + if (taskTableSelector.column(optionalColumns[k]).visible()) { + taskSummaryHasSelected = true; + $("#box-" + optionalColumns[k]).prop('checked', true); + taskSummaryMetricsTableCurrentStateArray.push(taskSummaryMetricsTableArray.filter(row => (row.checkboxId).toString() == optionalColumns[k])[0]); + taskSummaryMetricsTableCurrentFilteredArray = taskSummaryMetricsTableCurrentStateArray.slice(); + } else { + allTaskSummaryChecked = false; + } + } + createDataTableForTaskSummaryMetricsTable(taskSummaryMetricsTableCurrentFilteredArray); + } + + if (typeof executorSummaryTableSelector !== 'undefined') { + for (var k = 0; k < executorOptionalColumns.length; k++) { + if (executorSummaryTableSelector.column(executorOptionalColumns[k]).visible()) { + executorSummaryHasSelected = true; + $("#executor-box-" + executorOptionalColumns[k]).prop('checked', true); + } else { + allExecutorSummaryChecked = false; + } + } + } + + if ((taskSummaryHasSelected || executorSummaryHasSelected) && allTaskSummaryChecked && allExecutorSummaryChecked) { + $("#box-0").prop('checked', true); + } } function getStageAttemptId() { @@ -307,18 +307,18 @@ $(document).ready(function () { "" + "
" + "
Select All
" + - "
Scheduler Delay
" + - "
Task Deserialization Time
" + - "
Shuffle Read Blocked Time
" + - "
Shuffle Remote Reads
" + - "
Shuffle Write Time
" + - "
Result Serialization Time
" + - "
Getting Result Time
" + - "
Peak Execution Memory
" + - "
Executor JVMOnHeapMemory / JVMOffHeapMemory
" + - "
Executor OnHeapExecutionMemory / OffHeapExecutionMemory
" + - "
Executor OnHeapStorageMemory / OffHeapStorageMemory
" + - "
Executor DirectPoolMemory / MappedPoolMemory
" + + "
Scheduler Delay
" + + "
Task Deserialization Time
" + + "
Shuffle Read Blocked Time
" + + "
Shuffle Remote Reads
" + + "
Shuffle Write Time
" + + "
Result Serialization Time
" + + "
Getting Result Time
" + + "
Peak Execution Memory
" + + "
Executor JVMOnHeapMemory / JVMOffHeapMemory
" + + "
Executor OnHeapExecutionMemory / OffHeapExecutionMemory
" + + "
Executor OnHeapStorageMemory / OffHeapStorageMemory
" + + "
Executor DirectPoolMemory / MappedPoolMemory
" + "
"); $('#scheduler_delay').attr("data-toggle", "tooltip") @@ -1004,8 +1004,8 @@ $(document).ready(function () { createDataTableForTaskSummaryMetricsTable(taskSummaryMetricsTableFilteredArray); } } else { - var columnType = $(this).attr("column-type"); - if(columnType == 'task') { + var metricsType = $(this).attr("metrics-type"); + if (metricsType === 'task') { var column = taskTableSelector.column(para); // Toggle the visibility column.visible(!column.visible()); @@ -1018,7 +1018,8 @@ $(document).ready(function () { taskSummaryMetricsTableCurrentStateArray.filter(row => (row.checkboxId).toString() != para); } createDataTableForTaskSummaryMetricsTable(taskSummaryMetricsTableFilteredArray); - } else { + } + if (metricsType === "executor") { var column = executorSummaryTableSelector.column(para); column.visible(!column.visible()); } From e678f63e84ff880c2a010d6edfc9ede909552baa Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 4 Dec 2020 16:08:46 +0800 Subject: [PATCH 12/22] Update stagepage.js --- .../main/resources/org/apache/spark/ui/static/stagepage.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js index 90abc63229d4a..b3e167f00e4f9 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js @@ -252,7 +252,7 @@ function reselectCheckboxesBasedOnTaskTableState() { for (var k = 0; k < optionalColumns.length; k++) { if (taskTableSelector.column(optionalColumns[k]).visible()) { taskSummaryHasSelected = true; - $("#box-" + optionalColumns[k]).prop('checked', true); + $("#box-"+optionalColumns[k]).prop('checked', true); taskSummaryMetricsTableCurrentStateArray.push(taskSummaryMetricsTableArray.filter(row => (row.checkboxId).toString() == optionalColumns[k])[0]); taskSummaryMetricsTableCurrentFilteredArray = taskSummaryMetricsTableCurrentStateArray.slice(); } else { @@ -266,7 +266,7 @@ function reselectCheckboxesBasedOnTaskTableState() { for (var k = 0; k < executorOptionalColumns.length; k++) { if (executorSummaryTableSelector.column(executorOptionalColumns[k]).visible()) { executorSummaryHasSelected = true; - $("#executor-box-" + executorOptionalColumns[k]).prop('checked', true); + $("#executor-box-"+executorOptionalColumns[k]).prop('checked', true); } else { allExecutorSummaryChecked = false; } From 141fa27c9abb8b048dc1431455762638821bd070 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 4 Dec 2020 16:50:20 +0800 Subject: [PATCH 13/22] Update webui.css --- core/src/main/resources/org/apache/spark/ui/static/webui.css | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css index 53dc8de627cac..262cee7b58aff 100755 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.css +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css @@ -353,10 +353,6 @@ a.expandbutton { width: 180px; } -.executor-jvm-metrics-checkbox-div { - width: 480px; -} - #active-tasks-table th { border-top: 1px solid #dddddd; border-bottom: 1px solid #dddddd; From 92cbcf7256c5b7b3ef96844b71c88ba30eb32c77 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 4 Dec 2020 16:52:52 +0800 Subject: [PATCH 14/22] fix end line --- .../excludeOnFailure_for_stage_expectation.json | 2 +- .../excludeOnFailure_node_for_stage_expectation.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_for_stage_expectation.json b/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_for_stage_expectation.json index b3da4d1fccb7e..d6353d780bbf2 100644 --- a/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_for_stage_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_for_stage_expectation.json @@ -765,4 +765,4 @@ }, "killedTasksSummary" : { }, "resourceProfileId" : 0 -} \ No newline at end of file +} diff --git a/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_node_for_stage_expectation.json b/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_node_for_stage_expectation.json index 75edae0548d05..117a47d0a9e7c 100644 --- a/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_node_for_stage_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_node_for_stage_expectation.json @@ -993,4 +993,4 @@ }, "killedTasksSummary" : { }, "resourceProfileId" : 0 -} \ No newline at end of file +} From e16819372a3ee6542c64624eaf99cfb2b80bc066 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 4 Dec 2020 17:55:24 +0800 Subject: [PATCH 15/22] remove option of peakMemoryMetrics --- .../org/apache/spark/status/LiveEntity.scala | 6 +- .../status/api/v1/PrometheusResource.scala | 56 +++++++++---------- .../org/apache/spark/status/api/v1/api.scala | 38 ++++++------- .../org/apache/spark/ui/jobs/JobPage.scala | 3 +- .../spark/status/AppStatusListenerSuite.scala | 23 ++------ .../status/api/v1/ExecutorSummarySuite.scala | 5 +- .../org/apache/spark/ui/StagePageSuite.scala | 2 +- 7 files changed, 59 insertions(+), 74 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 88594a0dbc4d5..ffce6b0521dd3 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -342,7 +342,7 @@ private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extend executorLogs, memoryMetrics, excludedInStages, - Some(peakExecutorMetrics).filter(_.isSet), + peakExecutorMetrics, attributes, resources, resourceProfileId, @@ -386,7 +386,7 @@ private class LiveExecutorStageSummary( metrics.memoryBytesSpilled, metrics.diskBytesSpilled, isExcluded, - Some(peakExecutorMetrics), + peakExecutorMetrics, isExcluded) new ExecutorStageSummaryWrapper(stageId, attemptId, executorId, info) } @@ -493,7 +493,7 @@ private class LiveStage extends LiveEntity { executorSummary = None, killedTasksSummary = killedSummary, resourceProfileId = info.resourceProfileId, - Some(peakExecutorMetrics).filter(_.isSet)) + peakExecutorMetrics) } override protected def doUpdate(): Any = { diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala index 9658e5e627724..31cfd4d9333e1 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala @@ -75,36 +75,36 @@ private[v1] class PrometheusResource extends ApiRequestContext { sb.append(s"${prefix}totalOffHeapStorageMemory_bytes$labels " + s"${m.totalOffHeapStorageMemory}\n") } - executor.peakMemoryMetrics.foreach { m => - val names = Array( - "JVMHeapMemory", - "JVMOffHeapMemory", - "OnHeapExecutionMemory", - "OffHeapExecutionMemory", - "OnHeapStorageMemory", - "OffHeapStorageMemory", - "OnHeapUnifiedMemory", - "OffHeapUnifiedMemory", - "DirectPoolMemory", - "MappedPoolMemory", - "ProcessTreeJVMVMemory", - "ProcessTreeJVMRSSMemory", - "ProcessTreePythonVMemory", - "ProcessTreePythonRSSMemory", - "ProcessTreeOtherVMemory", - "ProcessTreeOtherRSSMemory" - ) - names.foreach { name => - sb.append(s"$prefix${name}_bytes$labels ${m.getMetricValue(name)}\n") - } - Seq("MinorGCCount", "MajorGCCount").foreach { name => - sb.append(s"$prefix${name}_total$labels ${m.getMetricValue(name)}\n") - } - Seq("MinorGCTime", "MajorGCTime").foreach { name => - sb.append(s"$prefix${name}_seconds_total$labels ${m.getMetricValue(name) * 0.001}\n") - } + val metrics = executor.peakMemoryMetrics + val names = Array( + "JVMHeapMemory", + "JVMOffHeapMemory", + "OnHeapExecutionMemory", + "OffHeapExecutionMemory", + "OnHeapStorageMemory", + "OffHeapStorageMemory", + "OnHeapUnifiedMemory", + "OffHeapUnifiedMemory", + "DirectPoolMemory", + "MappedPoolMemory", + "ProcessTreeJVMVMemory", + "ProcessTreeJVMRSSMemory", + "ProcessTreePythonVMemory", + "ProcessTreePythonRSSMemory", + "ProcessTreeOtherVMemory", + "ProcessTreeOtherRSSMemory" + ) + names.foreach { name => + sb.append(s"$prefix${name}_bytes$labels ${metrics.getMetricValue(name)}\n") + } + Seq("MinorGCCount", "MajorGCCount").foreach { name => + sb.append(s"$prefix${name}_total$labels ${metrics.getMetricValue(name)}\n") + } + Seq("MinorGCTime", "MajorGCTime").foreach { name => + sb.append(s"$prefix${name}_seconds_total$labels ${metrics.getMetricValue(name) * 0.001}\n") } } + sb.toString } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 96f5b7b5cf27e..9e777fc04963f 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -86,7 +86,7 @@ class ExecutorStageSummary private[spark]( val isBlacklistedForStage: Boolean, @JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) @JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) - val peakMemoryMetrics: Option[ExecutorMetrics], + val peakMemoryMetrics: ExecutorMetrics, val isExcludedForStage: Boolean) class ExecutorSummary private[spark]( @@ -119,7 +119,7 @@ class ExecutorSummary private[spark]( val blacklistedInStages: Set[Int], @JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) @JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) - val peakMemoryMetrics: Option[ExecutorMetrics], + val peakMemoryMetrics: ExecutorMetrics, val attributes: Map[String, String], val resources: Map[String, ResourceInformation], val resourceProfileId: Int, @@ -134,40 +134,38 @@ class MemoryMetrics private[spark]( /** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */ private[spark] class ExecutorMetricsJsonDeserializer - extends JsonDeserializer[Option[ExecutorMetrics]] { + extends JsonDeserializer[ExecutorMetrics] { override def deserialize( jsonParser: JsonParser, - deserializationContext: DeserializationContext): Option[ExecutorMetrics] = { - val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]]( - new TypeReference[Option[Map[String, java.lang.Long]]] {}) - metricsMap.map(metrics => new ExecutorMetrics(metrics)) + deserializationContext: DeserializationContext): ExecutorMetrics = { + val metrics = jsonParser.readValueAs[Map[String, Long]]( + new TypeReference[Map[String, java.lang.Long]] {}) + new ExecutorMetrics(metrics) } - override def getNullValue(ctxt: DeserializationContext): Option[ExecutorMetrics] = { - None + override def getNullValue(ctxt: DeserializationContext): ExecutorMetrics = { + null } } /** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with metric name as key */ private[spark] class ExecutorMetricsJsonSerializer - extends JsonSerializer[Option[ExecutorMetrics]] { + extends JsonSerializer[ExecutorMetrics] { override def serialize( - metrics: Option[ExecutorMetrics], + metrics: ExecutorMetrics, jsonGenerator: JsonGenerator, serializerProvider: SerializerProvider): Unit = { - if (metrics.isEmpty) { + if (metrics == null) { jsonGenerator.writeNull() } else { - metrics.foreach { m: ExecutorMetrics => - val metricsMap = ExecutorMetricType.metricToOffset.map { case (metric, _) => - metric -> m.getMetricValue(metric) - } - jsonGenerator.writeObject(metricsMap) + val metricsMap = ExecutorMetricType.metricToOffset.map { case (metric, _) => + metric -> metrics.getMetricValue(metric) } + jsonGenerator.writeObject(metricsMap) } } - override def isEmpty(provider: SerializerProvider, value: Option[ExecutorMetrics]): Boolean = - value.isEmpty + override def isEmpty(provider: SerializerProvider, value: ExecutorMetrics): Boolean = + value == null } class JobData private[spark]( @@ -279,7 +277,7 @@ class StageData private[spark]( val resourceProfileId: Int, @JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) @JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) - val peakExecutorMetrics: Option[ExecutorMetrics]) + val peakExecutorMetrics: ExecutorMetrics) class TaskData private[spark]( val taskId: Long, diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index c40e1bc248a49..3123233ae642a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -26,6 +26,7 @@ import scala.xml.{Node, NodeSeq, Unparsed, Utility} import org.apache.commons.text.StringEscapeUtils import org.apache.spark.JobExecutionStatus +import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.resource.ResourceProfile import org.apache.spark.status.AppStatusStore import org.apache.spark.status.api.v1 @@ -257,7 +258,7 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP executorSummary = None, killedTasksSummary = Map(), ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID, - peakExecutorMetrics = None) + peakExecutorMetrics = new ExecutorMetrics) } } diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index a251c164a79ca..dc4b5ca69d00c 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1532,12 +1532,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { expectedValues.foreach { case (id, metrics) => check[ExecutorSummaryWrapper](id) { exec => assert(exec.info.id === id) - exec.info.peakMemoryMetrics match { - case Some(actual) => - checkExecutorMetrics(metrics, actual) - case _ => - assert(false) - } + checkExecutorMetrics(metrics, exec.info.peakMemoryMetrics) } } @@ -1600,12 +1595,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { for ((id, metrics) <- expectedValues) { check[ExecutorSummaryWrapper](id) { exec => assert(exec.info.id === id) - exec.info.peakMemoryMetrics match { - case Some(actual) => - checkExecutorMetrics(metrics, actual) - case _ => - assert(false) - } + checkExecutorMetrics(metrics, exec.info.peakMemoryMetrics) } } @@ -1646,12 +1636,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { // check stage level peak executor metric values for each stage for ((stageId, expectedMetrics) <- expectedStageValues) { check[StageDataWrapper](Array(stageId, 0)) { stage => - stage.info.peakExecutorMetrics match { - case Some(actual) => - checkExecutorMetrics(expectedMetrics.peakExecutorMetrics, actual) - case None => - assert(false) - } + checkExecutorMetrics(expectedMetrics.peakExecutorMetrics, stage.info.peakExecutorMetrics) } } @@ -1661,7 +1646,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { expectedStageValues.get(exec.stageId) match { case Some(stageValue) => (stageValue.executorMetrics.get(exec.executorId), exec.info.peakMemoryMetrics) match { - case (Some(expected), Some(actual)) => + case (Some(expected), actual) => checkExecutorMetrics(expected, actual) case _ => assert(false) diff --git a/core/src/test/scala/org/apache/spark/status/api/v1/ExecutorSummarySuite.scala b/core/src/test/scala/org/apache/spark/status/api/v1/ExecutorSummarySuite.scala index 541a7821a51fb..c551a26fc608c 100644 --- a/core/src/test/scala/org/apache/spark/status/api/v1/ExecutorSummarySuite.scala +++ b/core/src/test/scala/org/apache/spark/status/api/v1/ExecutorSummarySuite.scala @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.spark.SparkFunSuite +import org.apache.spark.executor.ExecutorMetrics class ExecutorSummarySuite extends SparkFunSuite { @@ -33,7 +34,7 @@ class ExecutorSummarySuite extends SparkFunSuite { 0, 0, 1, 100, 1, 100, 100, 10, false, 20, new Date(1600984336352L), - Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1, + Option.empty, Option.empty, Map(), Option.empty, Set(), null, Map(), Map(), 1, false, Set()) val expectedJson = "{\"id\":\"id\",\"hostPort\":\"host:port\",\"isActive\":true," + "\"rddBlocks\":1,\"memoryUsed\":10,\"diskUsed\":10,\"totalCores\":1,\"maxTasks\":1," + @@ -47,7 +48,7 @@ class ExecutorSummarySuite extends SparkFunSuite { val json = mapper.writeValueAsString(executorSummary) assert(expectedJson.equals(json)) val deserializeExecutorSummary = mapper.readValue(json, new TypeReference[ExecutorSummary] {}) - assert(deserializeExecutorSummary.peakMemoryMetrics == None) + assert(deserializeExecutorSummary.peakMemoryMetrics == null) } } diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala index d02d7f862df80..25d9a1581fb20 100644 --- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala @@ -93,7 +93,7 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { executorSummary = None, killedTasksSummary = Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, - peakExecutorMetrics = None + peakExecutorMetrics = new ExecutorMetrics ) val taskTable = new TaskPagedTable( stageData, From 3cd21cef962919bcd362655ccd201290dcfdac02 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 4 Dec 2020 18:24:55 +0800 Subject: [PATCH 16/22] Revert "remove option of peakMemoryMetrics" This reverts commit e16819372a3ee6542c64624eaf99cfb2b80bc066. --- .../org/apache/spark/status/LiveEntity.scala | 6 +- .../status/api/v1/PrometheusResource.scala | 56 +++++++++---------- .../org/apache/spark/status/api/v1/api.scala | 38 +++++++------ .../org/apache/spark/ui/jobs/JobPage.scala | 3 +- .../spark/status/AppStatusListenerSuite.scala | 23 ++++++-- .../status/api/v1/ExecutorSummarySuite.scala | 5 +- .../org/apache/spark/ui/StagePageSuite.scala | 2 +- 7 files changed, 74 insertions(+), 59 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index ffce6b0521dd3..88594a0dbc4d5 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -342,7 +342,7 @@ private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extend executorLogs, memoryMetrics, excludedInStages, - peakExecutorMetrics, + Some(peakExecutorMetrics).filter(_.isSet), attributes, resources, resourceProfileId, @@ -386,7 +386,7 @@ private class LiveExecutorStageSummary( metrics.memoryBytesSpilled, metrics.diskBytesSpilled, isExcluded, - peakExecutorMetrics, + Some(peakExecutorMetrics), isExcluded) new ExecutorStageSummaryWrapper(stageId, attemptId, executorId, info) } @@ -493,7 +493,7 @@ private class LiveStage extends LiveEntity { executorSummary = None, killedTasksSummary = killedSummary, resourceProfileId = info.resourceProfileId, - peakExecutorMetrics) + Some(peakExecutorMetrics).filter(_.isSet)) } override protected def doUpdate(): Any = { diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala index 31cfd4d9333e1..9658e5e627724 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala @@ -75,36 +75,36 @@ private[v1] class PrometheusResource extends ApiRequestContext { sb.append(s"${prefix}totalOffHeapStorageMemory_bytes$labels " + s"${m.totalOffHeapStorageMemory}\n") } - val metrics = executor.peakMemoryMetrics - val names = Array( - "JVMHeapMemory", - "JVMOffHeapMemory", - "OnHeapExecutionMemory", - "OffHeapExecutionMemory", - "OnHeapStorageMemory", - "OffHeapStorageMemory", - "OnHeapUnifiedMemory", - "OffHeapUnifiedMemory", - "DirectPoolMemory", - "MappedPoolMemory", - "ProcessTreeJVMVMemory", - "ProcessTreeJVMRSSMemory", - "ProcessTreePythonVMemory", - "ProcessTreePythonRSSMemory", - "ProcessTreeOtherVMemory", - "ProcessTreeOtherRSSMemory" - ) - names.foreach { name => - sb.append(s"$prefix${name}_bytes$labels ${metrics.getMetricValue(name)}\n") - } - Seq("MinorGCCount", "MajorGCCount").foreach { name => - sb.append(s"$prefix${name}_total$labels ${metrics.getMetricValue(name)}\n") - } - Seq("MinorGCTime", "MajorGCTime").foreach { name => - sb.append(s"$prefix${name}_seconds_total$labels ${metrics.getMetricValue(name) * 0.001}\n") + executor.peakMemoryMetrics.foreach { m => + val names = Array( + "JVMHeapMemory", + "JVMOffHeapMemory", + "OnHeapExecutionMemory", + "OffHeapExecutionMemory", + "OnHeapStorageMemory", + "OffHeapStorageMemory", + "OnHeapUnifiedMemory", + "OffHeapUnifiedMemory", + "DirectPoolMemory", + "MappedPoolMemory", + "ProcessTreeJVMVMemory", + "ProcessTreeJVMRSSMemory", + "ProcessTreePythonVMemory", + "ProcessTreePythonRSSMemory", + "ProcessTreeOtherVMemory", + "ProcessTreeOtherRSSMemory" + ) + names.foreach { name => + sb.append(s"$prefix${name}_bytes$labels ${m.getMetricValue(name)}\n") + } + Seq("MinorGCCount", "MajorGCCount").foreach { name => + sb.append(s"$prefix${name}_total$labels ${m.getMetricValue(name)}\n") + } + Seq("MinorGCTime", "MajorGCTime").foreach { name => + sb.append(s"$prefix${name}_seconds_total$labels ${m.getMetricValue(name) * 0.001}\n") + } } } - sb.toString } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 9e777fc04963f..96f5b7b5cf27e 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -86,7 +86,7 @@ class ExecutorStageSummary private[spark]( val isBlacklistedForStage: Boolean, @JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) @JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) - val peakMemoryMetrics: ExecutorMetrics, + val peakMemoryMetrics: Option[ExecutorMetrics], val isExcludedForStage: Boolean) class ExecutorSummary private[spark]( @@ -119,7 +119,7 @@ class ExecutorSummary private[spark]( val blacklistedInStages: Set[Int], @JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) @JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) - val peakMemoryMetrics: ExecutorMetrics, + val peakMemoryMetrics: Option[ExecutorMetrics], val attributes: Map[String, String], val resources: Map[String, ResourceInformation], val resourceProfileId: Int, @@ -134,38 +134,40 @@ class MemoryMetrics private[spark]( /** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */ private[spark] class ExecutorMetricsJsonDeserializer - extends JsonDeserializer[ExecutorMetrics] { + extends JsonDeserializer[Option[ExecutorMetrics]] { override def deserialize( jsonParser: JsonParser, - deserializationContext: DeserializationContext): ExecutorMetrics = { - val metrics = jsonParser.readValueAs[Map[String, Long]]( - new TypeReference[Map[String, java.lang.Long]] {}) - new ExecutorMetrics(metrics) + deserializationContext: DeserializationContext): Option[ExecutorMetrics] = { + val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]]( + new TypeReference[Option[Map[String, java.lang.Long]]] {}) + metricsMap.map(metrics => new ExecutorMetrics(metrics)) } - override def getNullValue(ctxt: DeserializationContext): ExecutorMetrics = { - null + override def getNullValue(ctxt: DeserializationContext): Option[ExecutorMetrics] = { + None } } /** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with metric name as key */ private[spark] class ExecutorMetricsJsonSerializer - extends JsonSerializer[ExecutorMetrics] { + extends JsonSerializer[Option[ExecutorMetrics]] { override def serialize( - metrics: ExecutorMetrics, + metrics: Option[ExecutorMetrics], jsonGenerator: JsonGenerator, serializerProvider: SerializerProvider): Unit = { - if (metrics == null) { + if (metrics.isEmpty) { jsonGenerator.writeNull() } else { - val metricsMap = ExecutorMetricType.metricToOffset.map { case (metric, _) => - metric -> metrics.getMetricValue(metric) + metrics.foreach { m: ExecutorMetrics => + val metricsMap = ExecutorMetricType.metricToOffset.map { case (metric, _) => + metric -> m.getMetricValue(metric) + } + jsonGenerator.writeObject(metricsMap) } - jsonGenerator.writeObject(metricsMap) } } - override def isEmpty(provider: SerializerProvider, value: ExecutorMetrics): Boolean = - value == null + override def isEmpty(provider: SerializerProvider, value: Option[ExecutorMetrics]): Boolean = + value.isEmpty } class JobData private[spark]( @@ -277,7 +279,7 @@ class StageData private[spark]( val resourceProfileId: Int, @JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) @JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) - val peakExecutorMetrics: ExecutorMetrics) + val peakExecutorMetrics: Option[ExecutorMetrics]) class TaskData private[spark]( val taskId: Long, diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index 3123233ae642a..c40e1bc248a49 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -26,7 +26,6 @@ import scala.xml.{Node, NodeSeq, Unparsed, Utility} import org.apache.commons.text.StringEscapeUtils import org.apache.spark.JobExecutionStatus -import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.resource.ResourceProfile import org.apache.spark.status.AppStatusStore import org.apache.spark.status.api.v1 @@ -258,7 +257,7 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP executorSummary = None, killedTasksSummary = Map(), ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID, - peakExecutorMetrics = new ExecutorMetrics) + peakExecutorMetrics = None) } } diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index dc4b5ca69d00c..a251c164a79ca 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1532,7 +1532,12 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { expectedValues.foreach { case (id, metrics) => check[ExecutorSummaryWrapper](id) { exec => assert(exec.info.id === id) - checkExecutorMetrics(metrics, exec.info.peakMemoryMetrics) + exec.info.peakMemoryMetrics match { + case Some(actual) => + checkExecutorMetrics(metrics, actual) + case _ => + assert(false) + } } } @@ -1595,7 +1600,12 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { for ((id, metrics) <- expectedValues) { check[ExecutorSummaryWrapper](id) { exec => assert(exec.info.id === id) - checkExecutorMetrics(metrics, exec.info.peakMemoryMetrics) + exec.info.peakMemoryMetrics match { + case Some(actual) => + checkExecutorMetrics(metrics, actual) + case _ => + assert(false) + } } } @@ -1636,7 +1646,12 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { // check stage level peak executor metric values for each stage for ((stageId, expectedMetrics) <- expectedStageValues) { check[StageDataWrapper](Array(stageId, 0)) { stage => - checkExecutorMetrics(expectedMetrics.peakExecutorMetrics, stage.info.peakExecutorMetrics) + stage.info.peakExecutorMetrics match { + case Some(actual) => + checkExecutorMetrics(expectedMetrics.peakExecutorMetrics, actual) + case None => + assert(false) + } } } @@ -1646,7 +1661,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { expectedStageValues.get(exec.stageId) match { case Some(stageValue) => (stageValue.executorMetrics.get(exec.executorId), exec.info.peakMemoryMetrics) match { - case (Some(expected), actual) => + case (Some(expected), Some(actual)) => checkExecutorMetrics(expected, actual) case _ => assert(false) diff --git a/core/src/test/scala/org/apache/spark/status/api/v1/ExecutorSummarySuite.scala b/core/src/test/scala/org/apache/spark/status/api/v1/ExecutorSummarySuite.scala index c551a26fc608c..541a7821a51fb 100644 --- a/core/src/test/scala/org/apache/spark/status/api/v1/ExecutorSummarySuite.scala +++ b/core/src/test/scala/org/apache/spark/status/api/v1/ExecutorSummarySuite.scala @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.spark.SparkFunSuite -import org.apache.spark.executor.ExecutorMetrics class ExecutorSummarySuite extends SparkFunSuite { @@ -34,7 +33,7 @@ class ExecutorSummarySuite extends SparkFunSuite { 0, 0, 1, 100, 1, 100, 100, 10, false, 20, new Date(1600984336352L), - Option.empty, Option.empty, Map(), Option.empty, Set(), null, Map(), Map(), 1, + Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1, false, Set()) val expectedJson = "{\"id\":\"id\",\"hostPort\":\"host:port\",\"isActive\":true," + "\"rddBlocks\":1,\"memoryUsed\":10,\"diskUsed\":10,\"totalCores\":1,\"maxTasks\":1," + @@ -48,7 +47,7 @@ class ExecutorSummarySuite extends SparkFunSuite { val json = mapper.writeValueAsString(executorSummary) assert(expectedJson.equals(json)) val deserializeExecutorSummary = mapper.readValue(json, new TypeReference[ExecutorSummary] {}) - assert(deserializeExecutorSummary.peakMemoryMetrics == null) + assert(deserializeExecutorSummary.peakMemoryMetrics == None) } } diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala index 25d9a1581fb20..d02d7f862df80 100644 --- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala @@ -93,7 +93,7 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { executorSummary = None, killedTasksSummary = Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, - peakExecutorMetrics = new ExecutorMetrics + peakExecutorMetrics = None ) val taskTable = new TaskPagedTable( stageData, From 13f6db2c2e76df99b04ac703f3a803feb31c2328 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 4 Dec 2020 18:43:52 +0800 Subject: [PATCH 17/22] follow comment --- .../org/apache/spark/ui/static/stagepage.js | 81 ++++++++++++++----- .../org/apache/spark/status/LiveEntity.scala | 2 +- 2 files changed, 62 insertions(+), 21 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js index dd48e719f03de..eea81f92a5bc6 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js @@ -489,38 +489,79 @@ $(document).ready(function () { }, { data : function (row, type) { - if (type !== 'display') - return row.peakMemoryMetrics.JVMHeapMemory; - else - return (formatBytes(row.peakMemoryMetrics.JVMHeapMemory, type) + ' / ' + - formatBytes(row.peakMemoryMetrics.JVMOffHeapMemory, type)); + var peakMemoryMetrics = row.peakMemoryMetrics + if (typeof peakMemoryMetrics !== 'undefined') { + var jvmHeapMemory = peakMemoryMetrics.JVMHeapMemory; + if (jvmHeapMemory < 0) { + jvmHeapMemory = 0; + } + if (type !== 'display') + return jvmHeapMemory; + else + return (formatBytes(jvmHeapMemory, type) + ' / ' + + formatBytes(peakMemoryMetrics.JVMOffHeapMemory, type)); + } else { + if (type !== 'display') { + return 0; + } else { + return '0.0 B / 0.0 B'; + } + } + } }, { data : function (row, type) { - if (type !== 'display') - return row.peakMemoryMetrics.OnHeapExecutionMemory; - else - return (formatBytes(row.peakMemoryMetrics.OnHeapExecutionMemory, type) + ' / ' + - formatBytes(row.peakMemoryMetrics.OffHeapExecutionMemory, type)); + var peakMemoryMetrics = row.peakMemoryMetrics + if (typeof peakMemoryMetrics !== 'undefined') { + if (type !== 'display') + return peakMemoryMetrics.OnHeapExecutionMemory; + else + return (formatBytes(peakMemoryMetrics.OnHeapExecutionMemory, type) + ' / ' + + formatBytes(peakMemoryMetrics.OffHeapExecutionMemory, type)); + } else { + if (type !== 'display') { + return 0; + } else { + return '0.0 B / 0.0 B'; + } + } } }, { data : function (row, type) { - if (type !== 'display') - return row.peakMemoryMetrics.OnHeapStorageMemory; - else - return (formatBytes(row.peakMemoryMetrics.OnHeapStorageMemory, type) + ' / ' + - formatBytes(row.peakMemoryMetrics.OffHeapStorageMemory, type)); + var peakMemoryMetrics = row.peakMemoryMetrics + if (typeof peakMemoryMetrics !== 'undefined') { + if (type !== 'display') + return peakMemoryMetrics.OnHeapStorageMemory; + else + return (formatBytes(peakMemoryMetrics.OnHeapStorageMemory, type) + ' / ' + + formatBytes(row.peakMemoryMetrics.OffHeapStorageMemory, type)); + } else { + if (type !== 'display') { + return 0; + } else { + return '0.0 B / 0.0 B'; + } } + } }, { data : function (row, type) { - if (type !== 'display') - return row.peakMemoryMetrics.DirectPoolMemory; - else - return (formatBytes(row.peakMemoryMetrics.DirectPoolMemory, type) + ' / ' + - formatBytes(row.peakMemoryMetrics.MappedPoolMemory, type)); + var peakMemoryMetrics = row.peakMemoryMetrics + if (typeof peakMemoryMetrics !== 'undefined') { + if (type !== 'display') + return peakMemoryMetrics.DirectPoolMemory; + else + return (formatBytes(peakMemoryMetrics.DirectPoolMemory, type) + ' / ' + + formatBytes(peakMemoryMetrics.MappedPoolMemory, type)); + } else { + if (type !== 'display') { + return 0; + } else { + return '0.0 B / 0.0 B'; + } + } } } ], diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 88594a0dbc4d5..38f1f25f2fcaa 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -386,7 +386,7 @@ private class LiveExecutorStageSummary( metrics.memoryBytesSpilled, metrics.diskBytesSpilled, isExcluded, - Some(peakExecutorMetrics), + Some(peakExecutorMetrics).filter(_.isSet), isExcluded) new ExecutorStageSummaryWrapper(stageId, attemptId, executorId, info) } From 3cc355970954fd57ff17473737eba34612cbf39f Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 4 Dec 2020 20:37:03 +0800 Subject: [PATCH 18/22] follow commnet --- .../org/apache/spark/ui/static/stagepage.js | 30 ++--- ...xcludeOnFailure_for_stage_expectation.json | 44 ------- ...eOnFailure_node_for_stage_expectation.json | 110 ------------------ .../one_stage_attempt_json_expectation.json | 22 ---- .../one_stage_json_expectation.json | 22 ---- ...age_with_accumulable_json_expectation.json | 22 ---- 6 files changed, 15 insertions(+), 235 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js index eea81f92a5bc6..24874b47854e7 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js @@ -307,18 +307,18 @@ $(document).ready(function () { "" + "
" + "
Select All
" + - "
Scheduler Delay
" + - "
Task Deserialization Time
" + - "
Shuffle Read Blocked Time
" + - "
Shuffle Remote Reads
" + - "
Shuffle Write Time
" + - "
Result Serialization Time
" + - "
Getting Result Time
" + - "
Peak Execution Memory
" + - "
Executor JVMOnHeapMemory / JVMOffHeapMemory
" + - "
Executor OnHeapExecutionMemory / OffHeapExecutionMemory
" + - "
Executor OnHeapStorageMemory / OffHeapStorageMemory
" + - "
Executor DirectPoolMemory / MappedPoolMemory
" + + "
Scheduler Delay
" + + "
Task Deserialization Time
" + + "
Shuffle Read Blocked Time
" + + "
Shuffle Remote Reads
" + + "
Shuffle Write Time
" + + "
Result Serialization Time
" + + "
Getting Result Time
" + + "
Peak Execution Memory
" + + "
Executor JVMOnHeapMemory / JVMOffHeapMemory
" + + "
Executor OnHeapExecutionMemory / OffHeapExecutionMemory
" + + "
Executor OnHeapStorageMemory / OffHeapStorageMemory
" + + "
Executor DirectPoolMemory / MappedPoolMemory
" + "
"); $('#scheduler_delay').attr("data-toggle", "tooltip") @@ -1045,8 +1045,8 @@ $(document).ready(function () { createDataTableForTaskSummaryMetricsTable(taskSummaryMetricsTableFilteredArray); } } else { - var metricsType = $(this).attr("metrics-type"); - if (metricsType === 'task') { + var dataMetricsType = $(this).attr("data-metrics-type"); + if (dataMetricsType === 'task') { var column = taskTableSelector.column(para); // Toggle the visibility column.visible(!column.visible()); @@ -1060,7 +1060,7 @@ $(document).ready(function () { } createDataTableForTaskSummaryMetricsTable(taskSummaryMetricsTableFilteredArray); } - if (metricsType === "executor") { + if (dataMetricsType === "executor") { var column = executorSummaryTableSelector.column(para); column.visible(!column.visible()); } diff --git a/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_for_stage_expectation.json b/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_for_stage_expectation.json index d6353d780bbf2..a69940fa5a1a5 100644 --- a/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_for_stage_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_for_stage_expectation.json @@ -698,28 +698,6 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : true, - "peakMemoryMetrics" : { - "JVMHeapMemory" : -1, - "JVMOffHeapMemory" : 0, - "OnHeapExecutionMemory" : 0, - "OffHeapExecutionMemory" : 0, - "OnHeapStorageMemory" : 0, - "OffHeapStorageMemory" : 0, - "OnHeapUnifiedMemory" : 0, - "OffHeapUnifiedMemory" : 0, - "DirectPoolMemory" : 0, - "MappedPoolMemory" : 0, - "ProcessTreeJVMVMemory" : 0, - "ProcessTreeJVMRSSMemory" : 0, - "ProcessTreePythonVMemory" : 0, - "ProcessTreePythonRSSMemory" : 0, - "ProcessTreeOtherVMemory" : 0, - "ProcessTreeOtherRSSMemory" : 0, - "MinorGCCount" : 0, - "MinorGCTime" : 0, - "MajorGCCount" : 0, - "MajorGCTime" : 0 - }, "isExcludedForStage" : true }, "1" : { @@ -738,28 +716,6 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : false, - "peakMemoryMetrics" : { - "JVMHeapMemory" : -1, - "JVMOffHeapMemory" : 0, - "OnHeapExecutionMemory" : 0, - "OffHeapExecutionMemory" : 0, - "OnHeapStorageMemory" : 0, - "OffHeapStorageMemory" : 0, - "OnHeapUnifiedMemory" : 0, - "OffHeapUnifiedMemory" : 0, - "DirectPoolMemory" : 0, - "MappedPoolMemory" : 0, - "ProcessTreeJVMVMemory" : 0, - "ProcessTreeJVMRSSMemory" : 0, - "ProcessTreePythonVMemory" : 0, - "ProcessTreePythonRSSMemory" : 0, - "ProcessTreeOtherVMemory" : 0, - "ProcessTreeOtherRSSMemory" : 0, - "MinorGCCount" : 0, - "MinorGCTime" : 0, - "MajorGCCount" : 0, - "MajorGCTime" : 0 - }, "isExcludedForStage" : false } }, diff --git a/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_node_for_stage_expectation.json b/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_node_for_stage_expectation.json index 117a47d0a9e7c..bda9caedbbe81 100644 --- a/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_node_for_stage_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_node_for_stage_expectation.json @@ -806,28 +806,6 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : true, - "peakMemoryMetrics" : { - "JVMHeapMemory" : -1, - "JVMOffHeapMemory" : 0, - "OnHeapExecutionMemory" : 0, - "OffHeapExecutionMemory" : 0, - "OnHeapStorageMemory" : 0, - "OffHeapStorageMemory" : 0, - "OnHeapUnifiedMemory" : 0, - "OffHeapUnifiedMemory" : 0, - "DirectPoolMemory" : 0, - "MappedPoolMemory" : 0, - "ProcessTreeJVMVMemory" : 0, - "ProcessTreeJVMRSSMemory" : 0, - "ProcessTreePythonVMemory" : 0, - "ProcessTreePythonRSSMemory" : 0, - "ProcessTreeOtherVMemory" : 0, - "ProcessTreeOtherRSSMemory" : 0, - "MinorGCCount" : 0, - "MinorGCTime" : 0, - "MajorGCCount" : 0, - "MajorGCTime" : 0 - }, "isExcludedForStage" : true }, "5" : { @@ -846,28 +824,6 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : true, - "peakMemoryMetrics" : { - "JVMHeapMemory" : -1, - "JVMOffHeapMemory" : 0, - "OnHeapExecutionMemory" : 0, - "OffHeapExecutionMemory" : 0, - "OnHeapStorageMemory" : 0, - "OffHeapStorageMemory" : 0, - "OnHeapUnifiedMemory" : 0, - "OffHeapUnifiedMemory" : 0, - "DirectPoolMemory" : 0, - "MappedPoolMemory" : 0, - "ProcessTreeJVMVMemory" : 0, - "ProcessTreeJVMRSSMemory" : 0, - "ProcessTreePythonVMemory" : 0, - "ProcessTreePythonRSSMemory" : 0, - "ProcessTreeOtherVMemory" : 0, - "ProcessTreeOtherRSSMemory" : 0, - "MinorGCCount" : 0, - "MinorGCTime" : 0, - "MajorGCCount" : 0, - "MajorGCTime" : 0 - }, "isExcludedForStage" : true }, "1" : { @@ -886,28 +842,6 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : false, - "peakMemoryMetrics" : { - "JVMHeapMemory" : -1, - "JVMOffHeapMemory" : 0, - "OnHeapExecutionMemory" : 0, - "OffHeapExecutionMemory" : 0, - "OnHeapStorageMemory" : 0, - "OffHeapStorageMemory" : 0, - "OnHeapUnifiedMemory" : 0, - "OffHeapUnifiedMemory" : 0, - "DirectPoolMemory" : 0, - "MappedPoolMemory" : 0, - "ProcessTreeJVMVMemory" : 0, - "ProcessTreeJVMRSSMemory" : 0, - "ProcessTreePythonVMemory" : 0, - "ProcessTreePythonRSSMemory" : 0, - "ProcessTreeOtherVMemory" : 0, - "ProcessTreeOtherRSSMemory" : 0, - "MinorGCCount" : 0, - "MinorGCTime" : 0, - "MajorGCCount" : 0, - "MajorGCTime" : 0 - }, "isExcludedForStage" : false }, "2" : { @@ -926,28 +860,6 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : false, - "peakMemoryMetrics" : { - "JVMHeapMemory" : -1, - "JVMOffHeapMemory" : 0, - "OnHeapExecutionMemory" : 0, - "OffHeapExecutionMemory" : 0, - "OnHeapStorageMemory" : 0, - "OffHeapStorageMemory" : 0, - "OnHeapUnifiedMemory" : 0, - "OffHeapUnifiedMemory" : 0, - "DirectPoolMemory" : 0, - "MappedPoolMemory" : 0, - "ProcessTreeJVMVMemory" : 0, - "ProcessTreeJVMRSSMemory" : 0, - "ProcessTreePythonVMemory" : 0, - "ProcessTreePythonRSSMemory" : 0, - "ProcessTreeOtherVMemory" : 0, - "ProcessTreeOtherRSSMemory" : 0, - "MinorGCCount" : 0, - "MinorGCTime" : 0, - "MajorGCCount" : 0, - "MajorGCTime" : 0 - }, "isExcludedForStage" : false }, "3" : { @@ -966,28 +878,6 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : true, - "peakMemoryMetrics" : { - "JVMHeapMemory" : -1, - "JVMOffHeapMemory" : 0, - "OnHeapExecutionMemory" : 0, - "OffHeapExecutionMemory" : 0, - "OnHeapStorageMemory" : 0, - "OffHeapStorageMemory" : 0, - "OnHeapUnifiedMemory" : 0, - "OffHeapUnifiedMemory" : 0, - "DirectPoolMemory" : 0, - "MappedPoolMemory" : 0, - "ProcessTreeJVMVMemory" : 0, - "ProcessTreeJVMRSSMemory" : 0, - "ProcessTreePythonVMemory" : 0, - "ProcessTreePythonRSSMemory" : 0, - "ProcessTreeOtherVMemory" : 0, - "ProcessTreeOtherRSSMemory" : 0, - "MinorGCCount" : 0, - "MinorGCTime" : 0, - "MajorGCCount" : 0, - "MajorGCTime" : 0 - }, "isExcludedForStage" : true } }, diff --git a/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json index 6246f96944be0..41e54c68858ad 100644 --- a/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json @@ -460,28 +460,6 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : false, - "peakMemoryMetrics" : { - "JVMHeapMemory" : -1, - "JVMOffHeapMemory" : 0, - "OnHeapExecutionMemory" : 0, - "OffHeapExecutionMemory" : 0, - "OnHeapStorageMemory" : 0, - "OffHeapStorageMemory" : 0, - "OnHeapUnifiedMemory" : 0, - "OffHeapUnifiedMemory" : 0, - "DirectPoolMemory" : 0, - "MappedPoolMemory" : 0, - "ProcessTreeJVMVMemory" : 0, - "ProcessTreeJVMRSSMemory" : 0, - "ProcessTreePythonVMemory" : 0, - "ProcessTreePythonRSSMemory" : 0, - "ProcessTreeOtherVMemory" : 0, - "ProcessTreeOtherRSSMemory" : 0, - "MinorGCCount" : 0, - "MinorGCTime" : 0, - "MajorGCCount" : 0, - "MajorGCTime" : 0 - }, "isExcludedForStage" : false } }, diff --git a/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json index 512b3d8acdad3..7a6685a609523 100644 --- a/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json @@ -460,28 +460,6 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : false, - "peakMemoryMetrics" : { - "JVMHeapMemory" : -1, - "JVMOffHeapMemory" : 0, - "OnHeapExecutionMemory" : 0, - "OffHeapExecutionMemory" : 0, - "OnHeapStorageMemory" : 0, - "OffHeapStorageMemory" : 0, - "OnHeapUnifiedMemory" : 0, - "OffHeapUnifiedMemory" : 0, - "DirectPoolMemory" : 0, - "MappedPoolMemory" : 0, - "ProcessTreeJVMVMemory" : 0, - "ProcessTreeJVMRSSMemory" : 0, - "ProcessTreePythonVMemory" : 0, - "ProcessTreePythonRSSMemory" : 0, - "ProcessTreeOtherVMemory" : 0, - "ProcessTreeOtherRSSMemory" : 0, - "MinorGCCount" : 0, - "MinorGCTime" : 0, - "MajorGCCount" : 0, - "MajorGCTime" : 0 - }, "isExcludedForStage" : false } }, diff --git a/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json index 6f72488478225..066b6a4f884a7 100644 --- a/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json @@ -504,28 +504,6 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : false, - "peakMemoryMetrics" : { - "JVMHeapMemory" : -1, - "JVMOffHeapMemory" : 0, - "OnHeapExecutionMemory" : 0, - "OffHeapExecutionMemory" : 0, - "OnHeapStorageMemory" : 0, - "OffHeapStorageMemory" : 0, - "OnHeapUnifiedMemory" : 0, - "OffHeapUnifiedMemory" : 0, - "DirectPoolMemory" : 0, - "MappedPoolMemory" : 0, - "ProcessTreeJVMVMemory" : 0, - "ProcessTreeJVMRSSMemory" : 0, - "ProcessTreePythonVMemory" : 0, - "ProcessTreePythonRSSMemory" : 0, - "ProcessTreeOtherVMemory" : 0, - "ProcessTreeOtherRSSMemory" : 0, - "MinorGCCount" : 0, - "MinorGCTime" : 0, - "MajorGCCount" : 0, - "MajorGCTime" : 0 - }, "isExcludedForStage" : false } }, From fbd096af411361daae31ea958f346bc2b01ce4cc Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 4 Dec 2020 21:38:34 +0800 Subject: [PATCH 19/22] save --- .../org/apache/spark/ui/static/stagepage.js | 16 ++++++---------- .../spark/ui/static/stagespage-template.html | 8 ++++---- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js index 24874b47854e7..51495c37020bc 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js @@ -315,10 +315,10 @@ $(document).ready(function () { "
Result Serialization Time
" + "
Getting Result Time
" + "
Peak Execution Memory
" + - "
Executor JVMOnHeapMemory / JVMOffHeapMemory
" + - "
Executor OnHeapExecutionMemory / OffHeapExecutionMemory
" + - "
Executor OnHeapStorageMemory / OffHeapStorageMemory
" + - "
Executor DirectPoolMemory / MappedPoolMemory
" + + "
Peak JVM Memory OnHeap / OffHeap
" + + "
Peak Execution Memory OnHeap / OffHeap
" + + "
Peak Storage Memory OnHeap / OffHeap
" + + "
Peak Pool Memory Direct / Mapped
" + ""); $('#scheduler_delay').attr("data-toggle", "tooltip") @@ -491,14 +491,10 @@ $(document).ready(function () { data : function (row, type) { var peakMemoryMetrics = row.peakMemoryMetrics if (typeof peakMemoryMetrics !== 'undefined') { - var jvmHeapMemory = peakMemoryMetrics.JVMHeapMemory; - if (jvmHeapMemory < 0) { - jvmHeapMemory = 0; - } if (type !== 'display') - return jvmHeapMemory; + return peakMemoryMetrics.JVMOnHeapMemory; else - return (formatBytes(jvmHeapMemory, type) + ' / ' + + return (formatBytes(peakMemoryMetrics.JVMOnHeapMemory, type) + ' / ' + formatBytes(peakMemoryMetrics.JVMOffHeapMemory, type)); } else { if (type !== 'display') { diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagespage-template.html b/core/src/main/resources/org/apache/spark/ui/static/stagespage-template.html index d039f456669a3..b938158b77027 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagespage-template.html +++ b/core/src/main/resources/org/apache/spark/ui/static/stagespage-template.html @@ -59,10 +59,10 @@

Aggregated Metrics by Executor

Shuffle Write Size / Records Spill (Memory) Spill (Disk) - JVMHeapMemory / JVMOffHeapMemory - OnHeapExecutionMemory / OffHeapExecutionMemory - OnHeapStorageMemory / OffHeapStorageMemory - DirectPoolMemory / MappedPoolMemory + Peak JVM Memory OnHeap / OffHeap + Peak Execution Memory OnHeap / OffHeap + Peak Storage Memory OnHeap / OffHeap + Peak Pool Memory Direct / Mapped From c85fd16af6f35c46bc20cccac9d5d947b7c74af8 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 7 Dec 2020 13:56:08 +0800 Subject: [PATCH 20/22] update --- .../main/resources/org/apache/spark/ui/static/stagepage.js | 4 ++-- .../scala/org/apache/spark/status/AppStatusListener.scala | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js index 51495c37020bc..f0761a14a6c7d 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js @@ -492,9 +492,9 @@ $(document).ready(function () { var peakMemoryMetrics = row.peakMemoryMetrics if (typeof peakMemoryMetrics !== 'undefined') { if (type !== 'display') - return peakMemoryMetrics.JVMOnHeapMemory; + return peakMemoryMetrics.JVMHeapMemory; else - return (formatBytes(peakMemoryMetrics.JVMOnHeapMemory, type) + ' / ' + + return (formatBytes(peakMemoryMetrics.JVMHeapMemory, type) + ' / ' + formatBytes(peakMemoryMetrics.JVMOffHeapMemory, type)); } else { if (type !== 'display') { diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 5b0c1dc389af0..0722095cc6533 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -687,6 +687,9 @@ private[spark] class AppStatusListener( stage.killedSummary = killedTasksSummary(event.reason, stage.killedSummary) } stage.activeTasksPerExecutor(event.taskInfo.executorId) -= 1 + + stage.executorSummary(event.taskInfo.executorId).peakExecutorMetrics + .compareAndUpdatePeakValues(event.taskExecutorMetrics) // [SPARK-24415] Wait for all tasks to finish before removing stage from live list val removeStage = stage.activeTasks == 0 && From ecd2779f36360f0334adc7a7d636bcfd2fa48c01 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 9 Dec 2020 17:38:46 +0800 Subject: [PATCH 21/22] Update stagepage.js --- .../main/resources/org/apache/spark/ui/static/stagepage.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js index f0761a14a6c7d..336edff509300 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js @@ -489,7 +489,7 @@ $(document).ready(function () { }, { data : function (row, type) { - var peakMemoryMetrics = row.peakMemoryMetrics + var peakMemoryMetrics = row.peakMemoryMetrics; if (typeof peakMemoryMetrics !== 'undefined') { if (type !== 'display') return peakMemoryMetrics.JVMHeapMemory; @@ -532,7 +532,7 @@ $(document).ready(function () { return peakMemoryMetrics.OnHeapStorageMemory; else return (formatBytes(peakMemoryMetrics.OnHeapStorageMemory, type) + ' / ' + - formatBytes(row.peakMemoryMetrics.OffHeapStorageMemory, type)); + formatBytes(peakMemoryMetrics.OffHeapStorageMemory, type)); } else { if (type !== 'display') { return 0; From c21be1157c2fe74aec075e4662aec13d0b66597c Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 9 Dec 2020 21:48:33 +0800 Subject: [PATCH 22/22] fix UT --- ...xcludeOnFailure_for_stage_expectation.json | 44 +++++++ ...eOnFailure_node_for_stage_expectation.json | 110 ++++++++++++++++++ .../one_stage_attempt_json_expectation.json | 22 ++++ .../one_stage_json_expectation.json | 22 ++++ ...age_with_accumulable_json_expectation.json | 22 ++++ 5 files changed, 220 insertions(+) diff --git a/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_for_stage_expectation.json b/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_for_stage_expectation.json index a69940fa5a1a5..ab9a8b7ef885f 100644 --- a/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_for_stage_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_for_stage_expectation.json @@ -698,6 +698,28 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : true, + "peakMemoryMetrics" : { + "JVMHeapMemory" : 0, + "JVMOffHeapMemory" : 0, + "OnHeapExecutionMemory" : 0, + "OffHeapExecutionMemory" : 0, + "OnHeapStorageMemory" : 0, + "OffHeapStorageMemory" : 0, + "OnHeapUnifiedMemory" : 0, + "OffHeapUnifiedMemory" : 0, + "DirectPoolMemory" : 0, + "MappedPoolMemory" : 0, + "ProcessTreeJVMVMemory" : 0, + "ProcessTreeJVMRSSMemory" : 0, + "ProcessTreePythonVMemory" : 0, + "ProcessTreePythonRSSMemory" : 0, + "ProcessTreeOtherVMemory" : 0, + "ProcessTreeOtherRSSMemory" : 0, + "MinorGCCount" : 0, + "MinorGCTime" : 0, + "MajorGCCount" : 0, + "MajorGCTime" : 0 + }, "isExcludedForStage" : true }, "1" : { @@ -716,6 +738,28 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : false, + "peakMemoryMetrics" : { + "JVMHeapMemory" : 0, + "JVMOffHeapMemory" : 0, + "OnHeapExecutionMemory" : 0, + "OffHeapExecutionMemory" : 0, + "OnHeapStorageMemory" : 0, + "OffHeapStorageMemory" : 0, + "OnHeapUnifiedMemory" : 0, + "OffHeapUnifiedMemory" : 0, + "DirectPoolMemory" : 0, + "MappedPoolMemory" : 0, + "ProcessTreeJVMVMemory" : 0, + "ProcessTreeJVMRSSMemory" : 0, + "ProcessTreePythonVMemory" : 0, + "ProcessTreePythonRSSMemory" : 0, + "ProcessTreeOtherVMemory" : 0, + "ProcessTreeOtherRSSMemory" : 0, + "MinorGCCount" : 0, + "MinorGCTime" : 0, + "MajorGCCount" : 0, + "MajorGCTime" : 0 + }, "isExcludedForStage" : false } }, diff --git a/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_node_for_stage_expectation.json b/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_node_for_stage_expectation.json index bda9caedbbe81..1c569c19894fd 100644 --- a/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_node_for_stage_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/excludeOnFailure_node_for_stage_expectation.json @@ -806,6 +806,28 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : true, + "peakMemoryMetrics" : { + "JVMHeapMemory" : 0, + "JVMOffHeapMemory" : 0, + "OnHeapExecutionMemory" : 0, + "OffHeapExecutionMemory" : 0, + "OnHeapStorageMemory" : 0, + "OffHeapStorageMemory" : 0, + "OnHeapUnifiedMemory" : 0, + "OffHeapUnifiedMemory" : 0, + "DirectPoolMemory" : 0, + "MappedPoolMemory" : 0, + "ProcessTreeJVMVMemory" : 0, + "ProcessTreeJVMRSSMemory" : 0, + "ProcessTreePythonVMemory" : 0, + "ProcessTreePythonRSSMemory" : 0, + "ProcessTreeOtherVMemory" : 0, + "ProcessTreeOtherRSSMemory" : 0, + "MinorGCCount" : 0, + "MinorGCTime" : 0, + "MajorGCCount" : 0, + "MajorGCTime" : 0 + }, "isExcludedForStage" : true }, "5" : { @@ -824,6 +846,28 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : true, + "peakMemoryMetrics" : { + "JVMHeapMemory" : 0, + "JVMOffHeapMemory" : 0, + "OnHeapExecutionMemory" : 0, + "OffHeapExecutionMemory" : 0, + "OnHeapStorageMemory" : 0, + "OffHeapStorageMemory" : 0, + "OnHeapUnifiedMemory" : 0, + "OffHeapUnifiedMemory" : 0, + "DirectPoolMemory" : 0, + "MappedPoolMemory" : 0, + "ProcessTreeJVMVMemory" : 0, + "ProcessTreeJVMRSSMemory" : 0, + "ProcessTreePythonVMemory" : 0, + "ProcessTreePythonRSSMemory" : 0, + "ProcessTreeOtherVMemory" : 0, + "ProcessTreeOtherRSSMemory" : 0, + "MinorGCCount" : 0, + "MinorGCTime" : 0, + "MajorGCCount" : 0, + "MajorGCTime" : 0 + }, "isExcludedForStage" : true }, "1" : { @@ -842,6 +886,28 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : false, + "peakMemoryMetrics" : { + "JVMHeapMemory" : 0, + "JVMOffHeapMemory" : 0, + "OnHeapExecutionMemory" : 0, + "OffHeapExecutionMemory" : 0, + "OnHeapStorageMemory" : 0, + "OffHeapStorageMemory" : 0, + "OnHeapUnifiedMemory" : 0, + "OffHeapUnifiedMemory" : 0, + "DirectPoolMemory" : 0, + "MappedPoolMemory" : 0, + "ProcessTreeJVMVMemory" : 0, + "ProcessTreeJVMRSSMemory" : 0, + "ProcessTreePythonVMemory" : 0, + "ProcessTreePythonRSSMemory" : 0, + "ProcessTreeOtherVMemory" : 0, + "ProcessTreeOtherRSSMemory" : 0, + "MinorGCCount" : 0, + "MinorGCTime" : 0, + "MajorGCCount" : 0, + "MajorGCTime" : 0 + }, "isExcludedForStage" : false }, "2" : { @@ -860,6 +926,28 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : false, + "peakMemoryMetrics" : { + "JVMHeapMemory" : 0, + "JVMOffHeapMemory" : 0, + "OnHeapExecutionMemory" : 0, + "OffHeapExecutionMemory" : 0, + "OnHeapStorageMemory" : 0, + "OffHeapStorageMemory" : 0, + "OnHeapUnifiedMemory" : 0, + "OffHeapUnifiedMemory" : 0, + "DirectPoolMemory" : 0, + "MappedPoolMemory" : 0, + "ProcessTreeJVMVMemory" : 0, + "ProcessTreeJVMRSSMemory" : 0, + "ProcessTreePythonVMemory" : 0, + "ProcessTreePythonRSSMemory" : 0, + "ProcessTreeOtherVMemory" : 0, + "ProcessTreeOtherRSSMemory" : 0, + "MinorGCCount" : 0, + "MinorGCTime" : 0, + "MajorGCCount" : 0, + "MajorGCTime" : 0 + }, "isExcludedForStage" : false }, "3" : { @@ -878,6 +966,28 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : true, + "peakMemoryMetrics" : { + "JVMHeapMemory" : 0, + "JVMOffHeapMemory" : 0, + "OnHeapExecutionMemory" : 0, + "OffHeapExecutionMemory" : 0, + "OnHeapStorageMemory" : 0, + "OffHeapStorageMemory" : 0, + "OnHeapUnifiedMemory" : 0, + "OffHeapUnifiedMemory" : 0, + "DirectPoolMemory" : 0, + "MappedPoolMemory" : 0, + "ProcessTreeJVMVMemory" : 0, + "ProcessTreeJVMRSSMemory" : 0, + "ProcessTreePythonVMemory" : 0, + "ProcessTreePythonRSSMemory" : 0, + "ProcessTreeOtherVMemory" : 0, + "ProcessTreeOtherRSSMemory" : 0, + "MinorGCCount" : 0, + "MinorGCTime" : 0, + "MajorGCCount" : 0, + "MajorGCTime" : 0 + }, "isExcludedForStage" : true } }, diff --git a/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json index 41e54c68858ad..b1eab0d7ac196 100644 --- a/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json @@ -460,6 +460,28 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : false, + "peakMemoryMetrics" : { + "JVMHeapMemory" : 0, + "JVMOffHeapMemory" : 0, + "OnHeapExecutionMemory" : 0, + "OffHeapExecutionMemory" : 0, + "OnHeapStorageMemory" : 0, + "OffHeapStorageMemory" : 0, + "OnHeapUnifiedMemory" : 0, + "OffHeapUnifiedMemory" : 0, + "DirectPoolMemory" : 0, + "MappedPoolMemory" : 0, + "ProcessTreeJVMVMemory" : 0, + "ProcessTreeJVMRSSMemory" : 0, + "ProcessTreePythonVMemory" : 0, + "ProcessTreePythonRSSMemory" : 0, + "ProcessTreeOtherVMemory" : 0, + "ProcessTreeOtherRSSMemory" : 0, + "MinorGCCount" : 0, + "MinorGCTime" : 0, + "MajorGCCount" : 0, + "MajorGCTime" : 0 + }, "isExcludedForStage" : false } }, diff --git a/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json index 7a6685a609523..6dfdd27cd7d8f 100644 --- a/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json @@ -460,6 +460,28 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : false, + "peakMemoryMetrics" : { + "JVMHeapMemory" : 0, + "JVMOffHeapMemory" : 0, + "OnHeapExecutionMemory" : 0, + "OffHeapExecutionMemory" : 0, + "OnHeapStorageMemory" : 0, + "OffHeapStorageMemory" : 0, + "OnHeapUnifiedMemory" : 0, + "OffHeapUnifiedMemory" : 0, + "DirectPoolMemory" : 0, + "MappedPoolMemory" : 0, + "ProcessTreeJVMVMemory" : 0, + "ProcessTreeJVMRSSMemory" : 0, + "ProcessTreePythonVMemory" : 0, + "ProcessTreePythonRSSMemory" : 0, + "ProcessTreeOtherVMemory" : 0, + "ProcessTreeOtherRSSMemory" : 0, + "MinorGCCount" : 0, + "MinorGCTime" : 0, + "MajorGCCount" : 0, + "MajorGCTime" : 0 + }, "isExcludedForStage" : false } }, diff --git a/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json index 066b6a4f884a7..a2cfd9d42cc99 100644 --- a/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json @@ -504,6 +504,28 @@ "memoryBytesSpilled" : 0, "diskBytesSpilled" : 0, "isBlacklistedForStage" : false, + "peakMemoryMetrics" : { + "JVMHeapMemory" : 0, + "JVMOffHeapMemory" : 0, + "OnHeapExecutionMemory" : 0, + "OffHeapExecutionMemory" : 0, + "OnHeapStorageMemory" : 0, + "OffHeapStorageMemory" : 0, + "OnHeapUnifiedMemory" : 0, + "OffHeapUnifiedMemory" : 0, + "DirectPoolMemory" : 0, + "MappedPoolMemory" : 0, + "ProcessTreeJVMVMemory" : 0, + "ProcessTreeJVMRSSMemory" : 0, + "ProcessTreePythonVMemory" : 0, + "ProcessTreePythonRSSMemory" : 0, + "ProcessTreeOtherVMemory" : 0, + "ProcessTreeOtherRSSMemory" : 0, + "MinorGCCount" : 0, + "MinorGCTime" : 0, + "MajorGCCount" : 0, + "MajorGCTime" : 0 + }, "isExcludedForStage" : false } },