diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala index 40f7b4e8db7c5..3dcfd427c38e7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala @@ -132,6 +132,7 @@ case class ScriptTransformationExec( lazy val unwrappers = outputSoi.getAllStructFieldRefs.asScala.map(unwrapperFor) private def checkFailureAndPropagate(cause: Throwable = null): Unit = { + proc.waitFor() if (writerThread.exception.isDefined) { throw writerThread.exception.get } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index 7153d3f03cd57..f601c259e2bb5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -172,6 +172,28 @@ class ScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with Tes assert(uncaughtExceptionHandler.exception.isEmpty) } + + test("SPARK-30973 ScriptTransformationExec should wait for the termination") { + (0 until 10).foreach { index => + assume(TestUtils.testCommandAvailable("/bin/bash")) + + val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") + + val e = intercept[SparkException] { + val plan = + new ScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "some_non_existent_command", + output = Seq(AttributeReference("a", StringType)()), + child = rowsDf.queryExecution.sparkPlan, + ioschema = noSerdeIOSchema) + SparkPlanTest.executePlan(plan, hiveContext) + } + assert(e.getMessage.contains("Subprocess exited with status")) + assert(uncaughtExceptionHandler.exception.isEmpty) + } + } + test("SPARK-24339 verify the result after pruning the unused columns") { val rowsDf = Seq( ("Bob", 16, 176),