-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29400][CORE] Improve PrometheusResource to use labels #26060
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,30 +40,35 @@ private[v1] class PrometheusResource extends ApiRequestContext { | |
| def executors(): String = { | ||
| val sb = new StringBuilder | ||
| val store = uiRoot.asInstanceOf[SparkUI].store | ||
| val appId = store.applicationInfo.id.replaceAll("[^a-zA-Z0-9]", "_") | ||
| store.executorList(true).foreach { executor => | ||
| val prefix = s"metrics_${appId}_${executor.id}_executor_" | ||
| sb.append(s"${prefix}rddBlocks_Count ${executor.rddBlocks}\n") | ||
| sb.append(s"${prefix}memoryUsed_Count ${executor.memoryUsed}\n") | ||
| sb.append(s"${prefix}diskUsed_Count ${executor.diskUsed}\n") | ||
| sb.append(s"${prefix}totalCores_Count ${executor.totalCores}\n") | ||
| sb.append(s"${prefix}maxTasks_Count ${executor.maxTasks}\n") | ||
| sb.append(s"${prefix}activeTasks_Count ${executor.activeTasks}\n") | ||
| sb.append(s"${prefix}failedTasks_Count ${executor.failedTasks}\n") | ||
| sb.append(s"${prefix}completedTasks_Count ${executor.completedTasks}\n") | ||
| sb.append(s"${prefix}totalTasks_Count ${executor.totalTasks}\n") | ||
| sb.append(s"${prefix}totalDuration_Value ${executor.totalDuration}\n") | ||
| sb.append(s"${prefix}totalGCTime_Value ${executor.totalGCTime}\n") | ||
| sb.append(s"${prefix}totalInputBytes_Count ${executor.totalInputBytes}\n") | ||
| sb.append(s"${prefix}totalShuffleRead_Count ${executor.totalShuffleRead}\n") | ||
| sb.append(s"${prefix}totalShuffleWrite_Count ${executor.totalShuffleWrite}\n") | ||
| sb.append(s"${prefix}maxMemory_Count ${executor.maxMemory}\n") | ||
| val prefix = "metrics_executor_" | ||
| val labels = Seq( | ||
| "application_id" -> store.applicationInfo.id, | ||
| "application_name" -> store.applicationInfo.name, | ||
| "executor_id" -> executor.id | ||
| ).map { case (k, v) => s"""$k="$v"""" }.mkString("{", ", ", "}") | ||
| sb.append(s"${prefix}rddBlocks_Count$labels ${executor.rddBlocks}\n") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually Does it have bad impact on the metrics usage later? Because now all applications are recorded under the same metrics. I am not sure how Prometheus processes, but naturally I'd think Prometheus needs to search specified application id in the metrics of all applications. Previously you have appId and executor id in metric name.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right. The redundant information is moved to labels.
No. Prometheus query language support to handle them individually.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes. But I am wondering is, now all numbers from all applications are recorded under same metric. To retrieve number for specified application, does not Prometheus need to search it among all applications' metric numbers?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I may misunderstand Prometheus's approach. If so, then this might not be a problem.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For that
Here are the reference for the details~ |
||
| sb.append(s"${prefix}memoryUsed_Count$labels ${executor.memoryUsed}\n") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not related to this PR. But why they all end with
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for review, @viirya . Yes. Of course, we rename it freely because we start to support them natively. |
||
| sb.append(s"${prefix}diskUsed_Count$labels ${executor.diskUsed}\n") | ||
| sb.append(s"${prefix}totalCores_Count$labels ${executor.totalCores}\n") | ||
| sb.append(s"${prefix}maxTasks_Count$labels ${executor.maxTasks}\n") | ||
| sb.append(s"${prefix}activeTasks_Count$labels ${executor.activeTasks}\n") | ||
| sb.append(s"${prefix}failedTasks_Count$labels ${executor.failedTasks}\n") | ||
| sb.append(s"${prefix}completedTasks_Count$labels ${executor.completedTasks}\n") | ||
| sb.append(s"${prefix}totalTasks_Count$labels ${executor.totalTasks}\n") | ||
| sb.append(s"${prefix}totalDuration_Value$labels ${executor.totalDuration}\n") | ||
| sb.append(s"${prefix}totalGCTime_Value$labels ${executor.totalGCTime}\n") | ||
| sb.append(s"${prefix}totalInputBytes_Count$labels ${executor.totalInputBytes}\n") | ||
| sb.append(s"${prefix}totalShuffleRead_Count$labels ${executor.totalShuffleRead}\n") | ||
| sb.append(s"${prefix}totalShuffleWrite_Count$labels ${executor.totalShuffleWrite}\n") | ||
| sb.append(s"${prefix}maxMemory_Count$labels ${executor.maxMemory}\n") | ||
| executor.executorLogs.foreach { case (k, v) => } | ||
| executor.memoryMetrics.foreach { m => | ||
| sb.append(s"${prefix}usedOnHeapStorageMemory_Count ${m.usedOnHeapStorageMemory}\n") | ||
| sb.append(s"${prefix}usedOffHeapStorageMemory_Count ${m.usedOffHeapStorageMemory}\n") | ||
| sb.append(s"${prefix}totalOnHeapStorageMemory_Count ${m.totalOnHeapStorageMemory}\n") | ||
| sb.append(s"${prefix}totalOffHeapStorageMemory_Count ${m.totalOffHeapStorageMemory}\n") | ||
| sb.append(s"${prefix}usedOnHeapStorageMemory_Count$labels ${m.usedOnHeapStorageMemory}\n") | ||
| sb.append(s"${prefix}usedOffHeapStorageMemory_Count$labels ${m.usedOffHeapStorageMemory}\n") | ||
| sb.append(s"${prefix}totalOnHeapStorageMemory_Count$labels ${m.totalOnHeapStorageMemory}\n") | ||
| sb.append(s"${prefix}totalOffHeapStorageMemory_Count$labels " + | ||
| s"${m.totalOffHeapStorageMemory}\n") | ||
| } | ||
| executor.peakMemoryMetrics.foreach { m => | ||
| val names = Array( | ||
|
|
@@ -89,7 +94,7 @@ private[v1] class PrometheusResource extends ApiRequestContext { | |
| "MajorGCTime" | ||
| ) | ||
| names.foreach { name => | ||
| sb.append(s"$prefix${name}_Count ${m.getMetricValue(name)}\n") | ||
| sb.append(s"$prefix${name}_Count$labels ${m.getMetricValue(name)}\n") | ||
| } | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if application name is needed, because you have application id already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only field human-readable to distinguish the jobs.