Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,7 @@ class OrcDeserializer(
} else {
new RowUpdater(resultRow)
}
val writer: (Int, WritableComparable[_]) => Unit =
(ordinal, value) =>
if (value == null) {
rowUpdater.setNullAt(ordinal)
} else {
val writerFunc = newWriter(f.dataType, rowUpdater)
writerFunc(ordinal, value)
}
val writer = newWriter(f.dataType, rowUpdater)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little magical, let me do some investigate too

(value: WritableComparable[_]) => writer(index, value)
}
}.toArray
Expand All @@ -75,7 +68,11 @@ class OrcDeserializer(
while (targetColumnIndex < fieldWriters.length) {
if (fieldWriters(targetColumnIndex) != null) {
val value = orcStruct.getFieldValue(requestedColIds(targetColumnIndex))
fieldWriters(targetColumnIndex)(value)
if (value == null) {
resultRow.setNullAt(targetColumnIndex)
} else {
fieldWriters(targetColumnIndex)(value)
}
}
targetColumnIndex += 1
}
Expand All @@ -88,7 +85,11 @@ class OrcDeserializer(
while (targetColumnIndex < fieldWriters.length) {
if (fieldWriters(targetColumnIndex) != null) {
val value = orcValues(requestedColIds(targetColumnIndex))
fieldWriters(targetColumnIndex)(value)
if (value == null) {
resultRow.setNullAt(targetColumnIndex)
} else {
fieldWriters(targetColumnIndex)(value)
}
}
targetColumnIndex += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1679,7 +1679,8 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
Config(
None),
Config(
Some(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false")))),
Some(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false"),
insertNullsToStorage = false))),
TestCase(
dataSource = "parquet",
Seq(
Expand Down Expand Up @@ -1943,7 +1944,11 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
Row(Seq(Row(1, 2)), Seq(Map(false -> "def", true -> "jkl"))),
Seq(Map(true -> "xyz"))),
Row(2,
null,
if (config.dataSource != "orc") {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this PR breaks the DEFAULT value functionality for the Orc data source (as shown by this unit test). If anyone is using this functionality, this PR will make their results incorrect. It would be better if we can fix the performance regression without changing the behavior. Is there some profile to show why the performance regression takes place? For example, is it because this change to the writer function introduces a new level of function call?

image

@dongjoon-hyun dongjoon-hyun Jan 3, 2023

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review, @dtenedor .

  • Please see [SPARK-41782][TESTS] Regenerate benchmark results #39301 (comment) . We have a benchmark to detect this kind of perf regression. You can run it locally in your environment.
  • This is a partial revert to the original code which is the existing behavior before your PR like the previous Spark. As I mentioned in this PR description, SPARK-39862 should propose a new fix without perf regression.

New feature is good as long as not breaking the old behavior.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review, @dtenedor .

Thanks @dongjoon-hyun for the benchmark! The Jira simply comprises the title Regenerate benchmark results. Is there some instructions for how to run the benchmark?

  • This is a partial revert to the original code which is the existing behavior before your PR like the previous Spark. As I mentioned in the PR description, SPARK-39862 should propose a fix without perf regression.

New feature is good as long as not breaking the old behavior.

Agree on this. However, that bug fix was merged into Spark 3.3 on Jul. 28, 2022. Is it possible that users could have built pipelines since then using the new feature that would return incorrect results if we merged this PR?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone can tell me how to run the benchmark, I can play around with a fix for the perf regression that also does not change the behavior. I suspect that it's due to the extra function call overhead that takes place each time a value is written, but not sure without a profile :)

@dongjoon-hyun dongjoon-hyun Jan 3, 2023

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I copied a wrong link. Here is the exact link, @dtenedor .
#39301 (comment)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, but I filed SPARK-41858 as the blocker of Apache Spark 3.4 because this really blocks Apache Spark 3.4 preparation from my side. If you don't mind, I'd prefer to merge this first and help you cleanly.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun that is OK, but we should probably not release Spark 3.4 with a correctness regression either. It is equally important as performance. If we create another Jira blocking the release of 3.4 that covers fixing the correctness bug, it is fine to merge this PR.

At any rate, I am hoping to figure this out today. Then we should be unblocked 🤔

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for your understanding. I'll file another blocker JIRA for that of course.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, let's merge this then

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can assign the new correctness Jira for me to fix.

null
} else {
Row(Seq(Row(1, 2)), Seq(Map(false -> "def", true -> "jkl")))
},
Seq(Map(true -> "xyz"))),
Row(3,
Row(Seq(Row(3, 4)), Seq(Map(false -> "mno", true -> "pqr"))),
Expand Down