diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d68015454de9..fd45a090228f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -50,7 +50,7 @@ import org.apache.spark.internal.config.Tests._ import org.apache.spark.internal.config.UI._ import org.apache.spark.internal.plugin.PluginContainer import org.apache.spark.io.CompressionCodec -import org.apache.spark.metrics.source.JVMCPUSource +import org.apache.spark.metrics.source.{JVMCPUSource, PythonMetricsSource} import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.resource._ @@ -631,6 +631,9 @@ class SparkContext(config: SparkConf) extends Logging { _env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) _env.metricsSystem.registerSource(new JVMCPUSource()) + if (isLocal && conf.get(METRICS_PYTHONMETRICS_SOURCE_ENABLED)) { + _env.metricsSystem.registerSource(new PythonMetricsSource()) + } _executorMetricsSource.foreach(_.register(_env.metricsSystem)) _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonMetrics.scala b/core/src/main/scala/org/apache/spark/api/python/PythonMetrics.scala new file mode 100644 index 000000000000..daf75fe52a70 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/python/PythonMetrics.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.util.concurrent.atomic.AtomicLong + +private[spark] object PythonMetrics { + + // Instrument with general metrics on serialization/deserialization JVM-to-Python + private val toWorkerWriteTime = new AtomicLong(0L) + private val toWorkerBatchCount = new AtomicLong(0L) + private val toWorkerBytesWritten = new AtomicLong(0L) + private val fromWorkerReadTime = new AtomicLong(0L) + private val fromWorkerBatchCount = new AtomicLong(0L) + private val fromWorkerBytesRead = new AtomicLong(0L) + + // Instrument Pandas_UDF + private val pandasUDFReadRowCount = new AtomicLong(0L) + private val pandasUDFWriteRowCount = new AtomicLong(0L) + + def incToWorkerWriteTime(delta: Long): Unit = { + toWorkerWriteTime.getAndAdd(delta) + } + + def getToWorkerWriteTime: Long = { + toWorkerWriteTime.get + } + + def incToWorkerBytesWritten(delta: Long): Unit = { + toWorkerBytesWritten.getAndAdd(delta) + } + + def getToWorkerBytesWritten: Long = { + toWorkerBytesWritten.get + } + + def incToWorkerBatchCount(delta: Long): Unit = { + toWorkerBatchCount.getAndAdd(delta) + } + + def getToWorkerBatchCount: Long = { + toWorkerBatchCount.get + } + + def incFromWorkerReadTime(delta: Long): Unit = { + fromWorkerReadTime.getAndAdd(delta) + } + + def getFromWorkerReadTime: Long = { + fromWorkerReadTime.get + } + + def incFromWorkerBatchCount(delta: Long): Unit = { + fromWorkerBatchCount.getAndAdd(delta) + } + + def getFromWorkerBatchCount: Long = { + fromWorkerBatchCount.get + } + + def incFromWorkerBytesRead(delta: Long): Unit = { + fromWorkerBytesRead.getAndAdd(delta) + } + + def getFromWorkerBytesRead: Long = { + fromWorkerBytesRead.get + } + + // Pandas_UDF + def incPandasUDFReadRowCount(step: Long): Unit = { + pandasUDFReadRowCount.getAndAdd(step) + } + + def getPandasUDFReadRowCount: Long = { + pandasUDFReadRowCount.get + } + + def incPandasUDFWriteRowCount(step: Long): Unit = { + pandasUDFWriteRowCount.getAndAdd(step) + } + + def getPandasUDFWriteRowCount: Long = { + pandasUDFWriteRowCount.get + } + +} diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 86a1ac31c084..4515020daf70 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -44,7 +44,6 @@ import org.apache.spark.security.{SocketAuthHelper, SocketAuthServer, SocketFunc import org.apache.spark.storage.{BroadcastBlockId, StorageLevel} import org.apache.spark.util._ - private[spark] class PythonRDD( parent: RDD[_], func: PythonFunction, @@ -304,9 +303,9 @@ private[spark] object PythonRDD extends Logging { throw new SparkException("Unexpected element type " + other.getClass) } - iter.foreach(write) + val numIterations = iter.map(write).size + PythonMetrics.incToWorkerBatchCount(numIterations) } - /** * Create an RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]], * key and value class. diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index d7a09b599794..fbcb3d67917a 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -229,6 +229,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( override def run(): Unit = Utils.logUncaughtExceptions { try { + // time instrumentation + val startTime = System.nanoTime() + TaskContext.setTaskContext(context) val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize) val dataOut = new DataOutputStream(stream) @@ -397,6 +400,12 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(SpecialLengths.END_OF_STREAM) dataOut.flush() + + val deltaTime = System.nanoTime()-startTime + val deltaBytes = dataOut.size() + PythonMetrics.incToWorkerWriteTime(deltaTime) + PythonMetrics.incToWorkerBytesWritten(deltaBytes) + } catch { case t: Throwable if (NonFatal(t) || t.isInstanceOf[Exception]) => if (context.isCompleted || context.isInterrupted) { @@ -466,7 +475,10 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( override def hasNext: Boolean = nextObj != null || { if (!eos) { + val startTime = System.nanoTime() nextObj = read() + val deltaTime = System.nanoTime()-startTime + PythonMetrics.incFromWorkerReadTime(deltaTime) hasNext } else { false @@ -477,6 +489,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( if (hasNext) { val obj = nextObj nextObj = null.asInstanceOf[OUT] + PythonMetrics.incFromWorkerBatchCount(1L) obj } else { Iterator.empty.next() @@ -642,6 +655,7 @@ private[spark] class PythonRunner(funcs: Seq[ChainedPythonFunctions]) case length if length > 0 => val obj = new Array[Byte](length) stream.readFully(obj) + PythonMetrics.incFromWorkerBytesRead(length) obj case 0 => Array.emptyByteArray case SpecialLengths.TIMING_DATA => diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 1a0ad566633d..927f0971fce0 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -42,7 +42,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.plugin.PluginContainer import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager} -import org.apache.spark.metrics.source.JVMCPUSource +import org.apache.spark.metrics.source.{JVMCPUSource, PythonMetricsSource} import org.apache.spark.resource.ResourceInformation import org.apache.spark.rpc.RpcTimeout import org.apache.spark.scheduler._ @@ -134,6 +134,9 @@ private[spark] class Executor( env.metricsSystem.registerSource(executorSource) env.metricsSystem.registerSource(new JVMCPUSource()) executorMetricsSource.foreach(_.register(env.metricsSystem)) + if (conf.get(METRICS_PYTHONMETRICS_SOURCE_ENABLED)) { + env.metricsSystem.registerSource(new PythonMetricsSource()) + } env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource) } else { // This enable the registration of the executor source in local mode. diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 2bb1290963f8..fbfd479b4372 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -871,6 +871,12 @@ package object config { .booleanConf .createWithDefault(true) + private[spark] val METRICS_PYTHONMETRICS_SOURCE_ENABLED = + ConfigBuilder("spark.metrics.pythonMetricsSource.enabled") + .doc("Whether to register the PythonMetrics source with the metrics system.") + .booleanConf + .createWithDefault(false) + private[spark] val PYSPARK_DRIVER_PYTHON = ConfigBuilder("spark.pyspark.driver.python") .version("2.1.0") .stringConf diff --git a/core/src/main/scala/org/apache/spark/metrics/source/PythonMetricsSource.scala b/core/src/main/scala/org/apache/spark/metrics/source/PythonMetricsSource.scala new file mode 100644 index 000000000000..60619ade9e8c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/source/PythonMetricsSource.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.source + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.api.python.PythonMetrics + +private[spark] class PythonMetricsSource extends Source { + + override val metricRegistry = new MetricRegistry() + override val sourceName = "PythonMetrics" + + // This instruments the time spent to write/send serialized data to Python workers. + // Includes operations for MapPartition, PythonUDF and PandasUDF. + // Time is measured in nanoseconds. + metricRegistry.register(MetricRegistry.name("WriteTimeToWorkers"), new Gauge[Long] { + override def getValue: Long = PythonMetrics.getToWorkerWriteTime + }) + + // This instruments the number of data batches sent to Python workers. + // Includes operations for MapPartition, PythonUDF and PandasUDF. + metricRegistry.register(MetricRegistry.name("NumBatchesToWorkers"), new Gauge[Long] { + override def getValue: Long = PythonMetrics.getToWorkerBatchCount + }) + + // This instruments the number of bytes sent to Python workers. + // Includes operations for MapPartition, PythonUDF and PandasUDF. + metricRegistry.register(MetricRegistry.name("BytesSentToWorkers"), new Gauge[Long] { + override def getValue: Long = PythonMetrics.getToWorkerBytesWritten + }) + + // This instruments the number of bytes received from to Python workers. + // Includes operations for MapPartition, PythonUDF and PandasUDF. + metricRegistry.register(MetricRegistry.name("BytesReceivedFromWorkers"), new Gauge[Long] { + override def getValue: Long = PythonMetrics.getFromWorkerBytesRead + }) + + // This instruments the time spent reading/receiving data back from Python workers. + // It includes read operations for MapPartition, PythonUDF and PandasUDF. + // Time is measured in nanoseconds. + metricRegistry.register(MetricRegistry.name("FetchResultsTimeFromWorkers"), new Gauge[Long] { + override def getValue: Long = PythonMetrics.getFromWorkerReadTime + }) + + // This instruments the number of data batches received back from Python workers. + // Includes operations for MapPartition, PythonUDF and PandasUDF. + metricRegistry.register(MetricRegistry.name("NumBatchesFromWorkers"), new Gauge[Long] { + override def getValue: Long = PythonMetrics.getFromWorkerBatchCount + }) + + // This instruments the number of rows received back from Python workers, + // for Pandas UDF operations. + metricRegistry.register(MetricRegistry.name("PandasUDFReceivedNumRows"), new Gauge[Long] { + override def getValue: Long = PythonMetrics.getPandasUDFReadRowCount + }) + + // This instruments the number of rows sent to Python workers, + // for Pandas UDF operations. + metricRegistry.register(MetricRegistry.name("PandasUDFSentNumRows"), new Gauge[Long] { + override def getValue: Long = PythonMetrics.getPandasUDFWriteRowCount + }) + +} diff --git a/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala index 7da1403ecd4b..5f3518466c9e 100644 --- a/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.metrics.source import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} -import org.apache.spark.internal.config.{METRICS_EXECUTORMETRICS_SOURCE_ENABLED, METRICS_STATIC_SOURCES_ENABLED} +import org.apache.spark.internal.config._ class SourceConfigSuite extends SparkFunSuite with LocalSparkContext { @@ -92,4 +92,32 @@ class SourceConfigSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } } + + test("Test configuration for adding PythonMetrics source registration") { + val conf = new SparkConf() + conf.set(METRICS_PYTHONMETRICS_SOURCE_ENABLED, true) + val sc = new SparkContext("local", "test", conf) + try { + val metricsSystem = sc.env.metricsSystem + + // ExecutorMetrics source should be registered + assert (metricsSystem.getSourcesByName("PythonMetrics").nonEmpty) + } finally { + sc.stop() + } + } + + test("Test configuration for skipping PythonMetrics source registration") { + val conf = new SparkConf() + conf.set(METRICS_PYTHONMETRICS_SOURCE_ENABLED, false) + val sc = new SparkContext("local", "test", conf) + try { + val metricsSystem = sc.env.metricsSystem + + // ExecutorMetrics source should not be registered + assert (metricsSystem.getSourcesByName("PythonMetrics").isEmpty) + } finally { + sc.stop() + } + } } diff --git a/docs/monitoring.md b/docs/monitoring.md index a07a11344598..ecb2d6dbb760 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -1166,6 +1166,14 @@ This is the component with the largest amount of instrumented metrics - This source contains memory-related metrics. A full list of available metrics in this namespace can be found in the corresponding entry for the Executor component instance. +- namespace=PythonMetrics + - **note:**: these metrics only apply for the driver when running in local mode and are + conditional to a configuration parameter: + `spark.metrics.pythonMetricsSource.enabled` (default is false). + - This source exposes metrics for instrumenting Python UDF. + A full list of available metrics in this namespace can be found in the corresponding entry + for the Executor component instance. + - namespace=plugin.\ - Optional namespace(s). Metrics in this namespace are defined by user-supplied code, and configured using the Spark plugin API. See "Advanced Instrumentation" below for how to load @@ -1277,6 +1285,20 @@ These metrics are exposed by Spark executors. - hiveClientCalls.count - sourceCodeSize (histogram) +- namespace=PythonMetrics + - **notes:** + - These metrics are conditional to a configuration parameter: + `spark.metrics.pythonMetricsSource.enabled` (default is false). + - Time-based metrics are reported in nanoseconds. + - BytesReceivedFromWorkers + - BytesSentToWorkers + - FetchResultsTimeFromWorkers + - NumBatchesFromWorkers + - NumBatchesToWorkers + - PandasUDFReceivedNumRows + - PandasUDFSentNumRows + - WriteTimeToWorkers + - namespace=plugin.\ - Optional namespace(s). Metrics in this namespace are defined by user-supplied code, and configured using the Spark plugin API. See "Advanced Instrumentation" below for how to load diff --git a/python/pyspark/tests/test_metrics.py b/python/pyspark/tests/test_metrics.py new file mode 100644 index 000000000000..3e48143de7b9 --- /dev/null +++ b/python/pyspark/tests/test_metrics.py @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from operator import add + +from pyspark.testing.utils import ReusedPySparkTestCase + + +class MetricsTests(ReusedPySparkTestCase): + + def test_metrics(self): + count = self.sc.parallelize(range(1, 10000), 1).reduce(add) + read_time = self.sc._jvm.org.apache.spark.api.python.PythonMetrics.getFromWorkerReadTime() + self.assertGreater(read_time, 0) + + +if __name__ == "__main__": + import unittest + from pyspark.tests.test_metrics import * # noqa: F401 + + try: + import xmlrunner # type: ignore[import] + testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2) + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala index b44b13c8de0d..0e18fdb3c6dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala @@ -61,7 +61,6 @@ class ArrowPythonRunner( new WriterThread(env, worker, inputIterator, partitionIndex, context) { protected override def writeCommand(dataOut: DataOutputStream): Unit = { - // Write config for the worker as a number of key -> value pairs of strings dataOut.writeInt(conf.size) for ((k, v) <- conf) { @@ -92,6 +91,9 @@ class ArrowPythonRunner( arrowWriter.finish() writer.writeBatch() + val rowCount = root.getRowCount + PythonMetrics.incToWorkerBatchCount(1L) + PythonMetrics.incPandasUDFWriteRowCount(rowCount) arrowWriter.reset() } // end writes footer to the output stream and doesn't clean any resources. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala index 25ce16db264a..82c9842b2a96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala @@ -24,7 +24,7 @@ import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.ipc.ArrowStreamWriter import org.apache.spark.{SparkEnv, TaskContext} -import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions, PythonRDD} +import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions, PythonMetrics, PythonRDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.arrow.ArrowWriter import org.apache.spark.sql.types.StructType @@ -102,6 +102,9 @@ class CoGroupedArrowPythonRunner( } arrowWriter.finish() writer.writeBatch() + val rowCount = root.getRowCount + PythonMetrics.incToWorkerBatchCount(1L) + PythonMetrics.incPandasUDFWriteRowCount(rowCount) writer.end() }{ root.close() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala index bb353062384a..6fdea374b277 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala @@ -26,7 +26,7 @@ import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.ipc.ArrowStreamReader import org.apache.spark.{SparkEnv, TaskContext} -import org.apache.spark.api.python.{BasePythonRunner, SpecialLengths} +import org.apache.spark.api.python.{BasePythonRunner, PythonMetrics, SpecialLengths} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector} @@ -71,10 +71,15 @@ private[python] trait PythonArrowOutput { self: BasePythonRunner[_, ColumnarBatc } try { if (reader != null && batchLoaded) { + val bytesReadStart = reader.bytesRead() batchLoaded = reader.loadNextBatch() if (batchLoaded) { val batch = new ColumnarBatch(vectors) - batch.setNumRows(root.getRowCount) + val rowCount = root.getRowCount + batch.setNumRows(rowCount) + val bytesReadEnd = reader.bytesRead() + PythonMetrics.incFromWorkerBytesRead(bytesReadEnd - bytesReadStart) + PythonMetrics.incPandasUDFReadRowCount(rowCount.toLong) batch } else { reader.close(false) @@ -91,6 +96,8 @@ private[python] trait PythonArrowOutput { self: BasePythonRunner[_, ColumnarBatc vectors = root.getFieldVectors().asScala.map { vector => new ArrowColumnVector(vector) }.toArray[ColumnVector] + val bytesReadSpecial = reader.bytesRead() + PythonMetrics.incFromWorkerBytesRead(bytesReadSpecial) read() case SpecialLengths.TIMING_DATA => handleTimingData() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala index d341d7019f0a..659aeb817a7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala @@ -72,6 +72,7 @@ class PythonUDFRunner( case length if length > 0 => val obj = new Array[Byte](length) stream.readFully(obj) + PythonMetrics.incFromWorkerBytesRead(length) obj case 0 => Array.emptyByteArray case SpecialLengths.TIMING_DATA =>