Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,32 @@ private[parquet] class ParquetRowConverter(
new ParquetMapConverter(parquetType.asGroupType(), t, updater)

case t: StructType =>
val wrappedUpdater = {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshRosen, no big deal at all but how about we put the JIRA ID somewhere in the comment?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea: I added a JIRA reference in e6945e8

if (updater.isInstanceOf[RowUpdater]) {
// `updater` is a RowUpdater, implying that the parent container is a struct.
// We do NOT need to perform defensive copying here because either:
//
// 1. The path from the schema root to this field consists only of nested

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we have deeply nested struct inside an array, is it the first case here?

I think it is fine because at the element converter the top level struct inside an array element will do the defensive copying. So in nested struct converter, we will see RowUpdater from parent struct so don't need defensive copying too.

Just maybe good to also update it in the doc.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right. After thinking about this some more, I think I've come up with a clearer explanation and have updated the code comment: 4651b2f

// structs, so this converter will only be invoked once per record and
// we don't need to copy because copying will be done in the final
// UnsafeProjection, or
// 2. The path from the schema root to this field contains a map or array,
// in which case we will perform a recursive defensive copy via the

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness relies on the copy actually being a deep copy. Looking elsewhere in this file, we have comments like

    // NOTE: We can't reuse the mutable Map here and must instantiate a new `Map` for the next
    // value.  `Row.copy()` only copies row cells, it doesn't do deep copy to objects stored in row
    // cells.

which suggest that certain copying might be shallow, so it's important to double-check and make sure that the copies are indeed deep.

Here, the state being copied is an InternalRow. To be more specific, it's actually a SpecificInternalRow (I'll update the .asInstanceOf cast below to reflect this). SpecificInternalRow extends BaseGenericInternalRow and #18483 changed that to implement a deep-copy, recursively copying maps, arrays, and structs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the existing comment about Row.copy() is outdated, so we might be able to optimize those other parts of the code, too; I'm going to defer that to future work / another PR, though.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: in #27089 I'm removing these other unnecessary ArrayBuffer copies.

// `else` branch below.
updater
} else {
// `updater` is NOT a RowUpdater, implying that the parent container is not a struct.
// Therefore, the parent container must be a map or array. We need to copy the row
// because this converter might be invoked multiple times per Parquet input record.
new ParentContainerUpdater {
override def set(value: Any): Unit = {
updater.set(value.asInstanceOf[SpecificInternalRow].copy())
}
}
}
}
new ParquetRowConverter(
schemaConverter, parquetType.asGroupType(), t, convertTz, new ParentContainerUpdater {
override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
})
schemaConverter, parquetType.asGroupType(), t, convertTz, wrappedUpdater)

case t =>
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,23 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

testStandardAndLegacyModes("array of struct") {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test for array of struct of struct?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a new test case for this in 0f1af94

val data = (1 to 4).map { i =>
Tuple1(
Seq(
Tuple1(s"1st_val_$i"),
Tuple1(s"2nd_val_$i")
)
)
}
withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
checkAnswer(df, data.map { case Tuple1(array) =>
Row(array.map(struct => Row(struct.productIterator.toSeq: _*)))
})
}
}

testStandardAndLegacyModes("nested struct with array of array as field") {
val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i")))))
withParquetDataFrame(data) { df =>
Expand All @@ -214,9 +231,34 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

testStandardAndLegacyModes("nested map with struct as key type") {
val data = (1 to 4).map { i =>
Tuple1(
Map(
(i, s"kA_$i") -> s"vA_$i",
(i, s"kB_$i") -> s"vB_$i"
)
)
}
withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
checkAnswer(df, data.map { case Tuple1(m) =>
Row(m.map { case (k, v) => Row(k.productIterator.toSeq: _*) -> v })
})
}
}

testStandardAndLegacyModes("nested map with struct as value type") {
val data = (1 to 4).map(i => Tuple1(Map(i -> ((i, s"val_$i")))))
val data = (1 to 4).map { i =>
Tuple1(
Map(
s"kA_$i" -> ((i, s"vA_$i")),
s"kB_$i" -> ((i, s"vB_$i"))
)
)
}
withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
checkAnswer(df, data.map { case Tuple1(m) =>
Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*)))
})
Expand Down