Skip to content

Commit 66d5a00

Browse files
bersprocketsgengliangwang
authored andcommitted
[SPARK-35817][SQL] Restore performance of queries against wide Avro tables
### What changes were proposed in this pull request? When creating a record writer in an AvroDeserializer, or creating a struct converter in an AvroSerializer, look up Avro fields using a map rather than scanning the entire list of Avro fields. ### Why are the changes needed? A query against an Avro table can be quite slow when all are true: * There are many columns in the Avro file * The query contains a wide projection * There are many splits in the input * Some of the splits are read serially (e.g., less executors than there are tasks) A write to an Avro table can be quite slow when all are true: * There are many columns in the new rows * The operation is creating many files For example, a single-threaded query against a 6000 column Avro data set with 50K rows and 20 files takes less than a minute with Spark 3.0.1 but over 7 minutes with Spark 3.2.0-SNAPSHOT. This PR restores the faster time. For the 1000 column read benchmark: Before patch: 108447 ms After patch: 35925 ms percent improvement: 66% For the 1000 column write benchmark: Before patch: 123307 After patch: 42313 percent improvement: 65% ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? * Ran existing unit tests * Added new unit tests * Added new benchmarks Closes #32969 from bersprockets/SPARK-35817. Authored-by: Bruce Robbins <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
1 parent 7f93773 commit 66d5a00

File tree

8 files changed

+239
-80
lines changed

8 files changed

+239
-80
lines changed

external/avro/benchmarks/AvroReadBenchmark-results.txt

Lines changed: 63 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -2,129 +2,140 @@
22
SQL Single Numeric Column Scan
33
================================================================================================
44

5-
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
6-
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
5+
OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
6+
Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
77
SQL Single TINYINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
88
------------------------------------------------------------------------------------------------------------------------
9-
Sum 2802 2826 34 5.6 178.1 1.0X
9+
Sum 2648 2658 15 5.9 168.3 1.0X
1010

11-
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
12-
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
11+
OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
12+
Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
1313
SQL Single SMALLINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
1414
------------------------------------------------------------------------------------------------------------------------
15-
Sum 2786 2810 35 5.6 177.1 1.0X
15+
Sum 2584 2624 56 6.1 164.3 1.0X
1616

17-
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
18-
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
17+
OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
18+
Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
1919
SQL Single INT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
2020
------------------------------------------------------------------------------------------------------------------------
21-
Sum 2808 2817 13 5.6 178.5 1.0X
21+
Sum 2611 2612 2 6.0 166.0 1.0X
2222

23-
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
24-
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
23+
OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
24+
Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
2525
SQL Single BIGINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
2626
------------------------------------------------------------------------------------------------------------------------
27-
Sum 3222 3224 3 4.9 204.9 1.0X
27+
Sum 2861 2866 7 5.5 181.9 1.0X
2828

29-
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
30-
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
29+
OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
30+
Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
3131
SQL Single FLOAT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
3232
------------------------------------------------------------------------------------------------------------------------
33-
Sum 2827 2844 24 5.6 179.7 1.0X
33+
Sum 2519 2528 13 6.2 160.1 1.0X
3434

35-
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
36-
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
35+
OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
36+
Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
3737
SQL Single DOUBLE Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
3838
------------------------------------------------------------------------------------------------------------------------
39-
Sum 2910 2924 20 5.4 185.0 1.0X
39+
Sum 2584 2589 7 6.1 164.3 1.0X
4040

4141

4242
================================================================================================
4343
Int and String Scan
4444
================================================================================================
4545

46-
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
47-
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
46+
OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
47+
Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
4848
Int and String Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
4949
------------------------------------------------------------------------------------------------------------------------
50-
Sum of columns 4575 4580 7 2.3 436.3 1.0X
50+
Sum of columns 4097 4098 1 2.6 390.7 1.0X
5151

5252

5353
================================================================================================
5454
Partitioned Table Scan
5555
================================================================================================
5656

57-
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
58-
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
57+
OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
58+
Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
5959
Partitioned Table: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
6060
------------------------------------------------------------------------------------------------------------------------
61-
Data column 3252 3271 27 4.8 206.8 1.0X
62-
Partition column 2905 2907 3 5.4 184.7 1.1X
63-
Both columns 3385 3398 18 4.6 215.2 1.0X
61+
Data column 2918 2920 3 5.4 185.5 1.0X
62+
Partition column 2603 2605 2 6.0 165.5 1.1X
63+
Both columns 2949 2953 5 5.3 187.5 1.0X
6464

6565

6666
================================================================================================
6767
Repeated String Scan
6868
================================================================================================
6969

70-
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
71-
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
70+
OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
71+
Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
7272
Repeated String: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
7373
------------------------------------------------------------------------------------------------------------------------
74-
Sum of string length 3275 3278 3 3.2 312.4 1.0X
74+
Sum of string length 2759 2763 6 3.8 263.1 1.0X
7575

7676

7777
================================================================================================
7878
String with Nulls Scan
7979
================================================================================================
8080

81-
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
82-
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
81+
OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
82+
Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
8383
String with Nulls Scan (0.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
8484
------------------------------------------------------------------------------------------------------------------------
85-
Sum of string length 5202 5219 24 2.0 496.1 1.0X
85+
Sum of string length 4444 4449 7 2.4 423.8 1.0X
8686

87-
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
88-
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
87+
OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
88+
Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
8989
String with Nulls Scan (50.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
9090
------------------------------------------------------------------------------------------------------------------------
91-
Sum of string length 3360 3381 29 3.1 320.5 1.0X
91+
Sum of string length 2892 2894 3 3.6 275.8 1.0X
9292

93-
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
94-
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
93+
OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
94+
Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
9595
String with Nulls Scan (95.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
9696
------------------------------------------------------------------------------------------------------------------------
97-
Sum of string length 1917 1936 28 5.5 182.8 1.0X
97+
Sum of string length 1693 1696 5 6.2 161.4 1.0X
98+
99+
100+
================================================================================================
101+
Select All From Wide Columns
102+
================================================================================================
103+
104+
OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
105+
Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
106+
Wide Column Scan from 1000 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
107+
------------------------------------------------------------------------------------------------------------------------
108+
Select of all columns 35653 35925 384 0.0 71306.7 1.0X
98109

99110

100111
================================================================================================
101112
Single Column Scan From Wide Columns
102113
================================================================================================
103114

104-
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
105-
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
115+
OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
116+
Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
106117
Single Column Scan from 100 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
107118
------------------------------------------------------------------------------------------------------------------------
108-
Sum of single column 4348 4424 107 0.2 4146.5 1.0X
119+
Sum of single column 4102 4103 2 0.3 3911.6 1.0X
109120

110-
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
111-
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
121+
OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
122+
Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
112123
Single Column Scan from 200 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
113124
------------------------------------------------------------------------------------------------------------------------
114-
Sum of single column 8799 8806 10 0.1 8391.2 1.0X
125+
Sum of single column 8014 8074 85 0.1 7642.4 1.0X
115126

116-
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
117-
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
127+
OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
128+
Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
118129
Single Column Scan from 300 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
119130
------------------------------------------------------------------------------------------------------------------------
120-
Sum of single column 12956 12990 49 0.1 12355.5 1.0X
131+
Sum of single column 11980 11990 14 0.1 11425.5 1.0X
121132

122133

123-
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
124-
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
134+
OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
135+
Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
125136
Filters pushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
126137
------------------------------------------------------------------------------------------------------------------------
127-
w/o filters 9208 9269 63 0.1 9207.5 1.0X
128-
pushdown disabled 9073 9111 59 0.1 9072.7 1.0X
129-
w/ filters 3929 3947 18 0.3 3928.8 2.3X
138+
w/o filters 9014 9033 23 0.1 9014.2 1.0X
139+
pushdown disabled 8878 8900 23 0.1 8877.8 1.0X
140+
w/ filters 3700 3707 9 0.3 3699.7 2.4X
130141

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1-
OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
2-
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
1+
OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
2+
Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
33
Avro writer benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
44
------------------------------------------------------------------------------------------------------------------------
5-
Output Single Int Column 2478 2537 83 6.3 157.6 1.0X
6-
Output Single Double Column 2636 2652 21 6.0 167.6 0.9X
7-
Output Int and String Column 5922 6039 166 2.7 376.5 0.4X
8-
Output Partitions 4158 4305 207 3.8 264.3 0.6X
9-
Output Buckets 5486 5534 68 2.9 348.8 0.5X
5+
Output Single Int Column 2767 2813 65 5.7 175.9 1.0X
6+
Output Single Double Column 2973 2975 2 5.3 189.0 0.9X
7+
Output Int and String Column 6024 6036 16 2.6 383.0 0.5X
8+
Output Partitions 4610 4709 140 3.4 293.1 0.6X
9+
Output Buckets 6177 6209 45 2.5 392.7 0.4X
10+
11+
OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
12+
Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
13+
Write wide rows into 20 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
14+
------------------------------------------------------------------------------------------------------------------------
15+
Write wide rows 40838 40936 139 0.0 81675.4 1.0X
1016

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,11 +338,12 @@ private[sql] class AvroDeserializer(
338338
val validFieldIndexes = ArrayBuffer.empty[Int]
339339
val fieldWriters = ArrayBuffer.empty[(CatalystDataUpdater, Any) => Unit]
340340

341+
val avroSchemaHelper = new AvroUtils.AvroSchemaHelper(avroType, avroPath)
341342
val length = catalystType.length
342343
var i = 0
343344
while (i < length) {
344345
val catalystField = catalystType.fields(i)
345-
AvroUtils.getAvroFieldByName(avroType, catalystField.name, avroPath) match {
346+
avroSchemaHelper.getFieldByName(catalystField.name) match {
346347
case Some(avroField) =>
347348
validFieldIndexes += avroField.pos()
348349

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,11 +250,11 @@ private[sql] class AvroSerializer(
250250
s"Avro $avroPathStr schema length (${avroFields.size}) doesn't match " +
251251
s"SQL ${toFieldStr(catalystPath)} schema length (${catalystStruct.length})")
252252
}
253+
val avroSchemaHelper = new AvroUtils.AvroSchemaHelper(avroStruct, avroPath)
253254

254255
val (avroIndices: Array[Int], fieldConverters: Array[Converter]) =
255256
catalystStruct.map { catalystField =>
256-
val avroField = AvroUtils
257-
.getAvroFieldByName(avroStruct, catalystField.name, avroPath) match {
257+
val avroField = avroSchemaHelper.getFieldByName(catalystField.name) match {
258258
case Some(f) => f
259259
case None => throw new IncompatibleSchemaException(s"Cannot find " +
260260
s"${toFieldStr(catalystPath :+ catalystField.name)} in Avro schema at $avroPathStr")

0 commit comments

Comments
 (0)