Skip to content

Commit 49da38e

Browse files
lianchengdavies
authored andcommitted
[SPARK-10301] [SPARK-10428] [SQL] Addresses comments of PR #8583 and #8509 for master
Author: Cheng Lian <[email protected]> Closes #8670 from liancheng/spark-10301/address-pr-comments.
1 parent f892d92 commit 49da38e

File tree

4 files changed

+522
-45
lines changed

4 files changed

+522
-45
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,18 @@ private[parquet] object CatalystReadSupport {
117117
// Only clips array types with nested type as element type.
118118
clipParquetListType(parquetType.asGroupType(), t.elementType)
119119

120-
case t: MapType if !isPrimitiveCatalystType(t.valueType) =>
121-
// Only clips map types with nested type as value type.
120+
case t: MapType
121+
if !isPrimitiveCatalystType(t.keyType) ||
122+
!isPrimitiveCatalystType(t.valueType) =>
123+
// Only clips map types with nested key type or value type
122124
clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType)
123125

124126
case t: StructType =>
125127
clipParquetGroup(parquetType.asGroupType(), t)
126128

127129
case _ =>
130+
// UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able
131+
// to be mapped to desired user-space types. So UDTs shouldn't participate schema merging.
128132
parquetType
129133
}
130134
}
@@ -204,14 +208,14 @@ private[parquet] object CatalystReadSupport {
204208
}
205209

206210
/**
207-
* Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[MapType]]. The value type
208-
* of the [[MapType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a
209-
* [[StructType]]. Note that key type of any [[MapType]] is always a primitive type.
211+
* Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[MapType]]. Either key type or
212+
* value type of the [[MapType]] must be a nested type, namely an [[ArrayType]], a [[MapType]], or
213+
* a [[StructType]].
210214
*/
211215
private def clipParquetMapType(
212216
parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = {
213-
// Precondition of this method, should only be called for maps with nested value types.
214-
assert(!isPrimitiveCatalystType(valueType))
217+
// Precondition of this method, only handles maps with nested key types or value types.
218+
assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType))
215219

216220
val repeatedGroup = parquetMap.getType(0).asGroupType()
217221
val parquetKeyType = repeatedGroup.getType(0)
@@ -221,7 +225,7 @@ private[parquet] object CatalystReadSupport {
221225
Types
222226
.repeatedGroup()
223227
.as(repeatedGroup.getOriginalType)
224-
.addField(parquetKeyType)
228+
.addField(clipParquetType(parquetKeyType, keyType))
225229
.addField(clipParquetType(parquetValueType, valueType))
226230
.named(repeatedGroup.getName)
227231

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,16 @@ private[parquet] class CatalystRowConverter(
123123
updater: ParentContainerUpdater)
124124
extends CatalystGroupConverter(updater) with Logging {
125125

126+
assert(
127+
parquetType.getFieldCount == catalystType.length,
128+
s"""Field counts of the Parquet schema and the Catalyst schema don't match:
129+
|
130+
|Parquet schema:
131+
|$parquetType
132+
|Catalyst schema:
133+
|${catalystType.prettyJson}
134+
""".stripMargin)
135+
126136
logDebug(
127137
s"""Building row converter for the following schema:
128138
|

0 commit comments

Comments
 (0)