diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index 97402f5a58e..63a5084b679 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -490,7 +490,7 @@ private static StructType structFromTypes(DataType[] format) { return new StructType(fields); } - private static StructType structFromAttributes(List format) { + public static StructType structFromAttributes(List format) { StructField[] fields = new StructField[format.size()]; int i = 0; for (Attribute attribute: format) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchUtils.scala index bf2d2474dfe..92588885be0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchUtils.scala @@ -18,11 +18,15 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ArrayBuffer +import ai.rapids.cudf.Table +import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq + import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, StructType} /** * Utility class with methods for calculating various metrics about GPU memory usage - * prior to allocation. + * prior to allocation, along with some operations with batches. */ object GpuBatchUtils { @@ -175,4 +179,37 @@ object GpuBatchUtils { bytes } } + + /** + * Concatenate the input batches into a single one. + * The caller is responsible for closing the returned batch. + * + * @param spillBatches the batches to be concatenated, will be closed after the call + * returns. + * @return the concatenated SpillableColumnarBatch or None if the input is empty. + */ + def concatSpillBatchesAndClose( + spillBatches: Seq[SpillableColumnarBatch]): Option[SpillableColumnarBatch] = { + val retBatch = if (spillBatches.length >= 2) { + // two or more batches, concatenate them + val (concatTable, types) = RmmRapidsRetryIterator.withRetryNoSplit(spillBatches) { _ => + withResource(spillBatches.safeMap(_.getColumnarBatch())) { batches => + val batchTypes = GpuColumnVector.extractTypes(batches.head) + withResource(batches.safeMap(GpuColumnVector.from)) { tables => + (Table.concatenate(tables: _*), batchTypes) + } + } + } + // Make the concatenated table spillable. + withResource(concatTable) { _ => + SpillableColumnarBatch(GpuColumnVector.from(concatTable, types), + SpillPriorities.ACTIVE_BATCHING_PRIORITY) + } + } else if (spillBatches.length == 1) { + // only one batch + spillBatches.head + } else null + + Option(retBatch) + } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubPartitionHashJoin.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubPartitionHashJoin.scala index 33ca8c906f6..b4582b3e0d5 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubPartitionHashJoin.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubPartitionHashJoin.scala @@ -18,8 +18,7 @@ package org.apache.spark.sql.rapids.execution import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.Table -import com.nvidia.spark.rapids.{GpuColumnVector, GpuExpression, GpuHashPartitioningBase, GpuMetric, RmmRapidsRetryIterator, SpillableColumnarBatch, SpillPriorities, TaskAutoCloseableResource} +import com.nvidia.spark.rapids.{GpuBatchUtils, GpuColumnVector, GpuExpression, GpuHashPartitioningBase, GpuMetric, SpillableColumnarBatch, SpillPriorities, TaskAutoCloseableResource} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.RapidsPluginImplicits._ @@ -41,27 +40,7 @@ object GpuSubPartitionHashJoin { */ def concatSpillBatchesAndClose( spillBatches: Seq[SpillableColumnarBatch]): Option[SpillableColumnarBatch] = { - val retBatch = if (spillBatches.length >= 2) { - // two or more batches, concatenate them - val (concatTable, types) = RmmRapidsRetryIterator.withRetryNoSplit(spillBatches) { _ => - withResource(spillBatches.safeMap(_.getColumnarBatch())) { batches => - val batchTypes = GpuColumnVector.extractTypes(batches.head) - withResource(batches.safeMap(GpuColumnVector.from)) { tables => - (Table.concatenate(tables: _*), batchTypes) - } - } - } - // Make the concatenated table spillable. - withResource(concatTable) { _ => - SpillableColumnarBatch(GpuColumnVector.from(concatTable, types), - SpillPriorities.ACTIVE_BATCHING_PRIORITY) - } - } else if (spillBatches.length == 1) { - // only one batch - spillBatches.head - } else null - - Option(retBatch) + GpuBatchUtils.concatSpillBatchesAndClose(spillBatches) } /** diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/BatchGroupUtils.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/BatchGroupUtils.scala index 132c46b9ba7..87bcbab785b 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/BatchGroupUtils.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/BatchGroupUtils.scala @@ -29,8 +29,7 @@ import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.rapids.execution.GpuSubPartitionHashJoin -import org.apache.spark.sql.rapids.execution.python.shims.GpuPythonArrowOutput +import org.apache.spark.sql.rapids.execution.python.shims.GpuBasePythonRunner import org.apache.spark.sql.rapids.shims.DataTypeUtilsShim import org.apache.spark.sql.vectorized.ColumnarBatch @@ -197,7 +196,7 @@ private[python] object BatchGroupUtils { def executePython[IN]( pyInputIterator: Iterator[IN], output: Seq[Attribute], - pyRunner: GpuPythonRunnerBase[IN], + pyRunner: GpuBasePythonRunner[IN], outputRows: GpuMetric, outputBatches: GpuMetric): Iterator[ColumnarBatch] = { val context = TaskContext.get() @@ -396,7 +395,7 @@ private[python] object BatchGroupedIterator { class CombiningIterator( inputBatchQueue: BatchQueue, pythonOutputIter: Iterator[ColumnarBatch], - pythonArrowReader: GpuPythonArrowOutput, + pythonArrowReader: GpuArrowOutput, numOutputRows: GpuMetric, numOutputBatches: GpuMetric) extends Iterator[ColumnarBatch] { @@ -456,7 +455,7 @@ class CombiningIterator( pendingInput = Some(second) } - val ret = GpuSubPartitionHashJoin.concatSpillBatchesAndClose(buf.toSeq) + val ret = GpuBatchUtils.concatSpillBatchesAndClose(buf.toSeq) // "ret" should be non empty because we checked the buf is not empty ahead. withResource(ret.get) { concatedScb => concatedScb.getColumnarBatch() @@ -596,3 +595,4 @@ class CoGroupedIterator( keyOrdering.compare(leftKeyRow, rightKeyRow) } } + diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowPythonRunner.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowPythonRunner.scala deleted file mode 100644 index 7eb1803bf17..00000000000 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowPythonRunner.scala +++ /dev/null @@ -1,273 +0,0 @@ -/* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.rapids.execution.python - -import java.io.{DataInputStream, DataOutputStream} - -import ai.rapids.cudf._ -import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.Arm.withResource -import org.apache.arrow.vector.VectorSchemaRoot -import org.apache.arrow.vector.ipc.ArrowStreamWriter - -import org.apache.spark.{SparkEnv, TaskContext} -import org.apache.spark.api.python._ -import org.apache.spark.internal.Logging -import org.apache.spark.rapids.shims.api.python.ShimBasePythonRunner -import org.apache.spark.sql.execution.python.PythonUDFRunner -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.rapids.execution.python.shims.GpuPythonArrowOutput -import org.apache.spark.sql.rapids.shims.ArrowUtilsShim -import org.apache.spark.sql.types._ -import org.apache.spark.sql.util.ArrowUtils -import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.Utils - -class BufferToStreamWriter(outputStream: DataOutputStream) extends HostBufferConsumer { - private[this] val tempBuffer = new Array[Byte](128 * 1024) - - override def handleBuffer(hostBuffer: HostMemoryBuffer, length: Long): Unit = { - withResource(hostBuffer) { buffer => - var len = length - var offset: Long = 0 - while(len > 0) { - val toCopy = math.min(tempBuffer.length, len).toInt - buffer.getBytes(tempBuffer, 0, offset, toCopy) - outputStream.write(tempBuffer, 0, toCopy) - len = len - toCopy - offset = offset + toCopy - } - } - } -} - -class StreamToBufferProvider(inputStream: DataInputStream) extends HostBufferProvider { - private[this] val tempBuffer = new Array[Byte](128 * 1024) - - override def readInto(hostBuffer: HostMemoryBuffer, length: Long): Long = { - var amountLeft = length - var totalRead : Long = 0 - while (amountLeft > 0) { - val amountToRead = Math.min(tempBuffer.length, amountLeft).toInt - val amountRead = inputStream.read(tempBuffer, 0, amountToRead) - if (amountRead <= 0) { - // Reached EOF - amountLeft = 0 - } else { - amountLeft -= amountRead - hostBuffer.setBytes(totalRead, tempBuffer, 0, amountRead) - totalRead += amountRead - } - } - totalRead - } -} - -/** - * Base class of GPU Python runners who will be mixed with GpuPythonArrowOutput - * to produce columnar batches. - */ -abstract class GpuPythonRunnerBase[IN]( - funcs: Seq[ChainedPythonFunctions], - evalType: Int, - argOffsets: Array[Array[Int]]) - extends ShimBasePythonRunner[IN, ColumnarBatch](funcs, evalType, argOffsets) - -/** - * Similar to `PythonUDFRunner`, but exchange data with Python worker via Arrow stream. - */ -abstract class GpuArrowPythonRunnerBase( - funcs: Seq[ChainedPythonFunctions], - evalType: Int, - argOffsets: Array[Array[Int]], - pythonInSchema: StructType, - timeZoneId: String, - conf: Map[String, String], - batchSize: Long, - pythonOutSchema: StructType = null) - extends GpuPythonRunnerBase[ColumnarBatch](funcs, evalType, argOffsets) - with GpuPythonArrowOutput { - - def toBatch(table: Table): ColumnarBatch = { - GpuColumnVector.from(table, GpuColumnVector.extractTypes(pythonOutSchema)) - } - - override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize - require( - bufferSize >= 4, - "Pandas execution requires more than 4 bytes. Please set higher buffer. " + - s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.") - - protected class RapidsWriter( - env: SparkEnv, - inputIterator: Iterator[ColumnarBatch], - partitionIndex: Int, - context: TaskContext) extends Logging { - - private[this] var tableWriter: TableWriter = _ - private[this] lazy val isInputNonEmpty = inputIterator.nonEmpty - - def writeCommand(dataOut: DataOutputStream): Unit = { - // Write config for the worker as a number of key -> value pairs of strings - dataOut.writeInt(conf.size) - for ((k, v) <- conf) { - PythonRDD.writeUTF(k, dataOut) - PythonRDD.writeUTF(v, dataOut) - } - - PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets) - } - - /** - * Write all the batches into stream in one time for two-threaded PythonRunner. - * This will be called only once. - */ - def writeIteratorToStream(dataOut: DataOutputStream): Unit = { - if (isInputNonEmpty) { - initTableWriter(dataOut) - logDebug("GpuPythonRunner starts to write all batches to the stream.") - Utils.tryWithSafeFinally { - while (inputIterator.hasNext) { - writeBatchToStreamAndClose(inputIterator.next()) - } - } { - dataOut.flush() - close() - } - } else { - logDebug("GpuPythonRunner writes nothing to stream because the input is empty.") - writeEmptyIteratorOnCpu(dataOut) - // The iterator can grab the semaphore even on an empty batch - GpuSemaphore.releaseIfNecessary(TaskContext.get()) - } - logDebug("GpuPythonRunner writing is done.") - } - - /** - * Write one batch each time for the singled-threaded PythonRunner. - * This will be called multiple times when returning a true. - * See https://issues.apache.org/jira/browse/SPARK-44705 - */ - def writeNextInputToStream(dataOut: DataOutputStream): Boolean = { - if (isInputNonEmpty) { - initTableWriter(dataOut) - try { - if (inputIterator.hasNext) { - logDebug("GpuPythonRunner[single-threaded] write a batch to the stream.") - writeBatchToStreamAndClose(inputIterator.next()) - dataOut.flush() - true - } else { // all batches are written, close the writer - logDebug("GpuPythonRunner[single-threaded] writing is done.") - close() - false - } - } catch { - case t: Throwable => - close() - throw t - } - } else { - logDebug("GpuPythonRunner[single-threaded] writes nothing to stream because" + - " the input is empty.") - writeEmptyIteratorOnCpu(dataOut) - // The iterator can grab the semaphore even on an empty batch - GpuSemaphore.releaseIfNecessary(TaskContext.get()) - false - } - } - - private def initTableWriter(dataOut: DataOutputStream): Unit = { - if (tableWriter == null) { - val builder = ArrowIPCWriterOptions.builder() - builder.withMaxChunkSize(batchSize) - builder.withCallback((table: Table) => { - table.close() - GpuSemaphore.releaseIfNecessary(TaskContext.get()) - }) - // Flatten the names of nested struct columns, required by cudf arrow IPC writer. - GpuPythonRunnerUtils.flattenNames(pythonInSchema).foreach { case (name, nullable) => - if (nullable) { - builder.withColumnNames(name) - } else { - builder.withNotNullableColumnNames(name) - } - } - tableWriter = - Table.writeArrowIPCChunked(builder.build(), new BufferToStreamWriter(dataOut)) - } - } - - private def writeBatchToStreamAndClose(batch: ColumnarBatch): Unit = { - val table = withResource(batch) { nextBatch => - GpuColumnVector.from(nextBatch) - } - withResource(new NvtxRange("write python batch", NvtxColor.DARK_GREEN)) { _ => - // The callback will handle closing table and releasing the semaphore - tableWriter.write(table) - } - } - - private def close(): Unit = { - if (tableWriter != null) { - tableWriter.close() - tableWriter = null - } - } - - private def writeEmptyIteratorOnCpu(dataOut: DataOutputStream): Unit = { - // For the case that partition is empty. - // In this case CPU will still send the schema to Python workers by calling - // the "start" API of the Java Arrow writer, but GPU will send out nothing, - // leading to the IPC error. And it is not easy to do as what Spark does on - // GPU, because the C++ Arrow writer used by GPU will only send out the schema - // iff there is some data. Besides, it does not expose a "start" API to do this. - // So here we leverage the Java Arrow writer to do similar things as Spark. - // It is OK because sending out schema has nothing to do with GPU. - // most code is copied from Spark - val arrowSchema = ArrowUtilsShim.toArrowSchema(pythonInSchema, timeZoneId) - val allocator = ArrowUtils.rootAllocator.newChildAllocator( - s"stdout writer for empty partition", 0, Long.MaxValue) - val root = VectorSchemaRoot.create(arrowSchema, allocator) - - Utils.tryWithSafeFinally { - val writer = new ArrowStreamWriter(root, null, dataOut) - writer.start() - // No data to write - writer.end() - } { - root.close() - allocator.close() - } - } - } -} - -object GpuPythonRunnerUtils { - def flattenNames(d: DataType, nullable: Boolean = true): Seq[(String, Boolean)] = - d match { - case s: StructType => - s.flatMap(sf => Seq((sf.name, sf.nullable)) ++ flattenNames(sf.dataType, sf.nullable)) - case m: MapType => - flattenNames(m.keyType, nullable) ++ flattenNames(m.valueType, nullable) - case a: ArrayType => flattenNames(a.elementType, nullable) - case _ => Nil - } -} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowReader.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowReader.scala new file mode 100644 index 00000000000..b31b5de331a --- /dev/null +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowReader.scala @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids.execution.python + +import java.io.DataInputStream + +import ai.rapids.cudf.{ArrowIPCOptions, HostBufferProvider, HostMemoryBuffer, NvtxColor, NvtxRange, StreamedTableReader, Table} +import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.GpuSemaphore + +import org.apache.spark.TaskContext +import org.apache.spark.sql.vectorized.ColumnarBatch + + +/** A helper class to read arrow data from the input stream to host buffer. */ +private[rapids] class StreamToBufferProvider( + inputStream: DataInputStream) extends HostBufferProvider { + private[this] val tempBuffer = new Array[Byte](128 * 1024) + + override def readInto(hostBuffer: HostMemoryBuffer, length: Long): Long = { + var amountLeft = length + var totalRead : Long = 0 + while (amountLeft > 0) { + val amountToRead = Math.min(tempBuffer.length, amountLeft).toInt + val amountRead = inputStream.read(tempBuffer, 0, amountToRead) + if (amountRead <= 0) { + // Reached EOF + amountLeft = 0 + } else { + amountLeft -= amountRead + hostBuffer.setBytes(totalRead, tempBuffer, 0, amountRead) + totalRead += amountRead + } + } + totalRead + } +} + +trait GpuArrowOutput { + /** + * Update the expected rows number for next reading. + */ + private[rapids] final def setMinReadTargetNumRows(numRows: Int): Unit = { + minReadTargetNumRows = numRows + } + + /** Convert the table received from the Python side to a batch. */ + protected def toBatch(table: Table): ColumnarBatch + + /** + * Default to `Int.MaxValue` to try to read as many as possible. + * Change it by calling `setMinReadTargetNumRows` before a reading. + */ + private var minReadTargetNumRows: Int = Int.MaxValue + + def newGpuArrowReader: GpuArrowReader = new GpuArrowReader + + class GpuArrowReader extends AutoCloseable { + private[this] var tableReader: StreamedTableReader = _ + private[this] var batchLoaded: Boolean = true + + /** Make the reader ready to read data, should be called before reading any batch */ + final def start(stream: DataInputStream): Unit = { + if (tableReader == null) { + val builder = ArrowIPCOptions.builder().withCallback( + () => GpuSemaphore.acquireIfNecessary(TaskContext.get())) + tableReader = Table.readArrowIPCChunked(builder.build(), new StreamToBufferProvider(stream)) + } + } + + final def isStarted: Boolean = tableReader != null + + final def mayHasNext: Boolean = batchLoaded + + final def readNext(): ColumnarBatch = { + val table = + withResource(new NvtxRange("read python batch", NvtxColor.DARK_GREEN)) { _ => + // The GpuSemaphore is acquired in a callback + tableReader.getNextIfAvailable(minReadTargetNumRows) + } + if (table != null) { + batchLoaded = true + withResource(table)(toBatch) + } else { + batchLoaded = false + null + } + } + + def close(): Unit = { + if (tableReader != null) { + tableReader.close() + tableReader = null + } + } + } +} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowWriter.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowWriter.scala new file mode 100644 index 00000000000..14e2a57533d --- /dev/null +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowWriter.scala @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids.execution.python + +import java.io.DataOutputStream + +import ai.rapids.cudf.{ArrowIPCWriterOptions, HostBufferConsumer, HostMemoryBuffer, NvtxColor, NvtxRange, Table, TableWriter} +import com.nvidia.spark.rapids.{GpuColumnVector, GpuSemaphore} +import com.nvidia.spark.rapids.Arm.withResource +import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.ipc.ArrowStreamWriter +import org.apache.arrow.vector.types.pojo.Schema + +import org.apache.spark.TaskContext +import org.apache.spark.api.python.PythonRDD +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} +import org.apache.spark.sql.util.ArrowUtils +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.Utils + +/** A helper class to write arrow data from host buffer to the output stream */ +private[rapids] class BufferToStreamWriter( + outputStream: DataOutputStream) extends HostBufferConsumer { + + private[this] val tempBuffer = new Array[Byte](128 * 1024) + + override def handleBuffer(hostBuffer: HostMemoryBuffer, length: Long): Unit = { + withResource(hostBuffer) { buffer => + var len = length + var offset: Long = 0 + while(len > 0) { + val toCopy = math.min(tempBuffer.length, len).toInt + buffer.getBytes(tempBuffer, 0, offset, toCopy) + outputStream.write(tempBuffer, 0, toCopy) + len = len - toCopy + offset = offset + toCopy + } + } + } +} + +trait GpuArrowWriter extends AutoCloseable { + + protected[this] val inputSchema: StructType + protected[this] val maxBatchSize: Long + + private[this] var tableWriter: TableWriter = _ + private[this] var writerOptions: ArrowIPCWriterOptions = _ + + private def buildWriterOptions: ArrowIPCWriterOptions = { + val builder = ArrowIPCWriterOptions.builder() + builder.withMaxChunkSize(maxBatchSize) + builder.withCallback((table: Table) => { + table.close() + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + }) + // Flatten the names of nested struct columns, required by cudf arrow IPC writer. + GpuArrowWriter.flattenNames(inputSchema).foreach { case (name, nullable) => + if (nullable) { + builder.withColumnNames(name) + } else { + builder.withNotNullableColumnNames(name) + } + } + builder.build() + } + + /** Make the writer ready to write data, should be called before writing any batch */ + final def start(dataOut: DataOutputStream): Unit = { + if (tableWriter == null) { + if (writerOptions == null) { + writerOptions = buildWriterOptions + } + tableWriter = Table.writeArrowIPCChunked(writerOptions, new BufferToStreamWriter(dataOut)) + } + } + + final def writeAndClose(batch: ColumnarBatch): Unit = withResource(batch) { _ => + write(batch) + } + + final def write(batch: ColumnarBatch): Unit = { + withResource(new NvtxRange("write python batch", NvtxColor.DARK_GREEN)) { _ => + // The callback will handle closing table and releasing the semaphore + tableWriter.write(GpuColumnVector.from(batch)) + } + } + + /** This is design to reuse the writer options */ + final def reset(): Unit = { + if (tableWriter != null) { + tableWriter.close() + tableWriter = null + } + } + + def close(): Unit = { + if (tableWriter != null) { + tableWriter.close() + tableWriter = null + writerOptions = null + } + } + +} + +object GpuArrowWriter { + /** + * Create a simple GpuArrowWriter in case you don't want to implement a new one + * by extending from the trait. + */ + def apply(schema: StructType, maxSize: Long): GpuArrowWriter = { + new GpuArrowWriter { + override protected val inputSchema: StructType = schema + override protected val maxBatchSize: Long = maxSize + } + } + + def flattenNames(d: DataType, nullable: Boolean = true): Seq[(String, Boolean)] = + d match { + case s: StructType => + s.flatMap(sf => Seq((sf.name, sf.nullable)) ++ flattenNames(sf.dataType, sf.nullable)) + case m: MapType => + flattenNames(m.keyType, nullable) ++ flattenNames(m.valueType, nullable) + case a: ArrayType => flattenNames(a.elementType, nullable) + case _ => Nil + } +} + +abstract class GpuArrowPythonWriter( + override val inputSchema: StructType, + override val maxBatchSize: Long) extends GpuArrowWriter { + + protected def writeUDFs(dataOut: DataOutputStream): Unit + + def writeCommand(dataOut: DataOutputStream, confs: Map[String, String]): Unit = { + // Write config for the worker as a number of key -> value pairs of strings + dataOut.writeInt(confs.size) + for ((k, v) <- confs) { + PythonRDD.writeUTF(k, dataOut) + PythonRDD.writeUTF(v, dataOut) + } + writeUDFs(dataOut) + } + + /** + * This is for writing the empty partition. + * In this case CPU will still send the schema to Python workers by calling + * the "start" API of the Java Arrow writer, but GPU will send out nothing, + * leading to the IPC error. And it is not easy to do as what Spark does on + * GPU, because the C++ Arrow writer used by GPU will only send out the schema + * iff there is some data. Besides, it does not expose a "start" API to do this. + * So here we leverage the Java Arrow writer to do similar things as Spark. + * It is OK because sending out schema has nothing to do with GPU. + * (Most code is copied from Spark) + */ + final def writeEmptyIteratorOnCpu(dataOut: DataOutputStream, + arrowSchema: Schema): Unit = { + val allocator = ArrowUtils.rootAllocator.newChildAllocator( + s"stdout writer for empty partition", 0, Long.MaxValue) + val root = VectorSchemaRoot.create(arrowSchema, allocator) + Utils.tryWithSafeFinally { + val writer = new ArrowStreamWriter(root, null, dataOut) + writer.start() + // No data to write + writer.end() + } { + root.close() + allocator.close() + } + } + +} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapGroupsInPandasExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapGroupsInPandasExec.scala index 0baf2d2661f..6c2f716583f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapGroupsInPandasExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapGroupsInPandasExec.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistrib import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.python.FlatMapGroupsInPandasExec import org.apache.spark.sql.rapids.execution.python.BatchGroupUtils._ -import org.apache.spark.sql.rapids.execution.python.shims._ +import org.apache.spark.sql.rapids.execution.python.shims.GpuGroupedPythonRunnerFactory import org.apache.spark.sql.rapids.shims.DataTypeUtilsShim import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -122,11 +122,8 @@ case class GpuFlatMapGroupsInPandasExec( val GroupArgs(dedupAttrs, argOffsets, groupingOffsets) = resolveArgOffsets(child, groupingAttributes) - val runnerShims = GpuArrowPythonRunnerShims(conf, - chainedFunc, - Array(argOffsets), - DataTypeUtilsShim.fromAttributes(dedupAttrs), - pythonOutputSchema) + val runnerFactory = GpuGroupedPythonRunnerFactory(conf, chainedFunc, Array(argOffsets), + DataTypeUtilsShim.fromAttributes(dedupAttrs), pythonOutputSchema) // Start processing. Map grouped batches to ArrowPythonRunner results. child.executeColumnar().mapPartitionsInternal { inputIter => @@ -142,7 +139,7 @@ case class GpuFlatMapGroupsInPandasExec( if (pyInputIter.hasNext) { // Launch Python workers only when the data is not empty. - val pyRunner = runnerShims.getRunner() + val pyRunner = runnerFactory.getRunner() executePython(pyInputIter, localOutput, pyRunner, mNumOutputRows, mNumOutputBatches) } else { // Empty partition, return it directly diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuMapInBatchExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInBatchExec.scala similarity index 92% rename from sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuMapInBatchExec.scala rename to sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInBatchExec.scala index 2ee51096fcb..d5c3ef50400 100644 --- a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuMapInBatchExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInBatchExec.scala @@ -13,12 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -/*** spark-rapids-shim-json-lines -{"spark": "341db"} -spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.execution.python +import scala.collection.JavaConverters.seqAsJavaListConverter + import ai.rapids.cudf import ai.rapids.cudf.Table import com.nvidia.spark.rapids._ @@ -62,6 +60,8 @@ trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase { val pythonRunnerConf = ArrowUtilsShim.getPythonRunnerConfMap(conf) val isPythonOnGpuEnabled = GpuPythonHelper.isPythonOnGpuEnabled(conf) val localOutput = output + val localBatchSize = batchSize + val localEvalType = pythonEvalType // Start process child.executeColumnar().mapPartitionsInternal { inputIter => @@ -77,8 +77,7 @@ trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase { } val pyInputIterator = new RebatchingRoundoffIterator(inputIter, pyInputTypes, - batchSize, numInputRows, numInputBatches) - .map { batch => + localBatchSize, numInputRows, numInputBatches).map { batch => // Here we wrap it via another column so that Python sides understand it // as a DataFrame. withResource(batch) { b => @@ -88,15 +87,16 @@ trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase { new ColumnarBatch(Array(gpuColumn), b.numRows()) } } - } + } val pyRunner = new GpuArrowPythonRunner( chainedFunc, - pythonEvalType, + localEvalType, argOffsets, pyInputSchema, sessionLocalTimeZone, pythonRunnerConf, - batchSize) { + localBatchSize, + GpuColumnVector.structFromAttributes(localOutput.asJava)) { override def toBatch(table: Table): ColumnarBatch = { BatchGroupedIterator.extractChildren(table, localOutput) } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuPythonRunnerCommon.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuPythonRunnerCommon.scala new file mode 100644 index 00000000000..3fdd94b61d3 --- /dev/null +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuPythonRunnerCommon.scala @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids.execution.python + +import ai.rapids.cudf.Table +import com.nvidia.spark.rapids.GpuColumnVector + +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.rapids.execution.python.shims.GpuBasePythonRunner +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * A trait to put some common things from Spark for the basic GPU Arrow Python runners + */ +trait GpuPythonRunnerCommon { _: GpuBasePythonRunner[_] => + + protected val pythonOutSchema: StructType + + protected def toBatch(table: Table): ColumnarBatch = { + GpuColumnVector.from(table, GpuColumnVector.extractTypes(pythonOutSchema)) + } + + override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize + require( + bufferSize >= 4, + "Pandas execution requires more than 4 bytes. Please set higher buffer. " + + s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.") +} diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala deleted file mode 100644 index d4aeef00369..00000000000 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "311"} -{"spark": "312"} -{"spark": "313"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.rapids.shims.api.python - -import java.io.DataInputStream -import java.net.Socket -import java.util.concurrent.atomic.AtomicBoolean - -import org.apache.spark.{SparkEnv, TaskContext} -import org.apache.spark.api.python.BasePythonRunner - -// pid is not a constructor argument in 30x and 31x -abstract class ShimBasePythonRunner[IN, OUT]( - funcs : scala.Seq[org.apache.spark.api.python.ChainedPythonFunctions], - evalType : scala.Int, argOffsets : scala.Array[scala.Array[scala.Int]] -) extends BasePythonRunner[IN, OUT](funcs, evalType, argOffsets) { - protected abstract class ShimReaderIterator( - stream: DataInputStream, - writerThread: WriterThread, - startTime: Long, - env: SparkEnv, - worker: Socket, - pid: Option[Int], - releasedOrClosed: AtomicBoolean, - context: TaskContext - ) extends ReaderIterator(stream, writerThread, startTime, env, worker, releasedOrClosed, context) -} diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonOutput.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonOutput.scala new file mode 100644 index 00000000000..a6f58095c95 --- /dev/null +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonOutput.scala @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "311"} +{"spark": "312"} +{"spark": "313"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.execution.python.shims + +import java.io.DataInputStream +import java.net.Socket +import java.util.concurrent.atomic.AtomicBoolean + +import com.nvidia.spark.rapids.GpuSemaphore + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python._ +import org.apache.spark.sql.rapids.execution.python.GpuArrowOutput +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * A trait that can be mixed-in with `GpuBasePythonRunner`. It implements the logic from + * Python (Arrow) to GPU/JVM (ColumnarBatch). + */ +trait GpuArrowPythonOutput extends GpuArrowOutput { _: GpuBasePythonRunner[_] => + protected def newReaderIterator( + stream: DataInputStream, + writerThread: WriterThread, + startTime: Long, + env: SparkEnv, + worker: Socket, + releasedOrClosed: AtomicBoolean, + context: TaskContext): Iterator[ColumnarBatch] = { + + new ReaderIterator(stream, writerThread, startTime, env, worker, releasedOrClosed, context) { + val gpuArrowReader = newGpuArrowReader + + protected override def read(): ColumnarBatch = { + if (writerThread.exception.isDefined) { + throw writerThread.exception.get + } + try { + // Because of batching and other things we have to be sure that we release the semaphore + // before any operation that could block. This is because we are using multiple threads + // for a single task and the GpuSemaphore might not wake up both threads associated with + // the task, so a reader can be blocked waiting for data, while a writer is waiting on + // the semaphore + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + if (gpuArrowReader.isStarted && gpuArrowReader.mayHasNext) { + val batch = gpuArrowReader.readNext() + if (batch != null) { + batch + } else { + gpuArrowReader.close() // reach the end, close the reader + read() // read the end signal + } + } else { + stream.readInt() match { + case SpecialLengths.START_ARROW_STREAM => + gpuArrowReader.start(stream) + read() + case SpecialLengths.TIMING_DATA => + handleTimingData() + read() + case SpecialLengths.PYTHON_EXCEPTION_THROWN => + throw handlePythonException() + case SpecialLengths.END_OF_DATA_SECTION => + handleEndOfDataSection() + null + } + } + } catch handleException + } + } + } +} diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunner.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunner.scala new file mode 100644 index 00000000000..14931b12d0c --- /dev/null +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunner.scala @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "311"} +{"spark": "312"} +{"spark": "313"} +{"spark": "320"} +{"spark": "321"} +{"spark": "321cdh"} +{"spark": "321db"} +{"spark": "322"} +{"spark": "323"} +{"spark": "324"} +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "330db"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332cdh"} +{"spark": "332db"} +{"spark": "333"} +{"spark": "340"} +{"spark": "341"} +{"spark": "350"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.execution.python.shims + +import java.io.DataOutputStream +import java.net.Socket + +import com.nvidia.spark.rapids.GpuSemaphore + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.ChainedPythonFunctions +import org.apache.spark.sql.execution.python.PythonUDFRunner +import org.apache.spark.sql.rapids.execution.python.{GpuArrowPythonWriter, GpuPythonRunnerCommon} +import org.apache.spark.sql.rapids.shims.ArrowUtilsShim +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.Utils + +/** + * Similar to `PythonUDFRunner`, but exchange data with Python worker via Arrow stream. + */ +class GpuArrowPythonRunner( + funcs: Seq[ChainedPythonFunctions], + evalType: Int, + argOffsets: Array[Array[Int]], + pythonInSchema: StructType, + timeZoneId: String, + conf: Map[String, String], + maxBatchSize: Long, + override val pythonOutSchema: StructType, + jobArtifactUUID: Option[String] = None) + extends GpuBasePythonRunner[ColumnarBatch](funcs, evalType, argOffsets, jobArtifactUUID) + with GpuArrowPythonOutput with GpuPythonRunnerCommon { + + protected override def newWriterThread( + env: SparkEnv, + worker: Socket, + inputIterator: Iterator[ColumnarBatch], + partitionIndex: Int, + context: TaskContext): WriterThread = { + new WriterThread(env, worker, inputIterator, partitionIndex, context) { + + val arrowWriter = new GpuArrowPythonWriter(pythonInSchema, maxBatchSize) { + override protected def writeUDFs(dataOut: DataOutputStream): Unit = { + PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets) + } + } + val isInputNonEmpty = inputIterator.nonEmpty + lazy val arrowSchema = ArrowUtilsShim.toArrowSchema(pythonInSchema, timeZoneId) + + protected override def writeCommand(dataOut: DataOutputStream): Unit = { + arrowWriter.writeCommand(dataOut, conf) + } + + protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = { + if (isInputNonEmpty) { + arrowWriter.start(dataOut) + Utils.tryWithSafeFinally { + while (inputIterator.hasNext) { + arrowWriter.writeAndClose(inputIterator.next()) + } + } { + arrowWriter.close() + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + dataOut.flush() + } + } else { + // The iterator can grab the semaphore even on an empty batch + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + arrowWriter.writeEmptyIteratorOnCpu(dataOut, arrowSchema) + } + } + } + } +} diff --git a/sql-plugin/src/main/spark320/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuBasePythonRunner.scala similarity index 50% rename from sql-plugin/src/main/spark320/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala rename to sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuBasePythonRunner.scala index fa2f3f3fc72..0f5613289e6 100644 --- a/sql-plugin/src/main/spark320/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuBasePythonRunner.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +15,9 @@ */ /*** spark-rapids-shim-json-lines +{"spark": "311"} +{"spark": "312"} +{"spark": "313"} {"spark": "320"} {"spark": "321"} {"spark": "321cdh"} @@ -33,28 +36,14 @@ {"spark": "340"} {"spark": "341"} spark-rapids-shim-json-lines ***/ -package org.apache.spark.rapids.shims.api.python +package org.apache.spark.sql.rapids.execution.python.shims -import java.io.DataInputStream -import java.net.Socket -import java.util.concurrent.atomic.AtomicBoolean +import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions} +import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.{SparkEnv, TaskContext} -import org.apache.spark.api.python.BasePythonRunner - -abstract class ShimBasePythonRunner[IN, OUT]( - funcs : scala.Seq[org.apache.spark.api.python.ChainedPythonFunctions], - evalType : scala.Int, argOffsets : scala.Array[scala.Array[scala.Int]] -) extends BasePythonRunner[IN, OUT](funcs, evalType, argOffsets) { - protected abstract class ShimReaderIterator( - stream: DataInputStream, - writerThread: WriterThread, - startTime: Long, - env: SparkEnv, - worker: Socket, - pid: Option[Int], - releasedOrClosed: AtomicBoolean, - context: TaskContext - ) extends ReaderIterator(stream, writerThread, startTime, env, worker, pid, - releasedOrClosed, context) -} +abstract class GpuBasePythonRunner[IN]( + funcs: Seq[ChainedPythonFunctions], + evalType: Int, + argOffsets: Array[Array[Int]], + jobArtifactUUID: Option[String] // Introduced after 341 +) extends BasePythonRunner[IN, ColumnarBatch](funcs, evalType, argOffsets) diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuCoGroupedArrowPythonRunner.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuCoGroupedArrowPythonRunner.scala index d5e779011c0..2e3e8b44eef 100644 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuCoGroupedArrowPythonRunner.scala +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuCoGroupedArrowPythonRunner.scala @@ -42,14 +42,13 @@ package org.apache.spark.sql.rapids.execution.python.shims import java.io.DataOutputStream import java.net.Socket -import ai.rapids.cudf.{ArrowIPCWriterOptions, NvtxColor, NvtxRange, Table} -import com.nvidia.spark.rapids.{GpuColumnVector, GpuSemaphore} import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.GpuSemaphore import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRDD} import org.apache.spark.sql.execution.python.PythonUDFRunner -import org.apache.spark.sql.rapids.execution.python._ +import org.apache.spark.sql.rapids.execution.python.{GpuArrowWriter, GpuPythonRunnerCommon} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils @@ -69,9 +68,10 @@ class GpuCoGroupedArrowPythonRunner( timeZoneId: String, conf: Map[String, String], batchSize: Int, - pythonOutSchema: StructType) - extends GpuPythonRunnerBase[(ColumnarBatch, ColumnarBatch)](funcs, evalType, argOffsets) - with GpuPythonArrowOutput { + override val pythonOutSchema: StructType, + jobArtifactUUID: Option[String] = None) + extends GpuBasePythonRunner[(ColumnarBatch, ColumnarBatch)](funcs, evalType, + argOffsets, jobArtifactUUID) with GpuArrowPythonOutput with GpuPythonRunnerCommon { protected override def newWriterThread( env: SparkEnv, @@ -82,14 +82,11 @@ class GpuCoGroupedArrowPythonRunner( new WriterThread(env, worker, inputIterator, partitionIndex, context) { protected override def writeCommand(dataOut: DataOutputStream): Unit = { - - // Write config for the worker as a number of key -> value pairs of strings dataOut.writeInt(conf.size) for ((k, v) <- conf) { PythonRDD.writeUTF(k, dataOut) PythonRDD.writeUTF(v, dataOut) } - PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets) } @@ -107,42 +104,20 @@ class GpuCoGroupedArrowPythonRunner( // The iterator can grab the semaphore even on an empty batch GpuSemaphore.releaseIfNecessary(TaskContext.get()) dataOut.writeInt(0) + dataOut.flush() } private def writeGroupBatch(groupBatch: ColumnarBatch, batchSchema: StructType, dataOut: DataOutputStream): Unit = { - val writer = { - val builder = ArrowIPCWriterOptions.builder() - builder.withMaxChunkSize(batchSize) - builder.withCallback((table: Table) => { - table.close() - GpuSemaphore.releaseIfNecessary(TaskContext.get()) - }) - // Flatten the names of nested struct columns, required by cudf arrow IPC writer. - GpuPythonRunnerUtils.flattenNames(batchSchema).foreach { case (name, nullable) => - if (nullable) { - builder.withColumnNames(name) - } else { - builder.withNotNullableColumnNames(name) - } - } - Table.writeArrowIPCChunked(builder.build(), new BufferToStreamWriter(dataOut)) - } - + val gpuArrowWriter = GpuArrowWriter(batchSchema, batchSize) Utils.tryWithSafeFinally { - withResource(new NvtxRange("write python batch", NvtxColor.DARK_GREEN)) { _ => - // The callback will handle closing table and releasing the semaphore - writer.write(GpuColumnVector.from(groupBatch)) - } + gpuArrowWriter.start(dataOut) + gpuArrowWriter.write(groupBatch) } { - writer.close() + gpuArrowWriter.reset() dataOut.flush() } } // end of writeGroup } } // end of newWriterThread - - def toBatch(table: Table): ColumnarBatch = { - GpuColumnVector.from(table, GpuColumnVector.extractTypes(pythonOutSchema)) - } } diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunnerShims.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala similarity index 79% rename from sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunnerShims.scala rename to sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala index d7491ed5a9a..6e6f5edd1d9 100644 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunnerShims.scala +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala @@ -36,22 +36,21 @@ spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.execution.python.shims -import org.apache.spark.api.python._ -import org.apache.spark.sql.rapids.execution.python._ +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.sql.rapids.shims.ArrowUtilsShim -import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch -case class GpuArrowPythonRunnerShims( - conf: org.apache.spark.sql.internal.SQLConf, - chainedFunc: Seq[ChainedPythonFunctions], - argOffsets: Array[Array[Int]], - dedupAttrs: StructType, - pythonOutputSchema: StructType) { +case class GpuGroupedPythonRunnerFactory( + conf: org.apache.spark.sql.internal.SQLConf, + chainedFunc: Seq[ChainedPythonFunctions], + argOffsets: Array[Array[Int]], + dedupAttrs: StructType, + pythonOutputSchema: StructType) { val sessionLocalTimeZone = conf.sessionLocalTimeZone val pythonRunnerConf = ArrowUtilsShim.getPythonRunnerConfMap(conf) - def getRunner(): GpuPythonRunnerBase[ColumnarBatch] = { + def getRunner(): GpuBasePythonRunner[ColumnarBatch] = { new GpuArrowPythonRunner( chainedFunc, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuMapInBatchExec.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuMapInBatchExec.scala deleted file mode 100644 index 91dc6d3789f..00000000000 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuMapInBatchExec.scala +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "311"} -{"spark": "312"} -{"spark": "313"} -{"spark": "320"} -{"spark": "321"} -{"spark": "321cdh"} -{"spark": "321db"} -{"spark": "322"} -{"spark": "323"} -{"spark": "324"} -{"spark": "330"} -{"spark": "330cdh"} -{"spark": "330db"} -{"spark": "331"} -{"spark": "332"} -{"spark": "332cdh"} -{"spark": "332db"} -{"spark": "333"} -{"spark": "340"} -{"spark": "341"} -{"spark": "350"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.rapids.execution.python - -import ai.rapids.cudf -import ai.rapids.cudf.Table -import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.Arm.withResource -import com.nvidia.spark.rapids.python.PythonWorkerSemaphore -import com.nvidia.spark.rapids.shims.ShimUnaryExecNode - -import org.apache.spark.{ContextAwareIterator, TaskContext} -import org.apache.spark.api.python.ChainedPythonFunctions -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression} -import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonRunner -import org.apache.spark.sql.rapids.shims.ArrowUtilsShim -import org.apache.spark.sql.types.{StructField, StructType} -import org.apache.spark.sql.vectorized.ColumnarBatch - -/* - * A relation produced by applying a function that takes an iterator of batches - * such as pandas DataFrame or PyArrow's record batches, and outputs an iterator of them. - */ -trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase { - - protected val func: Expression - protected val pythonEvalType: Int - - private val pandasFunction = func.asInstanceOf[GpuPythonUDF].func - - override def producedAttributes: AttributeSet = AttributeSet(output) - - private val batchSize = conf.arrowMaxRecordsPerBatch - - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def internalDoExecuteColumnar(): RDD[ColumnarBatch] = { - val (numInputRows, numInputBatches, numOutputRows, numOutputBatches) = commonGpuMetrics() - - val pyInputTypes = child.schema - val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction))) - val sessionLocalTimeZone = conf.sessionLocalTimeZone - val pythonRunnerConf = ArrowUtilsShim.getPythonRunnerConfMap(conf) - val isPythonOnGpuEnabled = GpuPythonHelper.isPythonOnGpuEnabled(conf) - val localOutput = output - - // Start process - child.executeColumnar().mapPartitionsInternal { inputIter => - val context = TaskContext.get() - - // Single function with one struct. - val argOffsets = Array(Array(0)) - val pyInputSchema = StructType(StructField("in_struct", pyInputTypes) :: Nil) - - if (isPythonOnGpuEnabled) { - GpuPythonHelper.injectGpuInfo(chainedFunc, isPythonOnGpuEnabled) - PythonWorkerSemaphore.acquireIfNecessary(context) - } - - val contextAwareIter = new ContextAwareIterator(context, inputIter) - - val pyInputIterator = new RebatchingRoundoffIterator(contextAwareIter, pyInputTypes, - batchSize, numInputRows, numInputBatches) - .map { batch => - // Here we wrap it via another column so that Python sides understand it - // as a DataFrame. - withResource(batch) { b => - val structColumn = cudf.ColumnVector.makeStruct(GpuColumnVector.extractBases(b): _*) - withResource(structColumn) { stColumn => - val gpuColumn = GpuColumnVector.from(stColumn.incRefCount(), pyInputTypes) - new ColumnarBatch(Array(gpuColumn), b.numRows()) - } - } - } - - val pyRunner = new GpuArrowPythonRunner( - chainedFunc, - pythonEvalType, - argOffsets, - pyInputSchema, - sessionLocalTimeZone, - pythonRunnerConf, - batchSize) { - override def toBatch(table: Table): ColumnarBatch = { - BatchGroupedIterator.extractChildren(table, localOutput) - } - } - - pyRunner.compute(pyInputIterator, context.partitionId(), context) - .map { cb => - numOutputBatches += 1 - numOutputRows += cb.numRows - cb - } - } // end of mapPartitionsInternal - } // end of internalDoExecuteColumnar - -} diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuPythonArrowShims.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuPythonArrowShims.scala deleted file mode 100644 index e4685e48e06..00000000000 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuPythonArrowShims.scala +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Copyright (c) 2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "311"} -{"spark": "312"} -{"spark": "313"} -{"spark": "320"} -{"spark": "321"} -{"spark": "321cdh"} -{"spark": "321db"} -{"spark": "322"} -{"spark": "323"} -{"spark": "324"} -{"spark": "330"} -{"spark": "330cdh"} -{"spark": "330db"} -{"spark": "331"} -{"spark": "332"} -{"spark": "332cdh"} -{"spark": "332db"} -{"spark": "333"} -{"spark": "340"} -{"spark": "341"} -{"spark": "350"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.rapids.execution.python.shims - -import java.io.{DataInputStream, DataOutputStream} -import java.net.Socket -import java.util.concurrent.atomic.AtomicBoolean - -import ai.rapids.cudf._ -import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.Arm.withResource -import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion - -import org.apache.spark.{SparkEnv, TaskContext} -import org.apache.spark.api.python._ -import org.apache.spark.sql.rapids.execution.python._ -import org.apache.spark.sql.types._ -import org.apache.spark.sql.vectorized.ColumnarBatch - -/** - * A trait that can be mixed-in with `GpuPythonRunnerBase`. It implements the logic from - * Python (Arrow) to GPU/JVM (ColumnarBatch). - */ -trait GpuPythonArrowOutput { _: GpuPythonRunnerBase[_] => - - /** - * Default to `Int.MaxValue` to try to read as many as possible. - * Change it by calling `setMinReadTargetNumRows` before a reading. - */ - private var minReadTargetNumRows: Int = Int.MaxValue - - /** - * Update the expected batch size for next reading. - */ - private[python] final def setMinReadTargetNumRows(numRows: Int): Unit = { - minReadTargetNumRows = numRows - } - - /** Convert the table received from the Python side to a batch. */ - protected def toBatch(table: Table): ColumnarBatch - - protected def newReaderIterator( - stream: DataInputStream, - writerThread: WriterThread, - startTime: Long, - env: SparkEnv, - worker: Socket, - releasedOrClosed: AtomicBoolean, - context: TaskContext - ): Iterator[ColumnarBatch] = { - newReaderIterator(stream, writerThread, startTime, env, worker, None, releasedOrClosed, - context) - } - - protected def newReaderIterator( - stream: DataInputStream, - writerThread: WriterThread, - startTime: Long, - env: SparkEnv, - worker: Socket, - pid: Option[Int], - releasedOrClosed: AtomicBoolean, - context: TaskContext): Iterator[ColumnarBatch] = { - - new ShimReaderIterator(stream, writerThread, startTime, env, worker, pid, releasedOrClosed, - context) { - - private[this] var arrowReader: StreamedTableReader = _ - - onTaskCompletion(context) { - if (arrowReader != null) { - arrowReader.close() - arrowReader = null - } - } - - private var batchLoaded = true - - protected override def read(): ColumnarBatch = { - if (writerThread.exception.isDefined) { - throw writerThread.exception.get - } - try { - // Because of batching and other things we have to be sure that we release the semaphore - // before any operation that could block. This is because we are using multiple threads - // for a single task and the GpuSemaphore might not wake up both threads associated with - // the task, so a reader can be blocked waiting for data, while a writer is waiting on - // the semaphore - GpuSemaphore.releaseIfNecessary(TaskContext.get()) - if (arrowReader != null && batchLoaded) { - // The GpuSemaphore is acquired in a callback - val table = - withResource(new NvtxRange("read python batch", NvtxColor.DARK_GREEN)) { _ => - arrowReader.getNextIfAvailable(minReadTargetNumRows) - } - if (table == null) { - batchLoaded = false - arrowReader.close() - arrowReader = null - read() - } else { - withResource(table) { _ => - batchLoaded = true - toBatch(table) - } - } - } else { - stream.readInt() match { - case SpecialLengths.START_ARROW_STREAM => - val builder = ArrowIPCOptions.builder() - builder.withCallback(() => - GpuSemaphore.acquireIfNecessary(TaskContext.get())) - arrowReader = Table.readArrowIPCChunked(builder.build(), - new StreamToBufferProvider(stream)) - read() - case SpecialLengths.TIMING_DATA => - handleTimingData() - read() - case SpecialLengths.PYTHON_EXCEPTION_THROWN => - throw handlePythonException() - case SpecialLengths.END_OF_DATA_SECTION => - handleEndOfDataSection() - null - } - } - } catch handleException - } - } - } -} - -/** - * Similar to `PythonUDFRunner`, but exchange data with Python worker via Arrow stream. - */ -class GpuArrowPythonRunner( - funcs: Seq[ChainedPythonFunctions], - evalType: Int, - argOffsets: Array[Array[Int]], - pythonInSchema: StructType, - timeZoneId: String, - conf: Map[String, String], - batchSize: Long, - pythonOutSchema: StructType = null) - extends GpuArrowPythonRunnerBase(funcs, evalType, argOffsets, pythonInSchema, timeZoneId, - conf, batchSize, pythonOutSchema) { - - protected override def newWriterThread( - env: SparkEnv, - worker: Socket, - inputIterator: Iterator[ColumnarBatch], - partitionIndex: Int, - context: TaskContext): WriterThread = { - new WriterThread(env, worker, inputIterator, partitionIndex, context) { - - val workerImpl = new RapidsWriter(env, inputIterator, partitionIndex, context) - - protected override def writeCommand(dataOut: DataOutputStream): Unit = { - workerImpl.writeCommand(dataOut) - } - - protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = { - workerImpl.writeIteratorToStream(dataOut) - } - } - } -} diff --git a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonOutput.scala b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonOutput.scala new file mode 100644 index 00000000000..0d72dd2888c --- /dev/null +++ b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonOutput.scala @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "320"} +{"spark": "321"} +{"spark": "321cdh"} +{"spark": "321db"} +{"spark": "322"} +{"spark": "323"} +{"spark": "324"} +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "330db"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332cdh"} +{"spark": "332db"} +{"spark": "333"} +{"spark": "340"} +{"spark": "341"} +{"spark": "350"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.execution.python.shims + +import java.io.DataInputStream +import java.net.Socket +import java.util.concurrent.atomic.AtomicBoolean + +import com.nvidia.spark.rapids.GpuSemaphore + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python._ +import org.apache.spark.sql.rapids.execution.python.GpuArrowOutput +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * A trait that can be mixed-in with `GpuBasePythonRunner`. It implements the logic from + * Python (Arrow) to GPU/JVM (ColumnarBatch). + */ +trait GpuArrowPythonOutput extends GpuArrowOutput { _: GpuBasePythonRunner[_] => + protected def newReaderIterator( + stream: DataInputStream, + writerThread: WriterThread, + startTime: Long, + env: SparkEnv, + worker: Socket, + pid: Option[Int], // new paramter from Spark 320 + releasedOrClosed: AtomicBoolean, + context: TaskContext): Iterator[ColumnarBatch] = { + + new ReaderIterator(stream, writerThread, startTime, env, worker, pid, releasedOrClosed, + context) { + val gpuArrowReader = newGpuArrowReader + + protected override def read(): ColumnarBatch = { + if (writerThread.exception.isDefined) { + throw writerThread.exception.get + } + try { + // Because of batching and other things we have to be sure that we release the semaphore + // before any operation that could block. This is because we are using multiple threads + // for a single task and the GpuSemaphore might not wake up both threads associated with + // the task, so a reader can be blocked waiting for data, while a writer is waiting on + // the semaphore + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + if (gpuArrowReader.isStarted && gpuArrowReader.mayHasNext) { + val batch = gpuArrowReader.readNext() + if (batch != null) { + batch + } else { + gpuArrowReader.close() // reach the end, close the reader + read() // read the end signal + } + } else { + stream.readInt() match { + case SpecialLengths.START_ARROW_STREAM => + gpuArrowReader.start(stream) + read() + case SpecialLengths.TIMING_DATA => + handleTimingData() + read() + case SpecialLengths.PYTHON_EXCEPTION_THROWN => + throw handlePythonException() + case SpecialLengths.END_OF_DATA_SECTION => + handleEndOfDataSection() + null + } + } + } catch handleException + } + } + } +} diff --git a/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala b/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala index 7eb2bb74b0e..77265121efe 100644 --- a/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala +++ b/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala @@ -27,15 +27,13 @@ package org.apache.spark.sql.rapids.execution.python.shims import java.io.DataOutputStream import java.net.Socket -import ai.rapids.cudf._ -import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.GpuSemaphore import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python._ import org.apache.spark.sql.execution.python.PythonUDFRunner -import org.apache.spark.sql.rapids.execution.python.{BufferToStreamWriter, GpuPythonRunnerBase, GpuPythonRunnerUtils} -import org.apache.spark.sql.types._ +import org.apache.spark.sql.rapids.execution.python.{GpuArrowPythonWriter, GpuPythonRunnerCommon} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils @@ -59,10 +57,11 @@ class GpuGroupUDFArrowPythonRunner( pythonInSchema: StructType, timeZoneId: String, conf: Map[String, String], - batchSize: Long, - pythonOutSchema: StructType) - extends GpuPythonRunnerBase[ColumnarBatch](funcs, evalType, argOffsets) - with GpuPythonArrowOutput { + maxBatchSize: Long, + override val pythonOutSchema: StructType, + jobArtifactUUID: Option[String] = None) + extends GpuBasePythonRunner[ColumnarBatch](funcs, evalType, argOffsets, jobArtifactUUID) + with GpuArrowPythonOutput with GpuPythonRunnerCommon { protected override def newWriterThread( env: SparkEnv, @@ -72,64 +71,34 @@ class GpuGroupUDFArrowPythonRunner( context: TaskContext): WriterThread = { new WriterThread(env, worker, inputIterator, partitionIndex, context) { - protected override def writeCommand(dataOut: DataOutputStream): Unit = { - - // Write config for the worker as a number of key -> value pairs of strings - dataOut.writeInt(conf.size) - for ((k, v) <- conf) { - PythonRDD.writeUTF(k, dataOut) - PythonRDD.writeUTF(v, dataOut) + val arrowWriter = new GpuArrowPythonWriter(pythonInSchema, maxBatchSize) { + override protected def writeUDFs(dataOut: DataOutputStream): Unit = { + PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets) } + } - PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets) + protected override def writeCommand(dataOut: DataOutputStream): Unit = { + arrowWriter.writeCommand(dataOut, conf) } protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = { - // write out number of columns Utils.tryWithSafeFinally { - val builder = ArrowIPCWriterOptions.builder() - builder.withMaxChunkSize(batchSize) - builder.withCallback((table: Table) => { - table.close() - GpuSemaphore.releaseIfNecessary(TaskContext.get()) - }) - // Flatten the names of nested struct columns, required by cudf Arrow IPC writer. - GpuPythonRunnerUtils.flattenNames(pythonInSchema).foreach { case (name, nullable) => - if (nullable) { - builder.withColumnNames(name) - } else { - builder.withNotNullableColumnNames(name) - } - } while(inputIterator.hasNext) { - val writer = { - // write 1 out to indicate there is more to read - dataOut.writeInt(1) - Table.writeArrowIPCChunked(builder.build(), new BufferToStreamWriter(dataOut)) - } - val table = withResource(inputIterator.next()) { nextBatch => - GpuColumnVector.from(nextBatch) - } - withResource(new NvtxRange("write python batch", NvtxColor.DARK_GREEN)) { _ => - // The callback will handle closing table and releasing the semaphore - writer.write(table) - } - writer.close() + // write 1 out to indicate there is more to read + dataOut.writeInt(1) + arrowWriter.start(dataOut) + arrowWriter.writeAndClose(inputIterator.next()) + arrowWriter.reset() dataOut.flush() } - // indicate not to read more - // The iterator can grab the semaphore even on an empty batch - GpuSemaphore.releaseIfNecessary(TaskContext.get()) } { + arrowWriter.close() // tell serializer we are done dataOut.writeInt(0) dataOut.flush() + GpuSemaphore.releaseIfNecessary(TaskContext.get()) } } } } - - def toBatch(table: Table): ColumnarBatch = { - GpuColumnVector.from(table, GpuColumnVector.extractTypes(pythonOutSchema)) - } } diff --git a/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunnerShims.scala b/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala similarity index 90% rename from sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunnerShims.scala rename to sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala index 3da7f7dc99e..ed0d5816f40 100644 --- a/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunnerShims.scala +++ b/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala @@ -21,12 +21,12 @@ spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.execution.python.shims -import org.apache.spark.api.python._ -import org.apache.spark.sql.rapids.execution.python._ +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.sql.rapids.shims.ArrowUtilsShim -import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch -case class GpuArrowPythonRunnerShims( + +case class GpuGroupedPythonRunnerFactory( conf: org.apache.spark.sql.internal.SQLConf, chainedFunc: Seq[ChainedPythonFunctions], argOffsets: Array[Array[Int]], @@ -38,7 +38,7 @@ case class GpuArrowPythonRunnerShims( val sessionLocalTimeZone = conf.sessionLocalTimeZone val pythonRunnerConf = ArrowUtilsShim.getPythonRunnerConfMap(conf) - def getRunner(): GpuPythonRunnerBase[ColumnarBatch] = { + def getRunner(): GpuBasePythonRunner[ColumnarBatch] = { if (zeroConfEnabled && maxBytes > 0L) { new GpuGroupUDFArrowPythonRunner( chainedFunc, diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala deleted file mode 100644 index 1cf8abeab2d..00000000000 --- a/sql-plugin/src/main/spark341db/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "341db"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.rapids.shims.api.python - -import java.io.DataInputStream -import java.util.concurrent.atomic.AtomicBoolean - -import org.apache.spark.{SparkEnv, TaskContext} -import org.apache.spark.api.python.{BasePythonRunner, PythonWorker} - -abstract class ShimBasePythonRunner[IN, OUT]( - funcs : scala.Seq[org.apache.spark.api.python.ChainedPythonFunctions], - evalType : scala.Int, argOffsets : scala.Array[scala.Array[scala.Int]] -) extends BasePythonRunner[IN, OUT](funcs, evalType, argOffsets, None) { - protected abstract class ShimReaderIterator( - stream: DataInputStream, - writer: Writer, - startTime: Long, - env: SparkEnv, - worker: PythonWorker, - pid: Option[Int], - releasedOrClosed: AtomicBoolean, - context: TaskContext - ) extends ReaderIterator(stream, writer, startTime, env, worker, pid, - releasedOrClosed, context) -} diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonOutput.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonOutput.scala new file mode 100644 index 00000000000..9c8b180f344 --- /dev/null +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonOutput.scala @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "341db"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.execution.python.shims + +import java.io.DataInputStream +import java.util.concurrent.atomic.AtomicBoolean + +import com.nvidia.spark.rapids.GpuSemaphore + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python._ +import org.apache.spark.sql.rapids.execution.python.GpuArrowOutput +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * A trait that can be mixed-in with `GpuBasePythonRunner`. It implements the logic from + * Python (Arrow) to GPU/JVM (ColumnarBatch). + */ +trait GpuArrowPythonOutput extends GpuArrowOutput { _: GpuBasePythonRunner[_] => + + protected def newReaderIterator( + stream: DataInputStream, + writer: Writer, + startTime: Long, + env: SparkEnv, + worker: PythonWorker, + pid: Option[Int], + releasedOrClosed: AtomicBoolean, + context: TaskContext): Iterator[ColumnarBatch] = { + + new ReaderIterator(stream, writer, startTime, env, worker, pid, releasedOrClosed, context) { + val gpuArrowReader = newGpuArrowReader + + protected override def read(): ColumnarBatch = { + if (writer.exception.isDefined) { + throw writer.exception.get + } + try { + // Because of batching and other things we have to be sure that we release the semaphore + // before any operation that could block. This is because we are using multiple threads + // for a single task and the GpuSemaphore might not wake up both threads associated with + // the task, so a reader can be blocked waiting for data, while a writer is waiting on + // the semaphore + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + if (gpuArrowReader.isStarted && gpuArrowReader.mayHasNext) { + val batch = gpuArrowReader.readNext() + if (batch != null) { + batch + } else { + gpuArrowReader.close() // reach the end, close the reader + read() // read the end signal + } + } else { + stream.readInt() match { + case SpecialLengths.START_ARROW_STREAM => + gpuArrowReader.start(stream) + read() + case SpecialLengths.TIMING_DATA => + handleTimingData() + read() + case SpecialLengths.PYTHON_EXCEPTION_THROWN => + throw handlePythonException() + case SpecialLengths.END_OF_DATA_SECTION => + handleEndOfDataSection() + null + } + } + } catch handleException + } + } + } +} diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunner.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunner.scala new file mode 100644 index 00000000000..2912bff097c --- /dev/null +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunner.scala @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/*** spark-rapids-shim-json-lines +{"spark": "341db"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.execution.python.shims + +import java.io.DataOutputStream + +import com.nvidia.spark.rapids.GpuSemaphore + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python._ +import org.apache.spark.sql.execution.python.PythonUDFRunner +import org.apache.spark.sql.rapids.execution.python.{GpuArrowPythonWriter, GpuPythonRunnerCommon} +import org.apache.spark.sql.rapids.shims.ArrowUtilsShim +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * Similar to `PythonUDFRunner`, but exchange data with Python worker via Arrow stream. + */ +class GpuArrowPythonRunner( + funcs: Seq[ChainedPythonFunctions], + evalType: Int, + argOffsets: Array[Array[Int]], + pythonInSchema: StructType, + timeZoneId: String, + conf: Map[String, String], + maxBatchSize: Long, + override val pythonOutSchema: StructType, + jobArtifactUUID: Option[String] = None) + extends GpuBasePythonRunner[ColumnarBatch](funcs, evalType, argOffsets, jobArtifactUUID) + with GpuArrowPythonOutput with GpuPythonRunnerCommon { + + protected override def newWriter( + env: SparkEnv, + worker: PythonWorker, + inputIterator: Iterator[ColumnarBatch], + partitionIndex: Int, + context: TaskContext): Writer = { + new Writer(env, worker, inputIterator, partitionIndex, context) { + + val arrowWriter = new GpuArrowPythonWriter(pythonInSchema, maxBatchSize) { + override protected def writeUDFs(dataOut: DataOutputStream): Unit = { + PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets) + } + } + val isInputNonEmpty = inputIterator.nonEmpty + lazy val arrowSchema = ArrowUtilsShim.toArrowSchema(pythonInSchema, timeZoneId) + + protected override def writeCommand(dataOut: DataOutputStream): Unit = { + arrowWriter.writeCommand(dataOut, conf) + } + + override def writeNextInputToStream(dataOut: DataOutputStream): Boolean = { + if (isInputNonEmpty) { + arrowWriter.start(dataOut) + try { + if (inputIterator.hasNext) { + arrowWriter.writeAndClose(inputIterator.next()) + dataOut.flush() + true + } else { + arrowWriter.close() // all batches are written, close the writer + false + } + } catch { + case t: Throwable => + arrowWriter.close() + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + throw t + } + } else { + // The iterator can grab the semaphore even on an empty batch + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + arrowWriter.writeEmptyIteratorOnCpu(dataOut, arrowSchema) + false + } + } + } + } +} diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuBasePythonRunner.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuBasePythonRunner.scala new file mode 100644 index 00000000000..81c78db1cfc --- /dev/null +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuBasePythonRunner.scala @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "341db"} +{"spark": "350"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.execution.python.shims + +import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions} +import org.apache.spark.sql.vectorized.ColumnarBatch + +abstract class GpuBasePythonRunner[IN]( + funcs: Seq[ChainedPythonFunctions], + evalType: Int, + argOffsets: Array[Array[Int]], + jobArtifactUUID: Option[String] +) extends BasePythonRunner[IN, ColumnarBatch](funcs, evalType, argOffsets, jobArtifactUUID) diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuCoGroupedArrowPythonRunner.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuCoGroupedArrowPythonRunner.scala index a8aa799c484..ec2c8e67664 100644 --- a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuCoGroupedArrowPythonRunner.scala +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuCoGroupedArrowPythonRunner.scala @@ -21,14 +21,13 @@ package org.apache.spark.sql.rapids.execution.python.shims import java.io.DataOutputStream -import ai.rapids.cudf.{ArrowIPCWriterOptions, NvtxColor, NvtxRange, Table} -import com.nvidia.spark.rapids.{GpuColumnVector, GpuSemaphore} import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.GpuSemaphore import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRDD, PythonWorker} import org.apache.spark.sql.execution.python.PythonUDFRunner -import org.apache.spark.sql.rapids.execution.python._ +import org.apache.spark.sql.rapids.execution.python.{GpuArrowWriter, GpuPythonRunnerCommon} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -47,34 +46,33 @@ class GpuCoGroupedArrowPythonRunner( timeZoneId: String, conf: Map[String, String], batchSize: Int, - pythonOutSchema: StructType) - extends GpuPythonRunnerBase[(ColumnarBatch, ColumnarBatch)](funcs, evalType, argOffsets) - with GpuPythonArrowOutput { + override val pythonOutSchema: StructType, + jobArtifactUUID: Option[String] = None) + extends GpuBasePythonRunner[(ColumnarBatch, ColumnarBatch)](funcs, evalType, + argOffsets, jobArtifactUUID) with GpuArrowPythonOutput with GpuPythonRunnerCommon { protected override def newWriter( env: SparkEnv, - worker: PythonWorker, + worker: PythonWorker, // Changed from "Socket" to this "PythonWorker" from db341 inputIterator: Iterator[(ColumnarBatch, ColumnarBatch)], partitionIndex: Int, context: TaskContext): Writer = { new Writer(env, worker, inputIterator, partitionIndex, context) { protected override def writeCommand(dataOut: DataOutputStream): Unit = { - // Write config for the worker as a number of key -> value pairs of strings dataOut.writeInt(conf.size) for ((k, v) <- conf) { PythonRDD.writeUTF(k, dataOut) PythonRDD.writeUTF(v, dataOut) } - PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets) } override def writeNextInputToStream(dataOut: DataOutputStream): Boolean = { - // For each we first send the number of dataframes in each group then send - // first df, then send second df. End of data is marked by sending 0. if (inputIterator.hasNext) { + // For each we first send the number of dataframes in each group then send + // first df, then send second df. dataOut.writeInt(2) val (leftGroupBatch, rightGroupBatch) = inputIterator.next() withResource(Seq(leftGroupBatch, rightGroupBatch)) { _ => @@ -85,6 +83,7 @@ class GpuCoGroupedArrowPythonRunner( } else { // The iterator can grab the semaphore even on an empty batch GpuSemaphore.releaseIfNecessary(TaskContext.get()) + // End of data is marked by sending 0. dataOut.writeInt(0) false } @@ -92,42 +91,20 @@ class GpuCoGroupedArrowPythonRunner( private def writeGroupBatch(groupBatch: ColumnarBatch, batchSchema: StructType, dataOut: DataOutputStream): Unit = { - val writer = { - val builder = ArrowIPCWriterOptions.builder() - builder.withMaxChunkSize(batchSize) - builder.withCallback((table: Table) => { - table.close() - GpuSemaphore.releaseIfNecessary(TaskContext.get()) - }) - // Flatten the names of nested struct columns, required by cudf arrow IPC writer. - GpuPythonRunnerUtils.flattenNames(batchSchema).foreach { case (name, nullable) => - if (nullable) { - builder.withColumnNames(name) - } else { - builder.withNotNullableColumnNames(name) - } - } - Table.writeArrowIPCChunked(builder.build(), new BufferToStreamWriter(dataOut)) - } + val gpuArrowWriter = GpuArrowWriter(batchSchema, batchSize) try { - withResource(new NvtxRange("write python batch", NvtxColor.DARK_GREEN)) { _ => - // The callback will handle closing table and releasing the semaphore - writer.write(GpuColumnVector.from(groupBatch)) - } + gpuArrowWriter.start(dataOut) + gpuArrowWriter.write(groupBatch) } catch { case t: Throwable => // release the semaphore in case of exception in the middle of writing a batch GpuSemaphore.releaseIfNecessary(TaskContext.get()) throw t } finally { - writer.close() + gpuArrowWriter.reset() dataOut.flush() } - } // end of writeGroup + } // end of writeGroupBatch } - } // end of newWriterThread - - def toBatch(table: Table): ColumnarBatch = { - GpuColumnVector.from(table, GpuColumnVector.extractTypes(pythonOutSchema)) } } diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala index a9c04808879..1a7251b33f1 100644 --- a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala @@ -24,15 +24,13 @@ package org.apache.spark.sql.rapids.execution.python.shims import java.io.DataOutputStream -import ai.rapids.cudf._ -import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.GpuSemaphore import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python._ import org.apache.spark.sql.execution.python.PythonUDFRunner -import org.apache.spark.sql.rapids.execution.python._ -import org.apache.spark.sql.types._ +import org.apache.spark.sql.rapids.execution.python.{GpuArrowPythonWriter, GpuPythonRunnerCommon} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch /** @@ -56,61 +54,36 @@ class GpuGroupUDFArrowPythonRunner( timeZoneId: String, conf: Map[String, String], batchSize: Long, - pythonOutSchema: StructType) - extends GpuPythonRunnerBase[ColumnarBatch](funcs, evalType, argOffsets) - with GpuPythonArrowOutput { + override val pythonOutSchema: StructType, + jobArtifactUUID: Option[String] = None) + extends GpuBasePythonRunner[ColumnarBatch](funcs, evalType, argOffsets, jobArtifactUUID) + with GpuArrowPythonOutput with GpuPythonRunnerCommon { protected override def newWriter( env: SparkEnv, - worker: PythonWorker, + worker: PythonWorker, // From DB341, changed from Socket to PythonWorker inputIterator: Iterator[ColumnarBatch], partitionIndex: Int, context: TaskContext): Writer = { new Writer(env, worker, inputIterator, partitionIndex, context) { - protected override def writeCommand(dataOut: DataOutputStream): Unit = { - - // Write config for the worker as a number of key -> value pairs of strings - dataOut.writeInt(conf.size) - for ((k, v) <- conf) { - PythonRDD.writeUTF(k, dataOut) - PythonRDD.writeUTF(v, dataOut) + val arrowWriter = new GpuArrowPythonWriter(pythonInSchema, batchSize) { + override protected def writeUDFs(dataOut: DataOutputStream): Unit = { + PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets) } + } - PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets) + protected override def writeCommand(dataOut: DataOutputStream): Unit = { + arrowWriter.writeCommand(dataOut, conf) } override def writeNextInputToStream(dataOut: DataOutputStream): Boolean = { - // write out number of columns try { if (inputIterator.hasNext) { - val builder = ArrowIPCWriterOptions.builder() - builder.withMaxChunkSize(batchSize) - builder.withCallback((table: Table) => { - table.close() - GpuSemaphore.releaseIfNecessary(TaskContext.get()) - }) - // Flatten the names of nested struct columns, required by cudf Arrow IPC writer. - GpuPythonRunnerUtils.flattenNames(pythonInSchema).foreach { case (name, nullable) => - if (nullable) { - builder.withColumnNames(name) - } else { - builder.withNotNullableColumnNames(name) - } - } - val writer = { - // write 1 out to indicate there is more to read - dataOut.writeInt(1) - Table.writeArrowIPCChunked(builder.build(), new BufferToStreamWriter(dataOut)) - } - val table = withResource(inputIterator.next()) { nextBatch => - GpuColumnVector.from(nextBatch) - } - withResource(new NvtxRange("write python batch", NvtxColor.DARK_GREEN)) { _ => - // The callback will handle closing table and releasing the semaphore - writer.write(table) - } - writer.close() + dataOut.writeInt(1) + arrowWriter.start(dataOut) + arrowWriter.writeAndClose(inputIterator.next()) + arrowWriter.reset() dataOut.flush() true } else { @@ -123,6 +96,7 @@ class GpuGroupUDFArrowPythonRunner( } } catch { case t: Throwable => + arrowWriter.close() // release the semaphore in case of exception in the middle of writing a batch GpuSemaphore.releaseIfNecessary(TaskContext.get()) throw t @@ -130,8 +104,4 @@ class GpuGroupUDFArrowPythonRunner( } } } - - def toBatch(table: Table): ColumnarBatch = { - GpuColumnVector.from(table, GpuColumnVector.extractTypes(pythonOutSchema)) - } } diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunnerShims.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala similarity index 93% rename from sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunnerShims.scala rename to sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala index bf05656c861..d3e3415290a 100644 --- a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunnerShims.scala +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala @@ -20,13 +20,12 @@ spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.execution.python.shims import org.apache.spark.api.python._ -import org.apache.spark.sql.rapids.execution.python._ import org.apache.spark.sql.rapids.shims.ArrowUtilsShim import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch //TODO is this needed? we already have a similar version in spark321db -case class GpuArrowPythonRunnerShims( +case class GpuGroupedPythonRunnerFactory( conf: org.apache.spark.sql.internal.SQLConf, chainedFunc: Seq[ChainedPythonFunctions], argOffsets: Array[Array[Int]], @@ -38,7 +37,7 @@ case class GpuArrowPythonRunnerShims( val sessionLocalTimeZone = conf.sessionLocalTimeZone val pythonRunnerConf = ArrowUtilsShim.getPythonRunnerConfMap(conf) - def getRunner(): GpuPythonRunnerBase[ColumnarBatch] = { + def getRunner(): GpuBasePythonRunner[ColumnarBatch] = { if (zeroConfEnabled && maxBytes > 0L) { new GpuGroupUDFArrowPythonRunner( chainedFunc, diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuPythonArrowShims.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuPythonArrowShims.scala deleted file mode 100644 index ab11083561a..00000000000 --- a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuPythonArrowShims.scala +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Copyright (c) 2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/*** spark-rapids-shim-json-lines -{"spark": "341db"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.rapids.execution.python.shims - -import java.io.{DataInputStream, DataOutputStream} -import java.util.concurrent.atomic.AtomicBoolean - -import ai.rapids.cudf._ -import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.Arm.withResource -import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion - -import org.apache.spark.{SparkEnv, TaskContext} -import org.apache.spark.api.python._ -import org.apache.spark.sql.rapids.execution.python._ -import org.apache.spark.sql.types._ -import org.apache.spark.sql.vectorized.ColumnarBatch - -/** - * A trait that can be mixed-in with `GpuPythonRunnerBase`. It implements the logic from - * Python (Arrow) to GPU/JVM (ColumnarBatch). - */ -trait GpuPythonArrowOutput { _: GpuPythonRunnerBase[_] => - - /** - * Default to `Int.MaxValue` to try to read as many as possible. - * Change it by calling `setMinReadTargetNumRows` before a reading. - */ - private var minReadTargetNumRows: Int = Int.MaxValue - - /** - * Update the expected batch size for next reading. - */ - private[python] final def setMinReadTargetNumRows(numRows: Int): Unit = { - minReadTargetNumRows = numRows - } - - /** Convert the table received from the Python side to a batch. */ - protected def toBatch(table: Table): ColumnarBatch - - protected def newReaderIterator( - stream: DataInputStream, - writer: Writer, - startTime: Long, - env: SparkEnv, - worker: PythonWorker, - releasedOrClosed: AtomicBoolean, - context: TaskContext - ): Iterator[ColumnarBatch] = { - newReaderIterator(stream, writer, startTime, env, worker, None, releasedOrClosed, - context) - } - - protected def newReaderIterator( - stream: DataInputStream, - writer: Writer, - startTime: Long, - env: SparkEnv, - worker: PythonWorker, - pid: Option[Int], - releasedOrClosed: AtomicBoolean, - context: TaskContext): Iterator[ColumnarBatch] = { - - new ShimReaderIterator(stream, writer, startTime, env, worker, pid, releasedOrClosed, - context) { - - private[this] var arrowReader: StreamedTableReader = _ - - onTaskCompletion(context) { - if (arrowReader != null) { - arrowReader.close() - arrowReader = null - } - } - - private var batchLoaded = true - - protected override def read(): ColumnarBatch = { - if (writer.exception.isDefined) { - throw writer.exception.get - } - try { - // Because of batching and other things we have to be sure that we release the semaphore - // before any operation that could block. This is because we are using multiple threads - // for a single task and the GpuSemaphore might not wake up both threads associated with - // the task, so a reader can be blocked waiting for data, while a writer is waiting on - // the semaphore - GpuSemaphore.releaseIfNecessary(TaskContext.get()) - if (arrowReader != null && batchLoaded) { - // The GpuSemaphore is acquired in a callback - val table = - withResource(new NvtxRange("read python batch", NvtxColor.DARK_GREEN)) { _ => - arrowReader.getNextIfAvailable(minReadTargetNumRows) - } - if (table == null) { - batchLoaded = false - arrowReader.close() - arrowReader = null - read() - } else { - withResource(table) { _ => - batchLoaded = true - toBatch(table) - } - } - } else { - stream.readInt() match { - case SpecialLengths.START_ARROW_STREAM => - val builder = ArrowIPCOptions.builder() - builder.withCallback(() => - GpuSemaphore.acquireIfNecessary(TaskContext.get())) - arrowReader = Table.readArrowIPCChunked(builder.build(), - new StreamToBufferProvider(stream)) - read() - case SpecialLengths.TIMING_DATA => - handleTimingData() - read() - case SpecialLengths.PYTHON_EXCEPTION_THROWN => - throw handlePythonException() - case SpecialLengths.END_OF_DATA_SECTION => - handleEndOfDataSection() - null - } - } - } catch handleException - } - } - } -} - -/** - * Similar to `PythonUDFRunner`, but exchange data with Python worker via Arrow stream. - */ -class GpuArrowPythonRunner( - funcs: Seq[ChainedPythonFunctions], - evalType: Int, - argOffsets: Array[Array[Int]], - pythonInSchema: StructType, - timeZoneId: String, - conf: Map[String, String], - batchSize: Long, - pythonOutSchema: StructType = null) - extends GpuArrowPythonRunnerBase(funcs, evalType, argOffsets, pythonInSchema, timeZoneId, - conf, batchSize, pythonOutSchema) { - - protected override def newWriter( - env: SparkEnv, - worker: PythonWorker, - inputIterator: Iterator[ColumnarBatch], - partitionIndex: Int, - context: TaskContext): Writer = { - new Writer(env, worker, inputIterator, partitionIndex, context) { - - val workerImpl = new RapidsWriter(env, inputIterator, partitionIndex, context) - - protected override def writeCommand(dataOut: DataOutputStream): Unit = { - workerImpl.writeCommand(dataOut) - } - - override def writeNextInputToStream(dataOut: DataOutputStream): Boolean = { - workerImpl.writeNextInputToStream(dataOut) - } - } - } -} diff --git a/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/api/python/ShimBasePythonRunner.scala b/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/api/python/ShimBasePythonRunner.scala deleted file mode 100644 index d100d931b9c..00000000000 --- a/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/api/python/ShimBasePythonRunner.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "350"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.rapids.shims.api.python - -import java.io.DataInputStream -import java.net.Socket -import java.util.concurrent.atomic.AtomicBoolean - -import org.apache.spark.{SparkEnv, TaskContext} -import org.apache.spark.api.python.BasePythonRunner - -abstract class ShimBasePythonRunner[IN, OUT]( - funcs : scala.Seq[org.apache.spark.api.python.ChainedPythonFunctions], - evalType : scala.Int, - argOffsets : scala.Array[scala.Array[scala.Int]], - jobArtifactUUID: Option[String] = None) // TODO shim this - extends BasePythonRunner[IN, OUT](funcs, evalType, argOffsets, jobArtifactUUID) { - protected abstract class ShimReaderIterator( - stream: DataInputStream, - writerThread: WriterThread, - startTime: Long, - env: SparkEnv, - worker: Socket, - pid: Option[Int], - releasedOrClosed: AtomicBoolean, - context: TaskContext - ) extends ReaderIterator(stream, writerThread, startTime, env, worker, pid, - releasedOrClosed, context) -}