Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,18 @@ class ParquetReadSupport(val convertTz: Option[ZoneId],
keyValueMetaData: JMap[String, String],
fileSchema: MessageType,
readContext: ReadContext): RecordMaterializer[InternalRow] = {
val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key,
SQLConf.CASE_SENSITIVE.defaultValue.get)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why you need to pass caseSensitive across ParquetRecordMaterializer, ParquetRowConverter. Can't we just get it at ParquetRowConverter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I get runtime config at ParquetRowConverter? I'm not concretely understand it's behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQLConf.get works, even in executor sid, see dd37529

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll update to using SQLConf instead of passing argument across classes.

val schemaPruningEnabled = conf.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get)
val parquetRequestedSchema = readContext.getRequestedSchema
new ParquetRecordMaterializer(
parquetRequestedSchema,
ParquetReadSupport.expandUDT(catalystRequestedSchema),
new ParquetToSparkSchemaConverter(conf),
convertTz)
convertTz,
caseSensitive,
schemaPruningEnabled)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,20 @@ private[parquet] class ParquetRecordMaterializer(
parquetSchema: MessageType,
catalystSchema: StructType,
schemaConverter: ParquetToSparkSchemaConverter,
convertTz: Option[ZoneId])
convertTz: Option[ZoneId],
caseSensitive: Boolean,
schemaPruning: Boolean)
extends RecordMaterializer[InternalRow] {

private val rootConverter =
new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, convertTz, NoopUpdater)
new ParquetRowConverter(
schemaConverter,
parquetSchema,
catalystSchema,
convertTz,
caseSensitive,
schemaPruning,
NoopUpdater)

override def getCurrentRecord: InternalRow = rootConverter.currentRecord

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -126,6 +126,8 @@ private[parquet] class ParquetRowConverter(
parquetType: GroupType,
catalystType: StructType,
convertTz: Option[ZoneId],
caseSensitive: Boolean,
schemaPruning: Boolean,
updater: ParentContainerUpdater)
extends ParquetGroupConverter(updater) with Logging {

Expand Down Expand Up @@ -176,15 +178,38 @@ private[parquet] class ParquetRowConverter(
*/
def currentRecord: InternalRow = currentRow


// Converters for each field.
private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
parquetType.getFields.asScala.map { parquetField =>
val fieldIndex = catalystType.fieldIndex(parquetField.getName)
val catalystField = catalystType(fieldIndex)
// Converted field value should be set to the `fieldIndex`-th cell of `currentRow`
newConverter(parquetField, catalystField.dataType, new RowUpdater(currentRow, fieldIndex))
}.toArray
}

// (SPARK-31116) There is an issue when schema pruning is enabled, so we keep original codes
if (schemaPruning) {
// (SPARK-31116) For letter case issue, create name to field index based on case sensitivity
val catalystFieldNameToIndex = if (caseSensitive) {
catalystType.fieldNames.zipWithIndex.toMap
} else {
CaseInsensitiveMap(catalystType.fieldNames.zipWithIndex.toMap)
}
parquetType.getFields.asScala.map { parquetField =>
val fieldIndex = catalystFieldNameToIndex.getOrElse(parquetField.getName,
throw new IllegalArgumentException(
s"${parquetField.getName} does not exist. " +
s"Available: ${catalystType.fieldNames.mkString(", ")}")
)
val catalystField = catalystType(fieldIndex)
// Converted field value should be set to the `fieldIndex`-th cell of `currentRow`
newConverter(parquetField, catalystField.dataType, new RowUpdater(currentRow, fieldIndex))
}.toArray
} else {
parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {
case ((parquetFieldType, catalystField), ordinal) =>
// Converted field value should be set to the `ordinal`-th cell of `currentRow`
newConverter(
parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal))
}.toArray
}
}


// Updaters for each field.
private[this] val fieldUpdaters: Array[ParentContainerUpdater] = fieldConverters.map(_.updater)
Expand Down Expand Up @@ -348,7 +373,13 @@ private[parquet] class ParquetRowConverter(
}
}
new ParquetRowConverter(
schemaConverter, parquetType.asGroupType(), t, convertTz, wrappedUpdater)
schemaConverter,
parquetType.asGroupType(),
t,
convertTz,
caseSensitive,
schemaPruning,
wrappedUpdater)

case t =>
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,162 @@ class FileBasedDataSourceSuite extends QueryTest
}
}
}

test("SPARK-31116: Select simple parquet columns correctly in case insensitive manner") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
withTempPath { dir => {
val path = dir.getCanonicalPath

// Prepare values for testing specific parquet record reader
Seq("A").toDF("camelCase").write.parquet(path)

val exactSchema = new StructType().add("camelCase", StringType)
checkAnswer(spark.read.schema(exactSchema).parquet(path), Row("A"))

// In case insensitive manner, parquet's column cases are ignored
val caseInsensitiveSchema = new StructType().add("camelcase", StringType)
checkAnswer(spark.read.schema(caseInsensitiveSchema).parquet(path), Row("A"))
}}
}
}

test("SPARK-31116: Select nested parquet columns correctly in case insensitive manner") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
withTempPath { dir => {
val path = dir.getCanonicalPath

// Prepare values for testing nested parquet data
spark
.range(1L)
.selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn")
.write.parquet(path)

val exactSchema = new StructType()
.add(
"StructColumn",
new StructType()
.add("lowercase", LongType)
.add("camelCase", LongType))
checkAnswer(spark.read.schema(exactSchema).parquet(path), Row(Row(0, 1)))

// In case insensitive manner, parquet's column cases are ignored
val innerColumnCaseInsensitiveSchema = new StructType()
.add(
"StructColumn",
new StructType()
.add("LowerCase", LongType)
.add("camelcase", LongType))
checkAnswer(
spark.read.schema(innerColumnCaseInsensitiveSchema).parquet(path),
Row(Row(0, 1)))

val rootColumnCaseInsensitiveSchema = new StructType()
.add(
"structColumn",
new StructType()
.add("lowercase", LongType)
.add("camelCase", LongType))
checkAnswer(
spark.read.schema(rootColumnCaseInsensitiveSchema).parquet(path),
Row(Row(0, 1)))
}}
}
}

test("SPARK-31116: Select simple parquet columns correctly in case sensitive manner") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
withTempPath { dir => {
val path = dir.getCanonicalPath

// Prepare values for testing specific parquet record reader
Seq("A").toDF("camelCase").write.parquet(path)

val exactSchema = new StructType().add("camelCase", StringType)
checkAnswer(spark.read.schema(exactSchema).parquet(path), Row("A"))

// In case sensitive manner, different letter case does not read column
val caseInsensitiveSchema = new StructType().add("camelcase", StringType)
checkAnswer(spark.read.schema(caseInsensitiveSchema).parquet(path), Row(null))

// It also properly work in combined schema
val combinedSchema = new StructType()
.add("camelCase", StringType)
.add("camelcase", StringType)
checkAnswer(spark.read.schema(combinedSchema).parquet(path), Row("A", null))
}}
}
}

test("SPARK-31116: Select nested parquet columns correctly in case sensitive manner") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
withTempPath { dir => {
val path = dir.getCanonicalPath

// Prepare values for testing nested parquet data
spark
.range(1)
.selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn")
.write.parquet(path)

val exactSchema = new StructType()
.add(
"StructColumn",
new StructType()
.add("lowercase", LongType)
.add("camelCase", LongType))
checkAnswer(spark.read.schema(exactSchema).parquet(path), Row(Row(0, 1)))

val innerColumnCaseInsensitiveSchema = new StructType()
.add(
"StructColumn",
new StructType()
.add("LowerCase", LongType)
.add("camelcase", LongType))
checkAnswer(
spark.read.schema(innerColumnCaseInsensitiveSchema).parquet(path),
Row(null))

val innerPartialColumnCaseInsensitiveSchema = new StructType()
.add(
"StructColumn",
new StructType()
.add("lowercase", LongType)
.add("camelcase", LongType))
checkAnswer(
spark.read.schema(innerPartialColumnCaseInsensitiveSchema).parquet(path),
Row(Row(0, null)))

val rootColumnCaseInsensitiveSchema = new StructType()
.add(
"structColumn",
new StructType()
.add("lowercase", LongType)
.add("camelCase", LongType))
checkAnswer(
spark.read.schema(rootColumnCaseInsensitiveSchema).parquet(path),
Row(null))

val combinedSchema = new StructType()
.add(
"StructColumn",
new StructType()
.add("lowercase", LongType)
.add("camelCase", LongType)
.add("LowerCase", LongType)
.add("camelcase", LongType))
.add(
"structColumn",
new StructType()
.add("lowercase", LongType)
.add("camelCase", LongType)
.add("LowerCase", LongType)
.add("camelcase", LongType))
checkAnswer(
spark.read.schema(combinedSchema).parquet(path),
Row(Row(0, 1, null, null), null))
}}
}
}
}

object TestingUDT {
Expand Down