diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java new file mode 100644 index 0000000000000..1a37c7a33f20c --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; + +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +/** + * A mapper class from Spark supported parquet compression codecs to parquet compression codecs. + */ +public enum ParquetCompressionCodec { + NONE(CompressionCodecName.UNCOMPRESSED), + UNCOMPRESSED(CompressionCodecName.UNCOMPRESSED), + SNAPPY(CompressionCodecName.SNAPPY), + GZIP(CompressionCodecName.GZIP), + LZO(CompressionCodecName.LZO), + BROTLI(CompressionCodecName.BROTLI), + LZ4(CompressionCodecName.LZ4), + LZ4_RAW(CompressionCodecName.LZ4_RAW), + ZSTD(CompressionCodecName.ZSTD); + + private final CompressionCodecName compressionCodec; + + ParquetCompressionCodec(CompressionCodecName compressionCodec) { + this.compressionCodec = compressionCodec; + } + + public CompressionCodecName getCompressionCodec() { + return this.compressionCodec; + } + + public static ParquetCompressionCodec fromString(String s) { + return ParquetCompressionCodec.valueOf(s.toUpperCase(Locale.ROOT)); + } + + public static final List availableCodecs = + Arrays.asList( + ParquetCompressionCodec.UNCOMPRESSED, + ParquetCompressionCodec.SNAPPY, + ParquetCompressionCodec.GZIP, + ParquetCompressionCodec.ZSTD, + ParquetCompressionCodec.LZ4, + ParquetCompressionCodec.LZ4_RAW); +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index 559a994319d3e..ae110fdd0d3a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.parquet import java.util.Locale import org.apache.parquet.hadoop.ParquetOutputFormat -import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -88,16 +87,10 @@ class ParquetOptions( object ParquetOptions extends DataSourceOptions { // The parquet compression short names - private val shortParquetCompressionCodecNames = Map( - "none" -> CompressionCodecName.UNCOMPRESSED, - "uncompressed" -> CompressionCodecName.UNCOMPRESSED, - "snappy" -> CompressionCodecName.SNAPPY, - "gzip" -> CompressionCodecName.GZIP, - "lzo" -> CompressionCodecName.LZO, - "brotli" -> CompressionCodecName.BROTLI, - "lz4" -> CompressionCodecName.LZ4, - "lz4_raw" -> CompressionCodecName.LZ4_RAW, - "zstd" -> CompressionCodecName.ZSTD) + private val shortParquetCompressionCodecNames = + ParquetCompressionCodec.values().map { + codec => codec.name().toLowerCase(Locale.ROOT) -> codec.getCompressionCodec + }.toMap def getParquetCompressionCodecName(name: String): String = { shortParquetCompressionCodecNames(name).name() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala index 4752787c501bf..ba3228878ecee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala @@ -16,9 +16,12 @@ */ package org.apache.spark.sql.execution.benchmark +import java.util.Locale + import org.apache.parquet.column.ParquetProperties import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec import org.apache.spark.sql.internal.SQLConf /** @@ -51,7 +54,8 @@ object BuiltInDataSourceWriteBenchmark extends DataSourceWriteBenchmark { mainArgs } - spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, "snappy") + spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, + ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) spark.conf.set(SQLConf.ORC_COMPRESSION.key, "snappy") formats.foreach { format => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala index 771f944f1f6c5..a8736c041517f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.benchmark import java.io.File +import java.util.Locale import scala.jdk.CollectionConverters._ import scala.util.Random @@ -28,7 +29,7 @@ import org.apache.spark.{SparkConf, TestUtils} import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader +import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, VectorizedParquetRecordReader} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnVector @@ -99,15 +100,17 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark { spark.read.json(dir).createOrReplaceTempView("jsonTable") } + val parquetCodec = ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT) + private def saveAsParquetV1Table(df: DataFrameWriter[Row], dir: String): Unit = { - df.mode("overwrite").option("compression", "snappy").parquet(dir) + df.mode("overwrite").option("compression", parquetCodec).parquet(dir) spark.read.parquet(dir).createOrReplaceTempView("parquetV1Table") } private def saveAsParquetV2Table(df: DataFrameWriter[Row], dir: String): Unit = { withSQLConf(ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString) { - df.mode("overwrite").option("compression", "snappy").parquet(dir) + df.mode("overwrite").option("compression", parquetCodec).parquet(dir) spark.read.parquet(dir).createOrReplaceTempView("parquetV2Table") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala index 4862571b9c1be..10781ec90fa00 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala @@ -18,12 +18,14 @@ package org.apache.spark.sql.execution.benchmark import java.io.File +import java.util.Locale import scala.util.Random import org.apache.spark.SparkConf import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec import org.apache.spark.sql.functions.{monotonically_increasing_id, timestamp_seconds} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType @@ -50,7 +52,8 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark { .setIfMissing("spark.driver.memory", "3g") .setIfMissing("spark.executor.memory", "3g") .setIfMissing("orc.compression", "snappy") - .setIfMissing("spark.sql.parquet.compression.codec", "snappy") + .setIfMissing("spark.sql.parquet.compression.codec", + ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) SparkSession.builder().config(conf).getOrCreate() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala index c26272d1dcd63..f01cfea62a958 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.benchmark +import java.util.Locale + import scala.util.Try import org.apache.spark.SparkConf @@ -29,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -51,7 +54,8 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark with Logging { val conf = new SparkConf() .setMaster(System.getProperty("spark.sql.test.master", "local[1]")) .setAppName("test-sql-context") - .set("spark.sql.parquet.compression.codec", "snappy") + .set("spark.sql.parquet.compression.codec", + ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) .set("spark.sql.shuffle.partitions", System.getProperty("spark.sql.shuffle.partitions", "4")) .set("spark.driver.memory", "3g") .set("spark.executor.memory", "3g") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala index 11e9f4665a9cf..1f1805a02d765 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala @@ -17,7 +17,12 @@ package org.apache.spark.sql.execution.datasources +import java.util.Locale + +import scala.jdk.CollectionConverters._ + import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} @@ -58,9 +63,10 @@ class ParquetCodecSuite extends FileSourceCodecSuite { // Exclude "lzo" because it is GPL-licenced so not included in Hadoop. // Exclude "brotli" because the com.github.rdblue:brotli-codec dependency is not available // on Maven Central. - override protected def availableCodecs: Seq[String] = { - Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4_raw") - } + override protected def availableCodecs: Seq[String] = + (ParquetCompressionCodec.NONE +: + ParquetCompressionCodec.availableCodecs.asScala.iterator.to(Seq)) + .map(_.name().toLowerCase(Locale.ROOT)).iterator.to(Seq) } class OrcCodecSuite extends FileSourceCodecSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala index 1a387b7d2de63..28ea430635a2b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.File +import java.util.Locale import scala.jdk.CollectionConverters._ @@ -29,18 +30,9 @@ import org.apache.spark.sql.test.SharedSparkSession class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSparkSession { test("Test `spark.sql.parquet.compression.codec` config") { - Seq( - "NONE", - "UNCOMPRESSED", - "SNAPPY", - "GZIP", - "LZO", - "LZ4", - "BROTLI", - "ZSTD", - "LZ4_RAW").foreach { c => - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) { - val expected = if (c == "NONE") "UNCOMPRESSED" else c + ParquetCompressionCodec.values().foreach { codec => + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) { + val expected = codec.getCompressionCodec.name() val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf) assert(option.compressionCodecClassName == expected) } @@ -49,25 +41,32 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar test("[SPARK-21786] Test Acquiring 'compressionCodecClassName' for parquet in right order.") { // When "compression" is configured, it should be the first choice. - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { - val props = Map("compression" -> "uncompressed", ParquetOutputFormat.COMPRESSION -> "gzip") + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> + ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) { + val props = Map( + "compression" -> ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT), + ParquetOutputFormat.COMPRESSION -> + ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT)) val option = new ParquetOptions(props, spark.sessionState.conf) - assert(option.compressionCodecClassName == "UNCOMPRESSED") + assert(option.compressionCodecClassName == ParquetCompressionCodec.UNCOMPRESSED.name) } // When "compression" is not configured, "parquet.compression" should be the preferred choice. - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { - val props = Map(ParquetOutputFormat.COMPRESSION -> "gzip") + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> + ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) { + val props = Map(ParquetOutputFormat.COMPRESSION -> + ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT)) val option = new ParquetOptions(props, spark.sessionState.conf) - assert(option.compressionCodecClassName == "GZIP") + assert(option.compressionCodecClassName == ParquetCompressionCodec.GZIP.name) } // When both "compression" and "parquet.compression" are not configured, // spark.sql.parquet.compression.codec should be the right choice. - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> + ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) { val props = Map.empty[String, String] val option = new ParquetOptions(props, spark.sessionState.conf) - assert(option.compressionCodecClassName == "SNAPPY") + assert(option.compressionCodecClassName == ParquetCompressionCodec.SNAPPY.name) } } @@ -113,8 +112,8 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar } test("Create parquet table with compression") { + val codecs = ParquetCompressionCodec.availableCodecs.asScala.map(_.name()) Seq(true, false).foreach { isPartitioned => - val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4", "LZ4_RAW") codecs.foreach { compressionCodec => checkCompressionCodec(compressionCodec, isPartitioned) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 95a45e52bfb49..a5d5f8ce30f0c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -34,8 +34,6 @@ import org.apache.parquet.example.data.Group import org.apache.parquet.example.data.simple.{SimpleGroup, SimpleGroupFactory} import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.example.ExampleParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP import org.apache.parquet.io.api.Binary import org.apache.parquet.schema.{MessageType, MessageTypeParser} @@ -845,7 +843,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession val data = (0 until 10).map(i => (i, i.toString)) - def checkCompressionCodec(codec: CompressionCodecName): Unit = { + def checkCompressionCodec(codec: ParquetCompressionCodec): Unit = { withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) { withParquetFile(data) { path => assertResult(spark.conf.get(SQLConf.PARQUET_COMPRESSION).toUpperCase(Locale.ROOT)) { @@ -857,12 +855,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession // Checks default compression codec checkCompressionCodec( - CompressionCodecName.fromConf(spark.conf.get(SQLConf.PARQUET_COMPRESSION))) + ParquetCompressionCodec.fromString(spark.conf.get(SQLConf.PARQUET_COMPRESSION))) - checkCompressionCodec(CompressionCodecName.UNCOMPRESSED) - checkCompressionCodec(CompressionCodecName.GZIP) - checkCompressionCodec(CompressionCodecName.SNAPPY) - checkCompressionCodec(CompressionCodecName.ZSTD) + ParquetCompressionCodec.availableCodecs.asScala.foreach(checkCompressionCodec(_)) } private def createParquetWriter( @@ -878,7 +873,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession .withDictionaryEncoding(dictionaryEnabled) .withType(schema) .withWriterVersion(PARQUET_1_0) - .withCompressionCodec(GZIP) + .withCompressionCodec(ParquetCompressionCodec.GZIP.getCompressionCodec) .withRowGroupSize(1024 * 1024) .withPageSize(pageSize) .withDictionaryPageSize(dictionaryPageSize) @@ -1507,9 +1502,12 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { - val option = new ParquetOptions(Map("Compression" -> "uncompressed"), spark.sessionState.conf) - assert(option.compressionCodecClassName == "UNCOMPRESSED") + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> + ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) { + val option = new ParquetOptions( + Map("Compression" -> ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT)), + spark.sessionState.conf) + assert(option.compressionCodecClassName == ParquetCompressionCodec.UNCOMPRESSED.name) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala index a5d11f6e0e14d..df28e7b4485a6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala @@ -29,7 +29,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.execution.datasources.orc.OrcOptions -import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, ParquetTest} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetOptions, ParquetTest} import org.apache.spark.sql.hive.orc.OrcFileOperator import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -289,8 +289,14 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo test("both table-level and session-level compression are set") { checkForTableWithCompressProp("parquet", - tableCompressCodecs = List("UNCOMPRESSED", "SNAPPY", "GZIP"), - sessionCompressCodecs = List("SNAPPY", "GZIP", "SNAPPY")) + tableCompressCodecs = List( + ParquetCompressionCodec.UNCOMPRESSED.name, + ParquetCompressionCodec.SNAPPY.name, + ParquetCompressionCodec.GZIP.name), + sessionCompressCodecs = List( + ParquetCompressionCodec.SNAPPY.name, + ParquetCompressionCodec.GZIP.name, + ParquetCompressionCodec.SNAPPY.name)) checkForTableWithCompressProp("orc", tableCompressCodecs = List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name, CompressionKind.ZLIB.name), @@ -301,7 +307,10 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo test("table-level compression is not set but session-level compressions is set ") { checkForTableWithCompressProp("parquet", tableCompressCodecs = List.empty, - sessionCompressCodecs = List("UNCOMPRESSED", "SNAPPY", "GZIP")) + sessionCompressCodecs = List( + ParquetCompressionCodec.UNCOMPRESSED.name, + ParquetCompressionCodec.SNAPPY.name, + ParquetCompressionCodec.GZIP.name)) checkForTableWithCompressProp("orc", tableCompressCodecs = List.empty, sessionCompressCodecs = @@ -339,7 +348,11 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo } test("test table containing mixed compression codec") { - checkTableWriteWithCompressionCodecs("parquet", List("UNCOMPRESSED", "SNAPPY", "GZIP")) + checkTableWriteWithCompressionCodecs("parquet", + List( + ParquetCompressionCodec.UNCOMPRESSED.name, + ParquetCompressionCodec.SNAPPY.name, + ParquetCompressionCodec.GZIP.name)) checkTableWriteWithCompressionCodecs( "orc", List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name, CompressionKind.ZLIB.name)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala index 2a3c77a56e6db..45dd8da6e0200 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala @@ -19,9 +19,10 @@ package org.apache.spark.sql.hive import java.time.{Duration, Period} import java.time.temporal.ChronoUnit +import java.util.Locale import org.apache.spark.sql.{AnalysisException, QueryTest, Row} -import org.apache.spark.sql.execution.datasources.parquet.ParquetTest +import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetTest} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -157,7 +158,8 @@ class HiveParquetSuite extends QueryTest test("SPARK-37098: Alter table properties should invalidate cache") { // specify the compression in case we change it in future - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> + ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) { withTempPath { dir => withTable("t") { sql(s"CREATE TABLE t (c int) STORED AS PARQUET LOCATION '${dir.getCanonicalPath}'") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 05d2ca1e210f3..78365d25c8984 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} -import org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader +import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetFooterReader} import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET} @@ -2709,7 +2709,9 @@ class HiveDDLSuite assert(compression === actualCompression) } - Seq(("orc", "ZLIB"), ("parquet", "GZIP")).foreach { case (fileFormat, compression) => + Seq( + ("orc", "ZLIB"), + ("parquet", ParquetCompressionCodec.GZIP.name)).foreach { case (fileFormat, compression) => test(s"SPARK-22158 convertMetastore should not ignore table property - $fileFormat") { withSQLConf(CONVERT_METASTORE_ORC.key -> "true", CONVERT_METASTORE_PARQUET.key -> "true") { withTable("t") { @@ -2804,14 +2806,14 @@ class HiveDDLSuite assert(DDLUtils.isHiveTable(table)) assert(table.storage.serde.get.contains("parquet")) val properties = table.properties - assert(properties.get("parquet.compression") == Some("GZIP")) + assert(properties.get("parquet.compression") == Some(ParquetCompressionCodec.GZIP.name)) assert(spark.table("t").collect().isEmpty) sql("INSERT INTO t SELECT 1") checkAnswer(spark.table("t"), Row(1)) val maybeFile = path.listFiles().find(_.getName.startsWith("part")) - assertCompression(maybeFile, "parquet", "GZIP") + assertCompression(maybeFile, "parquet", ParquetCompressionCodec.GZIP.name) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index 18e8401ee3d2b..84ee19e62bca2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol +import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -199,7 +200,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { } test("SPARK-13543: Support for specifying compression codec for Parquet via option()") { - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "UNCOMPRESSED") { + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> ParquetCompressionCodec.UNCOMPRESSED.name) { withTempPath { dir => val path = s"${dir.getCanonicalPath}/table1" val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b")