Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,18 @@ private[parquet] object CatalystReadSupport {
// Only clips array types with nested type as element type.
clipParquetListType(parquetType.asGroupType(), t.elementType)

case t: MapType if !isPrimitiveCatalystType(t.valueType) =>
// Only clips map types with nested type as value type.
case t: MapType
if !isPrimitiveCatalystType(t.keyType) ||
!isPrimitiveCatalystType(t.valueType) =>
// Only clips map types with nested key type or value type
clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType)

case t: StructType =>
clipParquetGroup(parquetType.asGroupType(), t)

case _ =>
// UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able
// to be mapped to desired user-space types. So UDTs shouldn't participate schema merging.
parquetType
}
}
Expand Down Expand Up @@ -204,14 +208,14 @@ private[parquet] object CatalystReadSupport {
}

/**
* Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[MapType]]. The value type
* of the [[MapType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a
* [[StructType]]. Note that key type of any [[MapType]] is always a primitive type.
* Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[MapType]]. Either key type or
* value type of the [[MapType]] must be a nested type, namely an [[ArrayType]], a [[MapType]], or
* a [[StructType]].
*/
private def clipParquetMapType(
parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = {
// Precondition of this method, should only be called for maps with nested value types.
assert(!isPrimitiveCatalystType(valueType))
// Precondition of this method, only handles maps with nested key types or value types.
assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType))

val repeatedGroup = parquetMap.getType(0).asGroupType()
val parquetKeyType = repeatedGroup.getType(0)
Expand All @@ -221,7 +225,7 @@ private[parquet] object CatalystReadSupport {
Types
.repeatedGroup()
.as(repeatedGroup.getOriginalType)
.addField(parquetKeyType)
.addField(clipParquetType(parquetKeyType, keyType))
.addField(clipParquetType(parquetValueType, valueType))
.named(repeatedGroup.getName)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,16 @@ private[parquet] class CatalystRowConverter(
updater: ParentContainerUpdater)
extends CatalystGroupConverter(updater) with Logging {

assert(
parquetType.getFieldCount == catalystType.length,
s"""Field counts of the Parquet schema and the Catalyst schema don't match:
|
|Parquet schema:
|$parquetType
|Catalyst schema:
|${catalystType.prettyJson}
""".stripMargin)

logDebug(
s"""Building row converter for the following schema:
|
Expand Down
Loading