diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala index 2d1dfbf185830..832463e2512b5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala @@ -64,7 +64,7 @@ private[hive] case class HiveScriptTransformationExec( outputSoi: StructObjectInspector, hadoopConf: Configuration): Iterator[InternalRow] = { new Iterator[InternalRow] with HiveInspectors { - var curLine: String = null + private var completed = false val scriptOutputStream = new DataInputStream(inputStream) val scriptOutputReader = @@ -78,6 +78,9 @@ private[hive] case class HiveScriptTransformationExec( lazy val unwrappers = outputSoi.getAllStructFieldRefs.asScala.map(unwrapperFor) override def hasNext: Boolean = { + if (completed) { + return false + } try { if (scriptOutputWritable == null) { scriptOutputWritable = reusedWritableObject @@ -85,6 +88,7 @@ private[hive] case class HiveScriptTransformationExec( if (scriptOutputReader != null) { if (scriptOutputReader.next(scriptOutputWritable) <= 0) { checkFailureAndPropagate(writerThread, null, proc, stderrBuffer) + completed = true return false } } else { @@ -97,6 +101,7 @@ private[hive] case class HiveScriptTransformationExec( // there can be a lag between EOF being written out and the process // being terminated. So explicitly waiting for the process to be done. checkFailureAndPropagate(writerThread, null, proc, stderrBuffer) + completed = true return false } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala index 1018ae5b68895..9982dec7c1c60 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.scalatest.exceptions.TestFailedException import org.apache.spark.{SparkException, TestUtils} +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} import org.apache.spark.sql.execution._ import org.apache.spark.sql.functions._ @@ -438,4 +439,21 @@ class HiveScriptTransformationSuite extends BaseScriptTransformationSuite with T assert(e2.contains("array cannot be converted to Hive TypeInfo")) } } + + test("SPARK-38075: ORDER BY with LIMIT should not add fake rows") { + withTempView("v") { + val df = Seq((1), (2), (3)).toDF("a") + df.createTempView("v") + checkAnswer(sql( + """ + |SELECT TRANSFORM(a) + | USING 'cat' AS (a) + |FROM v + |ORDER BY a + |LIMIT 10 + |""".stripMargin), + identity, + Row("1") :: Row("2") :: Row("3") :: Nil) + } + } }