diff --git a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala index af67f41e94af..1c5f335e94f6 100644 --- a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala +++ b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala @@ -39,6 +39,7 @@ private[spark] case class ProcfsMetrics( otherVmemTotal: Long, otherRSSTotal: Long) + // Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop // project. private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends Logging { @@ -47,6 +48,15 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L private val pageSize = computePageSize() private var isAvailable: Boolean = isProcfsAvailable private val pid = computePid() + @volatile private var cachedAllMetric = ProcfsMetrics(0, 0, 0, 0, 0, 0) + @volatile private var lastimeMetricsComputed = 0L + private val PROCESS_TREE_METRICS_RECOMPUTE_WAIT_MS = if (testing) { + 0 + } else { + SparkEnv.get.conf.get(config.PROCESS_TREE_METRICS_RECOMPUTE_WAIT) + } + + private lazy val isProcfsAvailable: Boolean = { if (testing) { @@ -58,11 +68,7 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L logWarning("Exception checking for procfs dir", ioe) false } - val shouldLogStageExecutorMetrics = - SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) - val shouldLogStageExecutorProcessTreeMetrics = - SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) - procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + procDirExists.get } } @@ -205,21 +211,43 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L } } + private[spark] def isCacheValid(): Boolean = { + val metricsAge = System.currentTimeMillis() - lastimeMetricsComputed + // ToDo: Should we make this configurable? + return PROCESS_TREE_METRICS_RECOMPUTE_WAIT_MS > metricsAge + } + private[spark] def computeAllMetrics(): ProcfsMetrics = { if (!isAvailable) { - return ProcfsMetrics(0, 0, 0, 0, 0, 0) + lastimeMetricsComputed = System.currentTimeMillis + cachedAllMetric = ProcfsMetrics(0, 0, 0, 0, 0, 0) + return cachedAllMetric } - val pids = computeProcessTree - var allMetrics = ProcfsMetrics(0, 0, 0, 0, 0, 0) - for (p <- pids) { - allMetrics = addProcfsMetricsFromOneProcess(allMetrics, p) - // if we had an error getting any of the metrics, we don't want to report partial metrics, as - // that would be misleading. - if (!isAvailable) { - return ProcfsMetrics(0, 0, 0, 0, 0, 0) + + if (!isCacheValid) { + this.synchronized { + if (isCacheValid) { + return cachedAllMetric + } + val pids = computeProcessTree + var allMetrics = ProcfsMetrics(0, 0, 0, 0, 0, 0) + for (p <- pids) { + allMetrics = addProcfsMetricsFromOneProcess(allMetrics, p) + // if we had an error getting any of the metrics, we don't + // want to report partial metrics, as that would be misleading. + if (!isAvailable) { + lastimeMetricsComputed = System.currentTimeMillis + cachedAllMetric = ProcfsMetrics(0, 0, 0, 0, 0, 0) + return cachedAllMetric + } + } + lastimeMetricsComputed = System.currentTimeMillis + cachedAllMetric = allMetrics + allMetrics } + } else { + cachedAllMetric } - allMetrics } } diff --git a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsSource.scala b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsSource.scala new file mode 100644 index 000000000000..820a4e26cf2b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsSource.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.config +import org.apache.spark.metrics.source.Source + +private[executor] class ProcfsMetricsSource extends Source { + override val sourceName = "procfs" + // We use the following var to only call computAllMetrics once per + // the set of procfs metrics. This is because Metrics system gauge that + // return a set of metrics can't be used without significant changes to + // ProcfsMetricGetter + var numOfRequestsToGetProcfs: Int = 0 + override val metricRegistry = new MetricRegistry() + var metrics: Map[String, Long] = Map.empty + val shouldAddProcessTreeMetricsToMetricsSet = + SparkEnv.get.conf.get(config.METRICS_PROCESS_TREE_METRICS) + + private def getProcfsMetrics: Map[String, Long] = { + if (numOfRequestsToGetProcfs == 0) { + metrics = Map.empty + val p = ProcfsMetricsGetter.pTreeInfo.computeAllMetrics() + metrics = Map( + "JVMVMemory" -> p.jvmVmemTotal, + "JVMRSSMemory" -> p.jvmRSSTotal, + "PythonVMemory" -> p.pythonVmemTotal, + "PythonRSSMemory" -> p.pythonRSSTotal, + "OtherVMemory" -> p.otherVmemTotal, + "OtherRSSMemory" -> p.otherRSSTotal) + } + // We have 6 metrics in Procfs. So we just need to call computeAllMetrics + // every 6 times not every time that Metrics system needs the value of + // a metric + numOfRequestsToGetProcfs = (numOfRequestsToGetProcfs + 1) % 6 + metrics + } + + private def registerProcfsMetrics[Long](name: String) = { + metricRegistry.register(MetricRegistry.name("processTree", name), new Gauge[Long] { + override def getValue: Long = getProcfsMetrics(name).asInstanceOf[Long] + }) + } + + if (shouldAddProcessTreeMetricsToMetricsSet) { + registerProcfsMetrics("JVMVMemory") + registerProcfsMetrics("JVMRSSMemory") + registerProcfsMetrics("PythonVMemory") + registerProcfsMetrics("PythonRSSMemory") + registerProcfsMetrics("OtherVMemory") + registerProcfsMetrics("OtherRSSMemory") + } +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index f1c1c034df49..48812982e264 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -98,6 +98,16 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val METRICS_PROCESS_TREE_METRICS = + ConfigBuilder("spark.metrics.addProcessTreeMetricsToMetricsSet.enabled") + .booleanConf + .createWithDefault(false) + + private[spark] val PROCESS_TREE_METRICS_RECOMPUTE_WAIT = + ConfigBuilder("spark.metrics.recomputeWait") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") + private[spark] val EVENT_LOG_OVERWRITE = ConfigBuilder("spark.eventLog.overwrite").booleanConf.createWithDefault(false) diff --git a/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala b/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala index 704b36d3118b..3ddc71a81d08 100644 --- a/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala +++ b/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala @@ -21,9 +21,12 @@ import javax.management.ObjectName import scala.collection.mutable +import org.apache.spark.SparkEnv import org.apache.spark.executor.ProcfsMetricsGetter +import org.apache.spark.internal.config import org.apache.spark.memory.MemoryManager + /** * Executor metric types for executor-level metrics stored in ExecutorMetrics. */ @@ -85,16 +88,21 @@ case object ProcessTreeMetrics extends ExecutorMetricType { "ProcessTreePythonRSSMemory", "ProcessTreeOtherVMemory", "ProcessTreeOtherRSSMemory") - + val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) override private[spark] def getMetricValues(memoryManager: MemoryManager): Array[Long] = { - val allMetrics = ProcfsMetricsGetter.pTreeInfo.computeAllMetrics() val processTreeMetrics = new Array[Long](names.length) - processTreeMetrics(0) = allMetrics.jvmVmemTotal - processTreeMetrics(1) = allMetrics.jvmRSSTotal - processTreeMetrics(2) = allMetrics.pythonVmemTotal - processTreeMetrics(3) = allMetrics.pythonRSSTotal - processTreeMetrics(4) = allMetrics.otherVmemTotal - processTreeMetrics(5) = allMetrics.otherRSSTotal + if (shouldLogStageExecutorMetrics && shouldLogStageExecutorProcessTreeMetrics) { + val allMetrics = ProcfsMetricsGetter.pTreeInfo.computeAllMetrics() + processTreeMetrics(0) = allMetrics.jvmVmemTotal + processTreeMetrics(1) = allMetrics.jvmRSSTotal + processTreeMetrics(2) = allMetrics.pythonVmemTotal + processTreeMetrics(3) = allMetrics.pythonRSSTotal + processTreeMetrics(4) = allMetrics.otherVmemTotal + processTreeMetrics(5) = allMetrics.otherRSSTotal + } processTreeMetrics } } @@ -140,7 +148,6 @@ private[spark] object ExecutorMetricType { ProcessTreeMetrics ) - val (metricToOffset, numMetrics) = { var numberOfMetrics = 0 val definedMetricsAndOffset = mutable.LinkedHashMap.empty[String, Int]