From 907692275c3c560dc04f7305dc26c664828ed9b5 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 8 Oct 2019 15:10:48 -0700 Subject: [PATCH] [SPARK-29400][CORE] Improve PrometheusResource to use labels --- .../status/api/v1/PrometheusResource.scala | 49 ++++++++++--------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala index 6e52e213bda8..f9fb78e65a3d 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala @@ -40,30 +40,35 @@ private[v1] class PrometheusResource extends ApiRequestContext { def executors(): String = { val sb = new StringBuilder val store = uiRoot.asInstanceOf[SparkUI].store - val appId = store.applicationInfo.id.replaceAll("[^a-zA-Z0-9]", "_") store.executorList(true).foreach { executor => - val prefix = s"metrics_${appId}_${executor.id}_executor_" - sb.append(s"${prefix}rddBlocks_Count ${executor.rddBlocks}\n") - sb.append(s"${prefix}memoryUsed_Count ${executor.memoryUsed}\n") - sb.append(s"${prefix}diskUsed_Count ${executor.diskUsed}\n") - sb.append(s"${prefix}totalCores_Count ${executor.totalCores}\n") - sb.append(s"${prefix}maxTasks_Count ${executor.maxTasks}\n") - sb.append(s"${prefix}activeTasks_Count ${executor.activeTasks}\n") - sb.append(s"${prefix}failedTasks_Count ${executor.failedTasks}\n") - sb.append(s"${prefix}completedTasks_Count ${executor.completedTasks}\n") - sb.append(s"${prefix}totalTasks_Count ${executor.totalTasks}\n") - sb.append(s"${prefix}totalDuration_Value ${executor.totalDuration}\n") - sb.append(s"${prefix}totalGCTime_Value ${executor.totalGCTime}\n") - sb.append(s"${prefix}totalInputBytes_Count ${executor.totalInputBytes}\n") - sb.append(s"${prefix}totalShuffleRead_Count ${executor.totalShuffleRead}\n") - sb.append(s"${prefix}totalShuffleWrite_Count ${executor.totalShuffleWrite}\n") - sb.append(s"${prefix}maxMemory_Count ${executor.maxMemory}\n") + val prefix = "metrics_executor_" + val labels = Seq( + "application_id" -> store.applicationInfo.id, + "application_name" -> store.applicationInfo.name, + "executor_id" -> executor.id + ).map { case (k, v) => s"""$k="$v"""" }.mkString("{", ", ", "}") + sb.append(s"${prefix}rddBlocks_Count$labels ${executor.rddBlocks}\n") + sb.append(s"${prefix}memoryUsed_Count$labels ${executor.memoryUsed}\n") + sb.append(s"${prefix}diskUsed_Count$labels ${executor.diskUsed}\n") + sb.append(s"${prefix}totalCores_Count$labels ${executor.totalCores}\n") + sb.append(s"${prefix}maxTasks_Count$labels ${executor.maxTasks}\n") + sb.append(s"${prefix}activeTasks_Count$labels ${executor.activeTasks}\n") + sb.append(s"${prefix}failedTasks_Count$labels ${executor.failedTasks}\n") + sb.append(s"${prefix}completedTasks_Count$labels ${executor.completedTasks}\n") + sb.append(s"${prefix}totalTasks_Count$labels ${executor.totalTasks}\n") + sb.append(s"${prefix}totalDuration_Value$labels ${executor.totalDuration}\n") + sb.append(s"${prefix}totalGCTime_Value$labels ${executor.totalGCTime}\n") + sb.append(s"${prefix}totalInputBytes_Count$labels ${executor.totalInputBytes}\n") + sb.append(s"${prefix}totalShuffleRead_Count$labels ${executor.totalShuffleRead}\n") + sb.append(s"${prefix}totalShuffleWrite_Count$labels ${executor.totalShuffleWrite}\n") + sb.append(s"${prefix}maxMemory_Count$labels ${executor.maxMemory}\n") executor.executorLogs.foreach { case (k, v) => } executor.memoryMetrics.foreach { m => - sb.append(s"${prefix}usedOnHeapStorageMemory_Count ${m.usedOnHeapStorageMemory}\n") - sb.append(s"${prefix}usedOffHeapStorageMemory_Count ${m.usedOffHeapStorageMemory}\n") - sb.append(s"${prefix}totalOnHeapStorageMemory_Count ${m.totalOnHeapStorageMemory}\n") - sb.append(s"${prefix}totalOffHeapStorageMemory_Count ${m.totalOffHeapStorageMemory}\n") + sb.append(s"${prefix}usedOnHeapStorageMemory_Count$labels ${m.usedOnHeapStorageMemory}\n") + sb.append(s"${prefix}usedOffHeapStorageMemory_Count$labels ${m.usedOffHeapStorageMemory}\n") + sb.append(s"${prefix}totalOnHeapStorageMemory_Count$labels ${m.totalOnHeapStorageMemory}\n") + sb.append(s"${prefix}totalOffHeapStorageMemory_Count$labels " + + s"${m.totalOffHeapStorageMemory}\n") } executor.peakMemoryMetrics.foreach { m => val names = Array( @@ -89,7 +94,7 @@ private[v1] class PrometheusResource extends ApiRequestContext { "MajorGCTime" ) names.foreach { name => - sb.append(s"$prefix${name}_Count ${m.getMetricValue(name)}\n") + sb.append(s"$prefix${name}_Count$labels ${m.getMetricValue(name)}\n") } } }