From e0b77efc3e11c2d4230888a511f922ec9ab836fc Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 23 Aug 2023 20:41:28 -0700 Subject: [PATCH] [SPARK-44097][SPARK-44229][SQL][TESTS] Reenable PandasUDF and o.a.s.sql.execution.arrow tests in Java 21 --- .../spark/sql/IntegratedUDFTestUtils.scala | 4 +- .../arrow/ArrowConvertersSuite.scala | 59 ------------------- 2 files changed, 1 insertion(+), 62 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala index 209df0aa287f0..508818c2e501d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.execution.python.{UserDefinedPythonFunction, UserDefinedPythonTableFunction} import org.apache.spark.sql.expressions.SparkUserDefinedFunction import org.apache.spark.sql.types.{DataType, IntegerType, NullType, StringType, StructType} -import org.apache.spark.util.Utils /** * This object targets to integrate various UDF test cases so that Scalar UDF, Python UDF, @@ -306,9 +305,8 @@ object IntegratedUDFTestUtils extends SQLHelper { lazy val shouldTestPythonUDFs: Boolean = isPythonAvailable && isPySparkAvailable - // TODO(SPARK-44097) Renable PandasUDF Tests in Java 21 lazy val shouldTestPandasUDFs: Boolean = - isPythonAvailable && isPandasAvailable && isPyArrowAvailable && !Utils.isJavaVersionAtLeast21 + isPythonAvailable && isPandasAvailable && isPyArrowAvailable /** * A base trait for various UDFs defined in this object. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala index 06fb93054dcd4..6f2ebceddfa81 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala @@ -27,7 +27,6 @@ import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.{VectorLoader, VectorSchemaRoot} import org.apache.arrow.vector.ipc.JsonFileReader import org.apache.arrow.vector.util.{ByteArrayReadableSeekableByteChannel, Validator} -import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.spark.{SparkException, SparkUnsupportedOperationException, TaskContext} import org.apache.spark.sql.{DataFrame, Row} @@ -53,8 +52,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("collect to arrow record batch") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val indexData = (1 to 6).toDF("i") val arrowBatches = indexData.toArrowBatchRdd.collect() assert(arrowBatches.nonEmpty) @@ -69,8 +66,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("short conversion") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val json = s""" |{ @@ -120,8 +115,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("int conversion") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val json = s""" |{ @@ -171,8 +164,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("long conversion") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val json = s""" |{ @@ -222,8 +213,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("float conversion") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val json = s""" |{ @@ -271,8 +260,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("double conversion") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val json = s""" |{ @@ -320,8 +307,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("decimal conversion") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val json = s""" |{ @@ -386,8 +371,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("index conversion") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val data = List[Int](1, 2, 3, 4, 5, 6) val json = s""" @@ -421,8 +404,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("mixed numeric type conversion") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val json = s""" |{ @@ -514,8 +495,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("string type conversion") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val json = s""" |{ @@ -578,8 +557,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("boolean type conversion") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val json = s""" |{ @@ -609,8 +586,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("byte type conversion") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val json = s""" |{ @@ -643,8 +618,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("binary type conversion") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val json = s""" |{ @@ -679,8 +652,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("date type conversion") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val json = s""" |{ @@ -718,8 +689,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("timestamp type conversion") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") { val json = s""" @@ -762,8 +731,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("floating-point NaN") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val json = s""" |{ @@ -811,8 +778,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("array type conversion") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val json = s""" |{ @@ -960,8 +925,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("struct type conversion") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val json = s""" |{ @@ -1111,8 +1074,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("null type conversion") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val json = s""" |{ @@ -1168,8 +1129,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("partitioned DataFrame") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val json1 = s""" |{ @@ -1266,8 +1225,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("empty frame collect") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val arrowBatches = spark.emptyDataFrame.toArrowBatchRdd.collect() assert(arrowBatches.isEmpty) @@ -1277,8 +1234,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("empty partition collect") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val emptyPart = spark.sparkContext.parallelize(Seq(1), 2).toDF("i") val arrowBatches = emptyPart.toArrowBatchRdd.collect() assert(arrowBatches.length === 1) @@ -1290,8 +1245,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("max records in batch conf") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val totalRecords = 10 val maxRecordsPerBatch = 3 spark.conf.set(SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key, maxRecordsPerBatch) @@ -1324,8 +1277,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("test Arrow Validator") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val json = s""" |{ @@ -1423,8 +1374,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("roundtrip arrow batches") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val inputRows = (0 until 9).map { i => InternalRow(i) } :+ InternalRow(null) @@ -1449,8 +1398,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("ArrowBatchStreamWriter roundtrip") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val inputRows = (0 until 9).map(InternalRow(_)) :+ InternalRow(null) val schema = StructType(Seq(StructField("int", IntegerType, nullable = true))) @@ -1484,8 +1431,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("roundtrip arrow batches with complex schema") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val rows = (0 until 9).map { i => InternalRow(i, UTF8String.fromString(s"str-$i"), InternalRow(i)) } @@ -1517,8 +1462,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("roundtrip empty arrow batches") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val schema = StructType(Seq(StructField("int", IntegerType, nullable = true))) val ctx = TaskContext.empty() val batchIter = @@ -1530,8 +1473,6 @@ class ArrowConvertersSuite extends SharedSparkSession { } test("two batches with different schema") { - // TODO(SPARK-44229) Renable 'o.a.s.sql.execution.arrow' tests in Java 21 - assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) val schema1 = StructType(Seq(StructField("field1", IntegerType, nullable = true))) val inputRows1 = Array(InternalRow(1)).map { row => val proj = UnsafeProjection.create(schema1)