Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ class ParquetFileFormat
hadoopConf.set(
SQLConf.SESSION_LOCAL_TIMEZONE.key,
sparkSession.sessionState.conf.sessionLocalTimeZone)
hadoopConf.setBoolean(
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
hadoopConf.setBoolean(
SQLConf.CASE_SENSITIVE.key,
sparkSession.sessionState.conf.caseSensitiveAnalysis)
Expand Down Expand Up @@ -424,11 +427,12 @@ class ParquetFileFormat
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns UnsafeRow
val readSupport = new ParquetReadSupport(convertTz, usingVectorizedReader = false)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter)
new ParquetRecordReader[UnsafeRow](readSupport, parquetFilter)
} else {
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
new ParquetRecordReader[UnsafeRow](readSupport)
}
val iter = new RecordReaderIterator(reader)
// SPARK-23457 Register a task completion lister before `initialization`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,34 +49,80 @@ import org.apache.spark.sql.types._
* Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]]
* to [[prepareForRead()]], but use a private `var` for simplicity.
*/
private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone],
usingVectorizedReader: Boolean)
extends ReadSupport[UnsafeRow] with Logging {
private var catalystRequestedSchema: StructType = _

def this() {
// We need a zero-arg constructor for SpecificParquetRecordReaderBase. But that is only
// used in the vectorized reader, where we get the convertTz value directly, and the value here
// is ignored.
this(None)
this(None, usingVectorizedReader = true)
}

/**
* Called on executor side before [[prepareForRead()]] and instantiating actual Parquet record
* readers. Responsible for figuring out Parquet requested schema used for column pruning.
*/
override def init(context: InitContext): ReadContext = {
val conf = context.getConfiguration
catalystRequestedSchema = {
val conf = context.getConfiguration
val schemaString = conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
assert(schemaString != null, "Parquet requested schema not set.")
StructType.fromString(schemaString)
}

val caseSensitive = context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key,
val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key,
SQLConf.CASE_SENSITIVE.defaultValue.get)
val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema(
context.getFileSchema, catalystRequestedSchema, caseSensitive)

val schemaPruningEnabled = conf.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get)
val parquetFileSchema = context.getFileSchema
val parquetClippedSchema = ParquetReadSupport.clipParquetSchema(parquetFileSchema,
catalystRequestedSchema, caseSensitive)

// As a part of schema clipping, we add fields in catalystRequestedSchema which are missing
// from parquetFileSchema to parquetClippedSchema. However, nested schema pruning requires
// we ignore unrequested field data when reading from a Parquet file. Therefore we pass two
// schema to ParquetRecordMaterializer: the schema of the file data we want to read
// (parquetRequestedSchema), and the schema of the rows we want to return
// (catalystRequestedSchema). The reader is responsible for reconciling the differences between
// the two.
//
// Aside from checking whether schema pruning is enabled (schemaPruningEnabled), there
// is an additional complication to constructing parquetRequestedSchema. The manner in which
// Spark's two Parquet readers reconcile the differences between parquetRequestedSchema and
// catalystRequestedSchema differ. Spark's vectorized reader does not (currently) support
// reading Parquet files with complex types in their schema. Further, it assumes that
// parquetRequestedSchema includes all fields requested in catalystRequestedSchema. It includes
// logic in its read path to skip fields in parquetRequestedSchema which are not present in the
// file.
//
// Spark's parquet-mr based reader supports reading Parquet files of any kind of complex
// schema, and it supports nested schema pruning as well. Unlike the vectorized reader, the
// parquet-mr reader requires that parquetRequestedSchema include only those fields present in
// the underlying parquetFileSchema. Therefore, in the case where we use the parquet-mr reader
// we intersect the parquetClippedSchema with the parquetFileSchema to construct the
// parquetRequestedSchema set in the ReadContext.
val parquetRequestedSchema = if (schemaPruningEnabled && !usingVectorizedReader) {
ParquetReadSupport.intersectParquetGroups(parquetClippedSchema, parquetFileSchema)
.map(groupType => new MessageType(groupType.getName, groupType.getFields))
.getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE)
} else {
parquetClippedSchema
}
log.debug {
s"""Going to read the following fields from the Parquet file with the following schema:
|Parquet file schema:
|$parquetFileSchema
|Parquet clipped schema:
|$parquetClippedSchema
|Parquet requested schema:
|$parquetRequestedSchema
|Catalyst requested schema:
|${catalystRequestedSchema.treeString}
""".stripMargin
}
new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
}

Expand All @@ -90,16 +136,15 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
keyValueMetaData: JMap[String, String],
fileSchema: MessageType,
readContext: ReadContext): RecordMaterializer[UnsafeRow] = {
log.debug(s"Preparing for read Parquet file with message type: $fileSchema")
val parquetRequestedSchema = readContext.getRequestedSchema

logInfo {
s"""Going to read the following fields from the Parquet file:
|
|Parquet form:
log.debug {
s"""Going to read the following fields from the Parquet file with the following schema:
|Parquet file schema:
|$fileSchema
|Parquet read schema:
|$parquetRequestedSchema
|Catalyst form:
|$catalystRequestedSchema
|Catalyst read schema:
|${catalystRequestedSchema.treeString}
""".stripMargin
}

Expand Down Expand Up @@ -322,6 +367,32 @@ private[parquet] object ParquetReadSupport {
}
}

/**
* Computes the structural intersection between two Parquet group types.
*/
private def intersectParquetGroups(
groupType1: GroupType, groupType2: GroupType): Option[GroupType] = {
val fields =
groupType1.getFields.asScala
.filter(field => groupType2.containsField(field.getName))
.flatMap {
case field1: GroupType =>
val field2 = groupType2.getType(field1.getName)
if (field2.isPrimitive) {
None
} else {
intersectParquetGroups(field1, field2.asGroupType)
}
case field1 => Some(field1)
}

if (fields.nonEmpty) {
Some(groupType1.withNewFields(fields.asJava))
} else {
None
}
}

def expandUDT(schema: StructType): StructType = {
def expand(dataType: DataType): DataType = {
dataType match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ private[parquet] class ParquetRowConverter(
extends ParquetGroupConverter(updater) with Logging {

assert(
parquetType.getFieldCount == catalystType.length,
s"""Field counts of the Parquet schema and the Catalyst schema don't match:
parquetType.getFieldCount <= catalystType.length,
s"""Field count of the Parquet schema is greater than the field count of the Catalyst schema:
|
|Parquet schema:
|$parquetType
Expand Down Expand Up @@ -182,18 +182,19 @@ private[parquet] class ParquetRowConverter(

// Converters for each field.
private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {
case ((parquetFieldType, catalystField), ordinal) =>
// Converted field value should be set to the `ordinal`-th cell of `currentRow`
newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal))
parquetType.getFields.asScala.map { parquetField =>
val fieldIndex = catalystType.fieldIndex(parquetField.getName)
val catalystField = catalystType(fieldIndex)
// Converted field value should be set to the `fieldIndex`-th cell of `currentRow`
newConverter(parquetField, catalystField.dataType, new RowUpdater(currentRow, fieldIndex))
}.toArray
}

override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex)

override def end(): Unit = {
var i = 0
while (i < currentRow.numFields) {
while (i < fieldConverters.length) {
fieldConverters(i).updater.end()
i += 1
}
Expand All @@ -203,10 +204,14 @@ private[parquet] class ParquetRowConverter(
override def start(): Unit = {
var i = 0
while (i < currentRow.numFields) {
fieldConverters(i).updater.start()
currentRow.setNullAt(i)
i += 1
}
i = 0
while (i < fieldConverters.length) {
fieldConverters(i).updater.start()
i += 1
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ abstract class SchemaPruningSuite
Row("X.", 1) :: Row("Y.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil)
}

ignore("partial schema intersection - select missing subfield") {
testSchemaPruning("partial schema intersection - select missing subfield") {
Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Apr 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this test case is tested in both Parquet and ORC recently. In addition, before this PR, it fails in Parquet path only.

val query = sql("select name.middle, address from contacts where p=2")
checkScan(query, "struct<name:struct<middle:string>,address:string>")
checkAnswer(query.orderBy("id"),
Expand Down