From a5eb54f77f0589f1562a80d87cf7abf640920728 Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Wed, 15 Aug 2018 16:48:25 -0700 Subject: [PATCH 1/9] Ensure we pass a compatible pruned schema to ParquetRowConverter --- .../parquet/ParquetFileFormat.scala | 8 +- .../parquet/ParquetReadSupport.scala | 96 ++++++++++++++++--- .../parquet/ParquetRowConverter.scala | 22 +++-- .../datasources/SchemaPruningSuite.scala | 2 +- 4 files changed, 104 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index efa4f3f166d9..dc885c07fa7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -310,6 +310,9 @@ class ParquetFileFormat hadoopConf.set( SQLConf.SESSION_LOCAL_TIMEZONE.key, sparkSession.sessionState.conf.sessionLocalTimeZone) + hadoopConf.setBoolean( + SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, + sparkSession.sessionState.conf.nestedSchemaPruningEnabled) hadoopConf.setBoolean( SQLConf.CASE_SENSITIVE.key, sparkSession.sessionState.conf.caseSensitiveAnalysis) @@ -424,11 +427,12 @@ class ParquetFileFormat } else { logDebug(s"Falling back to parquet-mr") // ParquetRecordReader returns UnsafeRow + val readSupport = new ParquetReadSupport(convertTz, usingVectorizedReader = false) val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) - new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter) + new ParquetRecordReader[UnsafeRow](readSupport, parquetFilter) } else { - new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz)) + new ParquetRecordReader[UnsafeRow](readSupport) } val iter = new RecordReaderIterator(reader) // SPARK-23457 Register a task completion lister before `initialization`. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index 3319e73f2b31..4694689fff45 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -49,7 +49,8 @@ import org.apache.spark.sql.types._ * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]] * to [[prepareForRead()]], but use a private `var` for simplicity. */ -private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) +private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], + usingVectorizedReader: Boolean) extends ReadSupport[UnsafeRow] with Logging { private var catalystRequestedSchema: StructType = _ @@ -57,7 +58,7 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) // We need a zero-arg constructor for SpecificParquetRecordReaderBase. But that is only // used in the vectorized reader, where we get the convertTz value directly, and the value here // is ignored. - this(None) + this(None, usingVectorizedReader = true) } /** @@ -65,18 +66,65 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) * readers. Responsible for figuring out Parquet requested schema used for column pruning. */ override def init(context: InitContext): ReadContext = { + val conf = context.getConfiguration catalystRequestedSchema = { - val conf = context.getConfiguration val schemaString = conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA) assert(schemaString != null, "Parquet requested schema not set.") StructType.fromString(schemaString) } - val caseSensitive = context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key, + val schemaPruningEnabled = conf.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, + SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get) + val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key, SQLConf.CASE_SENSITIVE.defaultValue.get) - val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema( - context.getFileSchema, catalystRequestedSchema, caseSensitive) - + val parquetFileSchema = context.getFileSchema + val parquetClippedSchema = ParquetReadSupport.clipParquetSchema(parquetFileSchema, + catalystRequestedSchema, caseSensitive) + + // As part of schema clipping, we add fields in catalystRequestedSchema which are missing + // from parquetFileSchema to parquetClippedSchema. However, nested schema pruning requires + // we ignore unrequested field data when reading from a Parquet file. Therefore we pass two + // schema to ParquetRecordMaterializer: the schema of the file data we want to read + // (parquetRequestedSchema), and the schema of the rows we want to return + // (catalystRequestedSchema). The reader is responsible for reconciling the differences between + // the two. + // + // Aside from checking whether schema pruning is enabled (schemaPruningEnabled), there + // is an additional complication to constructing parquetRequestedSchema. The manner in which + // Spark's two Parquet readers reconcile the differences between parquetRequestedSchema and + // catalystRequestedSchema differ. Spark's vectorized reader does not (currently) support + // reading Parquet files with complex types in their schema. Further, it assumes that + // parquetRequestedSchema includes all fields requested in catalystRequestedSchema. It includes + // logic in its read path to skip fields in parquetRequestedSchema which are not present in the + // file. + // + // Spark's parquet-mr based reader supports reading Parquet files of any kind of complex + // schema, and it supports nested schema pruning as well. Unlike the vectorized reader, the + // parquet-mr reader requires that parquetRequestedSchema include only those fields present in + // the underlying parquetFileSchema. Therefore, in the case where we use the parquet-mr reader + // we intersect the parquetClippedSchema with the parquetFileSchema to construct the + // parquetRequestedSchema set in the ReadContext. + val parquetRequestedSchema = + if (schemaPruningEnabled && !usingVectorizedReader) { + ParquetReadSupport.intersectParquetGroups(parquetClippedSchema, parquetFileSchema) + .map(intersectionGroup => + new MessageType(intersectionGroup.getName, intersectionGroup.getFields)) + .getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE) + } else { + parquetClippedSchema + } + log.info { + s"""Going to read the following fields from the Parquet file with the following schema: + |Parquet file schema: + |$parquetFileSchema + |Parquet clipped schema: + |$parquetClippedSchema + |Parquet requested schema: + |$parquetRequestedSchema + |Catalyst requested schema: + |${catalystRequestedSchema.treeString} + """.stripMargin + } new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava) } @@ -93,13 +141,14 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) log.debug(s"Preparing for read Parquet file with message type: $fileSchema") val parquetRequestedSchema = readContext.getRequestedSchema - logInfo { - s"""Going to read the following fields from the Parquet file: - | - |Parquet form: + log.info { + s"""Going to read the following fields from the Parquet file with the following schema: + |Parquet file schema: + |$fileSchema + |Parquet read schema: |$parquetRequestedSchema - |Catalyst form: - |$catalystRequestedSchema + |Catalyst read schema: + |${catalystRequestedSchema.treeString} """.stripMargin } @@ -322,6 +371,27 @@ private[parquet] object ParquetReadSupport { } } + /** + * Computes the structural intersection between two Parquet group types. + */ + private def intersectParquetGroups( + groupType1: GroupType, groupType2: GroupType): Option[GroupType] = { + val fields = + groupType1.getFields.asScala + .filter(field => groupType2.containsField(field.getName)) + .flatMap { + case field1: GroupType => + intersectParquetGroups(field1, groupType2.getType(field1.getName).asGroupType) + case field1 => Some(field1) + } + + if (fields.nonEmpty) { + Some(groupType1.withNewFields(fields.asJava)) + } else { + None + } + } + def expandUDT(schema: StructType): StructType = { def expand(dataType: DataType): DataType = { dataType match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 004a96d13413..973ef8605aad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -130,8 +130,8 @@ private[parquet] class ParquetRowConverter( extends ParquetGroupConverter(updater) with Logging { assert( - parquetType.getFieldCount == catalystType.length, - s"""Field counts of the Parquet schema and the Catalyst schema don't match: + parquetType.getFieldCount <= catalystType.length, + s"""Field count of the Parquet schema is greater than the field count of the Catalyst schema: | |Parquet schema: |$parquetType @@ -182,10 +182,12 @@ private[parquet] class ParquetRowConverter( // Converters for each field. private val fieldConverters: Array[Converter with HasParentContainerUpdater] = { - parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map { - case ((parquetFieldType, catalystField), ordinal) => - // Converted field value should be set to the `ordinal`-th cell of `currentRow` - newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal)) + parquetType.getFields.asScala.map { + case parquetField => + val fieldIndex = catalystType.fieldIndex(parquetField.getName) + val catalystField = catalystType(fieldIndex) + // Converted field value should be set to the `fieldIndex`-th cell of `currentRow` + newConverter(parquetField, catalystField.dataType, new RowUpdater(currentRow, fieldIndex)) }.toArray } @@ -193,7 +195,7 @@ private[parquet] class ParquetRowConverter( override def end(): Unit = { var i = 0 - while (i < currentRow.numFields) { + while (i < fieldConverters.length) { fieldConverters(i).updater.end() i += 1 } @@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 - while (i < currentRow.numFields) { + while (i < fieldConverters.length) { fieldConverters(i).updater.start() currentRow.setNullAt(i) i += 1 } + while (i < currentRow.numFields) { + currentRow.setNullAt(i) + i += 1 + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala index b0314e621e6e..22317fe8d13a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala @@ -135,7 +135,7 @@ abstract class SchemaPruningSuite Row("X.", 1) :: Row("Y.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil) } - ignore("partial schema intersection - select missing subfield") { + testSchemaPruning("partial schema intersection - select missing subfield") { val query = sql("select name.middle, address from contacts where p=2") checkScan(query, "struct,address:string>") checkAnswer(query.orderBy("id"), From c7cbb29ff95f66547d1236493e038a4ce0bf4837 Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Tue, 30 Oct 2018 12:28:33 -0700 Subject: [PATCH 2/9] Replace an unnecessarily partial function with a "total" function --- .../sql/execution/datasources/parquet/ParquetRowConverter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 973ef8605aad..3fc2eef12c50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -183,7 +183,7 @@ private[parquet] class ParquetRowConverter( // Converters for each field. private val fieldConverters: Array[Converter with HasParentContainerUpdater] = { parquetType.getFields.asScala.map { - case parquetField => + parquetField => val fieldIndex = catalystType.fieldIndex(parquetField.getName) val catalystField = catalystType(fieldIndex) // Converted field value should be set to the `fieldIndex`-th cell of `currentRow` From 8b67c796a774a9b1bd61f00d0c9183d02f7848b6 Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Tue, 6 Nov 2018 11:09:01 -0800 Subject: [PATCH 3/9] Extract all calls to `currentRow.setNullAt(i)` in ParquetRowConverter.start() into their own loop for clarity --- .../sql/execution/datasources/parquet/ParquetRowConverter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 3fc2eef12c50..efe122ed039e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -206,9 +206,9 @@ private[parquet] class ParquetRowConverter( var i = 0 while (i < fieldConverters.length) { fieldConverters(i).updater.start() - currentRow.setNullAt(i) i += 1 } + i = 0 while (i < currentRow.numFields) { currentRow.setNullAt(i) i += 1 From c305da4ebc3cea95a1e996f6e70440378d7fd269 Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Tue, 13 Nov 2018 11:04:33 -0800 Subject: [PATCH 4/9] Change some log levels and make a stylistic change --- .../datasources/parquet/ParquetReadSupport.scala | 6 ++---- .../datasources/parquet/ParquetRowConverter.scala | 11 +++++------ 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index 4694689fff45..a5ff94298180 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -113,7 +113,7 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], } else { parquetClippedSchema } - log.info { + log.debug { s"""Going to read the following fields from the Parquet file with the following schema: |Parquet file schema: |$parquetFileSchema @@ -138,10 +138,8 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], keyValueMetaData: JMap[String, String], fileSchema: MessageType, readContext: ReadContext): RecordMaterializer[UnsafeRow] = { - log.debug(s"Preparing for read Parquet file with message type: $fileSchema") val parquetRequestedSchema = readContext.getRequestedSchema - - log.info { + log.debug { s"""Going to read the following fields from the Parquet file with the following schema: |Parquet file schema: |$fileSchema diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index efe122ed039e..4d3d30e73669 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -182,12 +182,11 @@ private[parquet] class ParquetRowConverter( // Converters for each field. private val fieldConverters: Array[Converter with HasParentContainerUpdater] = { - parquetType.getFields.asScala.map { - parquetField => - val fieldIndex = catalystType.fieldIndex(parquetField.getName) - val catalystField = catalystType(fieldIndex) - // Converted field value should be set to the `fieldIndex`-th cell of `currentRow` - newConverter(parquetField, catalystField.dataType, new RowUpdater(currentRow, fieldIndex)) + parquetType.getFields.asScala.map { parquetField => + val fieldIndex = catalystType.fieldIndex(parquetField.getName) + val catalystField = catalystType(fieldIndex) + // Converted field value should be set to the `fieldIndex`-th cell of `currentRow` + newConverter(parquetField, catalystField.dataType, new RowUpdater(currentRow, fieldIndex)) }.toArray } From 29c0495dd987b3e3123abb7a7110c6a5d4a27ddf Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 5 Apr 2019 15:57:42 -0700 Subject: [PATCH 5/9] Update --- .../parquet/ParquetReadSupport.scala | 22 +++++++++---------- .../parquet/ParquetRowConverter.scala | 8 +++---- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index a5ff94298180..9193dc30567c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -73,15 +73,15 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], StructType.fromString(schemaString) } - val schemaPruningEnabled = conf.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, - SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get) val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key, SQLConf.CASE_SENSITIVE.defaultValue.get) + val schemaPruningEnabled = conf.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, + SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get) val parquetFileSchema = context.getFileSchema val parquetClippedSchema = ParquetReadSupport.clipParquetSchema(parquetFileSchema, catalystRequestedSchema, caseSensitive) - // As part of schema clipping, we add fields in catalystRequestedSchema which are missing + // As a part of schema clipping, we add fields in catalystRequestedSchema which are missing // from parquetFileSchema to parquetClippedSchema. However, nested schema pruning requires // we ignore unrequested field data when reading from a Parquet file. Therefore we pass two // schema to ParquetRecordMaterializer: the schema of the file data we want to read @@ -104,15 +104,13 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], // the underlying parquetFileSchema. Therefore, in the case where we use the parquet-mr reader // we intersect the parquetClippedSchema with the parquetFileSchema to construct the // parquetRequestedSchema set in the ReadContext. - val parquetRequestedSchema = - if (schemaPruningEnabled && !usingVectorizedReader) { - ParquetReadSupport.intersectParquetGroups(parquetClippedSchema, parquetFileSchema) - .map(intersectionGroup => - new MessageType(intersectionGroup.getName, intersectionGroup.getFields)) - .getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE) - } else { - parquetClippedSchema - } + val parquetRequestedSchema = if (schemaPruningEnabled && !usingVectorizedReader) { + ParquetReadSupport.intersectParquetGroups(parquetClippedSchema, parquetFileSchema) + .map(groupType => new MessageType(groupType.getName, groupType.getFields)) + .getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE) + } else { + parquetClippedSchema + } log.debug { s"""Going to read the following fields from the Parquet file with the following schema: |Parquet file schema: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 4d3d30e73669..b772b6b77d1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -203,13 +203,13 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 - while (i < fieldConverters.length) { - fieldConverters(i).updater.start() + while (i < currentRow.numFields) { + currentRow.setNullAt(i) i += 1 } i = 0 - while (i < currentRow.numFields) { - currentRow.setNullAt(i) + while (i < fieldConverters.length) { + fieldConverters(i).updater.start() i += 1 } } From 8ad3032ca97be914f63dfe615254d1c8cd88e4f4 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 5 Apr 2019 16:26:05 -0700 Subject: [PATCH 6/9] Prevent ClassCastException. --- .../execution/datasources/parquet/ParquetReadSupport.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index 9193dc30567c..f4de29654c83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -377,7 +377,12 @@ private[parquet] object ParquetReadSupport { .filter(field => groupType2.containsField(field.getName)) .flatMap { case field1: GroupType => - intersectParquetGroups(field1, groupType2.getType(field1.getName).asGroupType) + val field2 = groupType2.getType(field1.getName) + if (field2.isPrimitive) { + None + } else { + intersectParquetGroups(field1, field2.asGroupType) + } case field1 => Some(field1) } From 1da3ba360d4376ebabd32f61b3effa90aec37bb4 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 7 Apr 2019 18:10:09 -0700 Subject: [PATCH 7/9] Remove log.debug. --- .../datasources/parquet/ParquetReadSupport.scala | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index f4de29654c83..bc10916122b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -137,17 +137,6 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], fileSchema: MessageType, readContext: ReadContext): RecordMaterializer[UnsafeRow] = { val parquetRequestedSchema = readContext.getRequestedSchema - log.debug { - s"""Going to read the following fields from the Parquet file with the following schema: - |Parquet file schema: - |$fileSchema - |Parquet read schema: - |$parquetRequestedSchema - |Catalyst read schema: - |${catalystRequestedSchema.treeString} - """.stripMargin - } - new ParquetRecordMaterializer( parquetRequestedSchema, ParquetReadSupport.expandUDT(catalystRequestedSchema), From 9e88cf3a1a131293510e1c705f0f5db5413e742a Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 7 Apr 2019 18:52:36 -0700 Subject: [PATCH 8/9] Address comments --- .../parquet/ParquetFileFormat.scala | 2 +- .../parquet/ParquetReadSupport.scala | 39 ++++++------------- 2 files changed, 13 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index dc885c07fa7e..1e38739e0101 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -427,7 +427,7 @@ class ParquetFileFormat } else { logDebug(s"Falling back to parquet-mr") // ParquetRecordReader returns UnsafeRow - val readSupport = new ParquetReadSupport(convertTz, usingVectorizedReader = false) + val readSupport = new ParquetReadSupport(convertTz, isMR = true) val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) new ParquetRecordReader[UnsafeRow](readSupport, parquetFilter) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index bc10916122b3..7546a7c86969 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -49,8 +49,7 @@ import org.apache.spark.sql.types._ * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]] * to [[prepareForRead()]], but use a private `var` for simplicity. */ -private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], - usingVectorizedReader: Boolean) +private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], isMR: Boolean) extends ReadSupport[UnsafeRow] with Logging { private var catalystRequestedSchema: StructType = _ @@ -58,7 +57,7 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], // We need a zero-arg constructor for SpecificParquetRecordReaderBase. But that is only // used in the vectorized reader, where we get the convertTz value directly, and the value here // is ignored. - this(None, usingVectorizedReader = true) + this(None, isMR = false) } /** @@ -81,34 +80,20 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], val parquetClippedSchema = ParquetReadSupport.clipParquetSchema(parquetFileSchema, catalystRequestedSchema, caseSensitive) - // As a part of schema clipping, we add fields in catalystRequestedSchema which are missing - // from parquetFileSchema to parquetClippedSchema. However, nested schema pruning requires - // we ignore unrequested field data when reading from a Parquet file. Therefore we pass two - // schema to ParquetRecordMaterializer: the schema of the file data we want to read - // (parquetRequestedSchema), and the schema of the rows we want to return - // (catalystRequestedSchema). The reader is responsible for reconciling the differences between - // the two. - // - // Aside from checking whether schema pruning is enabled (schemaPruningEnabled), there - // is an additional complication to constructing parquetRequestedSchema. The manner in which - // Spark's two Parquet readers reconcile the differences between parquetRequestedSchema and - // catalystRequestedSchema differ. Spark's vectorized reader does not (currently) support - // reading Parquet files with complex types in their schema. Further, it assumes that - // parquetRequestedSchema includes all fields requested in catalystRequestedSchema. It includes - // logic in its read path to skip fields in parquetRequestedSchema which are not present in the - // file. - // - // Spark's parquet-mr based reader supports reading Parquet files of any kind of complex - // schema, and it supports nested schema pruning as well. Unlike the vectorized reader, the - // parquet-mr reader requires that parquetRequestedSchema include only those fields present in - // the underlying parquetFileSchema. Therefore, in the case where we use the parquet-mr reader - // we intersect the parquetClippedSchema with the parquetFileSchema to construct the - // parquetRequestedSchema set in the ReadContext. - val parquetRequestedSchema = if (schemaPruningEnabled && !usingVectorizedReader) { + // We pass two schema to ParquetRecordMaterializer: + // - parquetRequestedSchema: the schema of the file data we want to read + // - catalystRequestedSchema: the schema of the rows we want to return + // The reader is responsible for reconciling the differences between the two. + val parquetRequestedSchema = if (isMR && schemaPruningEnabled) { + // Parquet-MR reader requires that parquetRequestedSchema + // include only those fields present in the underlying parquetFileSchema. Therefore, + // we intersect the parquetClippedSchema with the parquetFileSchema ParquetReadSupport.intersectParquetGroups(parquetClippedSchema, parquetFileSchema) .map(groupType => new MessageType(groupType.getName, groupType.getFields)) .getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE) } else { + // Spark's vectorized reader only support atomic types currently. It also skip fields + // in parquetRequestedSchema which are not present in the file. parquetClippedSchema } log.debug { From 95e1c630ac32922704900207909e1adef1190b73 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 7 Apr 2019 20:28:39 -0700 Subject: [PATCH 9/9] Address comments --- .../parquet/ParquetFileFormat.scala | 2 +- .../parquet/ParquetReadSupport.scala | 23 +++++++++++-------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 1e38739e0101..e37f2283e00c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -427,7 +427,7 @@ class ParquetFileFormat } else { logDebug(s"Falling back to parquet-mr") // ParquetRecordReader returns UnsafeRow - val readSupport = new ParquetReadSupport(convertTz, isMR = true) + val readSupport = new ParquetReadSupport(convertTz, enableVectorizedReader = false) val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) new ParquetRecordReader[UnsafeRow](readSupport, parquetFilter) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index 7546a7c86969..df7766520290 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -49,15 +49,16 @@ import org.apache.spark.sql.types._ * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]] * to [[prepareForRead()]], but use a private `var` for simplicity. */ -private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], isMR: Boolean) - extends ReadSupport[UnsafeRow] with Logging { +private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], + enableVectorizedReader: Boolean) + extends ReadSupport[UnsafeRow] with Logging { private var catalystRequestedSchema: StructType = _ def this() { // We need a zero-arg constructor for SpecificParquetRecordReaderBase. But that is only // used in the vectorized reader, where we get the convertTz value directly, and the value here // is ignored. - this(None, isMR = false) + this(None, enableVectorizedReader = true) } /** @@ -84,10 +85,10 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], isMR: // - parquetRequestedSchema: the schema of the file data we want to read // - catalystRequestedSchema: the schema of the rows we want to return // The reader is responsible for reconciling the differences between the two. - val parquetRequestedSchema = if (isMR && schemaPruningEnabled) { - // Parquet-MR reader requires that parquetRequestedSchema - // include only those fields present in the underlying parquetFileSchema. Therefore, - // we intersect the parquetClippedSchema with the parquetFileSchema + val parquetRequestedSchema = if (schemaPruningEnabled && !enableVectorizedReader) { + // Parquet-MR reader requires that parquetRequestedSchema include only those fields present + // in the underlying parquetFileSchema. Therefore, we intersect the parquetClippedSchema + // with the parquetFileSchema ParquetReadSupport.intersectParquetGroups(parquetClippedSchema, parquetFileSchema) .map(groupType => new MessageType(groupType.getName, groupType.getFields)) .getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE) @@ -96,7 +97,7 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], isMR: // in parquetRequestedSchema which are not present in the file. parquetClippedSchema } - log.debug { + logDebug( s"""Going to read the following fields from the Parquet file with the following schema: |Parquet file schema: |$parquetFileSchema @@ -106,8 +107,7 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], isMR: |$parquetRequestedSchema |Catalyst requested schema: |${catalystRequestedSchema.treeString} - """.stripMargin - } + """.stripMargin) new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava) } @@ -343,6 +343,9 @@ private[parquet] object ParquetReadSupport { /** * Computes the structural intersection between two Parquet group types. + * This is used to create a requestedSchema for ReadContext of Parquet-MR reader. + * Parquet-MR reader does not support the nested field access to non-existent field + * while parquet library does support to read the non-existent field by regular field access. */ private def intersectParquetGroups( groupType1: GroupType, groupType2: GroupType): Option[GroupType] = {