Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,15 @@ class ParquetReadSupport(val convertTz: Option[ZoneId],
keyValueMetaData: JMap[String, String],
fileSchema: MessageType,
readContext: ReadContext): RecordMaterializer[InternalRow] = {
val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key,
SQLConf.CASE_SENSITIVE.defaultValue.get)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why you need to pass caseSensitive across ParquetRecordMaterializer, ParquetRowConverter. Can't we just get it at ParquetRowConverter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I get runtime config at ParquetRowConverter? I'm not concretely understand it's behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQLConf.get works, even in executor sid, see dd37529

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll update to using SQLConf instead of passing argument across classes.

val parquetRequestedSchema = readContext.getRequestedSchema
new ParquetRecordMaterializer(
parquetRequestedSchema,
ParquetReadSupport.expandUDT(catalystRequestedSchema),
new ParquetToSparkSchemaConverter(conf),
convertTz)
convertTz,
caseSensitive)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,18 @@ private[parquet] class ParquetRecordMaterializer(
parquetSchema: MessageType,
catalystSchema: StructType,
schemaConverter: ParquetToSparkSchemaConverter,
convertTz: Option[ZoneId])
convertTz: Option[ZoneId],
caseSensitive: Boolean)
extends RecordMaterializer[InternalRow] {

private val rootConverter =
new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, convertTz, NoopUpdater)
new ParquetRowConverter(
schemaConverter,
parquetSchema,
catalystSchema,
convertTz,
caseSensitive,
NoopUpdater)

override def getCurrentRecord: InternalRow = rootConverter.currentRecord

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -126,6 +126,7 @@ private[parquet] class ParquetRowConverter(
parquetType: GroupType,
catalystType: StructType,
convertTz: Option[ZoneId],
caseSensitive: Boolean,
updater: ParentContainerUpdater)
extends ParquetGroupConverter(updater) with Logging {

Expand Down Expand Up @@ -178,8 +179,17 @@ private[parquet] class ParquetRowConverter(

// Converters for each field.
private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
val catalystFieldNameToIndex = if (caseSensitive) {
catalystType.fieldNames.zipWithIndex.toMap
} else {
CaseInsensitiveMap(catalystType.fieldNames.zipWithIndex.toMap)
}
parquetType.getFields.asScala.map { parquetField =>
val fieldIndex = catalystType.fieldIndex(parquetField.getName)
val fieldIndex = catalystFieldNameToIndex.getOrElse(parquetField.getName,
throw new IllegalArgumentException(
s"${parquetField.getName} does not exist. " +
s"Available: ${catalystType.fieldNames.mkString(", ")}")
)
val catalystField = catalystType(fieldIndex)
// Converted field value should be set to the `fieldIndex`-th cell of `currentRow`
newConverter(parquetField, catalystField.dataType, new RowUpdater(currentRow, fieldIndex))
Expand Down Expand Up @@ -348,7 +358,12 @@ private[parquet] class ParquetRowConverter(
}
}
new ParquetRowConverter(
schemaConverter, parquetType.asGroupType(), t, convertTz, wrappedUpdater)
schemaConverter,
parquetType.asGroupType(),
t,
convertTz,
caseSensitive,
wrappedUpdater)

case t =>
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,142 @@ class FileBasedDataSourceSuite extends QueryTest
}
}
}

test("SPARK-31116: Select simple parquet with case insensitive mode") {
Seq("true", "false").foreach { nestedSchemaPruningEnabled =>
withSQLConf(
SQLConf.CASE_SENSITIVE.key -> "false",
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key -> nestedSchemaPruningEnabled) {
withTempPath { dir =>
val path = dir.getCanonicalPath

// Prepare values for testing specific parquet record reader
Seq("A").toDF("camelCase").write.parquet(path)

val exactSchema = new StructType().add("camelCase", StringType)
checkAnswer(spark.read.schema(exactSchema).parquet(path), Row("A"))

// In case insensitive manner, parquet's column cases are ignored
val caseInsensitiveSchema = new StructType().add("camelcase", StringType)
checkAnswer(spark.read.schema(caseInsensitiveSchema).parquet(path), Row("A"))
}
}
}
}

test("SPARK-31116: Select nested parquet with case insensitive mode") {
Seq("true", "false").foreach { nestedSchemaPruningEnabled =>
withSQLConf(
SQLConf.CASE_SENSITIVE.key -> "false",
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key -> nestedSchemaPruningEnabled) {
withTempPath { dir =>
val path = dir.getCanonicalPath

// Prepare values for testing nested parquet data
spark
.range(1L)
.selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn")
.write.parquet(path)

val exactSchema = "StructColumn struct<lowercase: LONG, camelCase: LONG>"

checkAnswer(spark.read.schema(exactSchema).parquet(path), Row(Row(0, 1)))

// In case insensitive manner, parquet's column cases are ignored
val innerColumnCaseInsensitiveSchema =
"StructColumn struct<Lowercase: LONG, camelcase: LONG>"
checkAnswer(
spark.read.schema(innerColumnCaseInsensitiveSchema).parquet(path),
Row(Row(0, 1)))

val rootColumnCaseInsensitiveSchema =
"structColumn struct<lowercase: LONG, camelCase: LONG>"
checkAnswer(
spark.read.schema(rootColumnCaseInsensitiveSchema).parquet(path),
Row(Row(0, 1)))
}
}
}
}

test("SPARK-31116: Select simple parquet with case sensitive mode") {
Seq("true", "false").foreach { nestedSchemaPruningEnabled =>
withSQLConf(
SQLConf.CASE_SENSITIVE.key -> "true",
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key -> nestedSchemaPruningEnabled) {
withTempPath { dir =>
val path = dir.getCanonicalPath

// Prepare values for testing specific parquet record reader
Seq("A").toDF("camelCase").write.parquet(path)

val exactSchema = new StructType().add("camelCase", StringType)
checkAnswer(spark.read.schema(exactSchema).parquet(path), Row("A"))

// In case sensitive manner, different letter case does not read column
val caseInsensitiveSchema = new StructType().add("camelcase", StringType)
checkAnswer(spark.read.schema(caseInsensitiveSchema).parquet(path), Row(null))

// It also properly work in combined schema
val combinedSchema = new StructType()
.add("camelCase", StringType)
.add("camelcase", StringType)
checkAnswer(spark.read.schema(combinedSchema).parquet(path), Row("A", null))
}
}
}
}

test("SPARK-31116: Select nested parquet with case sensitive mode") {
Seq("true", "false").foreach { nestedSchemaPruningEnabled =>
withSQLConf(
SQLConf.CASE_SENSITIVE.key -> "true",
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key -> nestedSchemaPruningEnabled) {
withTempPath { dir =>
val path = dir.getCanonicalPath

// Prepare values for testing nested parquet data
spark
.range(1)
.selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn")
.write.parquet(path)

val exactSchema = "StructColumn struct<lowercase: LONG, camelCase: LONG>"
checkAnswer(spark.read.schema(exactSchema).parquet(path), Row(Row(0, 1)))

val innerColumnCaseInsensitiveSchema =
"StructColumn struct<Lowercase: LONG, camelcase: LONG>"
checkAnswer(
spark.read.schema(innerColumnCaseInsensitiveSchema).parquet(path),
Row(null))

val innerPartialColumnCaseInsensitiveSchema =
"StructColumn struct<lowercase: LONG, camelcase: LONG>"
checkAnswer(
spark.read.schema(innerPartialColumnCaseInsensitiveSchema).parquet(path),
Row(Row(0, null)))

val rootColumnCaseInsensitiveSchema =
"structColumn struct<lowercase: LONG, camelCase: LONG>"
checkAnswer(
spark.read.schema(rootColumnCaseInsensitiveSchema).parquet(path),
Row(null))

val combinedSchema =
"""
|StructColumn
|struct<lowercase: LONG, camelCase: LONG, LowerCase: LONG, camelcase: LONG>,
|structColumn
|struct<lowercase: LONG, camelCase: LONG, LowerCase: LONG, camelcase: LONG>
|""".stripMargin

checkAnswer(
spark.read.schema(combinedSchema).parquet(path),
Row(Row(0, 1, null, null), null))
}
}
}
}
}

object TestingUDT {
Expand Down