@@ -21,11 +21,11 @@ import java.math.{BigDecimal, BigInteger}
2121import java .nio .ByteOrder
2222
2323import scala .collection .JavaConversions ._
24- import scala .collection .mutable
2524import scala .collection .mutable .ArrayBuffer
2625
2726import org .apache .parquet .column .Dictionary
2827import org .apache .parquet .io .api .{Binary , Converter , GroupConverter , PrimitiveConverter }
28+ import org .apache .parquet .schema .OriginalType .LIST
2929import org .apache .parquet .schema .Type .Repetition
3030import org .apache .parquet .schema .{GroupType , PrimitiveType , Type }
3131
@@ -42,6 +42,12 @@ import org.apache.spark.unsafe.types.UTF8String
4242 * values to an [[ArrayBuffer ]].
4343 */
4444private [parquet] trait ParentContainerUpdater {
45+ /** Called before a record field is being converted */
46+ def start (): Unit = ()
47+
48+ /** Called after a record field is being converted */
49+ def end (): Unit = ()
50+
4551 def set (value : Any ): Unit = ()
4652 def setBoolean (value : Boolean ): Unit = set(value)
4753 def setByte (value : Byte ): Unit = set(value)
@@ -55,6 +61,32 @@ private[parquet] trait ParentContainerUpdater {
5561/** A no-op updater used for root converter (who doesn't have a parent). */
5662private [parquet] object NoopUpdater extends ParentContainerUpdater
5763
64+ private [parquet] trait HasParentContainerUpdater {
65+ def updater : ParentContainerUpdater
66+ }
67+
68+ /**
69+ * A convenient converter class for Parquet group types with an [[HasParentContainerUpdater ]].
70+ */
71+ private [parquet] abstract class CatalystGroupConverter (val updater : ParentContainerUpdater )
72+ extends GroupConverter with HasParentContainerUpdater
73+
74+ /**
75+ * Parquet converter for Parquet primitive types. Note that not all Spark SQL atomic types
76+ * are handled by this converter. Parquet primitive types are only a subset of those of Spark
77+ * SQL. For example, BYTE, SHORT, and INT in Spark SQL are all covered by INT32 in Parquet.
78+ */
79+ private [parquet] class CatalystPrimitiveConverter (val updater : ParentContainerUpdater )
80+ extends PrimitiveConverter with HasParentContainerUpdater {
81+
82+ override def addBoolean (value : Boolean ): Unit = updater.setBoolean(value)
83+ override def addInt (value : Int ): Unit = updater.setInt(value)
84+ override def addLong (value : Long ): Unit = updater.setLong(value)
85+ override def addFloat (value : Float ): Unit = updater.setFloat(value)
86+ override def addDouble (value : Double ): Unit = updater.setDouble(value)
87+ override def addBinary (value : Binary ): Unit = updater.set(value.getBytes)
88+ }
89+
5890/**
5991 * A [[CatalystRowConverter ]] is used to convert Parquet "structs" into Spark SQL [[InternalRow ]]s.
6092 * Since any Parquet record is also a struct, this converter can also be used as root converter.
@@ -70,7 +102,7 @@ private[parquet] class CatalystRowConverter(
70102 parquetType : GroupType ,
71103 catalystType : StructType ,
72104 updater : ParentContainerUpdater )
73- extends GroupConverter {
105+ extends CatalystGroupConverter (updater) {
74106
75107 /**
76108 * Updater used together with field converters within a [[CatalystRowConverter ]]. It propagates
@@ -89,13 +121,11 @@ private[parquet] class CatalystRowConverter(
89121
90122 /**
91123 * Represents the converted row object once an entire Parquet record is converted.
92- *
93- * @todo Uses [[UnsafeRow ]] for better performance.
94124 */
95125 val currentRow = new SpecificMutableRow (catalystType.map(_.dataType))
96126
97127 // Converters for each field.
98- private val fieldConverters : Array [Converter ] = {
128+ private val fieldConverters : Array [Converter with HasParentContainerUpdater ] = {
99129 parquetType.getFields.zip(catalystType).zipWithIndex.map {
100130 case ((parquetFieldType, catalystField), ordinal) =>
101131 // Converted field value should be set to the `ordinal`-th cell of `currentRow`
@@ -105,11 +135,19 @@ private[parquet] class CatalystRowConverter(
105135
106136 override def getConverter (fieldIndex : Int ): Converter = fieldConverters(fieldIndex)
107137
108- override def end (): Unit = updater.set(currentRow)
138+ override def end (): Unit = {
139+ var i = 0
140+ while (i < currentRow.numFields) {
141+ fieldConverters(i).updater.end()
142+ i += 1
143+ }
144+ updater.set(currentRow)
145+ }
109146
110147 override def start (): Unit = {
111148 var i = 0
112149 while (i < currentRow.numFields) {
150+ fieldConverters(i).updater.start()
113151 currentRow.setNullAt(i)
114152 i += 1
115153 }
@@ -122,20 +160,20 @@ private[parquet] class CatalystRowConverter(
122160 private def newConverter (
123161 parquetType : Type ,
124162 catalystType : DataType ,
125- updater : ParentContainerUpdater ): Converter = {
163+ updater : ParentContainerUpdater ): Converter with HasParentContainerUpdater = {
126164
127165 catalystType match {
128166 case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType =>
129167 new CatalystPrimitiveConverter (updater)
130168
131169 case ByteType =>
132- new PrimitiveConverter {
170+ new CatalystPrimitiveConverter (updater) {
133171 override def addInt (value : Int ): Unit =
134172 updater.setByte(value.asInstanceOf [ByteType # InternalType ])
135173 }
136174
137175 case ShortType =>
138- new PrimitiveConverter {
176+ new CatalystPrimitiveConverter (updater) {
139177 override def addInt (value : Int ): Unit =
140178 updater.setShort(value.asInstanceOf [ShortType # InternalType ])
141179 }
@@ -148,7 +186,7 @@ private[parquet] class CatalystRowConverter(
148186
149187 case TimestampType =>
150188 // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
151- new PrimitiveConverter {
189+ new CatalystPrimitiveConverter (updater) {
152190 // Converts nanosecond timestamps stored as INT96
153191 override def addBinary (value : Binary ): Unit = {
154192 assert(
@@ -164,13 +202,23 @@ private[parquet] class CatalystRowConverter(
164202 }
165203
166204 case DateType =>
167- new PrimitiveConverter {
205+ new CatalystPrimitiveConverter (updater) {
168206 override def addInt (value : Int ): Unit = {
169207 // DateType is not specialized in `SpecificMutableRow`, have to box it here.
170208 updater.set(value.asInstanceOf [DateType # InternalType ])
171209 }
172210 }
173211
212+ // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor
213+ // annotated by `LIST` or `MAP` should be interpreted as a required list of required
214+ // elements where the element type is the type of the field.
215+ case t : ArrayType if parquetType.getOriginalType != LIST =>
216+ if (parquetType.isPrimitive) {
217+ new RepeatedPrimitiveConverter (parquetType, t.elementType, updater)
218+ } else {
219+ new RepeatedGroupConverter (parquetType, t.elementType, updater)
220+ }
221+
174222 case t : ArrayType =>
175223 new CatalystArrayConverter (parquetType.asGroupType(), t, updater)
176224
@@ -195,27 +243,11 @@ private[parquet] class CatalystRowConverter(
195243 }
196244 }
197245
198- /**
199- * Parquet converter for Parquet primitive types. Note that not all Spark SQL atomic types
200- * are handled by this converter. Parquet primitive types are only a subset of those of Spark
201- * SQL. For example, BYTE, SHORT, and INT in Spark SQL are all covered by INT32 in Parquet.
202- */
203- private final class CatalystPrimitiveConverter (updater : ParentContainerUpdater )
204- extends PrimitiveConverter {
205-
206- override def addBoolean (value : Boolean ): Unit = updater.setBoolean(value)
207- override def addInt (value : Int ): Unit = updater.setInt(value)
208- override def addLong (value : Long ): Unit = updater.setLong(value)
209- override def addFloat (value : Float ): Unit = updater.setFloat(value)
210- override def addDouble (value : Double ): Unit = updater.setDouble(value)
211- override def addBinary (value : Binary ): Unit = updater.set(value.getBytes)
212- }
213-
214246 /**
215247 * Parquet converter for strings. A dictionary is used to minimize string decoding cost.
216248 */
217249 private final class CatalystStringConverter (updater : ParentContainerUpdater )
218- extends PrimitiveConverter {
250+ extends CatalystPrimitiveConverter (updater) {
219251
220252 private var expandedDictionary : Array [UTF8String ] = null
221253
@@ -242,7 +274,7 @@ private[parquet] class CatalystRowConverter(
242274 private final class CatalystDecimalConverter (
243275 decimalType : DecimalType ,
244276 updater : ParentContainerUpdater )
245- extends PrimitiveConverter {
277+ extends CatalystPrimitiveConverter (updater) {
246278
247279 // Converts decimals stored as INT32
248280 override def addInt (value : Int ): Unit = {
@@ -306,7 +338,7 @@ private[parquet] class CatalystRowConverter(
306338 parquetSchema : GroupType ,
307339 catalystSchema : ArrayType ,
308340 updater : ParentContainerUpdater )
309- extends GroupConverter {
341+ extends CatalystGroupConverter (updater) {
310342
311343 private var currentArray : ArrayBuffer [Any ] = _
312344
@@ -383,7 +415,7 @@ private[parquet] class CatalystRowConverter(
383415 parquetType : GroupType ,
384416 catalystType : MapType ,
385417 updater : ParentContainerUpdater )
386- extends GroupConverter {
418+ extends CatalystGroupConverter (updater) {
387419
388420 private var currentKeys : ArrayBuffer [Any ] = _
389421 private var currentValues : ArrayBuffer [Any ] = _
@@ -446,4 +478,61 @@ private[parquet] class CatalystRowConverter(
446478 }
447479 }
448480 }
481+
482+ private trait RepeatedConverter {
483+ private var currentArray : ArrayBuffer [Any ] = _
484+
485+ protected def newArrayUpdater (updater : ParentContainerUpdater ) = new ParentContainerUpdater {
486+ override def start (): Unit = currentArray = ArrayBuffer .empty[Any ]
487+ override def end (): Unit = updater.set(new GenericArrayData (currentArray.toArray))
488+ override def set (value : Any ): Unit = currentArray += value
489+ }
490+ }
491+
492+ /**
493+ * A primitive converter for converting unannotated repeated primitive values to required arrays
494+ * of required primitives values.
495+ */
496+ private final class RepeatedPrimitiveConverter (
497+ parquetType : Type ,
498+ catalystType : DataType ,
499+ parentUpdater : ParentContainerUpdater )
500+ extends PrimitiveConverter with RepeatedConverter with HasParentContainerUpdater {
501+
502+ val updater : ParentContainerUpdater = newArrayUpdater(parentUpdater)
503+
504+ private val elementConverter : PrimitiveConverter =
505+ newConverter(parquetType, catalystType, updater).asPrimitiveConverter()
506+
507+ override def addBoolean (value : Boolean ): Unit = elementConverter.addBoolean(value)
508+ override def addInt (value : Int ): Unit = elementConverter.addInt(value)
509+ override def addLong (value : Long ): Unit = elementConverter.addLong(value)
510+ override def addFloat (value : Float ): Unit = elementConverter.addFloat(value)
511+ override def addDouble (value : Double ): Unit = elementConverter.addDouble(value)
512+ override def addBinary (value : Binary ): Unit = elementConverter.addBinary(value)
513+
514+ override def setDictionary (dict : Dictionary ): Unit = elementConverter.setDictionary(dict)
515+ override def hasDictionarySupport : Boolean = elementConverter.hasDictionarySupport
516+ override def addValueFromDictionary (id : Int ): Unit = elementConverter.addValueFromDictionary(id)
517+ }
518+
519+ /**
520+ * A group converter for converting unannotated repeated group values to required arrays of
521+ * required struct values.
522+ */
523+ private final class RepeatedGroupConverter (
524+ parquetType : Type ,
525+ catalystType : DataType ,
526+ parentUpdater : ParentContainerUpdater )
527+ extends GroupConverter with HasParentContainerUpdater with RepeatedConverter {
528+
529+ val updater : ParentContainerUpdater = newArrayUpdater(parentUpdater)
530+
531+ private val elementConverter : GroupConverter =
532+ newConverter(parquetType, catalystType, updater).asGroupConverter()
533+
534+ override def getConverter (field : Int ): Converter = elementConverter.getConverter(field)
535+ override def end (): Unit = elementConverter.end()
536+ override def start (): Unit = elementConverter.start()
537+ }
449538}
0 commit comments