diff --git a/external/avro/benchmarks/AvroReadBenchmark-jdk11-results.txt b/external/avro/benchmarks/AvroReadBenchmark-jdk11-results.txt index 3c1b5af0d5986..b70b1446f6f64 100644 --- a/external/avro/benchmarks/AvroReadBenchmark-jdk11-results.txt +++ b/external/avro/benchmarks/AvroReadBenchmark-jdk11-results.txt @@ -2,121 +2,129 @@ SQL Single Numeric Column Scan ================================================================================================ -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single TINYINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2689 2694 7 5.8 170.9 1.0X +Sum 2872 2936 90 5.5 182.6 1.0X -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single SMALLINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2741 2759 26 5.7 174.2 1.0X +Sum 2810 2838 40 5.6 178.6 1.0X -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single INT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2736 2748 17 5.7 173.9 1.0X +Sum 2901 2922 30 5.4 184.4 1.0X -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single BIGINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 3305 3317 17 4.8 210.2 1.0X +Sum 3387 3391 5 4.6 215.4 1.0X -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single FLOAT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2904 2952 68 5.4 184.6 1.0X +Sum 2890 2960 99 5.4 183.7 1.0X -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single DOUBLE Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 3090 3093 4 5.1 196.5 1.0X +Sum 3067 3088 30 5.1 195.0 1.0X ================================================================================================ Int and String Scan ================================================================================================ -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Int and String Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of columns 5351 5365 20 2.0 510.3 1.0X +Sum of columns 4736 4818 116 2.2 451.7 1.0X ================================================================================================ Partitioned Table Scan ================================================================================================ -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Partitioned Table: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Data column 3278 3288 14 4.8 208.4 1.0X -Partition column 3149 3193 62 5.0 200.2 1.0X -Both columns 3198 3204 7 4.9 203.4 1.0X +Data column 3383 3400 23 4.6 215.1 1.0X +Partition column 2949 2959 14 5.3 187.5 1.1X +Both columns 3522 3545 33 4.5 223.9 1.0X ================================================================================================ Repeated String Scan ================================================================================================ -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Repeated String: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of string length 3435 3438 5 3.1 327.6 1.0X +Sum of string length 3332 3355 32 3.1 317.7 1.0X ================================================================================================ String with Nulls Scan ================================================================================================ -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz String with Nulls Scan (0.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of string length 5634 5650 23 1.9 537.3 1.0X +Sum of string length 5588 5652 90 1.9 532.9 1.0X -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz String with Nulls Scan (50.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of string length 4725 4752 39 2.2 450.6 1.0X +Sum of string length 3858 3865 9 2.7 368.0 1.0X -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz String with Nulls Scan (95.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of string length 3550 3566 23 3.0 338.6 1.0X +Sum of string length 2562 2571 12 4.1 244.3 1.0X ================================================================================================ Single Column Scan From Wide Columns ================================================================================================ -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Single Column Scan from 100 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of single column 5271 5279 11 0.2 5027.0 1.0X +Sum of single column 5241 5243 3 0.2 4998.0 1.0X -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Single Column Scan from 200 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of single column 10393 10516 174 0.1 9911.3 1.0X +Sum of single column 10178 10185 10 0.1 9706.5 1.0X -OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Single Column Scan from 300 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of single column 15330 15343 19 0.1 14619.6 1.0X +Sum of single column 15201 15232 44 0.1 14496.4 1.0X +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Filters pushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +w/o filters 9614 9669 54 0.1 9614.1 1.0X +pushdown disabled 10077 10141 66 0.1 10077.2 1.0X +w/ filters 4681 4713 29 0.2 4681.5 2.1X + diff --git a/external/avro/benchmarks/AvroReadBenchmark-results.txt b/external/avro/benchmarks/AvroReadBenchmark-results.txt index 0ab611a0f9a29..3108a9c8e13fe 100644 --- a/external/avro/benchmarks/AvroReadBenchmark-results.txt +++ b/external/avro/benchmarks/AvroReadBenchmark-results.txt @@ -2,121 +2,129 @@ SQL Single Numeric Column Scan ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single TINYINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 3049 3071 32 5.2 193.8 1.0X +Sum 2841 2846 7 5.5 180.6 1.0X -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single SMALLINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2982 2992 13 5.3 189.6 1.0X +Sum 2777 2799 30 5.7 176.6 1.0X -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single INT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2984 2989 7 5.3 189.7 1.0X +Sum 2730 2753 33 5.8 173.6 1.0X -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single BIGINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 3262 3353 128 4.8 207.4 1.0X +Sum 3278 3284 9 4.8 208.4 1.0X -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single FLOAT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2716 2723 10 5.8 172.7 1.0X +Sum 2801 2805 6 5.6 178.1 1.0X -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz SQL Single DOUBLE Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2868 2870 3 5.5 182.4 1.0X +Sum 2976 2984 12 5.3 189.2 1.0X ================================================================================================ Int and String Scan ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Int and String Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of columns 4714 4739 35 2.2 449.6 1.0X +Sum of columns 4674 4686 17 2.2 445.8 1.0X ================================================================================================ Partitioned Table Scan ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Partitioned Table: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Data column 3257 3286 41 4.8 207.1 1.0X -Partition column 3258 3277 27 4.8 207.2 1.0X -Both columns 3399 3405 9 4.6 216.1 1.0X +Data column 3273 3284 17 4.8 208.1 1.0X +Partition column 2934 2935 2 5.4 186.6 1.1X +Both columns 3395 3405 14 4.6 215.8 1.0X ================================================================================================ Repeated String Scan ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Repeated String: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of string length 3292 3316 33 3.2 314.0 1.0X +Sum of string length 3340 3353 19 3.1 318.5 1.0X ================================================================================================ String with Nulls Scan ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz String with Nulls Scan (0.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of string length 5450 5456 9 1.9 519.7 1.0X +Sum of string length 5484 5493 12 1.9 523.0 1.0X -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz String with Nulls Scan (50.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of string length 4410 4435 35 2.4 420.6 1.0X +Sum of string length 3817 3833 22 2.7 364.0 1.0X -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz String with Nulls Scan (95.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of string length 3074 3122 68 3.4 293.2 1.0X +Sum of string length 2340 2354 20 4.5 223.2 1.0X ================================================================================================ Single Column Scan From Wide Columns ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Single Column Scan from 100 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of single column 5120 5136 23 0.2 4882.7 1.0X +Sum of single column 4709 4719 14 0.2 4491.1 1.0X -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Single Column Scan from 200 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of single column 9952 10002 71 0.1 9490.7 1.0X +Sum of single column 9159 9171 18 0.1 8734.3 1.0X -OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Single Column Scan from 300 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of single column 14973 14978 7 0.1 14279.8 1.0X +Sum of single column 13645 13751 151 0.1 13012.8 1.0X +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Filters pushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +w/o filters 9215 9309 146 0.1 9215.2 1.0X +pushdown disabled 9535 9637 96 0.1 9534.9 1.0X +w/ filters 3969 3994 22 0.3 3969.5 2.3X + diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala index 79c72057c5823..285a30bcd046e 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala @@ -98,7 +98,10 @@ case class AvroDataToCatalyst( try { decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, decoder) result = reader.read(result, decoder) - deserializer.deserialize(result) + val deserialized = deserializer.deserialize(result) + assert(deserialized.isDefined, + "Avro deserializer cannot return an empty result because filters are not pushed down") + deserialized.get } catch { // There could be multiple possible exceptions here, e.g. java.io.IOException, // AvroRuntimeException, ArrayIndexOutOfBoundsException, etc. diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 1d18594fd349c..29385b78e3490 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -30,7 +30,7 @@ import org.apache.avro.Schema.Type._ import org.apache.avro.generic._ import org.apache.avro.util.Utf8 -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters} import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData} import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY @@ -45,12 +45,15 @@ import org.apache.spark.unsafe.types.UTF8String class AvroDeserializer( rootAvroType: Schema, rootCatalystType: DataType, - datetimeRebaseMode: LegacyBehaviorPolicy.Value) { + datetimeRebaseMode: LegacyBehaviorPolicy.Value, + filters: StructFilters) { def this(rootAvroType: Schema, rootCatalystType: DataType) { - this(rootAvroType, rootCatalystType, - LegacyBehaviorPolicy.withName( - SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))) + this( + rootAvroType, + rootCatalystType, + LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)), + new NoopFilters) } private lazy val decimalConversions = new DecimalConversion() @@ -61,19 +64,20 @@ class AvroDeserializer( private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead( datetimeRebaseMode, "Avro") - private val converter: Any => Any = rootCatalystType match { + private val converter: Any => Option[Any] = rootCatalystType match { // A shortcut for empty schema. case st: StructType if st.isEmpty => - (data: Any) => InternalRow.empty + (data: Any) => Some(InternalRow.empty) case st: StructType => val resultRow = new SpecificInternalRow(st.map(_.dataType)) val fieldUpdater = new RowUpdater(resultRow) - val writer = getRecordWriter(rootAvroType, st, Nil) + val applyFilters = filters.skipRow(resultRow, _) + val writer = getRecordWriter(rootAvroType, st, Nil, applyFilters) (data: Any) => { val record = data.asInstanceOf[GenericRecord] - writer(fieldUpdater, record) - resultRow + val skipRow = writer(fieldUpdater, record) + if (skipRow) None else Some(resultRow) } case _ => @@ -82,11 +86,11 @@ class AvroDeserializer( val writer = newWriter(rootAvroType, rootCatalystType, Nil) (data: Any) => { writer(fieldUpdater, 0, data) - tmpRow.get(0, rootCatalystType) + Some(tmpRow.get(0, rootCatalystType)) } } - def deserialize(data: Any): Any = converter(data) + def deserialize(data: Any): Option[Any] = converter(data) /** * Creates a writer to write avro values to Catalyst values at the given ordinal with the given @@ -178,7 +182,9 @@ class AvroDeserializer( updater.setDecimal(ordinal, decimal) case (RECORD, st: StructType) => - val writeRecord = getRecordWriter(avroType, st, path) + // Avro datasource doesn't accept filters with nested attributes. See SPARK-32328. + // We can always return `false` from `applyFilters` for nested records. + val writeRecord = getRecordWriter(avroType, st, path, applyFilters = _ => false) (updater, ordinal, value) => val row = new SpecificInternalRow(st) writeRecord(new RowUpdater(row), value.asInstanceOf[GenericRecord]) @@ -315,7 +321,8 @@ class AvroDeserializer( private def getRecordWriter( avroType: Schema, sqlType: StructType, - path: List[String]): (CatalystDataUpdater, GenericRecord) => Unit = { + path: List[String], + applyFilters: Int => Boolean): (CatalystDataUpdater, GenericRecord) => Boolean = { val validFieldIndexes = ArrayBuffer.empty[Int] val fieldWriters = ArrayBuffer.empty[(CatalystDataUpdater, Any) => Unit] @@ -350,10 +357,13 @@ class AvroDeserializer( (fieldUpdater, record) => { var i = 0 - while (i < validFieldIndexes.length) { + var skipRow = false + while (i < validFieldIndexes.length && !skipRow) { fieldWriters(i)(fieldUpdater, record.get(validFieldIndexes(i))) + skipRow = applyFilters(i) i += 1 } + skipRow } } diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 59d54bc433f8b..fa4b6b829bdde 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.mapreduce.Job import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters} import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FileFormat, OutputWriterFactory, PartitionedFile} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{DataSourceRegister, Filter} @@ -122,38 +122,28 @@ private[sql] class AvroFileFormat extends FileFormat } reader.sync(file.start) - val stop = file.start + file.length val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( reader.asInstanceOf[DataFileReader[_]].getMetaString, SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)) - val deserializer = new AvroDeserializer( - userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, datetimeRebaseMode) - - new Iterator[InternalRow] { - private[this] var completed = false - - override def hasNext: Boolean = { - if (completed) { - false - } else { - val r = reader.hasNext && !reader.pastSync(stop) - if (!r) { - reader.close() - completed = true - } - r - } - } + val avroFilters = if (SQLConf.get.avroFilterPushDown) { + new OrderedFilters(filters, requiredSchema) + } else { + new NoopFilters + } - override def next(): InternalRow = { - if (!hasNext) { - throw new NoSuchElementException("next on empty iterator") - } - val record = reader.next() - deserializer.deserialize(record).asInstanceOf[InternalRow] - } + new Iterator[InternalRow] with AvroUtils.RowReader { + override val fileReader = reader + override val deserializer = new AvroDeserializer( + userProvidedSchema.getOrElse(reader.getSchema), + requiredSchema, + datetimeRebaseMode, + avroFilters) + override val stopPosition = file.start + file.length + + override def hasNext: Boolean = hasNextRow + override def next(): InternalRow = nextRow } } else { Iterator.empty diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala index 70dcd58a600fc..51cc51e70cd18 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql.avro import java.io.{FileNotFoundException, IOException} import org.apache.avro.Schema +import org.apache.avro.file.{DataFileReader, FileReader} import org.apache.avro.file.DataFileConstants.{BZIP2_CODEC, DEFLATE_CODEC, SNAPPY_CODEC, XZ_CODEC} -import org.apache.avro.file.DataFileReader import org.apache.avro.generic.{GenericDatumReader, GenericRecord} import org.apache.avro.mapred.{AvroOutputFormat, FsInput} import org.apache.avro.mapreduce.AvroJob @@ -32,6 +32,7 @@ import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.avro.AvroOptions.ignoreExtensionKey +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.OutputWriterFactory import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -161,4 +162,37 @@ object AvroUtils extends Logging { "No Avro files found. If files don't have .avro extension, set ignoreExtension to true") } } + + // The trait provides iterator-like interface for reading records from an Avro file, + // deserializing and returning them as internal rows. + trait RowReader { + protected val fileReader: FileReader[GenericRecord] + protected val deserializer: AvroDeserializer + protected val stopPosition: Long + + private[this] var completed = false + private[this] var currentRow: Option[InternalRow] = None + + def hasNextRow: Boolean = { + do { + val r = fileReader.hasNext && !fileReader.pastSync(stopPosition) + if (!r) { + fileReader.close() + completed = true + currentRow = None + } else { + val record = fileReader.next() + currentRow = deserializer.deserialize(record).asInstanceOf[Option[InternalRow]] + } + } while (!completed && currentRow.isEmpty) + + currentRow.isDefined + } + + def nextRow: InternalRow = { + currentRow.getOrElse { + throw new NoSuchElementException("next on empty iterator") + } + } + } } diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala index 15918f46a83bb..1e6c382041efb 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala @@ -29,12 +29,13 @@ import org.apache.hadoop.fs.Path import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging -import org.apache.spark.sql.avro.{AvroDeserializer, AvroOptions} -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.avro.{AvroDeserializer, AvroOptions, AvroUtils} +import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters} import org.apache.spark.sql.connector.read.PartitionReader import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile} import org.apache.spark.sql.execution.datasources.v2.{EmptyPartitionReader, FilePartitionReaderFactory, PartitionReaderWithPartitionValues} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -54,7 +55,8 @@ case class AvroPartitionReaderFactory( dataSchema: StructType, readDataSchema: StructType, partitionSchema: StructType, - parsedOptions: AvroOptions) extends FilePartitionReaderFactory with Logging { + parsedOptions: AvroOptions, + filters: Seq[Filter]) extends FilePartitionReaderFactory with Logging { override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { val conf = broadcastedConf.value.value @@ -86,38 +88,28 @@ case class AvroPartitionReaderFactory( } reader.sync(partitionedFile.start) - val stop = partitionedFile.start + partitionedFile.length val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( reader.asInstanceOf[DataFileReader[_]].getMetaString, SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)) - val deserializer = new AvroDeserializer( - userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, datetimeRebaseMode) - val fileReader = new PartitionReader[InternalRow] { - private[this] var completed = false - - override def next(): Boolean = { - if (completed) { - false - } else { - val r = reader.hasNext && !reader.pastSync(stop) - if (!r) { - reader.close() - completed = true - } - r - } - } + val avroFilters = if (SQLConf.get.avroFilterPushDown) { + new OrderedFilters(filters, readDataSchema) + } else { + new NoopFilters + } - override def get(): InternalRow = { - if (!next) { - throw new NoSuchElementException("next on empty iterator") - } - val record = reader.next() - deserializer.deserialize(record).asInstanceOf[InternalRow] - } + val fileReader = new PartitionReader[InternalRow] with AvroUtils.RowReader { + override val fileReader = reader + override val deserializer = new AvroDeserializer( + userProvidedSchema.getOrElse(reader.getSchema), + readDataSchema, + datetimeRebaseMode, + avroFilters) + override val stopPosition = partitionedFile.start + partitionedFile.length + override def next(): Boolean = hasNextRow + override def get(): InternalRow = nextRow override def close(): Unit = reader.close() } new PartitionReaderWithPartitionValues(fileReader, readDataSchema, diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala index fe7315c739296..e94bef2f8bebe 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.connector.read.PartitionReaderFactory import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.execution.datasources.v2.FileScan +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.SerializableConfiguration @@ -37,6 +38,7 @@ case class AvroScan( readDataSchema: StructType, readPartitionSchema: StructType, options: CaseInsensitiveStringMap, + pushedFilters: Array[Filter], partitionFilters: Seq[Expression] = Seq.empty, dataFilters: Seq[Expression] = Seq.empty) extends FileScan { override def isSplitable(path: Path): Boolean = true @@ -50,8 +52,14 @@ case class AvroScan( val parsedOptions = new AvroOptions(caseSensitiveMap, hadoopConf) // The partition values are already truncated in `FileScan.partitions`. // We should use `readPartitionSchema` as the partition schema here. - AvroPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, - dataSchema, readDataSchema, readPartitionSchema, parsedOptions) + AvroPartitionReaderFactory( + sparkSession.sessionState.conf, + broadcastedConf, + dataSchema, + readDataSchema, + readPartitionSchema, + parsedOptions, + pushedFilters) } override def withFilters( @@ -59,10 +67,18 @@ case class AvroScan( this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters) override def equals(obj: Any): Boolean = obj match { - case a: AvroScan => super.equals(a) && dataSchema == a.dataSchema && options == a.options - + case a: AvroScan => super.equals(a) && dataSchema == a.dataSchema && options == a.options && + equivalentFilters(pushedFilters, a.pushedFilters) case _ => false } override def hashCode(): Int = super.hashCode() + + override def description(): String = { + super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", ", "]") + } + + override def getMetaData(): Map[String, String] = { + super.getMetaData() ++ Map("PushedFilers" -> seqToString(pushedFilters)) + } } diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScanBuilder.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScanBuilder.scala index e36c71ef4b1f7..9420608bb22ce 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScanBuilder.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScanBuilder.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.v2.avro import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.read.Scan +import org.apache.spark.sql.catalyst.StructFilters +import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters} import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -29,8 +31,27 @@ class AvroScanBuilder ( schema: StructType, dataSchema: StructType, options: CaseInsensitiveStringMap) - extends FileScanBuilder(sparkSession, fileIndex, dataSchema) { + extends FileScanBuilder(sparkSession, fileIndex, dataSchema) with SupportsPushDownFilters { + override def build(): Scan = { - AvroScan(sparkSession, fileIndex, dataSchema, readDataSchema(), readPartitionSchema(), options) + AvroScan( + sparkSession, + fileIndex, + dataSchema, + readDataSchema(), + readPartitionSchema(), + options, + pushedFilters()) } + + private var _pushedFilters: Array[Filter] = Array.empty + + override def pushFilters(filters: Array[Filter]): Array[Filter] = { + if (sparkSession.sessionState.conf.avroFilterPushDown) { + _pushedFilters = StructFilters.pushedFilters(filters, dataSchema) + } + filters + } + + override def pushedFilters(): Array[Filter] = _pushedFilters } diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala index c8a1f670bda9e..2d3209f8daa26 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala @@ -26,11 +26,14 @@ import org.apache.avro.message.{BinaryMessageDecoder, BinaryMessageEncoder} import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.sql.{RandomDataGenerator, Row} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, NoopFilters, OrderedFilters, StructFilters} import org.apache.spark.sql.catalyst.expressions.{ExpressionEvalHelper, GenericInternalRow, Literal} import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.{EqualTo, Not} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String class AvroCatalystDataConversionSuite extends SparkFunSuite with SharedSparkSession @@ -272,6 +275,25 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite assert(message == "Cannot convert Catalyst type StringType to Avro type \"long\".") } + private def checkDeserialization( + schema: Schema, + data: GenericData.Record, + expected: Option[Any], + filters: StructFilters = new NoopFilters): Unit = { + val dataType = SchemaConverters.toSqlType(schema).dataType + val deserializer = new AvroDeserializer( + schema, + dataType, + SQLConf.LegacyBehaviorPolicy.CORRECTED, + filters) + val deserialized = deserializer.deserialize(data) + expected match { + case None => assert(deserialized == None) + case Some(d) => + assert(checkResult(d, deserialized.get, dataType, exprNullable = false)) + } + } + test("avro array can be generic java collection") { val jsonFormatSchema = """ @@ -287,30 +309,53 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite |} """.stripMargin val avroSchema = new Schema.Parser().parse(jsonFormatSchema) - val dataType = SchemaConverters.toSqlType(avroSchema).dataType - val deserializer = new AvroDeserializer(avroSchema, dataType) - - def checkDeserialization(data: GenericData.Record, expected: Any): Unit = { - assert(checkResult( - expected, - deserializer.deserialize(data), - dataType, exprNullable = false - )) - } def validateDeserialization(array: java.util.Collection[Integer]): Unit = { val data = new GenericRecordBuilder(avroSchema) .set("array", array) .build() val expected = InternalRow(new GenericArrayData(new util.ArrayList[Any](array))) - checkDeserialization(data, expected) + checkDeserialization(avroSchema, data, Some(expected)) val reEncoded = new BinaryMessageDecoder[GenericData.Record](new GenericData(), avroSchema) .decode(new BinaryMessageEncoder(new GenericData(), avroSchema).encode(data)) - checkDeserialization(reEncoded, expected) + checkDeserialization(avroSchema, reEncoded, Some(expected)) } validateDeserialization(Collections.emptySet()) validateDeserialization(util.Arrays.asList(1, null, 3)) } + + test("SPARK-32346: filter pushdown to Avro deserializer") { + val schema = + """ + |{ + | "type" : "record", + | "name" : "test_schema", + | "fields" : [ + | {"name": "Age", "type": "int"}, + | {"name": "Name", "type": "string"} + | ] + |} + """.stripMargin + val avroSchema = new Schema.Parser().parse(schema) + val sqlSchema = new StructType().add("Age", "int").add("Name", "string") + val data = new GenericRecordBuilder(avroSchema) + .set("Age", 39) + .set("Name", "Maxim") + .build() + val expectedRow = Some(InternalRow(39, UTF8String.fromString("Maxim"))) + + checkDeserialization(avroSchema, data, expectedRow) + checkDeserialization( + avroSchema, + data, + expectedRow, + new OrderedFilters(Seq(EqualTo("Age", 39)), sqlSchema)) + checkDeserialization( + avroSchema, + data, + None, + new OrderedFilters(Seq(Not(EqualTo("Age", 39))), sqlSchema)) + } } diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 83a7ef0061fb2..46fe9b2c44529 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -1920,6 +1920,7 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper { |Format: avro |Location: InMemoryFileIndex\\[.*\\] |PartitionFilters: \\[isnotnull\\(id#x\\), \\(id#x > 1\\)\\] + |PushedFilers: \\[IsNotNull\\(value\\), GreaterThan\\(value,2\\)\\] |ReadSchema: struct\\ |""".stripMargin.trim spark.range(10) @@ -1933,7 +1934,38 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper { .format("avro") .load(basePath).where($"id" > 1 && $"value" > 2) val normalizedOutput = getNormalizedExplain(df, FormattedMode) - assert(expected_plan_fragment.r.findAllMatchIn(normalizedOutput).length == 1) + assert(expected_plan_fragment.r.findAllMatchIn(normalizedOutput).length == 1, + normalizedOutput) + } + } + + test("SPARK-32346: filters pushdown to Avro datasource v2") { + Seq(true, false).foreach { filtersPushdown => + withSQLConf(SQLConf.AVRO_FILTER_PUSHDOWN_ENABLED.key -> filtersPushdown.toString) { + withTempPath { dir => + Seq(("a", 1, 2), ("b", 1, 2), ("c", 2, 1)) + .toDF("value", "p1", "p2") + .write + .format("avro") + .save(dir.getCanonicalPath) + val df = spark + .read + .format("avro") + .load(dir.getCanonicalPath) + .where("value = 'a'") + + val fileScan = df.queryExecution.executedPlan collectFirst { + case BatchScanExec(_, f: AvroScan) => f + } + assert(fileScan.nonEmpty) + if (filtersPushdown) { + assert(fileScan.get.pushedFilters.nonEmpty) + } else { + assert(fileScan.get.pushedFilters.isEmpty) + } + checkAnswer(df, Row("a", 1, 2)) + } + } } } } diff --git a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala index dc9606f405191..fde858e0a7419 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala @@ -17,11 +17,14 @@ package org.apache.spark.sql.execution.benchmark import java.io.File +import java.time.Instant import scala.util.Random import org.apache.spark.benchmark.Benchmark -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{Column, DataFrame} +import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** @@ -36,6 +39,8 @@ import org.apache.spark.sql.types._ * }}} */ object AvroReadBenchmark extends SqlBasedBenchmark { + import spark.implicits._ + def withTempTable(tableNames: String*)(f: => Unit): Unit = { try f finally tableNames.foreach(spark.catalog.dropTempView) } @@ -186,6 +191,60 @@ object AvroReadBenchmark extends SqlBasedBenchmark { } } + private def filtersPushdownBenchmark(rowsNum: Int, numIters: Int): Unit = { + val benchmark = new Benchmark("Filters pushdown", rowsNum, output = output) + val colsNum = 100 + val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", TimestampType)) + val schema = StructType(StructField("key", LongType) +: fields) + def columns(): Seq[Column] = { + val ts = Seq.tabulate(colsNum) { i => + lit(Instant.ofEpochSecond(-30610224000L + i * 123456)).as(s"col$i") + } + ($"id" % 1000).as("key") +: ts + } + withTempPath { path => + // Write and read timestamp in the LEGACY mode to make timestamp conversions more expensive + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> "LEGACY") { + spark.range(rowsNum).select(columns(): _*) + .write + .format("avro") + .save(path.getAbsolutePath) + } + def readback = { + spark.read + .schema(schema) + .format("avro") + .load(path.getAbsolutePath) + } + + benchmark.addCase("w/o filters", numIters) { _ => + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> "LEGACY") { + readback.noop() + } + } + + def withFilter(configEnabled: Boolean): Unit = { + withSQLConf( + SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> "LEGACY", + SQLConf.AVRO_FILTER_PUSHDOWN_ENABLED.key -> configEnabled.toString()) { + readback.filter($"key" === 0).noop() + } + } + + benchmark.addCase("pushdown disabled", numIters) { _ => + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> "LEGACY") { + withFilter(configEnabled = false) + } + } + + benchmark.addCase("w/ filters", numIters) { _ => + withFilter(configEnabled = true) + } + + benchmark.run() + } + } + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { runBenchmark("SQL Single Numeric Column Scan") { Seq(ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType).foreach { dataType => @@ -211,5 +270,8 @@ object AvroReadBenchmark extends SqlBasedBenchmark { columnsBenchmark(1024 * 1024 * 1, 200) columnsBenchmark(1024 * 1024 * 1, 300) } + // Benchmark pushdown filters that refer to top-level columns. + // TODO (SPARK-32328): Add benchmarks for filters with nested column attributes. + filtersPushdownBenchmark(rowsNum = 1000 * 1000, numIters = 3) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala similarity index 60% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala index d2cb2c4d8134a..b7c8a0140ea66 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala @@ -15,23 +15,21 @@ * limitations under the License. */ -package org.apache.spark.sql.catalyst.csv +package org.apache.spark.sql.catalyst -import org.apache.spark.sql.catalyst.{InternalRow, StructFilters} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType /** - * An instance of the class compiles filters to predicates and allows to - * apply the predicates to an internal row with partially initialized values - * converted from parsed CSV fields. + * An instance of the class compiles filters to predicates and sorts them in + * the order which allows to apply the predicates to an internal row with partially + * initialized values, for instance converted from parsed CSV fields. * - * @param filters The filters pushed down to CSV datasource. + * @param filters The filters pushed down to a datasource. * @param requiredSchema The schema with only fields requested by the upper layer. */ -class CSVFilters(filters: Seq[sources.Filter], requiredSchema: StructType) +class OrderedFilters(filters: Seq[sources.Filter], requiredSchema: StructType) extends StructFilters(filters, requiredSchema) { /** * Converted filters to predicates and grouped by maximum field index @@ -48,33 +46,31 @@ class CSVFilters(filters: Seq[sources.Filter], requiredSchema: StructType) private val predicates: Array[BasePredicate] = { val len = requiredSchema.fields.length val groupedPredicates = Array.fill[BasePredicate](len)(null) - if (SQLConf.get.csvFilterPushDown) { - val groupedFilters = Array.fill(len)(Seq.empty[sources.Filter]) - for (filter <- filters) { - val refs = filter.references - val index = if (refs.isEmpty) { - // For example, `AlwaysTrue` and `AlwaysFalse` doesn't have any references - // Filters w/o refs always return the same result. Taking into account - // that predicates are combined via `And`, we can apply such filters only - // once at the position 0. - 0 - } else { - // readSchema must contain attributes of all filters. - // Accordingly, `fieldIndex()` returns a valid index always. - refs.map(requiredSchema.fieldIndex).max - } - groupedFilters(index) :+= filter + val groupedFilters = Array.fill(len)(Seq.empty[sources.Filter]) + for (filter <- filters) { + val refs = filter.references + val index = if (refs.isEmpty) { + // For example, `AlwaysTrue` and `AlwaysFalse` doesn't have any references + // Filters w/o refs always return the same result. Taking into account + // that predicates are combined via `And`, we can apply such filters only + // once at the position 0. + 0 + } else { + // readSchema must contain attributes of all filters. + // Accordingly, `fieldIndex()` returns a valid index always. + refs.map(requiredSchema.fieldIndex).max } - if (len > 0 && !groupedFilters(0).isEmpty) { - // We assume that filters w/o refs like `AlwaysTrue` and `AlwaysFalse` - // can be evaluated faster that others. We put them in front of others. - val (literals, others) = groupedFilters(0).partition(_.references.isEmpty) - groupedFilters(0) = literals ++ others - } - for (i <- 0 until len) { - if (!groupedFilters(i).isEmpty) { - groupedPredicates(i) = toPredicate(groupedFilters(i)) - } + groupedFilters(index) :+= filter + } + if (len > 0 && groupedFilters(0).nonEmpty) { + // We assume that filters w/o refs like `AlwaysTrue` and `AlwaysFalse` + // can be evaluated faster that others. We put them in front of others. + val (literals, others) = groupedFilters(0).partition(_.references.isEmpty) + groupedFilters(0) = literals ++ others + } + for (i <- 0 until len) { + if (groupedFilters(i).nonEmpty) { + groupedPredicates(i) = toPredicate(groupedFilters(i)) } } groupedPredicates @@ -90,11 +86,13 @@ class CSVFilters(filters: Seq[sources.Filter], requiredSchema: StructType) * otherwise `false` if at least one of the filters returns `false`. */ def skipRow(row: InternalRow, index: Int): Boolean = { + assert(0 <= index && index < requiredSchema.fields.length, + "Index is out of the valid range: it must point out to a field of the required schema.") val predicate = predicates(index) predicate != null && !predicate.eval(row) } - // CSV filters are applied sequentially, and no need to track which filter references + // The filters are applied sequentially, and no need to track which filter references // point out to already set row values. The `reset()` method is trivial because // the filters don't have any states. def reset(): Unit = {} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 898b963fd0ab5..b5c14a193ddee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -25,10 +25,11 @@ import com.univocity.parsers.csv.CsvParser import org.apache.spark.SparkUpgradeException import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters} import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -98,7 +99,11 @@ class UnivocityParser( legacyFormat = FAST_DATE_FORMAT, isParsing = true) - private val csvFilters = new CSVFilters(filters, requiredSchema) + private val csvFilters = if (SQLConf.get.csvFilterPushDown) { + new OrderedFilters(filters, requiredSchema) + } else { + new NoopFilters + } // Retrieve the raw record string. private def getCurrentInput: UTF8String = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f0d0a601ff196..48a43e67ca89a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2549,6 +2549,12 @@ object SQLConf { .booleanConf .createWithDefault(true) + val AVRO_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.avro.filterPushdown.enabled") + .doc("When true, enable filter pushdown to Avro datasource.") + .version("3.1.0") + .booleanConf + .createWithDefault(true) + val ADD_PARTITION_BATCH_SIZE = buildConf("spark.sql.addPartitionInBatch.size") .internal() @@ -3263,6 +3269,8 @@ class SQLConf extends Serializable with Logging { def jsonFilterPushDown: Boolean = getConf(JSON_FILTER_PUSHDOWN_ENABLED) + def avroFilterPushDown: Boolean = getConf(AVRO_FILTER_PUSHDOWN_ENABLED) + def integerGroupingIdEnabled: Boolean = getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID) def legacyAllowCastNumericToTimestamp: Boolean = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/OrderedFiltersSuite.scala similarity index 83% rename from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVFiltersSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/OrderedFiltersSuite.scala index 21bef20d7d4d9..b156cb52e921c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVFiltersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/OrderedFiltersSuite.scala @@ -15,14 +15,13 @@ * limitations under the License. */ -package org.apache.spark.sql.catalyst.csv +package org.apache.spark.sql.catalyst -import org.apache.spark.sql.catalyst.{StructFilters, StructFiltersSuite} import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType -class CSVFiltersSuite extends StructFiltersSuite { +class OrderedFiltersSuite extends StructFiltersSuite { override def createFilters(filters: Seq[sources.Filter], schema: StructType): StructFilters = { - new CSVFilters(filters, schema) + new OrderedFilters(filters, schema) } }