Skip to content

Commit 727cb25

Browse files
ueshinmarmbrus
authored andcommitted
[SPARK-3036][SPARK-3037][SQL] Add MapType/ArrayType containing null value support to Parquet.
JIRA: - https://issues.apache.org/jira/browse/SPARK-3036 - https://issues.apache.org/jira/browse/SPARK-3037 Currently this uses the following Parquet schema for `MapType` when `valueContainsNull` is `true`: ``` message root { optional group a (MAP) { repeated group map (MAP_KEY_VALUE) { required int32 key; optional int32 value; } } } ``` for `ArrayType` when `containsNull` is `true`: ``` message root { optional group a (LIST) { repeated group bag { optional int32 array; } } } ``` We have to think about compatibilities with older version of Spark or Hive or others I mentioned in the JIRA issues. Notice: This PR is based on #1963 and #1889. Please check them first. /cc marmbrus, yhuai Author: Takuya UESHIN <[email protected]> Closes #2032 from ueshin/issues/SPARK-3036_3037 and squashes the following commits: 4e8e9e7 [Takuya UESHIN] Add ArrayType containing null value support to Parquet. 013c2ca [Takuya UESHIN] Add MapType containing null value support to Parquet. 62989de [Takuya UESHIN] Merge branch 'issues/SPARK-2969' into issues/SPARK-3036_3037 8e38b53 [Takuya UESHIN] Merge branch 'issues/SPARK-3063' into issues/SPARK-3036_3037
1 parent 73b3089 commit 727cb25

File tree

4 files changed

+167
-40
lines changed

4 files changed

+167
-40
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ private[sql] object CatalystConverter {
5858
// This is mostly Parquet convention (see, e.g., `ConversionPatterns`).
5959
// Note that "array" for the array elements is chosen by ParquetAvro.
6060
// Using a different value will result in Parquet silently dropping columns.
61+
val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag"
6162
val ARRAY_ELEMENTS_SCHEMA_NAME = "array"
6263
val MAP_KEY_SCHEMA_NAME = "key"
6364
val MAP_VALUE_SCHEMA_NAME = "value"
@@ -82,6 +83,9 @@ private[sql] object CatalystConverter {
8283
case ArrayType(elementType: DataType, false) => {
8384
new CatalystArrayConverter(elementType, fieldIndex, parent)
8485
}
86+
case ArrayType(elementType: DataType, true) => {
87+
new CatalystArrayContainsNullConverter(elementType, fieldIndex, parent)
88+
}
8589
case StructType(fields: Seq[StructField]) => {
8690
new CatalystStructConverter(fields.toArray, fieldIndex, parent)
8791
}
@@ -567,6 +571,85 @@ private[parquet] class CatalystNativeArrayConverter(
567571
}
568572
}
569573

574+
/**
575+
* A `parquet.io.api.GroupConverter` that converts a single-element groups that
576+
* match the characteristics of an array contains null (see
577+
* [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an
578+
* [[org.apache.spark.sql.catalyst.types.ArrayType]].
579+
*
580+
* @param elementType The type of the array elements (complex or primitive)
581+
* @param index The position of this (array) field inside its parent converter
582+
* @param parent The parent converter
583+
* @param buffer A data buffer
584+
*/
585+
private[parquet] class CatalystArrayContainsNullConverter(
586+
val elementType: DataType,
587+
val index: Int,
588+
protected[parquet] val parent: CatalystConverter,
589+
protected[parquet] var buffer: Buffer[Any])
590+
extends CatalystConverter {
591+
592+
def this(elementType: DataType, index: Int, parent: CatalystConverter) =
593+
this(
594+
elementType,
595+
index,
596+
parent,
597+
new ArrayBuffer[Any](CatalystArrayConverter.INITIAL_ARRAY_SIZE))
598+
599+
protected[parquet] val converter: Converter = new CatalystConverter {
600+
601+
private var current: Any = null
602+
603+
val converter = CatalystConverter.createConverter(
604+
new CatalystConverter.FieldType(
605+
CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
606+
elementType,
607+
false),
608+
fieldIndex = 0,
609+
parent = this)
610+
611+
override def getConverter(fieldIndex: Int): Converter = converter
612+
613+
override def end(): Unit = parent.updateField(index, current)
614+
615+
override def start(): Unit = {
616+
current = null
617+
}
618+
619+
override protected[parquet] val size: Int = 1
620+
override protected[parquet] val index: Int = 0
621+
override protected[parquet] val parent = CatalystArrayContainsNullConverter.this
622+
623+
override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
624+
current = value
625+
}
626+
627+
override protected[parquet] def clearBuffer(): Unit = {}
628+
}
629+
630+
override def getConverter(fieldIndex: Int): Converter = converter
631+
632+
// arrays have only one (repeated) field, which is its elements
633+
override val size = 1
634+
635+
override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
636+
buffer += value
637+
}
638+
639+
override protected[parquet] def clearBuffer(): Unit = {
640+
buffer.clear()
641+
}
642+
643+
override def start(): Unit = {}
644+
645+
override def end(): Unit = {
646+
assert(parent != null)
647+
// here we need to make sure to use ArrayScalaType
648+
parent.updateField(index, buffer.toArray.toSeq)
649+
clearBuffer()
650+
}
651+
}
652+
570653
/**
571654
* This converter is for multi-element groups of primitive or complex types
572655
* that have repetition level optional or required (so struct fields).

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
173173
private[parquet] def writeValue(schema: DataType, value: Any): Unit = {
174174
if (value != null) {
175175
schema match {
176-
case t @ ArrayType(_, false) => writeArray(
176+
case t @ ArrayType(_, _) => writeArray(
177177
t,
178178
value.asInstanceOf[CatalystConverter.ArrayScalaType[_]])
179179
case t @ MapType(_, _, _) => writeMap(
@@ -228,45 +228,57 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
228228
}
229229
}
230230

231-
// TODO: support null values, see
232-
// https://issues.apache.org/jira/browse/SPARK-1649
233231
private[parquet] def writeArray(
234232
schema: ArrayType,
235233
array: CatalystConverter.ArrayScalaType[_]): Unit = {
236234
val elementType = schema.elementType
237235
writer.startGroup()
238236
if (array.size > 0) {
239-
writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
240-
var i = 0
241-
while(i < array.size) {
242-
writeValue(elementType, array(i))
243-
i = i + 1
237+
if (schema.containsNull) {
238+
writer.startField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0)
239+
var i = 0
240+
while (i < array.size) {
241+
writer.startGroup()
242+
if (array(i) != null) {
243+
writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
244+
writeValue(elementType, array(i))
245+
writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
246+
}
247+
writer.endGroup()
248+
i = i + 1
249+
}
250+
writer.endField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0)
251+
} else {
252+
writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
253+
var i = 0
254+
while (i < array.size) {
255+
writeValue(elementType, array(i))
256+
i = i + 1
257+
}
258+
writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
244259
}
245-
writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
246260
}
247261
writer.endGroup()
248262
}
249263

250-
// TODO: support null values, see
251-
// https://issues.apache.org/jira/browse/SPARK-1649
252264
private[parquet] def writeMap(
253265
schema: MapType,
254266
map: CatalystConverter.MapScalaType[_, _]): Unit = {
255267
writer.startGroup()
256268
if (map.size > 0) {
257269
writer.startField(CatalystConverter.MAP_SCHEMA_NAME, 0)
258-
writer.startGroup()
259-
writer.startField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
260-
for(key <- map.keys) {
270+
for ((key, value) <- map) {
271+
writer.startGroup()
272+
writer.startField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
261273
writeValue(schema.keyType, key)
274+
writer.endField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
275+
if (value != null) {
276+
writer.startField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
277+
writeValue(schema.valueType, value)
278+
writer.endField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
279+
}
280+
writer.endGroup()
262281
}
263-
writer.endField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
264-
writer.startField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
265-
for(value <- map.values) {
266-
writeValue(schema.valueType, value)
267-
}
268-
writer.endField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
269-
writer.endGroup()
270282
writer.endField(CatalystConverter.MAP_SCHEMA_NAME, 0)
271283
}
272284
writer.endGroup()

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,13 @@ private[parquet] object ParquetTypesConverter extends Logging {
119119
case ParquetOriginalType.LIST => { // TODO: check enums!
120120
assert(groupType.getFieldCount == 1)
121121
val field = groupType.getFields.apply(0)
122-
ArrayType(toDataType(field, isBinaryAsString), containsNull = false)
122+
if (field.getName == CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME) {
123+
val bag = field.asGroupType()
124+
assert(bag.getFieldCount == 1)
125+
ArrayType(toDataType(bag.getFields.apply(0), isBinaryAsString), containsNull = true)
126+
} else {
127+
ArrayType(toDataType(field, isBinaryAsString), containsNull = false)
128+
}
123129
}
124130
case ParquetOriginalType.MAP => {
125131
assert(
@@ -129,28 +135,32 @@ private[parquet] object ParquetTypesConverter extends Logging {
129135
assert(
130136
keyValueGroup.getFieldCount == 2,
131137
"Parquet Map type malformatted: nested group should have 2 (key, value) fields!")
132-
val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
133138
assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
139+
140+
val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
134141
val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString)
135-
assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED)
136-
// TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true
137-
// at here.
138-
MapType(keyType, valueType)
142+
MapType(keyType, valueType,
143+
keyValueGroup.getFields.apply(1).getRepetition != Repetition.REQUIRED)
139144
}
140145
case _ => {
141146
// Note: the order of these checks is important!
142147
if (correspondsToMap(groupType)) { // MapType
143148
val keyValueGroup = groupType.getFields.apply(0).asGroupType()
144-
val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
145149
assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
150+
151+
val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
146152
val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString)
147-
assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED)
148-
// TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true
149-
// at here.
150-
MapType(keyType, valueType)
153+
MapType(keyType, valueType,
154+
keyValueGroup.getFields.apply(1).getRepetition != Repetition.REQUIRED)
151155
} else if (correspondsToArray(groupType)) { // ArrayType
152-
val elementType = toDataType(groupType.getFields.apply(0), isBinaryAsString)
153-
ArrayType(elementType, containsNull = false)
156+
val field = groupType.getFields.apply(0)
157+
if (field.getName == CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME) {
158+
val bag = field.asGroupType()
159+
assert(bag.getFieldCount == 1)
160+
ArrayType(toDataType(bag.getFields.apply(0), isBinaryAsString), containsNull = true)
161+
} else {
162+
ArrayType(toDataType(field, isBinaryAsString), containsNull = false)
163+
}
154164
} else { // everything else: StructType
155165
val fields = groupType
156166
.getFields
@@ -249,13 +259,27 @@ private[parquet] object ParquetTypesConverter extends Logging {
249259
inArray = true)
250260
ConversionPatterns.listType(repetition, name, parquetElementType)
251261
}
262+
case ArrayType(elementType, true) => {
263+
val parquetElementType = fromDataType(
264+
elementType,
265+
CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
266+
nullable = true,
267+
inArray = false)
268+
ConversionPatterns.listType(
269+
repetition,
270+
name,
271+
new ParquetGroupType(
272+
Repetition.REPEATED,
273+
CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME,
274+
parquetElementType))
275+
}
252276
case StructType(structFields) => {
253277
val fields = structFields.map {
254278
field => fromDataType(field.dataType, field.name, field.nullable, inArray = false)
255279
}
256280
new ParquetGroupType(repetition, name, fields)
257281
}
258-
case MapType(keyType, valueType, _) => {
282+
case MapType(keyType, valueType, valueContainsNull) => {
259283
val parquetKeyType =
260284
fromDataType(
261285
keyType,
@@ -266,7 +290,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
266290
fromDataType(
267291
valueType,
268292
CatalystConverter.MAP_VALUE_SCHEMA_NAME,
269-
nullable = false,
293+
nullable = valueContainsNull,
270294
inArray = false)
271295
ConversionPatterns.mapType(
272296
repetition,

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ case class AllDataTypesWithNonPrimitiveType(
7878
booleanField: Boolean,
7979
binaryField: Array[Byte],
8080
array: Seq[Int],
81-
map: Map[Int, String],
81+
arrayContainsNull: Seq[Option[Int]],
82+
map: Map[Int, Long],
83+
mapValueContainsNull: Map[Int, Option[Long]],
8284
data: Data)
8385

8486
class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll {
@@ -287,7 +289,11 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
287289
.map(x => AllDataTypesWithNonPrimitiveType(
288290
s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
289291
(0 to x).map(_.toByte).toArray,
290-
(0 until x), (0 until x).map(i => i -> s"$i").toMap, Data((0 until x), Nested(x, s"$x"))))
292+
(0 until x),
293+
(0 until x).map(Option(_).filter(_ % 3 == 0)),
294+
(0 until x).map(i => i -> i.toLong).toMap,
295+
(0 until x).map(i => i -> Option(i.toLong)).toMap + (x -> None),
296+
Data((0 until x), Nested(x, s"$x"))))
291297
.saveAsParquetFile(tempDir)
292298
val result = parquetFile(tempDir).collect()
293299
range.foreach {
@@ -302,8 +308,10 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
302308
assert(result(i).getBoolean(7) === (i % 2 == 0))
303309
assert(result(i)(8) === (0 to i).map(_.toByte).toArray)
304310
assert(result(i)(9) === (0 until i))
305-
assert(result(i)(10) === (0 until i).map(i => i -> s"$i").toMap)
306-
assert(result(i)(11) === new GenericRow(Array[Any]((0 until i), new GenericRow(Array[Any](i, s"$i")))))
311+
assert(result(i)(10) === (0 until i).map(i => if (i % 3 == 0) i else null))
312+
assert(result(i)(11) === (0 until i).map(i => i -> i.toLong).toMap)
313+
assert(result(i)(12) === (0 until i).map(i => i -> i.toLong).toMap + (i -> null))
314+
assert(result(i)(13) === new GenericRow(Array[Any]((0 until i), new GenericRow(Array[Any](i, s"$i")))))
307315
}
308316
}
309317

0 commit comments

Comments
 (0)