diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index ff686d024f11..e4f7d77ac731 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -33,8 +33,9 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CaseInsensitiveMap, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -178,8 +179,15 @@ private[parquet] class ParquetRowConverter( // Converters for each field. private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = { + // (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is false + // to prevent throwing IllegalArgumentException when searching catalyst type's field index + val catalystFieldNameToIndex = if (SQLConf.get.caseSensitiveAnalysis) { + catalystType.fieldNames.zipWithIndex.toMap + } else { + CaseInsensitiveMap(catalystType.fieldNames.zipWithIndex.toMap) + } parquetType.getFields.asScala.map { parquetField => - val fieldIndex = catalystType.fieldIndex(parquetField.getName) + val fieldIndex = catalystFieldNameToIndex(parquetField.getName) val catalystField = catalystType(fieldIndex) // Converted field value should be set to the `fieldIndex`-th cell of `currentRow` newConverter(parquetField, catalystField.dataType, new RowUpdater(currentRow, fieldIndex)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index c87095812848..cb410b4f0d7d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -842,6 +842,46 @@ class FileBasedDataSourceSuite extends QueryTest } } } + + test("SPARK-31116: Select nested schema with case insensitive mode") { + // This test case failed at only Parquet. ORC is added for test coverage parity. + Seq("orc", "parquet").foreach { format => + Seq("true", "false").foreach { nestedSchemaPruningEnabled => + withSQLConf( + SQLConf.CASE_SENSITIVE.key -> "false", + SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key -> nestedSchemaPruningEnabled) { + withTempPath { dir => + val path = dir.getCanonicalPath + + // Prepare values for testing nested parquet data + spark + .range(1L) + .selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn") + .write + .format(format) + .save(path) + + val exactSchema = "StructColumn struct" + + checkAnswer(spark.read.schema(exactSchema).format(format).load(path), Row(Row(0, 1))) + + // In case insensitive manner, parquet's column cases are ignored + val innerColumnCaseInsensitiveSchema = + "StructColumn struct" + checkAnswer( + spark.read.schema(innerColumnCaseInsensitiveSchema).format(format).load(path), + Row(Row(0, 1))) + + val rootColumnCaseInsensitiveSchema = + "structColumn struct" + checkAnswer( + spark.read.schema(rootColumnCaseInsensitiveSchema).format(format).load(path), + Row(Row(0, 1))) + } + } + } + } + } } object TestingUDT {