Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.parquet;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;

import org.apache.parquet.hadoop.metadata.CompressionCodecName;

/**
* A mapper class from Spark supported parquet compression codecs to parquet compression codecs.
*/
public enum ParquetCompressionCodec {
NONE(CompressionCodecName.UNCOMPRESSED),
UNCOMPRESSED(CompressionCodecName.UNCOMPRESSED),
SNAPPY(CompressionCodecName.SNAPPY),
GZIP(CompressionCodecName.GZIP),
LZO(CompressionCodecName.LZO),
BROTLI(CompressionCodecName.BROTLI),
LZ4(CompressionCodecName.LZ4),
LZ4_RAW(CompressionCodecName.LZ4_RAW),
ZSTD(CompressionCodecName.ZSTD);

private final CompressionCodecName compressionCodec;

ParquetCompressionCodec(CompressionCodecName compressionCodec) {
this.compressionCodec = compressionCodec;
}

public CompressionCodecName getCompressionCodec() {
return this.compressionCodec;
}

public static ParquetCompressionCodec fromString(String s) {
return ParquetCompressionCodec.valueOf(s.toUpperCase(Locale.ROOT));
}

public static final List<ParquetCompressionCodec> availableCodecs =
Arrays.asList(
ParquetCompressionCodec.UNCOMPRESSED,
ParquetCompressionCodec.SNAPPY,
ParquetCompressionCodec.GZIP,
ParquetCompressionCodec.ZSTD,
ParquetCompressionCodec.LZ4,
ParquetCompressionCodec.LZ4_RAW);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.util.Locale

import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.parquet.hadoop.metadata.CompressionCodecName

import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
Expand Down Expand Up @@ -88,16 +87,10 @@ class ParquetOptions(

object ParquetOptions extends DataSourceOptions {
// The parquet compression short names
private val shortParquetCompressionCodecNames = Map(
"none" -> CompressionCodecName.UNCOMPRESSED,
"uncompressed" -> CompressionCodecName.UNCOMPRESSED,
"snappy" -> CompressionCodecName.SNAPPY,
"gzip" -> CompressionCodecName.GZIP,
"lzo" -> CompressionCodecName.LZO,
"brotli" -> CompressionCodecName.BROTLI,
"lz4" -> CompressionCodecName.LZ4,
"lz4_raw" -> CompressionCodecName.LZ4_RAW,
"zstd" -> CompressionCodecName.ZSTD)
private val shortParquetCompressionCodecNames =
ParquetCompressionCodec.values().map {
codec => codec.name().toLowerCase(Locale.ROOT) -> codec.getCompressionCodec
}.toMap

def getParquetCompressionCodecName(name: String): String = {
shortParquetCompressionCodecNames(name).name()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
*/
package org.apache.spark.sql.execution.benchmark

import java.util.Locale

import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.ParquetOutputFormat

import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.internal.SQLConf

/**
Expand Down Expand Up @@ -51,7 +54,8 @@ object BuiltInDataSourceWriteBenchmark extends DataSourceWriteBenchmark {
mainArgs
}

spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, "snappy")
spark.conf.set(SQLConf.PARQUET_COMPRESSION.key,
ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
spark.conf.set(SQLConf.ORC_COMPRESSION.key, "snappy")

formats.foreach { format =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.benchmark

import java.io.File
import java.util.Locale

import scala.jdk.CollectionConverters._
import scala.util.Random
Expand All @@ -28,7 +29,7 @@ import org.apache.spark.{SparkConf, TestUtils}
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader
import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, VectorizedParquetRecordReader}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnVector
Expand Down Expand Up @@ -99,15 +100,17 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
spark.read.json(dir).createOrReplaceTempView("jsonTable")
}

val parquetCodec = ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)

private def saveAsParquetV1Table(df: DataFrameWriter[Row], dir: String): Unit = {
df.mode("overwrite").option("compression", "snappy").parquet(dir)
df.mode("overwrite").option("compression", parquetCodec).parquet(dir)
spark.read.parquet(dir).createOrReplaceTempView("parquetV1Table")
}

private def saveAsParquetV2Table(df: DataFrameWriter[Row], dir: String): Unit = {
withSQLConf(ParquetOutputFormat.WRITER_VERSION ->
ParquetProperties.WriterVersion.PARQUET_2_0.toString) {
df.mode("overwrite").option("compression", "snappy").parquet(dir)
df.mode("overwrite").option("compression", parquetCodec).parquet(dir)
spark.read.parquet(dir).createOrReplaceTempView("parquetV2Table")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.spark.sql.execution.benchmark

import java.io.File
import java.util.Locale

import scala.util.Random

import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.functions.{monotonically_increasing_id, timestamp_seconds}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
Expand All @@ -50,7 +52,8 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark {
.setIfMissing("spark.driver.memory", "3g")
.setIfMissing("spark.executor.memory", "3g")
.setIfMissing("orc.compression", "snappy")
.setIfMissing("spark.sql.parquet.compression.codec", "snappy")
.setIfMissing("spark.sql.parquet.compression.codec",
ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))

SparkSession.builder().config(conf).getOrCreate()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.benchmark

import java.util.Locale

import scala.util.Try

import org.apache.spark.SparkConf
Expand All @@ -29,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

Expand All @@ -51,7 +54,8 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark with Logging {
val conf = new SparkConf()
.setMaster(System.getProperty("spark.sql.test.master", "local[1]"))
.setAppName("test-sql-context")
.set("spark.sql.parquet.compression.codec", "snappy")
.set("spark.sql.parquet.compression.codec",
ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
.set("spark.sql.shuffle.partitions", System.getProperty("spark.sql.shuffle.partitions", "4"))
.set("spark.driver.memory", "3g")
.set("spark.executor.memory", "3g")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@

package org.apache.spark.sql.execution.datasources

import java.util.Locale

import scala.jdk.CollectionConverters._

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}

Expand Down Expand Up @@ -58,9 +63,10 @@ class ParquetCodecSuite extends FileSourceCodecSuite {
// Exclude "lzo" because it is GPL-licenced so not included in Hadoop.
// Exclude "brotli" because the com.github.rdblue:brotli-codec dependency is not available
// on Maven Central.
override protected def availableCodecs: Seq[String] = {
Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4_raw")
}
override protected def availableCodecs: Seq[String] =
(ParquetCompressionCodec.NONE +:
ParquetCompressionCodec.availableCodecs.asScala.iterator.to(Seq))
.map(_.name().toLowerCase(Locale.ROOT)).iterator.to(Seq)
}

class OrcCodecSuite extends FileSourceCodecSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet

import java.io.File
import java.util.Locale

import scala.jdk.CollectionConverters._

Expand All @@ -29,18 +30,9 @@ import org.apache.spark.sql.test.SharedSparkSession

class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSparkSession {
test("Test `spark.sql.parquet.compression.codec` config") {
Seq(
"NONE",
"UNCOMPRESSED",
"SNAPPY",
"GZIP",
"LZO",
"LZ4",
"BROTLI",
"ZSTD",
"LZ4_RAW").foreach { c =>
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) {
val expected = if (c == "NONE") "UNCOMPRESSED" else c
ParquetCompressionCodec.values().foreach { codec =>
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
val expected = codec.getCompressionCodec.name()
val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf)
assert(option.compressionCodecClassName == expected)
}
Expand All @@ -49,25 +41,32 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar

test("[SPARK-21786] Test Acquiring 'compressionCodecClassName' for parquet in right order.") {
// When "compression" is configured, it should be the first choice.
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
val props = Map("compression" -> "uncompressed", ParquetOutputFormat.COMPRESSION -> "gzip")
withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
val props = Map(
"compression" -> ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT),
ParquetOutputFormat.COMPRESSION ->
ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT))
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName == "UNCOMPRESSED")
assert(option.compressionCodecClassName == ParquetCompressionCodec.UNCOMPRESSED.name)
}

// When "compression" is not configured, "parquet.compression" should be the preferred choice.
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
val props = Map(ParquetOutputFormat.COMPRESSION -> "gzip")
withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
val props = Map(ParquetOutputFormat.COMPRESSION ->
ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT))
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName == "GZIP")
assert(option.compressionCodecClassName == ParquetCompressionCodec.GZIP.name)
}

// When both "compression" and "parquet.compression" are not configured,
// spark.sql.parquet.compression.codec should be the right choice.
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
val props = Map.empty[String, String]
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName == "SNAPPY")
assert(option.compressionCodecClassName == ParquetCompressionCodec.SNAPPY.name)
}
}

Expand Down Expand Up @@ -113,8 +112,8 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar
}

test("Create parquet table with compression") {
val codecs = ParquetCompressionCodec.availableCodecs.asScala.map(_.name())
Seq(true, false).foreach { isPartitioned =>
val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4", "LZ4_RAW")
codecs.foreach { compressionCodec =>
checkCompressionCodec(compressionCodec, isPartitioned)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ import org.apache.parquet.example.data.Group
import org.apache.parquet.example.data.simple.{SimpleGroup, SimpleGroupFactory}
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.example.ExampleParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.{MessageType, MessageTypeParser}

Expand Down Expand Up @@ -845,7 +843,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession

val data = (0 until 10).map(i => (i, i.toString))

def checkCompressionCodec(codec: CompressionCodecName): Unit = {
def checkCompressionCodec(codec: ParquetCompressionCodec): Unit = {
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
withParquetFile(data) { path =>
assertResult(spark.conf.get(SQLConf.PARQUET_COMPRESSION).toUpperCase(Locale.ROOT)) {
Expand All @@ -857,12 +855,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession

// Checks default compression codec
checkCompressionCodec(
CompressionCodecName.fromConf(spark.conf.get(SQLConf.PARQUET_COMPRESSION)))
ParquetCompressionCodec.fromString(spark.conf.get(SQLConf.PARQUET_COMPRESSION)))

checkCompressionCodec(CompressionCodecName.UNCOMPRESSED)
checkCompressionCodec(CompressionCodecName.GZIP)
checkCompressionCodec(CompressionCodecName.SNAPPY)
checkCompressionCodec(CompressionCodecName.ZSTD)
ParquetCompressionCodec.availableCodecs.asScala.foreach(checkCompressionCodec(_))
}

private def createParquetWriter(
Expand All @@ -878,7 +873,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
.withDictionaryEncoding(dictionaryEnabled)
.withType(schema)
.withWriterVersion(PARQUET_1_0)
.withCompressionCodec(GZIP)
.withCompressionCodec(ParquetCompressionCodec.GZIP.getCompressionCodec)
.withRowGroupSize(1024 * 1024)
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
Expand Down Expand Up @@ -1507,9 +1502,12 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}

test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
val option = new ParquetOptions(Map("Compression" -> "uncompressed"), spark.sessionState.conf)
assert(option.compressionCodecClassName == "UNCOMPRESSED")
withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
val option = new ParquetOptions(
Map("Compression" -> ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT)),
spark.sessionState.conf)
assert(option.compressionCodecClassName == ParquetCompressionCodec.UNCOMPRESSED.name)
}
}

Expand Down
Loading