From b5101d20850b7c3ddc03cece8088a7b34b683084 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 18 Jan 2021 15:01:55 +0800 Subject: [PATCH 1/7] Upgrade Parquet to 1.11.1 --- dev/deps/spark-deps-hadoop-2.7-hive-2.3 | 12 +++++----- dev/deps/spark-deps-hadoop-3.2-hive-2.3 | 12 +++++----- pom.xml | 6 ++++- .../parquet/ParquetSchemaSuite.scala | 22 +++++++++---------- .../spark/sql/streaming/StreamSuite.scala | 2 +- .../spark/sql/hive/StatisticsSuite.scala | 2 +- 6 files changed, 30 insertions(+), 26 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 index 1a89da42d6adf..d8ef490a8ae2c 100644 --- a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 @@ -202,12 +202,12 @@ orc-shims/1.6.6//orc-shims-1.6.6.jar oro/2.0.8//oro-2.0.8.jar osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar paranamer/2.8//paranamer-2.8.jar -parquet-column/1.10.1//parquet-column-1.10.1.jar -parquet-common/1.10.1//parquet-common-1.10.1.jar -parquet-encoding/1.10.1//parquet-encoding-1.10.1.jar -parquet-format/2.4.0//parquet-format-2.4.0.jar -parquet-hadoop/1.10.1//parquet-hadoop-1.10.1.jar -parquet-jackson/1.10.1//parquet-jackson-1.10.1.jar +parquet-column/1.11.1//parquet-column-1.11.1.jar +parquet-common/1.11.1//parquet-common-1.11.1.jar +parquet-encoding/1.11.1//parquet-encoding-1.11.1.jar +parquet-format-structures/1.11.1//parquet-format-structures-1.11.1.jar +parquet-hadoop/1.11.1//parquet-hadoop-1.11.1.jar +parquet-jackson/1.11.1//parquet-jackson-1.11.1.jar protobuf-java/2.5.0//protobuf-java-2.5.0.jar py4j/0.10.9.1//py4j-0.10.9.1.jar pyrolite/4.30//pyrolite-4.30.jar diff --git a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 index 562a436425ba4..d52ae2476684a 100644 --- a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 @@ -172,12 +172,12 @@ orc-shims/1.6.6//orc-shims-1.6.6.jar oro/2.0.8//oro-2.0.8.jar osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar paranamer/2.8//paranamer-2.8.jar -parquet-column/1.10.1//parquet-column-1.10.1.jar -parquet-common/1.10.1//parquet-common-1.10.1.jar -parquet-encoding/1.10.1//parquet-encoding-1.10.1.jar -parquet-format/2.4.0//parquet-format-2.4.0.jar -parquet-hadoop/1.10.1//parquet-hadoop-1.10.1.jar -parquet-jackson/1.10.1//parquet-jackson-1.10.1.jar +parquet-column/1.11.1//parquet-column-1.11.1.jar +parquet-common/1.11.1//parquet-common-1.11.1.jar +parquet-encoding/1.11.1//parquet-encoding-1.11.1.jar +parquet-format-structures/1.11.1//parquet-format-structures-1.11.1.jar +parquet-hadoop/1.11.1//parquet-hadoop-1.11.1.jar +parquet-jackson/1.11.1//parquet-jackson-1.11.1.jar protobuf-java/2.5.0//protobuf-java-2.5.0.jar py4j/0.10.9.1//py4j-0.10.9.1.jar pyrolite/4.30//pyrolite-4.30.jar diff --git a/pom.xml b/pom.xml index 3a204279666b7..f667d33e0b8c8 100644 --- a/pom.xml +++ b/pom.xml @@ -136,7 +136,7 @@ 2.6.0 10.14.2.0 - 1.10.1 + 1.11.1 1.6.6 9.4.34.v20201102 4.0.3 @@ -2318,6 +2318,10 @@ commons-pool commons-pool + + javax.annotation + javax.annotation-api + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index e97c6cd29709c..fcc08ee16e805 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -251,7 +251,7 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { """ |message root { | optional group _1 (MAP) { - | repeated group map (MAP_KEY_VALUE) { + | repeated group key_value (MAP_KEY_VALUE) { | required int32 key; | optional binary value (UTF8); | } @@ -267,7 +267,7 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { """ |message root { | optional group _1 (MAP) { - | repeated group map (MAP_KEY_VALUE) { + | repeated group key_value (MAP_KEY_VALUE) { | required group key { | optional binary _1 (UTF8); | optional binary _2 (UTF8); @@ -300,7 +300,7 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { """ |message root { | optional group _1 (MAP_KEY_VALUE) { - | repeated group map { + | repeated group key_value { | required int32 key; | optional group value { | optional binary _1 (UTF8); @@ -740,7 +740,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { nullable = true))), """message root { | optional group f1 (MAP_KEY_VALUE) { - | repeated group map { + | repeated group key_value { | required int32 num; | required binary str (UTF8); | } @@ -759,7 +759,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { nullable = true))), """message root { | optional group f1 (MAP) { - | repeated group map (MAP_KEY_VALUE) { + | repeated group key_value (MAP_KEY_VALUE) { | required int32 key; | required binary value (UTF8); | } @@ -797,7 +797,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { nullable = true))), """message root { | optional group f1 (MAP_KEY_VALUE) { - | repeated group map { + | repeated group key_value { | required int32 num; | optional binary str (UTF8); | } @@ -816,7 +816,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { nullable = true))), """message root { | optional group f1 (MAP) { - | repeated group map (MAP_KEY_VALUE) { + | repeated group key_value (MAP_KEY_VALUE) { | required int32 key; | optional binary value (UTF8); | } @@ -857,7 +857,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { nullable = true))), """message root { | optional group f1 (MAP) { - | repeated group map (MAP_KEY_VALUE) { + | repeated group key_value (MAP_KEY_VALUE) { | required int32 key; | required binary value (UTF8); | } @@ -893,7 +893,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { nullable = true))), """message root { | optional group f1 (MAP) { - | repeated group map (MAP_KEY_VALUE) { + | repeated group key_value (MAP_KEY_VALUE) { | required int32 key; | optional binary value (UTF8); | } @@ -1447,7 +1447,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { parquetSchema = """message root { | required group f0 (MAP) { - | repeated group map (MAP_KEY_VALUE) { + | repeated group key_value (MAP_KEY_VALUE) { | required int32 key; | required group value { | required int32 value_f0; @@ -1472,7 +1472,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { expectedSchema = """message root { | required group f0 (MAP) { - | repeated group map (MAP_KEY_VALUE) { + | repeated group key_value (MAP_KEY_VALUE) { | required int32 key; | required group value { | required int64 value_f1; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 440fe997ae133..25a0796bba439 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -214,7 +214,7 @@ class StreamSuite extends StreamTest { .start(outputDir.getAbsolutePath) try { query.processAllAvailable() - val outputDf = spark.read.parquet(outputDir.getAbsolutePath).as[Long] + val outputDf = spark.read.parquet(outputDir.getAbsolutePath).sort('a).as[Long] checkDataset[Long](outputDf, (0L to 10L).toArray: _*) } finally { query.stop() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 5357f4b63d794..c91ee92350fc8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1528,7 +1528,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto Seq(tbl, ext_tbl).foreach { tblName => sql(s"INSERT INTO $tblName VALUES (1, 'a', '2019-12-13')") - val expectedSize = 601 + val expectedSize = 651 // analyze table sql(s"ANALYZE TABLE $tblName COMPUTE STATISTICS NOSCAN") var tableStats = getTableStats(tblName) From 4e257c43895d36f0d5630cc735fb56642470b26d Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 18 Jan 2021 23:26:34 +0800 Subject: [PATCH 2/7] fix --- .../spark/sql/execution/command/DDLCommandTestUtils.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala index 573c450254b29..dec67508218be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala @@ -101,11 +101,10 @@ trait DDLCommandTestUtils extends SQLTestUtils { if (stats.isEmpty) { throw new IllegalArgumentException(s"The table $tableName does not have stats") } - val tableSizeInStats = ".*(\\d) bytes.*".r - val size = stats.first().getString(0) match { - case tableSizeInStats(s) => s.toInt + stats.first().getString(0) match { + case s if s.endsWith("bytes") && s.trim.split(" ").length > 1 => + s.split(" ").head.toInt case _ => throw new IllegalArgumentException("Not found table size in stats") } - size } } From c9b479284a220074424a40c07af7ecd27085c5cd Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 20 Jan 2021 23:10:25 +0800 Subject: [PATCH 3/7] Workaround: PARQUET-1580: Page-level CRC checksum verfication for DataPageV1 --- .../scala/org/apache/spark/sql/streaming/StreamSuite.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 25a0796bba439..8ed7b1c8fb188 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -29,6 +29,7 @@ import scala.util.control.ControlThrowable import com.google.common.util.concurrent.UncheckedExecutionException import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.ParquetOutputFormat import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkConf, SparkContext, TaskContext, TestUtils} @@ -214,7 +215,7 @@ class StreamSuite extends StreamTest { .start(outputDir.getAbsolutePath) try { query.processAllAvailable() - val outputDf = spark.read.parquet(outputDir.getAbsolutePath).sort('a).as[Long] + val outputDf = spark.read.parquet(outputDir.getAbsolutePath).as[Long] checkDataset[Long](outputDf, (0L to 10L).toArray: _*) } finally { query.stop() @@ -225,7 +226,9 @@ class StreamSuite extends StreamTest { val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load() Seq("", "parquet").foreach { useV1Source => - withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1Source) { + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> useV1Source, + ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED -> "false") { assertDF(df) assertDF(df) } From 802eb369d3cada5a5dbc284febf91c4fc5b8dbcb Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 21 Jan 2021 07:59:57 +0800 Subject: [PATCH 4/7] Disables page-level CRC checksums by default. --- .../execution/datasources/parquet/ParquetFileFormat.scala | 3 +++ .../scala/org/apache/spark/sql/streaming/StreamSuite.scala | 5 +---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 1901f5575470e..b8d2e526b0be1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -127,6 +127,9 @@ class ParquetFileFormat conf.setEnum(ParquetOutputFormat.JOB_SUMMARY_LEVEL, JobSummaryLevel.NONE) } + // PARQUET-1580: Disables page-level CRC checksums by default. + conf.setBooleanIfUnset(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false) + if (ParquetOutputFormat.getJobSummaryLevel(conf) != JobSummaryLevel.NONE && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) { // output summary is requested, but the class is not a Parquet Committer diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 8ed7b1c8fb188..440fe997ae133 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -29,7 +29,6 @@ import scala.util.control.ControlThrowable import com.google.common.util.concurrent.UncheckedExecutionException import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.ParquetOutputFormat import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkConf, SparkContext, TaskContext, TestUtils} @@ -226,9 +225,7 @@ class StreamSuite extends StreamTest { val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load() Seq("", "parquet").foreach { useV1Source => - withSQLConf( - SQLConf.USE_V1_SOURCE_LIST.key -> useV1Source, - ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED -> "false") { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1Source) { assertDF(df) assertDF(df) } From f2ccb3a96353081fce6cd3d3730785fff107c83a Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 21 Jan 2021 08:57:39 +0800 Subject: [PATCH 5/7] fix --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index b8d2e526b0be1..37a67d398961d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -127,7 +127,7 @@ class ParquetFileFormat conf.setEnum(ParquetOutputFormat.JOB_SUMMARY_LEVEL, JobSummaryLevel.NONE) } - // PARQUET-1580: Disables page-level CRC checksums by default. + // PARQUET-1746: Disables page-level CRC checksums by default. conf.setBooleanIfUnset(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false) if (ParquetOutputFormat.getJobSummaryLevel(conf) != JobSummaryLevel.NONE From a89c61d90cc145cea7e5c3df1200fb3ec1d7a3db Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 21 Jan 2021 10:50:59 +0800 Subject: [PATCH 6/7] fix --- .../test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index c91ee92350fc8..21fe7cd3b4b04 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1528,7 +1528,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto Seq(tbl, ext_tbl).foreach { tblName => sql(s"INSERT INTO $tblName VALUES (1, 'a', '2019-12-13')") - val expectedSize = 651 + val expectedSize = 639 // analyze table sql(s"ANALYZE TABLE $tblName COMPUTE STATISTICS NOSCAN") var tableStats = getTableStats(tblName) From 72c52b64958340835e5a54b24aa68f201f4c15be Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 26 Jan 2021 13:22:27 +0800 Subject: [PATCH 7/7] Make "DataFrame reuse" test pass --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 3 --- .../scala/org/apache/spark/sql/streaming/StreamSuite.scala | 4 +++- .../scala/org/apache/spark/sql/hive/StatisticsSuite.scala | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 37a67d398961d..1901f5575470e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -127,9 +127,6 @@ class ParquetFileFormat conf.setEnum(ParquetOutputFormat.JOB_SUMMARY_LEVEL, JobSummaryLevel.NONE) } - // PARQUET-1746: Disables page-level CRC checksums by default. - conf.setBooleanIfUnset(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false) - if (ParquetOutputFormat.getJobSummaryLevel(conf) != JobSummaryLevel.NONE && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) { // output summary is requested, but the class is not a Parquet Committer diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 440fe997ae133..c4e43d24b0b82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -214,7 +214,9 @@ class StreamSuite extends StreamTest { .start(outputDir.getAbsolutePath) try { query.processAllAvailable() - val outputDf = spark.read.parquet(outputDir.getAbsolutePath).as[Long] + // Parquet write page-level CRC checksums will change the file size and + // affect the data order when reading these files. Please see PARQUET-1746 for details. + val outputDf = spark.read.parquet(outputDir.getAbsolutePath).sort('a).as[Long] checkDataset[Long](outputDf, (0L to 10L).toArray: _*) } finally { query.stop() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 21fe7cd3b4b04..c91ee92350fc8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1528,7 +1528,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto Seq(tbl, ext_tbl).foreach { tblName => sql(s"INSERT INTO $tblName VALUES (1, 'a', '2019-12-13')") - val expectedSize = 639 + val expectedSize = 651 // analyze table sql(s"ANALYZE TABLE $tblName COMPUTE STATISTICS NOSCAN") var tableStats = getTableStats(tblName)