Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,12 @@ class ApplyInPandasWithStatePythonRunner(
keySchema: StructType,
outputSchema: StructType,
stateValueSchema: StructType,
pyMetrics: Map[String, SQLMetric],
override val pythonMetrics: Map[String, SQLMetric],
jobArtifactUUID: Option[String])
extends BasePythonRunner[InType, OutType](funcs, evalType, argOffsets, jobArtifactUUID)
with PythonArrowInput[InType]
with PythonArrowOutput[OutType] {

override val pythonMetrics: Option[Map[String, SQLMetric]] = Some(pyMetrics)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All those changes are actually some revert of #44305 because we now support metrics, and no need to make it optional anymore


override val pythonExec: String =
SQLConf.get.pysparkWorkerPythonExecutable.getOrElse(
funcs.head.funcs.head.pythonExec)
Expand Down Expand Up @@ -151,7 +149,7 @@ class ApplyInPandasWithStatePythonRunner(

pandasWriter.finalizeGroup()
val deltaData = dataOut.size() - startData
pythonMetrics.foreach(_("pythonDataSent") += deltaData)
pythonMetrics("pythonDataSent") += deltaData
true
} else {
pandasWriter.finalizeData()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ case class ArrowEvalPythonUDTFExec(
sessionLocalTimeZone,
largeVarTypes,
pythonRunnerConf,
Some(pythonMetrics),
pythonMetrics,
jobArtifactUUID).compute(batchIter, context.partitionId(), context)

columnarBatchIter.map { batch =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ abstract class BaseArrowPythonRunner(
_timeZoneId: String,
protected override val largeVarTypes: Boolean,
protected override val workerConf: Map[String, String],
override val pythonMetrics: Option[Map[String, SQLMetric]],
override val pythonMetrics: Map[String, SQLMetric],
jobArtifactUUID: Option[String])
extends BasePythonRunner[Iterator[InternalRow], ColumnarBatch](
funcs, evalType, argOffsets, jobArtifactUUID)
Expand Down Expand Up @@ -74,7 +74,7 @@ class ArrowPythonRunner(
_timeZoneId: String,
largeVarTypes: Boolean,
workerConf: Map[String, String],
pythonMetrics: Option[Map[String, SQLMetric]],
pythonMetrics: Map[String, SQLMetric],
jobArtifactUUID: Option[String])
extends BaseArrowPythonRunner(
funcs, evalType, argOffsets, _schema, _timeZoneId, largeVarTypes, workerConf,
Expand All @@ -100,7 +100,7 @@ class ArrowPythonWithNamedArgumentRunner(
jobArtifactUUID: Option[String])
extends BaseArrowPythonRunner(
funcs, evalType, argMetas.map(_.map(_.offset)), _schema, _timeZoneId, largeVarTypes, workerConf,
Some(pythonMetrics), jobArtifactUUID) {
pythonMetrics, jobArtifactUUID) {

override protected def writeUDF(dataOut: DataOutputStream): Unit =
PythonUDFRunner.writeUDFs(dataOut, funcs, argMetas)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ArrowPythonUDTFRunner(
protected override val timeZoneId: String,
protected override val largeVarTypes: Boolean,
protected override val workerConf: Map[String, String],
override val pythonMetrics: Option[Map[String, SQLMetric]],
override val pythonMetrics: Map[String, SQLMetric],
jobArtifactUUID: Option[String])
extends BasePythonRunner[Iterator[InternalRow], ColumnarBatch](
Seq(ChainedPythonFunctions(Seq(udtf.func))), evalType, Array(argMetas.map(_.offset)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,13 @@ class CoGroupedArrowPythonRunner(
rightSchema: StructType,
timeZoneId: String,
conf: Map[String, String],
pyMetrics: Map[String, SQLMetric],
override val pythonMetrics: Map[String, SQLMetric],
jobArtifactUUID: Option[String])
extends BasePythonRunner[
(Iterator[InternalRow], Iterator[InternalRow]), ColumnarBatch](
funcs, evalType, argOffsets, jobArtifactUUID)
with BasicPythonArrowOutput {

override val pythonMetrics: Option[Map[String, SQLMetric]] = Some(pyMetrics)

override val pythonExec: String =
SQLConf.get.pysparkWorkerPythonExecutable.getOrElse(
funcs.head.funcs.head.pythonExec)
Expand Down Expand Up @@ -95,7 +93,7 @@ class CoGroupedArrowPythonRunner(
writeGroup(nextRight, rightSchema, dataOut, "right")

val deltaData = dataOut.size() - startData
pythonMetrics.foreach(_("pythonDataSent") += deltaData)
pythonMetrics("pythonDataSent") += deltaData
true
} else {
dataOut.writeInt(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ trait FlatMapGroupsInBatchExec extends SparkPlan with UnaryExecNode with PythonS
sessionLocalTimeZone,
largeVarTypes,
pythonRunnerConf,
Some(pythonMetrics),
pythonMetrics,
jobArtifactUUID)

executePython(data, output, runner)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class MapInBatchEvaluatorFactory(
sessionLocalTimeZone: String,
largeVarTypes: Boolean,
pythonRunnerConf: Map[String, String],
pythonMetrics: Option[Map[String, SQLMetric]],
val pythonMetrics: Map[String, SQLMetric],
jobArtifactUUID: Option[String])
extends PartitionEvaluatorFactory[InternalRow, InternalRow] {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ trait MapInBatchExec extends UnaryExecNode with PythonSQLMetrics {
conf.sessionLocalTimeZone,
conf.arrowUseLargeVarTypes,
pythonRunnerConf,
Some(pythonMetrics),
pythonMetrics,
jobArtifactUUID)

if (isBarrier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private[python] trait PythonArrowInput[IN] { self: BasePythonRunner[IN, _] =>

protected val largeVarTypes: Boolean

protected def pythonMetrics: Option[Map[String, SQLMetric]]
protected def pythonMetrics: Map[String, SQLMetric]

protected def writeNextInputToArrowStream(
root: VectorSchemaRoot,
Expand Down Expand Up @@ -132,7 +132,7 @@ private[python] trait BasicPythonArrowInput extends PythonArrowInput[Iterator[In
writer.writeBatch()
arrowWriter.reset()
val deltaData = dataOut.size() - startData
pythonMetrics.foreach(_("pythonDataSent") += deltaData)
pythonMetrics("pythonDataSent") += deltaData
true
} else {
super[PythonArrowInput].close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, Column
*/
private[python] trait PythonArrowOutput[OUT <: AnyRef] { self: BasePythonRunner[_, OUT] =>

protected def pythonMetrics: Option[Map[String, SQLMetric]]
protected def pythonMetrics: Map[String, SQLMetric]

protected def handleMetadataAfterExec(stream: DataInputStream): Unit = { }

Expand Down Expand Up @@ -91,8 +91,8 @@ private[python] trait PythonArrowOutput[OUT <: AnyRef] { self: BasePythonRunner[
val rowCount = root.getRowCount
batch.setNumRows(root.getRowCount)
val bytesReadEnd = reader.bytesRead()
pythonMetrics.foreach(_("pythonNumRowsReceived") += rowCount)
pythonMetrics.foreach(_("pythonDataReceived") += bytesReadEnd - bytesReadStart)
pythonMetrics("pythonNumRowsReceived") += rowCount
pythonMetrics("pythonDataReceived") += bytesReadEnd - bytesReadStart
deserializeColumnarBatch(batch, schema)
} else {
reader.close(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,29 @@
package org.apache.spark.sql.execution.python

import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}

private[sql] trait PythonSQLMetrics { self: SparkPlan =>
trait PythonSQLMetrics { self: SparkPlan =>
protected val pythonMetrics: Map[String, SQLMetric] = {
PythonSQLMetrics.pythonSizeMetricsDesc.map { case (k, v) =>
k -> SQLMetrics.createSizeMetric(sparkContext, v)
} ++ PythonSQLMetrics.pythonOtherMetricsDesc.map { case (k, v) =>
k -> SQLMetrics.createMetric(sparkContext, v)
}
}

val pythonMetrics = Map(
"pythonDataSent" -> SQLMetrics.createSizeMetric(sparkContext,
"data sent to Python workers"),
"pythonDataReceived" -> SQLMetrics.createSizeMetric(sparkContext,
"data returned from Python workers"),
"pythonNumRowsReceived" -> SQLMetrics.createMetric(sparkContext,
"number of output rows")
)
override lazy val metrics: Map[String, SQLMetric] = pythonMetrics
}

object PythonSQLMetrics {
val pythonSizeMetricsDesc: Map[String, String] = {
Map(
"pythonDataSent" -> "data sent to Python workers",
"pythonDataReceived" -> "data returned from Python workers"
)
}

override lazy val metrics = pythonMetrics
val pythonOtherMetricsDesc: Map[String, String] = {
Map("pythonNumRowsReceived" -> "number of output rows")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability, TableProvider}
import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BinaryType, DataType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -101,8 +103,10 @@ class PythonTableProvider extends TableProvider {
new PythonPartitionReaderFactory(
source, readerFunc, outputSchema, jobArtifactUUID)
}

override def description: String = "(Python)"

override def supportedCustomMetrics(): Array[CustomMetric] =
source.createPythonMetrics()
}
}

Expand All @@ -124,21 +128,78 @@ class PythonPartitionReaderFactory(

override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
new PartitionReader[InternalRow] {
private val outputIter = source.createPartitionReadIteratorInPython(
partition.asInstanceOf[PythonInputPartition],
pickledReadFunc,
outputSchema,
jobArtifactUUID)
// Dummy SQLMetrics. The result is manually reported via DSv2 interface
// via passing the value to `CustomTaskMetric`. Note that `pythonOtherMetricsDesc`
// is not used when it is reported. It is to reuse existing Python runner.
// See also `UserDefinedPythonDataSource.createPythonMetrics`.
private[this] val metrics: Map[String, SQLMetric] = {
PythonSQLMetrics.pythonSizeMetricsDesc.keys
.map(_ -> new SQLMetric("size", -1)).toMap ++
Comment on lines +131 to +137
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure I understand this part. Are these size metrics automatically updated by the DSv2 framework?

Also, is it possible to support user-defined Python metrics in the future?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is a little tricky. In the DS v2 framework, the reader (runs at the executor side) needs to update and report the current value of its metrics. To reuse existing code, here we use SQLMetrics and its value will be updated within createMapInBatchEvaluatorFactory (which calls MapInBatchEvaluatorFactory). Then we take the value from SQLMetric and report it via the DS v2 framework in currentMetricsValues

PythonSQLMetrics.pythonOtherMetricsDesc.keys
.map(_ -> new SQLMetric("sum", -1)).toMap
}

private val outputIter = {
val evaluatorFactory = source.createMapInBatchEvaluatorFactory(
pickledReadFunc,
outputSchema,
metrics,
jobArtifactUUID)

val part = partition.asInstanceOf[PythonInputPartition]
evaluatorFactory.createEvaluator().eval(
part.index, Iterator.single(InternalRow(part.pickedPartition)))
}

override def next(): Boolean = outputIter.hasNext

override def get(): InternalRow = outputIter.next()

override def close(): Unit = {}

override def currentMetricsValues(): Array[CustomTaskMetric] = {
source.createPythonTaskMetrics(metrics.map { case (k, v) => k -> v.value})
}
}
}
}

class PythonCustomMetric extends CustomMetric {
private var initName: String = _
private var initDescription: String = _
def initialize(n: String, d: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the caller side always call initialize right after instantiating PythonCustomMetric, shall we just add constructor with parameters?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't because it requires to have 0-argument constructor:

logWarning(s"Unable to load custom metric object for class `$className`. " +
"Please make sure that the custom metric class is in the classpath and " +
"it has 0-arg constructor.", e)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, actually I think I can just provide multiple constructors.

initName = n
initDescription = d
}
override def name(): String = {
assert(initName != null)
initName
}
override def description(): String = {
assert(initDescription != null)
initDescription
}
override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
SQLMetrics.stringValue("size", taskMetrics, Array.empty[Long])
}
}

class PythonCustomTaskMetric extends CustomTaskMetric {
private var initName: String = _
private var initValue: Long = -1L
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's currentValue? We always create new instances of PythonCustomTaskMetric when reporting the DS v2 metrics. We can optimize this part though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can rename it. Actually we should just say metricValue to make it less confusing.

def initialize(n: String, v: Long): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

initName = n
initValue = v
}
override def name(): String = {
assert(initName != null)
initName
}
override def value(): Long = {
initValue
}
}

/**
* A user-defined Python data source. This is used by the Python API.
* Defines the interation between Python and JVM.
Expand Down Expand Up @@ -179,11 +240,11 @@ case class UserDefinedPythonDataSource(dataSourceCls: PythonFunction) {
/**
* (Executor-side) Create an iterator that reads the input partitions.
*/
def createPartitionReadIteratorInPython(
partition: PythonInputPartition,
def createMapInBatchEvaluatorFactory(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we plan to reuse this method further? If not we can make it return Iterator[InternalRow] to simplify the caller code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

pickledReadFunc: Array[Byte],
outputSchema: StructType,
jobArtifactUUID: Option[String]): Iterator[InternalRow] = {
metrics: Map[String, SQLMetric],
jobArtifactUUID: Option[String]): MapInBatchEvaluatorFactory = {
val readerFunc = createPythonFunction(pickledReadFunc)

val pythonEvalType = PythonEvalType.SQL_MAP_ARROW_ITER_UDF
Expand All @@ -199,7 +260,7 @@ case class UserDefinedPythonDataSource(dataSourceCls: PythonFunction) {
val conf = SQLConf.get

val pythonRunnerConf = ArrowPythonRunner.getPythonRunnerConfMap(conf)
val evaluatorFactory = new MapInBatchEvaluatorFactory(
new MapInBatchEvaluatorFactory(
toAttributes(outputSchema),
Seq(ChainedPythonFunctions(Seq(pythonUDF.func))),
inputSchema,
Expand All @@ -208,11 +269,26 @@ case class UserDefinedPythonDataSource(dataSourceCls: PythonFunction) {
conf.sessionLocalTimeZone,
conf.arrowUseLargeVarTypes,
pythonRunnerConf,
None,
metrics,
jobArtifactUUID)
}

def createPythonMetrics(): Array[CustomMetric] = {
// Do not add other metrics such as number of rows,
// that is already included via DSv2.
PythonSQLMetrics.pythonSizeMetricsDesc.map { case (k, v) =>
val m = new PythonCustomMetric()
m.initialize(k, v)
m
}.toArray
}

evaluatorFactory.createEvaluator().eval(
partition.index, Iterator.single(InternalRow(partition.pickedPartition)))
def createPythonTaskMetrics(taskMetrics: Map[String, Long]): Array[CustomTaskMetric] = {
taskMetrics.map { case (k, v) =>
val m = new PythonCustomTaskMetric()
m.initialize(k, v)
m
}.toArray
}

private def createPythonFunction(pickledFunc: Array[Byte]): PythonFunction = {
Expand Down
Loading