Skip to content

Commit fb23f28

Browse files
author
Damian Guy
committed
[SparkSPARK-9340] - make SparkSQL work with nested types in parquet-protobuf
1 parent ebfd91c commit fb23f28

17 files changed

+874
-16
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,12 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
3939
readContext: ReadContext): RecordMaterializer[InternalRow] = {
4040
log.debug(s"Preparing for read Parquet file with message type: $fileSchema")
4141

42-
val toCatalyst = new CatalystSchemaConverter(conf)
42+
val toCatalyst = if (keyValueMetaData.containsKey("parquet.proto.class")) {
43+
new ProtobufCatalystSchemaConverter(conf)
44+
} else {
45+
new CatalystSchemaConverter(conf)
46+
}
47+
4348
val parquetRequestedSchema = readContext.getRequestedSchema
4449

4550
val catalystRequestedSchema =

sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRowConverter.scala

Lines changed: 110 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ package org.apache.spark.sql.parquet
2020
import java.math.{BigDecimal, BigInteger}
2121
import java.nio.ByteOrder
2222

23+
2324
import scala.collection.JavaConversions._
24-
import scala.collection.mutable
2525
import scala.collection.mutable.ArrayBuffer
2626

2727
import org.apache.parquet.column.Dictionary
28-
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
28+
import org.apache.parquet.io.api._
2929
import org.apache.parquet.schema.Type.Repetition
3030
import org.apache.parquet.schema.{GroupType, PrimitiveType, Type}
3131

@@ -103,16 +103,22 @@ private[parquet] class CatalystRowConverter(
103103
}.toArray
104104
}
105105

106+
private val needsArrayReset =
107+
fieldConverters.filter(converter => converter.isInstanceOf[NeedsResetArray])
106108
override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex)
107109

108110
override def end(): Unit = updater.set(currentRow)
109111

112+
110113
override def start(): Unit = {
111114
var i = 0
112115
while (i < currentRow.numFields) {
113116
currentRow.setNullAt(i)
114117
i += 1
115118
}
119+
needsArrayReset.foreach {
120+
converter => converter.asInstanceOf[NeedsResetArray].resetArray()
121+
}
116122
}
117123

118124
/**
@@ -171,8 +177,20 @@ private[parquet] class CatalystRowConverter(
171177
}
172178
}
173179

174-
case t: ArrayType =>
175-
new CatalystArrayConverter(parquetType.asGroupType(), t, updater)
180+
case t: ArrayType => {
181+
parquetType.isRepetition(Type.Repetition.REPEATED) match {
182+
case true => {
183+
t match {
184+
case ArrayType(elementType: StructType, _) =>
185+
new CatalystRepeatedStructConverter(parquetType.asGroupType(), elementType, updater)
186+
case ArrayType(elementType: DataType, _) =>
187+
new CatalystRepeatedPrimitiveConverter(parquetType, elementType, updater)
188+
}
189+
}
190+
case false =>
191+
new CatalystArrayConverter(parquetType.asGroupType(), t, updater)
192+
}
193+
}
176194

177195
case t: MapType =>
178196
new CatalystMapConverter(parquetType.asGroupType(), t, updater)
@@ -378,6 +396,66 @@ private[parquet] class CatalystRowConverter(
378396
}
379397
}
380398

399+
/**
400+
* Support Protobuf native repeated. parquet-protobuf does a 1 - 1 conversion, i.e.,
401+
* repeated int32 myInt;
402+
* @param parquetType
403+
* @param catalystType
404+
* @param updater
405+
*/
406+
private final class CatalystRepeatedPrimitiveConverter(
407+
parquetType: Type,
408+
myCatalystType: DataType,
409+
updater: ParentContainerUpdater)
410+
extends PrimitiveConverter with NeedsResetArray {
411+
412+
private var elements: Int = 0
413+
private var buffer: ArrayBuffer[Any] = ArrayBuffer.empty[Any]
414+
415+
private val stringConverter = new CatalystStringConverter(new ParentContainerUpdater {
416+
override def set(value: Any): Unit = addValue(value)
417+
})
418+
419+
override def hasDictionarySupport: Boolean = true
420+
421+
override def addValueFromDictionary(dictionaryId: Int): Unit =
422+
stringConverter.addValueFromDictionary(dictionaryId)
423+
424+
override def setDictionary(dictionary: Dictionary): Unit =
425+
stringConverter.setDictionary(dictionary)
426+
427+
private def addValue(value: Any): Unit = {
428+
buffer+= value
429+
elements +=1
430+
updater.set(new GenericArrayData(buffer.slice(0, elements).toArray))
431+
}
432+
433+
override def addBinary(value: Binary): Unit =
434+
myCatalystType match {
435+
case StringType => stringConverter.addBinary(value)
436+
case _ => addValue(value)
437+
}
438+
439+
440+
override def addDouble(value: Double): Unit = addValue(value)
441+
442+
override def addInt(value: Int): Unit = {
443+
addValue(value)
444+
}
445+
446+
override def addBoolean(value: Boolean): Unit = addValue(value)
447+
448+
override def addFloat(value: Float): Unit = addValue(value)
449+
450+
override def addLong(value: Long): Unit = addValue(value)
451+
452+
override def resetArray(): Unit = {
453+
buffer.clear()
454+
elements = 0
455+
}
456+
}
457+
458+
381459
/** Parquet converter for maps */
382460
private final class CatalystMapConverter(
383461
parquetType: GroupType,
@@ -446,4 +524,32 @@ private[parquet] class CatalystRowConverter(
446524
}
447525
}
448526
}
527+
528+
trait NeedsResetArray {
529+
def resetArray(): Unit
530+
}
531+
532+
private class CatalystRepeatedStructConverter(
533+
groupType: GroupType,
534+
structType: StructType,
535+
updater: ParentContainerUpdater)
536+
extends CatalystRowConverter(groupType, structType, updater)
537+
with NeedsResetArray {
538+
539+
private var rowBuffer: ArrayBuffer[Any] = ArrayBuffer.empty
540+
private var elements = 0
541+
542+
override def end(): Unit = {
543+
rowBuffer += currentRow.copy()
544+
elements += 1
545+
updater.set(new GenericArrayData(rowBuffer.slice(0, elements).toArray))
546+
}
547+
548+
def resetArray(): Unit = {
549+
rowBuffer.clear()
550+
elements = 0
551+
}
552+
553+
}
554+
449555
}

sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ import org.apache.spark.sql.{AnalysisException, SQLConf}
5656
private[parquet] class CatalystSchemaConverter(
5757
private val assumeBinaryIsString: Boolean,
5858
private val assumeInt96IsTimestamp: Boolean,
59-
private val followParquetFormatSpec: Boolean) {
59+
private val followParquetFormatSpec: Boolean) extends SchemaConverter{
6060

6161
// Only used when constructing converter for converting Spark SQL schema to Parquet schema, in
6262
// which case `assumeInt96IsTimestamp` and `assumeBinaryIsString` are irrelevant.

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -705,8 +705,22 @@ private[sql] object ParquetRelation extends Logging {
705705
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
706706
followParquetFormatSpec = followParquetFormatSpec)
707707

708-
footers.map { footer =>
709-
ParquetRelation.readSchemaFromFooter(footer, converter)
708+
val protobufConverter =
709+
new ProtobufCatalystSchemaConverter(
710+
assumeBinaryIsString = assumeBinaryIsString,
711+
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
712+
followParquetFormatSpec = followParquetFormatSpec)
713+
714+
footers.map { footer => {
715+
val myConverter =
716+
if (footer.getParquetMetadata.getFileMetaData.
717+
getKeyValueMetaData.containsKey("parquet.proto.class")) {
718+
protobufConverter
719+
} else {
720+
converter
721+
}
722+
ParquetRelation.readSchemaFromFooter(footer, myConverter)
723+
}
710724
}.reduceOption(_ merge _).iterator
711725
}.collect()
712726

@@ -719,7 +733,7 @@ private[sql] object ParquetRelation extends Logging {
719733
* a [[StructType]] converted from the [[MessageType]] stored in this footer.
720734
*/
721735
def readSchemaFromFooter(
722-
footer: Footer, converter: CatalystSchemaConverter): StructType = {
736+
footer: Footer, converter: SchemaConverter): StructType = {
723737
val fileMetaData = footer.getParquetMetadata.getFileMetaData
724738
fileMetaData
725739
.getKeyValueMetaData

0 commit comments

Comments
 (0)