diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index dee0ddfa3387..1c9a9af07458 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -450,6 +450,7 @@ def __hash__(self):
"pyspark.sql.tests.test_pandas_cogrouped_map",
"pyspark.sql.tests.test_pandas_grouped_map",
"pyspark.sql.tests.test_pandas_map",
+ "pyspark.sql.tests.test_pandas_sqlmetrics",
"pyspark.sql.tests.test_pandas_udf",
"pyspark.sql.tests.test_pandas_udf_grouped_agg",
"pyspark.sql.tests.test_pandas_udf_scalar",
diff --git a/docs/web-ui.md b/docs/web-ui.md
index deaf50fe5797..3389c3f67cbe 100644
--- a/docs/web-ui.md
+++ b/docs/web-ui.md
@@ -404,6 +404,16 @@ Here is the list of SQL metrics:
avg hash probe bucket list iters | the average bucket list iterations per lookup during aggregation | HashAggregate |
data size of build side | the size of built hash map | ShuffledHashJoin |
time to build hash map | the time spent on building hash map | ShuffledHashJoin |
+ time spent executing | time spent executing the Python UDF and sending the results back to the executors. It can contain a component related to the time spent waiting for data to process, when data sending is slower compared to execution time | BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas |
+ time spent sending data | time spent sending data to the Python workers, this is part of the Python UDF execution time | BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas |
+ time spent sending code | time spent sending Python code to the Python workers | BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas |
+ bytes of code sent | the number of bytes of serialized Python code sent the to the Python workers | BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas |
+ bytes of data returned | the number of bytes of serialized data received back from the Python workers | BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas |
+ bytes of data sent | the number of bytes of serialized data sent the to the Python workers | BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas |
+ number of batches returned | the number of data batches received back from the Python workers | BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas |
+ number of batches processed | the number of data batches sent to the Python workers | BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas |
+ number of rows returned | the number of rows returned by the Python workers | BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas |
+ number of rows processed | the number rows sent to the Python workers | BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas |
diff --git a/python/pyspark/sql/tests/test_pandas_sqlmetrics.py b/python/pyspark/sql/tests/test_pandas_sqlmetrics.py
new file mode 100644
index 000000000000..b58fa8fbb95c
--- /dev/null
+++ b/python/pyspark/sql/tests/test_pandas_sqlmetrics.py
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from pyspark.sql.functions import pandas_udf
+from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \
+ pandas_requirement_message, pyarrow_requirement_message
+
+
+@unittest.skipIf(
+ not have_pandas or not have_pyarrow,
+ pandas_requirement_message or pyarrow_requirement_message) # type: ignore[arg-type]
+class PandasSQLMetrics(ReusedSQLTestCase):
+
+ def test_pandas_sql_metrics_basic(self):
+
+ PythonSQLMetrics = [
+ "time spent executing",
+ "time spent sending data",
+ "time spent sending code",
+ "bytes of code sent",
+ "bytes of data returned",
+ "bytes of data sent",
+ "number of batches returned",
+ "number of batches processed",
+ "number of rows returned",
+ "number of rows processed"
+ ]
+
+ @pandas_udf("long")
+ def test_pandas(col1):
+ return col1 * col1
+
+ res = self.spark.range(10).select(test_pandas("id")).collect()
+
+ statusStore = self.spark._jsparkSession.sharedState().statusStore()
+ executionMetrics = statusStore.execution(0).get().metrics().mkString()
+
+ for metric in PythonSQLMetrics:
+ self.assertIn(metric, executionMetrics)
+
+
+if __name__ == "__main__":
+ from pyspark.sql.tests.test_pandas_sqlmetrics import * # noqa: F401
+
+ try:
+ import xmlrunner # type: ignore[import]
+ testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
+ except ImportError:
+ testRunner = None
+ unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
index 69802b143c11..c541a575dd08 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
@@ -46,7 +46,7 @@ case class AggregateInPandasExec(
udfExpressions: Seq[PythonUDF],
resultExpressions: Seq[NamedExpression],
child: SparkPlan)
- extends UnaryExecNode {
+ extends UnaryExecNode with PythonSQLMetrics {
override val output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
@@ -162,7 +162,17 @@ case class AggregateInPandasExec(
argOffsets,
aggInputSchema,
sessionLocalTimeZone,
- pythonRunnerConf).compute(projectedRowIter, context.partitionId(), context)
+ pythonRunnerConf,
+ pythonExecTime,
+ pythonDataSerializeTime,
+ pythonCodeSerializeTime,
+ pythonCodeSent,
+ pythonDataReceived,
+ pythonDataSent,
+ pythonNumRowsReceived,
+ pythonNumRowsSent,
+ pythonNumBatchesReceived,
+ pythonNumBatchesSent).compute(projectedRowIter, context.partitionId(), context)
val joinedAttributes =
groupingExpressions.map(_.toAttribute) ++ udfExpressions.map(_.resultAttribute)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
index 096712cf9352..2ffbac421b06 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
@@ -61,7 +61,7 @@ private[spark] class BatchIterator[T](iter: Iterator[T], batchSize: Int)
*/
case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan,
evalType: Int)
- extends EvalPythonExec {
+ extends EvalPythonExec with PythonSQLMetrics {
private val batchSize = conf.arrowMaxRecordsPerBatch
private val sessionLocalTimeZone = conf.sessionLocalTimeZone
@@ -85,7 +85,17 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute]
argOffsets,
schema,
sessionLocalTimeZone,
- pythonRunnerConf).compute(batchIter, context.partitionId(), context)
+ pythonRunnerConf,
+ pythonExecTime,
+ pythonDataSerializeTime,
+ pythonCodeSerializeTime,
+ pythonCodeSent,
+ pythonDataReceived,
+ pythonDataSent,
+ pythonNumRowsReceived,
+ pythonNumRowsSent,
+ pythonNumBatchesReceived,
+ pythonNumBatchesSent).compute(batchIter, context.partitionId(), context)
columnarBatchIter.flatMap { batch =>
val actualDataTypes = (0 until batch.numCols()).map(i => batch.column(i).dataType())
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
index 7171c7f7f974..d550ef327344 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
@@ -27,6 +27,7 @@ import org.apache.spark._
import org.apache.spark.api.python._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.arrow.ArrowWriter
+import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ArrowUtils
@@ -42,7 +43,17 @@ class ArrowPythonRunner(
argOffsets: Array[Array[Int]],
schema: StructType,
timeZoneId: String,
- conf: Map[String, String])
+ conf: Map[String, String],
+ val pythonExecTime: SQLMetric,
+ pythonSerializeTime: SQLMetric,
+ pythonCodeSerializeTime: SQLMetric,
+ pythonCodeSent: SQLMetric,
+ val pythonDataReceived: SQLMetric,
+ pythonDataSent: SQLMetric,
+ val pythonNumRowsReceived: SQLMetric,
+ pythonNumRowsSent: SQLMetric,
+ val pythonNumBatchesReceived: SQLMetric,
+ pythonNumBatchesSent: SQLMetric)
extends BasePythonRunner[Iterator[InternalRow], ColumnarBatch](funcs, evalType, argOffsets)
with PythonArrowOutput {
@@ -64,6 +75,8 @@ class ArrowPythonRunner(
protected override def writeCommand(dataOut: DataOutputStream): Unit = {
+ val startTime = System.nanoTime()
+ val startData = dataOut.size()
// Write config for the worker as a number of key -> value pairs of strings
dataOut.writeInt(conf.size)
for ((k, v) <- conf) {
@@ -72,6 +85,10 @@ class ArrowPythonRunner(
}
PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
+ val deltaTime = System.nanoTime() - startTime
+ pythonCodeSerializeTime += deltaTime
+ val deltaData = dataOut.size() - startData
+ pythonCodeSent += deltaData
}
protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = {
@@ -85,17 +102,27 @@ class ArrowPythonRunner(
val writer = new ArrowStreamWriter(root, null, dataOut)
writer.start()
+ val startData = dataOut.size()
while (inputIterator.hasNext) {
val nextBatch = inputIterator.next()
while (nextBatch.hasNext) {
+ val startTime = System.nanoTime()
arrowWriter.write(nextBatch.next())
+ val deltaTime = System.nanoTime() - startTime
+ pythonSerializeTime += deltaTime
}
arrowWriter.finish()
writer.writeBatch()
+
+ val rowCount = root.getRowCount
+ pythonNumBatchesSent += 1
+ pythonNumRowsSent += rowCount
arrowWriter.reset()
}
+ val deltaData = dataOut.size() - startData
+ pythonDataSent += deltaData
// end writes footer to the output stream and doesn't clean any resources.
// It could throw exception if the output stream is closed, so it should be
// in the try block.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
index 10f7966b93d1..5e4a7edafabd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.types.{StructField, StructType}
* A physical plan that evaluates a [[PythonUDF]]
*/
case class BatchEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan)
- extends EvalPythonExec {
+ extends EvalPythonExec with PythonSQLMetrics {
protected override def evaluate(
funcs: Seq[ChainedPythonFunctions],
@@ -61,6 +61,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute]
// Input iterator to Python: input rows are grouped so we send them in batches to Python.
// For each row, add it to the queue.
val inputIterator = iter.map { row =>
+ pythonNumRowsSent += 1
if (needConversion) {
EvaluatePython.toJava(row, schema)
} else {
@@ -74,10 +75,22 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute]
}
fields
}
- }.grouped(100).map(x => pickle.dumps(x.toArray))
-
+ }.grouped(100).map(x => {
+ pythonNumBatchesSent += 1
+ pickle.dumps(x.toArray)
+ })
// Output iterator for results from Python.
- val outputIterator = new PythonUDFRunner(funcs, PythonEvalType.SQL_BATCHED_UDF, argOffsets)
+ val outputIterator = new PythonUDFRunner(
+ funcs,
+ PythonEvalType.SQL_BATCHED_UDF,
+ argOffsets,
+ pythonExecTime,
+ pythonDataSerializeTime,
+ pythonCodeSerializeTime,
+ pythonCodeSent,
+ pythonDataReceived,
+ pythonDataSent,
+ pythonNumBatchesReceived)
.compute(inputIterator, context.partitionId(), context)
val unpickle = new Unpickler
@@ -94,12 +107,19 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute]
val unpickledBatch = unpickle.loads(pickedResult)
unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala
}.map { result =>
+ pythonNumRowsReceived += 1
+ val startTime = System.nanoTime()
if (udfs.length == 1) {
// fast path for single UDF
mutableRow(0) = fromJava(result)
+ val deltaTime = System.nanoTime() - startTime
+ pythonExecTime += deltaTime
mutableRow
} else {
- fromJava(result).asInstanceOf[InternalRow]
+ val res = fromJava(result).asInstanceOf[InternalRow]
+ val deltaTime = System.nanoTime() - startTime
+ pythonExecTime += deltaTime
+ res
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
index e3d8a943d8cf..80ea20db1178 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
@@ -27,6 +27,7 @@ import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions, PythonRDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.arrow.ArrowWriter
+import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.ArrowUtils
@@ -45,7 +46,17 @@ class CoGroupedArrowPythonRunner(
leftSchema: StructType,
rightSchema: StructType,
timeZoneId: String,
- conf: Map[String, String])
+ conf: Map[String, String],
+ val pythonExecTime: SQLMetric,
+ pythonSerializeTime: SQLMetric,
+ pythonCodeSerializeTime: SQLMetric,
+ pythonCodeSent: SQLMetric,
+ val pythonDataReceived: SQLMetric,
+ pythonDataSent: SQLMetric,
+ val pythonNumRowsReceived: SQLMetric,
+ pythonNumRowsSent: SQLMetric,
+ val pythonNumBatchesReceived: SQLMetric,
+ pythonNumBatchesSent: SQLMetric)
extends BasePythonRunner[
(Iterator[InternalRow], Iterator[InternalRow]), ColumnarBatch](funcs, evalType, argOffsets)
with PythonArrowOutput {
@@ -63,6 +74,8 @@ class CoGroupedArrowPythonRunner(
protected override def writeCommand(dataOut: DataOutputStream): Unit = {
+ val startData = dataOut.size()
+ val startTime = System.nanoTime()
// Write config for the worker as a number of key -> value pairs of strings
dataOut.writeInt(conf.size)
for ((k, v) <- conf) {
@@ -71,11 +84,16 @@ class CoGroupedArrowPythonRunner(
}
PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
+ val deltaTime = System.nanoTime()-startTime
+ pythonCodeSerializeTime += deltaTime
+ val deltaData = dataOut.size() - startData
+ pythonCodeSent += deltaData
}
protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = {
// For each we first send the number of dataframes in each group then send
// first df, then send second df. End of data is marked by sending 0.
+ val startData = dataOut.size()
while (inputIterator.hasNext) {
dataOut.writeInt(2)
val (nextLeft, nextRight) = inputIterator.next()
@@ -83,6 +101,8 @@ class CoGroupedArrowPythonRunner(
writeGroup(nextRight, rightSchema, dataOut, "right")
}
dataOut.writeInt(0)
+ val deltaData = dataOut.size() - startData
+ pythonDataSent += deltaData
}
private def writeGroup(
@@ -101,10 +121,17 @@ class CoGroupedArrowPythonRunner(
writer.start()
while (group.hasNext) {
+ val startTime = System.nanoTime()
arrowWriter.write(group.next())
+ val deltaTime = System.nanoTime() - startTime
+ pythonSerializeTime += deltaTime
}
+
arrowWriter.finish()
writer.writeBatch()
+ val rowCount = root.getRowCount
+ pythonNumBatchesSent += 1
+ pythonNumRowsSent += rowCount
writer.end()
}{
root.close()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPandasExec.scala
index e830ea6b5466..a4d71625bb52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPandasExec.scala
@@ -54,7 +54,7 @@ case class FlatMapCoGroupsInPandasExec(
output: Seq[Attribute],
left: SparkPlan,
right: SparkPlan)
- extends SparkPlan with BinaryExecNode {
+ extends SparkPlan with BinaryExecNode with PythonSQLMetrics {
private val sessionLocalTimeZone = conf.sessionLocalTimeZone
private val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
@@ -97,7 +97,17 @@ case class FlatMapCoGroupsInPandasExec(
StructType.fromAttributes(leftDedup),
StructType.fromAttributes(rightDedup),
sessionLocalTimeZone,
- pythonRunnerConf)
+ pythonRunnerConf,
+ pythonExecTime,
+ pythonDataSerializeTime,
+ pythonCodeSerializeTime,
+ pythonCodeSent,
+ pythonDataReceived,
+ pythonDataSent,
+ pythonNumRowsReceived,
+ pythonNumRowsSent,
+ pythonNumBatchesReceived,
+ pythonNumBatchesSent)
executePython(data, output, runner)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
index 3a3a6022f998..946b5731b460 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
@@ -50,7 +50,7 @@ case class FlatMapGroupsInPandasExec(
func: Expression,
output: Seq[Attribute],
child: SparkPlan)
- extends SparkPlan with UnaryExecNode {
+ extends SparkPlan with UnaryExecNode with PythonSQLMetrics {
private val sessionLocalTimeZone = conf.sessionLocalTimeZone
private val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
@@ -89,7 +89,17 @@ case class FlatMapGroupsInPandasExec(
Array(argOffsets),
StructType.fromAttributes(dedupAttributes),
sessionLocalTimeZone,
- pythonRunnerConf)
+ pythonRunnerConf,
+ pythonExecTime,
+ pythonDataSerializeTime,
+ pythonCodeSerializeTime,
+ pythonCodeSent,
+ pythonDataReceived,
+ pythonDataSent,
+ pythonNumRowsReceived,
+ pythonNumRowsSent,
+ pythonNumBatchesReceived,
+ pythonNumBatchesSent)
executePython(data, output, runner)
}}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala
index 0434710da43a..36e11a69e77c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala
@@ -42,7 +42,7 @@ case class MapInPandasExec(
func: Expression,
output: Seq[Attribute],
child: SparkPlan)
- extends UnaryExecNode {
+ extends UnaryExecNode with PythonSQLMetrics {
private val pandasFunction = func.asInstanceOf[PythonUDF].func
@@ -78,7 +78,17 @@ case class MapInPandasExec(
argOffsets,
StructType(StructField("struct", outputTypes) :: Nil),
sessionLocalTimeZone,
- pythonRunnerConf).compute(batchIter, context.partitionId(), context)
+ pythonRunnerConf,
+ pythonExecTime,
+ pythonDataSerializeTime,
+ pythonCodeSerializeTime,
+ pythonCodeSent,
+ pythonDataReceived,
+ pythonDataSent,
+ pythonNumRowsReceived,
+ pythonNumRowsSent,
+ pythonNumBatchesReceived,
+ pythonNumBatchesSent).compute(batchIter, context.partitionId(), context)
val unsafeProj = UnsafeProjection.create(output, output)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
index 00bab1e9fbf5..52a6c663fca8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
@@ -27,6 +27,7 @@ import org.apache.arrow.vector.ipc.ArrowStreamReader
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.api.python.{BasePythonRunner, SpecialLengths}
+import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector}
@@ -37,6 +38,11 @@ import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, Column
*/
private[python] trait PythonArrowOutput { self: BasePythonRunner[_, ColumnarBatch] =>
+ def pythonDataReceived: SQLMetric
+ def pythonExecTime: SQLMetric
+ def pythonNumRowsReceived: SQLMetric
+ def pythonNumBatchesReceived: SQLMetric
+
protected def newReaderIterator(
stream: DataInputStream,
writerThread: WriterThread,
@@ -73,10 +79,19 @@ private[python] trait PythonArrowOutput { self: BasePythonRunner[_, ColumnarBatc
}
try {
if (reader != null && batchLoaded) {
+ val bytesReadStart = reader.bytesRead()
+ val startTime = System.nanoTime()
batchLoaded = reader.loadNextBatch()
+ val deltaTime = System.nanoTime() - startTime
if (batchLoaded) {
+ pythonExecTime += deltaTime
val batch = new ColumnarBatch(vectors)
- batch.setNumRows(root.getRowCount)
+ val rowCount = root.getRowCount
+ batch.setNumRows(rowCount)
+ val bytesReadEnd = reader.bytesRead()
+ pythonDataReceived += bytesReadEnd - bytesReadStart
+ pythonNumRowsReceived += rowCount
+ pythonNumBatchesReceived += 1
batch
} else {
reader.close(false)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonSQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonSQLMetrics.scala
new file mode 100644
index 000000000000..5c7930700658
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonSQLMetrics.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.SQLMetrics
+
+private[python] trait PythonSQLMetrics { self: SparkPlan =>
+
+ override val metrics = Map(
+ "pythonExecTime" ->
+ SQLMetrics.createNanoTimingMetric(sparkContext, "time spent executing"),
+ "pythonDataSerializeTime" ->
+ SQLMetrics.createNanoTimingMetric(sparkContext, "time spent sending data"),
+ "pythonCodeSerializeTime" -> SQLMetrics.createNanoTimingMetric(sparkContext,
+ "time spent sending code"),
+ "pythonCodeSent" ->
+ SQLMetrics.createSizeMetric(sparkContext, "bytes of code sent"),
+ "pythonDataReceived" ->
+ SQLMetrics.createSizeMetric(sparkContext, "bytes of data returned"),
+ "pythonDataSent" ->
+ SQLMetrics.createSizeMetric(sparkContext, "bytes of data sent"),
+ "pythonNumBatchesReceived" ->
+ SQLMetrics.createMetric(sparkContext, "number of batches returned"),
+ "pythonNumBatchesSent" ->
+ SQLMetrics.createMetric(sparkContext, "number of batches processed"),
+ "pythonNumRowsReceived" ->
+ SQLMetrics.createMetric(sparkContext, "number of rows returned"),
+ "pythonNumRowsSent" ->
+ SQLMetrics.createMetric(sparkContext, "number of rows processed")
+ )
+
+ val pythonNumRowsReceived = longMetric("pythonNumRowsReceived")
+ val pythonNumRowsSent = longMetric("pythonNumRowsSent")
+ val pythonExecTime = longMetric("pythonExecTime")
+ val pythonDataSerializeTime = longMetric("pythonDataSerializeTime")
+ val pythonCodeSerializeTime = longMetric("pythonCodeSerializeTime")
+ val pythonCodeSent = longMetric("pythonCodeSent")
+ val pythonDataReceived = longMetric("pythonDataReceived")
+ val pythonDataSent = longMetric("pythonDataSent")
+ val pythonNumBatchesReceived = longMetric("pythonNumBatchesReceived")
+ val pythonNumBatchesSent = longMetric("pythonNumBatchesSent")
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
index d1109d251c28..af51827c0767 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import org.apache.spark._
import org.apache.spark.api.python._
+import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
/**
@@ -31,7 +32,14 @@ import org.apache.spark.sql.internal.SQLConf
class PythonUDFRunner(
funcs: Seq[ChainedPythonFunctions],
evalType: Int,
- argOffsets: Array[Array[Int]])
+ argOffsets: Array[Array[Int]],
+ pythonExecTime: SQLMetric,
+ pythonDataSerializeTime: SQLMetric,
+ pythonCodeSerializeTime: SQLMetric,
+ pythonCodeSent: SQLMetric,
+ pythonDataReceived: SQLMetric,
+ pythonDataSent: SQLMetric,
+ pythonNumBatchesReceived: SQLMetric)
extends BasePythonRunner[Array[Byte], Array[Byte]](
funcs, evalType, argOffsets) {
@@ -46,12 +54,25 @@ class PythonUDFRunner(
new WriterThread(env, worker, inputIterator, partitionIndex, context) {
protected override def writeCommand(dataOut: DataOutputStream): Unit = {
+ val startData = dataOut.size()
+ val startTime = System.nanoTime()
PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
+ val deltaTime = System.nanoTime()-startTime
+ pythonCodeSerializeTime += deltaTime
+ val deltaData = dataOut.size() - startData
+ pythonCodeSent += deltaData
}
protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = {
+ val startData = dataOut.size()
+ val startTime = System.nanoTime()
PythonRDD.writeIteratorToStream(inputIterator, dataOut)
+ val deltaTime = System.nanoTime() - startTime
dataOut.writeInt(SpecialLengths.END_OF_DATA_SECTION)
+
+ pythonDataSerializeTime += deltaTime
+ val deltaData = dataOut.size() - startData
+ pythonDataSent += deltaData
}
}
}
@@ -73,10 +94,15 @@ class PythonUDFRunner(
throw writerThread.exception.get
}
try {
+ val startTime = System.nanoTime()
stream.readInt() match {
case length if length > 0 =>
val obj = new Array[Byte](length)
stream.readFully(obj)
+ val deltaTime = System.nanoTime() - startTime
+ pythonExecTime += deltaTime
+ pythonDataReceived += length
+ pythonNumBatchesReceived += 1
obj
case 0 => Array.emptyByteArray
case SpecialLengths.TIMING_DATA =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
index 07c0aab1b6b7..fd6bd8f13b54 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
@@ -85,7 +85,7 @@ case class WindowInPandasExec(
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
child: SparkPlan)
- extends WindowExecBase {
+ extends WindowExecBase with PythonSQLMetrics {
override def output: Seq[Attribute] =
child.output ++ windowExpression.map(_.toAttribute)
@@ -391,7 +391,17 @@ case class WindowInPandasExec(
argOffsets,
pythonInputSchema,
sessionLocalTimeZone,
- pythonRunnerConf).compute(pythonInput, context.partitionId(), context)
+ pythonRunnerConf,
+ pythonExecTime,
+ pythonDataSerializeTime,
+ pythonCodeSerializeTime,
+ pythonCodeSent,
+ pythonDataReceived,
+ pythonDataSent,
+ pythonNumRowsReceived,
+ pythonNumRowsSent,
+ pythonNumBatchesReceived,
+ pythonNumBatchesSent).compute(pythonInput, context.partitionId(), context)
val joined = new JoinedRow