Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import org.apache.spark.internal.config.Tests._
import org.apache.spark.internal.config.UI._
import org.apache.spark.internal.plugin.PluginContainer
import org.apache.spark.io.CompressionCodec
import org.apache.spark.metrics.source.JVMCPUSource
import org.apache.spark.metrics.source.{JVMCPUSource, PythonMetricsSource}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.resource._
Expand Down Expand Up @@ -631,6 +631,9 @@ class SparkContext(config: SparkConf) extends Logging {
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_env.metricsSystem.registerSource(new JVMCPUSource())
if (isLocal && conf.get(METRICS_PYTHONMETRICS_SOURCE_ENABLED)) {
_env.metricsSystem.registerSource(new PythonMetricsSource())
}
_executorMetricsSource.foreach(_.register(_env.metricsSystem))
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
Expand Down
101 changes: 101 additions & 0 deletions core/src/main/scala/org/apache/spark/api/python/PythonMetrics.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.python

import java.util.concurrent.atomic.AtomicLong

private[spark] object PythonMetrics {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, these metrics are incremental across all JVM-Python and Python UDFs in single Spark app?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. The metrics values are cumulative, measured and reported for each executor, on the same spirit of other metrics in the metrics system like cpuTime.count runTime.count, etc.


// Instrument with general metrics on serialization/deserialization JVM-to-Python
private val toWorkerWriteTime = new AtomicLong(0L)
private val toWorkerBatchCount = new AtomicLong(0L)
private val toWorkerBytesWritten = new AtomicLong(0L)
private val fromWorkerReadTime = new AtomicLong(0L)
private val fromWorkerBatchCount = new AtomicLong(0L)
private val fromWorkerBytesRead = new AtomicLong(0L)

// Instrument Pandas_UDF
private val pandasUDFReadRowCount = new AtomicLong(0L)
private val pandasUDFWriteRowCount = new AtomicLong(0L)

def incToWorkerWriteTime(delta: Long): Unit = {
toWorkerWriteTime.getAndAdd(delta)
}

def getToWorkerWriteTime: Long = {
toWorkerWriteTime.get
}

def incToWorkerBytesWritten(delta: Long): Unit = {
toWorkerBytesWritten.getAndAdd(delta)
}

def getToWorkerBytesWritten: Long = {
toWorkerBytesWritten.get
}

def incToWorkerBatchCount(delta: Long): Unit = {
toWorkerBatchCount.getAndAdd(delta)
}

def getToWorkerBatchCount: Long = {
toWorkerBatchCount.get
}

def incFromWorkerReadTime(delta: Long): Unit = {
fromWorkerReadTime.getAndAdd(delta)
}

def getFromWorkerReadTime: Long = {
fromWorkerReadTime.get
}

def incFromWorkerBatchCount(delta: Long): Unit = {
fromWorkerBatchCount.getAndAdd(delta)
}

def getFromWorkerBatchCount: Long = {
fromWorkerBatchCount.get
}

def incFromWorkerBytesRead(delta: Long): Unit = {
fromWorkerBytesRead.getAndAdd(delta)
}

def getFromWorkerBytesRead: Long = {
fromWorkerBytesRead.get
}

// Pandas_UDF
def incPandasUDFReadRowCount(step: Long): Unit = {
pandasUDFReadRowCount.getAndAdd(step)
}

def getPandasUDFReadRowCount: Long = {
pandasUDFReadRowCount.get
}

def incPandasUDFWriteRowCount(step: Long): Unit = {
pandasUDFWriteRowCount.getAndAdd(step)
}

def getPandasUDFWriteRowCount: Long = {
pandasUDFWriteRowCount.get
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import org.apache.spark.security.{SocketAuthHelper, SocketAuthServer, SocketFunc
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
import org.apache.spark.util._


private[spark] class PythonRDD(
parent: RDD[_],
func: PythonFunction,
Expand Down Expand Up @@ -304,9 +303,9 @@ private[spark] object PythonRDD extends Logging {
throw new SparkException("Unexpected element type " + other.getClass)
}

iter.foreach(write)
val numIterations = iter.map(write).size
PythonMetrics.incToWorkerBatchCount(numIterations)
}

/**
* Create an RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]],
* key and value class.
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT](

override def run(): Unit = Utils.logUncaughtExceptions {
try {
// time instrumentation
val startTime = System.nanoTime()

TaskContext.setTaskContext(context)
val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
val dataOut = new DataOutputStream(stream)
Expand Down Expand Up @@ -397,6 +400,12 @@ private[spark] abstract class BasePythonRunner[IN, OUT](

dataOut.writeInt(SpecialLengths.END_OF_STREAM)
dataOut.flush()

val deltaTime = System.nanoTime()-startTime
val deltaBytes = dataOut.size()
PythonMetrics.incToWorkerWriteTime(deltaTime)
PythonMetrics.incToWorkerBytesWritten(deltaBytes)

} catch {
case t: Throwable if (NonFatal(t) || t.isInstanceOf[Exception]) =>
if (context.isCompleted || context.isInterrupted) {
Expand Down Expand Up @@ -466,7 +475,10 @@ private[spark] abstract class BasePythonRunner[IN, OUT](

override def hasNext: Boolean = nextObj != null || {
if (!eos) {
val startTime = System.nanoTime()
nextObj = read()
val deltaTime = System.nanoTime()-startTime
PythonMetrics.incFromWorkerReadTime(deltaTime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a couple of questions.

  • Is it possible to merge it with SQLMetric? It would be nicer if UI shows it as well.
  • Is it possible to integrate with existing Python profiler? The current read time isn't purely Python execution time. It includes socket IO time which can potentially be large.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I like the idea of adding SQLMetrics for Python UDF instrumentation and use them in the WEBUI. However, I think the work would rather fit for a separate JIRA/PR. The implementation details and the overhead of SQLMetrics are different from Dropwizard-based metrics, so probably we would like to have only a limited number of SQLMetrics instrumenting task activities in this area. Also the implementation of SQLMetrics for [[PythonUDF]] execution may require some important changes to the current plan evaluation code.

  • It is indeed the case that the “read time from worker” which is exposed to the users via the dropwizard library as “FetchResultsTimeFromWorkers” contains both socket I/O + deserialization time and Python UDF execution time. Measuring on the Python side could allow to separate the 2 time components, however currently I don’t see how to make a lightweight implementation for that. Python profiler has the possibility to measure on the Python side as you mentioned, but I see its usage more for debugging, while the proposed instrumentation is lightweight and intended to be used for production use cases too. Maybe future work can address this case if there is need?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I have finally managed to work on your suggestion to instrument Python execution using SQL Metrics, so that users can see the metrics via the WebUI. See [SPARK-34265]. I imagine that I could later refactor the work on this PR based on that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR link: #31367

hasNext
} else {
false
Expand All @@ -477,6 +489,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
if (hasNext) {
val obj = nextObj
nextObj = null.asInstanceOf[OUT]
PythonMetrics.incFromWorkerBatchCount(1L)
obj
} else {
Iterator.empty.next()
Expand Down Expand Up @@ -642,6 +655,7 @@ private[spark] class PythonRunner(funcs: Seq[ChainedPythonFunctions])
case length if length > 0 =>
val obj = new Array[Byte](length)
stream.readFully(obj)
PythonMetrics.incFromWorkerBytesRead(length)
obj
case 0 => Array.emptyByteArray
case SpecialLengths.TIMING_DATA =>
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.plugin.PluginContainer
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
import org.apache.spark.metrics.source.JVMCPUSource
import org.apache.spark.metrics.source.{JVMCPUSource, PythonMetricsSource}
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -134,6 +134,9 @@ private[spark] class Executor(
env.metricsSystem.registerSource(executorSource)
env.metricsSystem.registerSource(new JVMCPUSource())
executorMetricsSource.foreach(_.register(env.metricsSystem))
if (conf.get(METRICS_PYTHONMETRICS_SOURCE_ENABLED)) {
env.metricsSystem.registerSource(new PythonMetricsSource())
}
env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource)
} else {
// This enable the registration of the executor source in local mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,12 @@ package object config {
.booleanConf
.createWithDefault(true)

private[spark] val METRICS_PYTHONMETRICS_SOURCE_ENABLED =
ConfigBuilder("spark.metrics.pythonMetricsSource.enabled")
.doc("Whether to register the PythonMetrics source with the metrics system.")
.booleanConf
.createWithDefault(false)

private[spark] val PYSPARK_DRIVER_PYTHON = ConfigBuilder("spark.pyspark.driver.python")
.version("2.1.0")
.stringConf
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.metrics.source

import com.codahale.metrics.{Gauge, MetricRegistry}

import org.apache.spark.api.python.PythonMetrics

private[spark] class PythonMetricsSource extends Source {

override val metricRegistry = new MetricRegistry()
override val sourceName = "PythonMetrics"

// This instruments the time spent to write/send serialized data to Python workers.
// Includes operations for MapPartition, PythonUDF and PandasUDF.
// Time is measured in nanoseconds.
metricRegistry.register(MetricRegistry.name("WriteTimeToWorkers"), new Gauge[Long] {
override def getValue: Long = PythonMetrics.getToWorkerWriteTime
})

// This instruments the number of data batches sent to Python workers.
// Includes operations for MapPartition, PythonUDF and PandasUDF.
metricRegistry.register(MetricRegistry.name("NumBatchesToWorkers"), new Gauge[Long] {
override def getValue: Long = PythonMetrics.getToWorkerBatchCount
})

// This instruments the number of bytes sent to Python workers.
// Includes operations for MapPartition, PythonUDF and PandasUDF.
metricRegistry.register(MetricRegistry.name("BytesSentToWorkers"), new Gauge[Long] {
override def getValue: Long = PythonMetrics.getToWorkerBytesWritten
})

// This instruments the number of bytes received from to Python workers.
// Includes operations for MapPartition, PythonUDF and PandasUDF.
metricRegistry.register(MetricRegistry.name("BytesReceivedFromWorkers"), new Gauge[Long] {
override def getValue: Long = PythonMetrics.getFromWorkerBytesRead
})

// This instruments the time spent reading/receiving data back from Python workers.
// It includes read operations for MapPartition, PythonUDF and PandasUDF.
// Time is measured in nanoseconds.
metricRegistry.register(MetricRegistry.name("FetchResultsTimeFromWorkers"), new Gauge[Long] {
override def getValue: Long = PythonMetrics.getFromWorkerReadTime
})

// This instruments the number of data batches received back from Python workers.
// Includes operations for MapPartition, PythonUDF and PandasUDF.
metricRegistry.register(MetricRegistry.name("NumBatchesFromWorkers"), new Gauge[Long] {
override def getValue: Long = PythonMetrics.getFromWorkerBatchCount
})

// This instruments the number of rows received back from Python workers,
// for Pandas UDF operations.
metricRegistry.register(MetricRegistry.name("PandasUDFReceivedNumRows"), new Gauge[Long] {
override def getValue: Long = PythonMetrics.getPandasUDFReadRowCount
})

// This instruments the number of rows sent to Python workers,
// for Pandas UDF operations.
metricRegistry.register(MetricRegistry.name("PandasUDFSentNumRows"), new Gauge[Long] {
override def getValue: Long = PythonMetrics.getPandasUDFWriteRowCount
})

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.metrics.source

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.internal.config.{METRICS_EXECUTORMETRICS_SOURCE_ENABLED, METRICS_STATIC_SOURCES_ENABLED}
import org.apache.spark.internal.config._

class SourceConfigSuite extends SparkFunSuite with LocalSparkContext {

Expand Down Expand Up @@ -92,4 +92,32 @@ class SourceConfigSuite extends SparkFunSuite with LocalSparkContext {
sc.stop()
}
}

test("Test configuration for adding PythonMetrics source registration") {
val conf = new SparkConf()
conf.set(METRICS_PYTHONMETRICS_SOURCE_ENABLED, true)
val sc = new SparkContext("local", "test", conf)
try {
val metricsSystem = sc.env.metricsSystem

// ExecutorMetrics source should be registered
assert (metricsSystem.getSourcesByName("PythonMetrics").nonEmpty)
} finally {
sc.stop()
}
}

test("Test configuration for skipping PythonMetrics source registration") {
val conf = new SparkConf()
conf.set(METRICS_PYTHONMETRICS_SOURCE_ENABLED, false)
val sc = new SparkContext("local", "test", conf)
try {
val metricsSystem = sc.env.metricsSystem

// ExecutorMetrics source should not be registered
assert (metricsSystem.getSourcesByName("PythonMetrics").isEmpty)
} finally {
sc.stop()
}
}
}
22 changes: 22 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,14 @@ This is the component with the largest amount of instrumented metrics
- This source contains memory-related metrics. A full list of available metrics in this
namespace can be found in the corresponding entry for the Executor component instance.

- namespace=PythonMetrics
- **note:**: these metrics only apply for the driver when running in local mode and are
conditional to a configuration parameter:
`spark.metrics.pythonMetricsSource.enabled` (default is false).
- This source exposes metrics for instrumenting Python UDF.
A full list of available metrics in this namespace can be found in the corresponding entry
for the Executor component instance.

- namespace=plugin.\<Plugin Class Name>
- Optional namespace(s). Metrics in this namespace are defined by user-supplied code, and
configured using the Spark plugin API. See "Advanced Instrumentation" below for how to load
Expand Down Expand Up @@ -1277,6 +1285,20 @@ These metrics are exposed by Spark executors.
- hiveClientCalls.count
- sourceCodeSize (histogram)

- namespace=PythonMetrics
- **notes:**
- These metrics are conditional to a configuration parameter:
`spark.metrics.pythonMetricsSource.enabled` (default is false).
- Time-based metrics are reported in nanoseconds.
- BytesReceivedFromWorkers
- BytesSentToWorkers
- FetchResultsTimeFromWorkers
- NumBatchesFromWorkers
- NumBatchesToWorkers
- PandasUDFReceivedNumRows
- PandasUDFSentNumRows
- WriteTimeToWorkers

- namespace=plugin.\<Plugin Class Name>
- Optional namespace(s). Metrics in this namespace are defined by user-supplied code, and
configured using the Spark plugin API. See "Advanced Instrumentation" below for how to load
Expand Down
Loading