Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ def __hash__(self):
"pyspark.sql.tests.test_pandas_cogrouped_map",
"pyspark.sql.tests.test_pandas_grouped_map",
"pyspark.sql.tests.test_pandas_map",
"pyspark.sql.tests.test_pandas_sqlmetrics",
"pyspark.sql.tests.test_pandas_udf",
"pyspark.sql.tests.test_pandas_udf_grouped_agg",
"pyspark.sql.tests.test_pandas_udf_scalar",
Expand Down
10 changes: 10 additions & 0 deletions docs/web-ui.md
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,16 @@ Here is the list of SQL metrics:
<tr><td> <code>avg hash probe bucket list iters</code> </td><td> the average bucket list iterations per lookup during aggregation </td><td> HashAggregate </td></tr>
<tr><td> <code>data size of build side</code> </td><td> the size of built hash map </td><td> ShuffledHashJoin </td></tr>
<tr><td> <code>time to build hash map</code> </td><td> the time spent on building hash map </td><td> ShuffledHashJoin </td></tr>
<tr><td> <code>time spent executing</code> </td><td> time spent executing the Python UDF and sending the results back to the executors. It can contain a component related to the time spent waiting for data to process, when data sending is slower compared to execution time </td><td> BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas </td></tr>
<tr><td> <code>time spent sending data</code> </td><td> time spent sending data to the Python workers, this is part of the Python UDF execution time </td><td> BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas </td></tr>
<tr><td> <code>time spent sending code</code> </td><td> time spent sending Python code to the Python workers </td><td> BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas </td></tr>
<tr><td> <code>bytes of code sent</code> </td><td> the number of bytes of serialized Python code sent the to the Python workers </td><td> BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas </td></tr>
<tr><td> <code>bytes of data returned</code> </td><td> the number of bytes of serialized data received back from the Python workers </td><td> BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas </td></tr>
<tr><td> <code>bytes of data sent</code> </td><td> the number of bytes of serialized data sent the to the Python workers </td><td> BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas </td></tr>
<tr><td> <code>number of batches returned</code> </td><td> the number of data batches received back from the Python workers </td><td> BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas </td></tr>
<tr><td> <code>number of batches processed</code> </td><td> the number of data batches sent to the Python workers </td><td> BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas </td></tr>
<tr><td> <code>number of rows returned</code> </td><td> the number of rows returned by the Python workers </td><td> BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas </td></tr>
<tr><td> <code>number of rows processed</code> </td><td> the number rows sent to the Python workers </td><td> BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas </td></tr>

</table>

Expand Down
66 changes: 66 additions & 0 deletions python/pyspark/sql/tests/test_pandas_sqlmetrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import unittest

from pyspark.sql.functions import pandas_udf
from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \
pandas_requirement_message, pyarrow_requirement_message


@unittest.skipIf(
not have_pandas or not have_pyarrow,
pandas_requirement_message or pyarrow_requirement_message) # type: ignore[arg-type]
class PandasSQLMetrics(ReusedSQLTestCase):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add tests in a Scala side (e.g., SQLMetricsSuite), too?

Copy link
Contributor Author

@LucaCanali LucaCanali Jan 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I am now sure on how to do a Scala side test for Python UDF though.


def test_pandas_sql_metrics_basic(self):

PythonSQLMetrics = [
"time spent executing",
"time spent sending data",
"time spent sending code",
"bytes of code sent",
"bytes of data returned",
"bytes of data sent",
"number of batches returned",
"number of batches processed",
"number of rows returned",
"number of rows processed"
]

@pandas_udf("long")
def test_pandas(col1):
return col1 * col1

res = self.spark.range(10).select(test_pandas("id")).collect()

statusStore = self.spark._jsparkSession.sharedState().statusStore()
executionMetrics = statusStore.execution(0).get().metrics().mkString()

for metric in PythonSQLMetrics:
self.assertIn(metric, executionMetrics)


if __name__ == "__main__":
from pyspark.sql.tests.test_pandas_sqlmetrics import * # noqa: F401

try:
import xmlrunner # type: ignore[import]
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ case class AggregateInPandasExec(
udfExpressions: Seq[PythonUDF],
resultExpressions: Seq[NamedExpression],
child: SparkPlan)
extends UnaryExecNode {
extends UnaryExecNode with PythonSQLMetrics {

override val output: Seq[Attribute] = resultExpressions.map(_.toAttribute)

Expand Down Expand Up @@ -162,7 +162,17 @@ case class AggregateInPandasExec(
argOffsets,
aggInputSchema,
sessionLocalTimeZone,
pythonRunnerConf).compute(projectedRowIter, context.partitionId(), context)
pythonRunnerConf,
pythonExecTime,
pythonDataSerializeTime,
pythonCodeSerializeTime,
pythonCodeSent,
pythonDataReceived,
pythonDataSent,
pythonNumRowsReceived,
pythonNumRowsSent,
pythonNumBatchesReceived,
pythonNumBatchesSent).compute(projectedRowIter, context.partitionId(), context)

val joinedAttributes =
groupingExpressions.map(_.toAttribute) ++ udfExpressions.map(_.resultAttribute)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private[spark] class BatchIterator[T](iter: Iterator[T], batchSize: Int)
*/
case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan,
evalType: Int)
extends EvalPythonExec {
extends EvalPythonExec with PythonSQLMetrics {

private val batchSize = conf.arrowMaxRecordsPerBatch
private val sessionLocalTimeZone = conf.sessionLocalTimeZone
Expand All @@ -85,7 +85,17 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute]
argOffsets,
schema,
sessionLocalTimeZone,
pythonRunnerConf).compute(batchIter, context.partitionId(), context)
pythonRunnerConf,
pythonExecTime,
pythonDataSerializeTime,
pythonCodeSerializeTime,
pythonCodeSent,
pythonDataReceived,
pythonDataSent,
pythonNumRowsReceived,
pythonNumRowsSent,
pythonNumBatchesReceived,
pythonNumBatchesSent).compute(batchIter, context.partitionId(), context)

columnarBatchIter.flatMap { batch =>
val actualDataTypes = (0 until batch.numCols()).map(i => batch.column(i).dataType())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark._
import org.apache.spark.api.python._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.arrow.ArrowWriter
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ArrowUtils
Expand All @@ -42,7 +43,17 @@ class ArrowPythonRunner(
argOffsets: Array[Array[Int]],
schema: StructType,
timeZoneId: String,
conf: Map[String, String])
conf: Map[String, String],
val pythonExecTime: SQLMetric,
pythonSerializeTime: SQLMetric,
pythonCodeSerializeTime: SQLMetric,
pythonCodeSent: SQLMetric,
val pythonDataReceived: SQLMetric,
pythonDataSent: SQLMetric,
val pythonNumRowsReceived: SQLMetric,
pythonNumRowsSent: SQLMetric,
val pythonNumBatchesReceived: SQLMetric,
pythonNumBatchesSent: SQLMetric)
extends BasePythonRunner[Iterator[InternalRow], ColumnarBatch](funcs, evalType, argOffsets)
with PythonArrowOutput {

Expand All @@ -64,6 +75,8 @@ class ArrowPythonRunner(

protected override def writeCommand(dataOut: DataOutputStream): Unit = {

val startTime = System.nanoTime()
val startData = dataOut.size()
// Write config for the worker as a number of key -> value pairs of strings
dataOut.writeInt(conf.size)
for ((k, v) <- conf) {
Expand All @@ -72,6 +85,10 @@ class ArrowPythonRunner(
}

PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
val deltaTime = System.nanoTime() - startTime
pythonCodeSerializeTime += deltaTime
val deltaData = dataOut.size() - startData
pythonCodeSent += deltaData
}

protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = {
Expand All @@ -85,17 +102,27 @@ class ArrowPythonRunner(
val writer = new ArrowStreamWriter(root, null, dataOut)
writer.start()

val startData = dataOut.size()
while (inputIterator.hasNext) {
val nextBatch = inputIterator.next()

while (nextBatch.hasNext) {
val startTime = System.nanoTime()
arrowWriter.write(nextBatch.next())
val deltaTime = System.nanoTime() - startTime
pythonSerializeTime += deltaTime
}

arrowWriter.finish()
writer.writeBatch()

val rowCount = root.getRowCount
pythonNumBatchesSent += 1
pythonNumRowsSent += rowCount
arrowWriter.reset()
}
val deltaData = dataOut.size() - startData
pythonDataSent += deltaData
// end writes footer to the output stream and doesn't clean any resources.
// It could throw exception if the output stream is closed, so it should be
// in the try block.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.types.{StructField, StructType}
* A physical plan that evaluates a [[PythonUDF]]
*/
case class BatchEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan)
extends EvalPythonExec {
extends EvalPythonExec with PythonSQLMetrics {

protected override def evaluate(
funcs: Seq[ChainedPythonFunctions],
Expand Down Expand Up @@ -61,6 +61,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute]
// Input iterator to Python: input rows are grouped so we send them in batches to Python.
// For each row, add it to the queue.
val inputIterator = iter.map { row =>
pythonNumRowsSent += 1
if (needConversion) {
EvaluatePython.toJava(row, schema)
} else {
Expand All @@ -74,10 +75,22 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute]
}
fields
}
}.grouped(100).map(x => pickle.dumps(x.toArray))

}.grouped(100).map(x => {
pythonNumBatchesSent += 1
pickle.dumps(x.toArray)
})
// Output iterator for results from Python.
val outputIterator = new PythonUDFRunner(funcs, PythonEvalType.SQL_BATCHED_UDF, argOffsets)
val outputIterator = new PythonUDFRunner(
funcs,
PythonEvalType.SQL_BATCHED_UDF,
argOffsets,
pythonExecTime,
pythonDataSerializeTime,
pythonCodeSerializeTime,
pythonCodeSent,
pythonDataReceived,
pythonDataSent,
pythonNumBatchesReceived)
.compute(inputIterator, context.partitionId(), context)

val unpickle = new Unpickler
Expand All @@ -94,12 +107,19 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute]
val unpickledBatch = unpickle.loads(pickedResult)
unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala
}.map { result =>
pythonNumRowsReceived += 1
val startTime = System.nanoTime()
if (udfs.length == 1) {
// fast path for single UDF
mutableRow(0) = fromJava(result)
val deltaTime = System.nanoTime() - startTime
pythonExecTime += deltaTime
mutableRow
} else {
fromJava(result).asInstanceOf[InternalRow]
val res = fromJava(result).asInstanceOf[InternalRow]
val deltaTime = System.nanoTime() - startTime
pythonExecTime += deltaTime
Comment on lines +111 to +121
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really calculating the exec time? Seems like it's only calculating the time for fromJava(result)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, this is just a component, actually small and could be omitted. The most important part of the execution for BatchEvalPythonExec time is measured in PythonUDFRunner.

res
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions, PythonRDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.arrow.ArrowWriter
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.ArrowUtils
Expand All @@ -45,7 +46,17 @@ class CoGroupedArrowPythonRunner(
leftSchema: StructType,
rightSchema: StructType,
timeZoneId: String,
conf: Map[String, String])
conf: Map[String, String],
val pythonExecTime: SQLMetric,
pythonSerializeTime: SQLMetric,
pythonCodeSerializeTime: SQLMetric,
pythonCodeSent: SQLMetric,
val pythonDataReceived: SQLMetric,
pythonDataSent: SQLMetric,
val pythonNumRowsReceived: SQLMetric,
pythonNumRowsSent: SQLMetric,
val pythonNumBatchesReceived: SQLMetric,
pythonNumBatchesSent: SQLMetric)
extends BasePythonRunner[
(Iterator[InternalRow], Iterator[InternalRow]), ColumnarBatch](funcs, evalType, argOffsets)
with PythonArrowOutput {
Expand All @@ -63,6 +74,8 @@ class CoGroupedArrowPythonRunner(

protected override def writeCommand(dataOut: DataOutputStream): Unit = {

val startData = dataOut.size()
val startTime = System.nanoTime()
// Write config for the worker as a number of key -> value pairs of strings
dataOut.writeInt(conf.size)
for ((k, v) <- conf) {
Expand All @@ -71,18 +84,25 @@ class CoGroupedArrowPythonRunner(
}

PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
val deltaTime = System.nanoTime()-startTime
pythonCodeSerializeTime += deltaTime
val deltaData = dataOut.size() - startData
pythonCodeSent += deltaData
}

protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = {
// For each we first send the number of dataframes in each group then send
// first df, then send second df. End of data is marked by sending 0.
val startData = dataOut.size()
while (inputIterator.hasNext) {
dataOut.writeInt(2)
val (nextLeft, nextRight) = inputIterator.next()
writeGroup(nextLeft, leftSchema, dataOut, "left")
writeGroup(nextRight, rightSchema, dataOut, "right")
}
dataOut.writeInt(0)
val deltaData = dataOut.size() - startData
pythonDataSent += deltaData
}

private def writeGroup(
Expand All @@ -101,10 +121,17 @@ class CoGroupedArrowPythonRunner(
writer.start()

while (group.hasNext) {
val startTime = System.nanoTime()
arrowWriter.write(group.next())
val deltaTime = System.nanoTime() - startTime
pythonSerializeTime += deltaTime
}

arrowWriter.finish()
writer.writeBatch()
val rowCount = root.getRowCount
pythonNumBatchesSent += 1
pythonNumRowsSent += rowCount
writer.end()
}{
root.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ case class FlatMapCoGroupsInPandasExec(
output: Seq[Attribute],
left: SparkPlan,
right: SparkPlan)
extends SparkPlan with BinaryExecNode {
extends SparkPlan with BinaryExecNode with PythonSQLMetrics {

private val sessionLocalTimeZone = conf.sessionLocalTimeZone
private val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
Expand Down Expand Up @@ -97,7 +97,17 @@ case class FlatMapCoGroupsInPandasExec(
StructType.fromAttributes(leftDedup),
StructType.fromAttributes(rightDedup),
sessionLocalTimeZone,
pythonRunnerConf)
pythonRunnerConf,
pythonExecTime,
pythonDataSerializeTime,
pythonCodeSerializeTime,
pythonCodeSent,
pythonDataReceived,
pythonDataSent,
pythonNumRowsReceived,
pythonNumRowsSent,
pythonNumBatchesReceived,
pythonNumBatchesSent)

executePython(data, output, runner)
}
Expand Down
Loading