Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25228][CORE]Add executor CPU time metric. #22218

Closed
wants to merge 7 commits into from

Conversation

LucaCanali
Copy link
Contributor

What changes were proposed in this pull request?

Add a new metric to measure the executor's process (JVM) CPU time.

How was this patch tested?

Manually tested on a Spark cluster (see SPARK-25228 for an example screenshot).

// The value is returned in nanoseconds, the method return -1 if this operation is not supported.
val osMXBean = ManagementFactory.getOperatingSystemMXBean.asInstanceOf[OperatingSystemMXBean]
metricRegistry.register(MetricRegistry.name("executorCPUTime" ), new Gauge[Long] {
override def getValue: Long = osMXBean.getProcessCpuTime()
Copy link
Member

@maropu maropu Aug 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metric is useful for users? The task cpu time metric is not enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the proposed metric tracking the executor CPU time is useful and adds additional information and convenience on top of the task CPU metric, as implemented in SPARK-22190. A couple of considerations to support this argument from some of the recent findings and experimentation on this:

  • the process CPU time contains all the CPU consumed by the JVM, notably including the CPU consumed by garbage collection, which can be important in some cases and definitely something we want to measure and analyze
  • the CPU time collected from the tasks is "harder to consume" in a dashboard as the CPU value is only updated at the end of the successful execution of the task, which makes it harder to handle for a dashboard in case of long-running tasks. In contrast, the executor process CPU time "dropwizard gauge" gives an up-to-date value of the CPU consumed by the executor at any time as it takes it directly from the OS.

import java.util.concurrent.ThreadPoolExecutor

import scala.collection.JavaConverters._

import com.codahale.metrics.{Gauge, MetricRegistry}
import com.sun.management.OperatingSystemMXBean
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this com.sun class going to be available in all JDKs? Thinking of OpenJDK and IBM JDKs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.
This class cannot be loaded at least on IBM JDK as reported here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed this is a very good point that I had overlooked. I have now directly checked and this appears to work OK on OpenJDK (and on Oracle JVM of course). In addition, I tested manually with IBM JDK (IBM J9 VM, Java 1.8.0_181, where one would indeed suspect incompatibilities and surprisingly this appears to work in that case too. I believe this may come from recent work by IBM to make com.ibm.lang.management.OperatingSystemMXBean.getProcessCpuTime compatible with com.sun.management.OperatingSystemMXBean.getProcessCpuTime? See also this link

I guess that if this is confirmed, we should be fine with a large fraction of the commonly used JDKs. In addition, we could handle the exception in case getProcessCpuTime is not available on a particular platform where the executor is running, for example returning the value -1 for this gauge in that case. Any thoughts/suggestions on this proposal?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's safest to a little reflection here to make sure this doesn't cause the whole app to crash every time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have refactored the code with a different approach using the BeanServer which should address the comments about avialability of com.sun.management.OperatingSystemMXBean across different JDKs.

@LucaCanali
Copy link
Contributor Author

I have refactored the code now using the BeanServer which should address the comments about availability of com.sun.management.OperatingSystemMXBean across different JDKs.

// will try to get JVM Process CPU time or return -1 otherwise
// will use proprietary extensions as com.sun.management.OperatingSystemMXBean or
// com.ibm.lang.management.OperatingSystemMXBean if available
def tryToGetJVMProcessPCUTime() : Long = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just inline this method below?

if (attribute != null) {
attribute.asInstanceOf[Long]
}
else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: pull onto previous line

@LucaCanali
Copy link
Contributor Author

I have implemented the changes as from the latest comments, namely inlined the method.

@@ -73,6 +75,29 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends
registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)
}

/** Dropwizard metrics gauge measuring the executor's process CPU time.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the comments should begin on the next line. But this is scaladoc syntax, and inside a code block, normally we just use // block comments.

* It will use proprietary extensions as com.sun.management.OperatingSystemMXBean or
* com.ibm.lang.management.OperatingSystemMXBean if available
*/
val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is that these become fields in the parent object. These should go inside the new Gauge... { I think?

-1L
}
} catch {
case _ : Exception => -1L
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case NonFatal(_) => -1?

*/
val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer
val name = new ObjectName("java.lang", "type", "OperatingSystem")
metricRegistry.register(MetricRegistry.name("executorCPUTime" ), new Gauge[Long] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a little confused with the exsiting cpuTime. How about jvmCpuTime?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: name("executorCPUTime" ) -> name("executorCPUTime")

if (attribute != null) {
attribute.asInstanceOf[Long]
} else {
-1L
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to return -1 instead of 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took the idea from com.sun.management.OperatingSystemMXBean.getProcessCpuTime, according to the documentation: "Returns: the CPU time used by the process in nanoseconds, or -1 if this operation is not supported."
I guess it makes sense to return an invalid value as -1L for the CPU time if something goes wrong with gathering CPU Time values, so the error condition will appear evident to the end user of the metric. Returning 0 is also possible, of course.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks.

@LucaCanali
Copy link
Contributor Author

I have implemented the changes as from the latest comments by @maropu and @srowen

// com.ibm.lang.management.OperatingSystemMXBean, if available.
metricRegistry.register(MetricRegistry.name("jvmCpuTime"), new Gauge[Long] {
override def getValue: Long = {
val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I actually mean to put these inside the anonymous Gauge instance but outside the method, so as to compute them once, I doubt there is much overhead here. Getting the bean is just returning a field, although constructing the ObjectName is a little non-trivial. I suppose metrics are infrequently computed so this doesn't matter much.

@LucaCanali
Copy link
Contributor Author

Thanks @srowen

// The CPU time value is returned in nanoseconds.
// It will use proprietary extensions such as com.sun.management.OperatingSystemMXBean or
// com.ibm.lang.management.OperatingSystemMXBean, if available.
metricRegistry.register(MetricRegistry.name("jvmCpuTime"), new Gauge[Long] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this isn't exposed except through dropwizard... not plumbed through to the driver too like some of the metrics below? just checking that this is all that needs to happen, that the metric can be used by external users but is not otherwise touched by Spark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, this is exposed only through dropwizard metrics system and not used otherwise in the Spark code. Another point worth mentioning is that currently executorSource is not registered when running in local mode.
On a related topic (although maybe for a more general discussion than the scope of this PR) I was wondering if it would make sense to introduce a few SparkConf properties to switch on/off certain families of (dropwizard) metrics in the Spark, as the list of available metrics is mecoming long in recent versions.

override def getValue: Long = {
try {
val attribute = mBean.getAttribute(name, "ProcessCpuTime")
if (attribute != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed good point. I'll remove this additional check for null value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally don't mind the defensive checks, because who knows what to really expect from these implementations? but this is OK by me. In case of a bad impl this would still return -1.

@SparkQA
Copy link

SparkQA commented Sep 3, 2018

Test build #4330 has finished for PR 22218 at commit e72966e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Sep 3, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Sep 4, 2018

Test build #4331 has finished for PR 22218 at commit e72966e.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Sep 5, 2018

Merged to master

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants