Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, SparkUp
import org.apache.spark.TestUtils.assertExceptionMsg
import org.apache.spark.sql._
import org.apache.spark.sql.TestingUDT.IntervalData
import org.apache.spark.sql.avro.AvroCompressionCodec._
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils
Expand Down Expand Up @@ -680,24 +681,18 @@ abstract class AvroSuite
val zstandardDir = s"$dir/zstandard"

val df = spark.read.format("avro").load(testAvro)
spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key,
AvroCompressionCodec.UNCOMPRESSED.lowerCaseName())
spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, UNCOMPRESSED.lowerCaseName())
df.write.format("avro").save(uncompressDir)
spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key,
AvroCompressionCodec.BZIP2.lowerCaseName())
spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, BZIP2.lowerCaseName())
df.write.format("avro").save(bzip2Dir)
spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key,
AvroCompressionCodec.XZ.lowerCaseName())
spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, XZ.lowerCaseName())
df.write.format("avro").save(xzDir)
spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key,
AvroCompressionCodec.DEFLATE.lowerCaseName())
spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, DEFLATE.lowerCaseName())
spark.conf.set(SQLConf.AVRO_DEFLATE_LEVEL.key, "9")
df.write.format("avro").save(deflateDir)
spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key,
AvroCompressionCodec.SNAPPY.lowerCaseName())
spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, SNAPPY.lowerCaseName())
df.write.format("avro").save(snappyDir)
spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key,
AvroCompressionCodec.ZSTANDARD.lowerCaseName())
spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, ZSTANDARD.lowerCaseName())
df.write.format("avro").save(zstandardDir)

val uncompressSize = FileUtils.sizeOfDirectory(new File(uncompressDir))
Expand Down Expand Up @@ -2132,7 +2127,7 @@ abstract class AvroSuite
val reader = new DataFileReader(file, new GenericDatumReader[Any]())
val r = reader.getMetaString("avro.codec")
r
}.map(v => if (v == "null") "uncompressed" else v).headOption
}.map(v => if (v == "null") UNCOMPRESSED.lowerCaseName() else v).headOption
}
def checkCodec(df: DataFrame, dir: String, codec: String): Unit = {
val subdir = s"$dir/$codec"
Expand All @@ -2143,11 +2138,9 @@ abstract class AvroSuite
val path = dir.toString
val df = spark.read.format("avro").load(testAvro)

checkCodec(df, path, "uncompressed")
checkCodec(df, path, "deflate")
checkCodec(df, path, "snappy")
checkCodec(df, path, "bzip2")
checkCodec(df, path, "xz")
AvroCompressionCodec.values().foreach { codec =>
checkCodec(df, path, codec.lowerCaseName())
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException}
import org.apache.spark.sql.{Row, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite, SchemaMergeUtils}
import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtilsBase}
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -324,38 +325,31 @@ abstract class OrcSuite

test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
val conf = spark.sessionState.conf
val option = new OrcOptions(
Map(COMPRESS.getAttribute.toUpperCase(Locale.ROOT) -> OrcCompressionCodec.NONE.name()), conf)
val option =
new OrcOptions(Map(COMPRESS.getAttribute.toUpperCase(Locale.ROOT) -> NONE.name()), conf)
assert(option.compressionCodec == OrcCompressionCodec.NONE.name())
}

test("SPARK-21839: Add SQL config for ORC compression") {
val conf = spark.sessionState.conf
// Test if the default of spark.sql.orc.compression.codec is snappy
assert(new OrcOptions(
Map.empty[String, String], conf).compressionCodec == OrcCompressionCodec.SNAPPY.name())
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == SNAPPY.name())

// OrcOptions's parameters have a higher priority than SQL configuration.
// `compression` -> `orc.compression` -> `spark.sql.orc.compression.codec`
withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") {
assert(new OrcOptions(
Map.empty[String, String], conf).compressionCodec == OrcCompressionCodec.NONE.name())
val zlibCodec = OrcCompressionCodec.ZLIB.lowerCaseName()
val lzoCodec = OrcCompressionCodec.LZO.lowerCaseName()
withSQLConf(SQLConf.ORC_COMPRESSION.key -> UNCOMPRESSED.lowerCaseName()) {
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == NONE.name())
val zlibCodec = ZLIB.lowerCaseName()
val map1 = Map(COMPRESS.getAttribute -> zlibCodec)
val map2 = Map(COMPRESS.getAttribute -> zlibCodec, "compression" -> lzoCodec)
assert(new OrcOptions(map1, conf).compressionCodec == OrcCompressionCodec.ZLIB.name())
assert(new OrcOptions(map2, conf).compressionCodec == OrcCompressionCodec.LZO.name())
val map2 = Map(COMPRESS.getAttribute -> zlibCodec, "compression" -> LZO.lowerCaseName())
assert(new OrcOptions(map1, conf).compressionCodec == ZLIB.name())
assert(new OrcOptions(map2, conf).compressionCodec == LZO.name())
}

// Test all the valid options of spark.sql.orc.compression.codec
OrcCompressionCodec.values().map(_.name()).foreach { c =>
withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) {
val expected = if (c == OrcCompressionCodec.UNCOMPRESSED.name()) {
OrcCompressionCodec.NONE.name()
} else {
c
}
val expected = OrcCompressionCodec.valueOf(c).getCompressionKind.name()
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == expected)
}
}
Expand Down Expand Up @@ -556,20 +550,20 @@ abstract class OrcSuite
test("SPARK-35612: Support LZ4 compression in ORC data source") {
withTempPath { dir =>
val path = dir.getAbsolutePath
spark.range(3).write.option("compression", "lz4").orc(path)
spark.range(3).write.option("compression", LZ4.lowerCaseName()).orc(path)
checkAnswer(spark.read.orc(path), Seq(Row(0), Row(1), Row(2)))
val files = OrcUtils.listOrcFiles(path, spark.sessionState.newHadoopConf())
assert(files.nonEmpty && files.forall(_.getName.contains("lz4")))
assert(files.nonEmpty && files.forall(_.getName.contains(LZ4.lowerCaseName())))
}
}

test("SPARK-33978: Write and read a file with ZSTD compression") {
withTempPath { dir =>
val path = dir.getAbsolutePath
spark.range(3).write.option("compression", "zstd").orc(path)
spark.range(3).write.option("compression", ZSTD.lowerCaseName()).orc(path)
checkAnswer(spark.read.orc(path), Seq(Row(0), Row(1), Row(2)))
val files = OrcUtils.listOrcFiles(path, spark.sessionState.newHadoopConf())
assert(files.nonEmpty && files.forall(_.getName.contains("zstd")))
assert(files.nonEmpty && files.forall(_.getName.contains(ZSTD.lowerCaseName())))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.{SPARK_DOC_ROOT, SparkNoSuchElementException}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.MIT
import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec.{GZIP, LZO}
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.test.{SharedSparkSession, TestSQLContext}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -368,25 +369,25 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {

assert(spark.conf.get(fallback.key) ===
SQLConf.PARQUET_COMPRESSION.defaultValue.get)
assert(spark.conf.get(fallback.key, "lzo") === "lzo")
assert(spark.conf.get(fallback.key, LZO.lowerCaseName()) === LZO.lowerCaseName())

val displayValue = spark.sessionState.conf.getAllDefinedConfs
.find { case (key, _, _, _) => key == fallback.key }
.map { case (_, v, _, _) => v }
.get
assert(displayValue === fallback.defaultValueString)

spark.conf.set(SQLConf.PARQUET_COMPRESSION, "gzip")
assert(spark.conf.get(fallback.key) === "gzip")
spark.conf.set(SQLConf.PARQUET_COMPRESSION, GZIP.lowerCaseName())
assert(spark.conf.get(fallback.key) === GZIP.lowerCaseName())

spark.conf.set(fallback, "lzo")
assert(spark.conf.get(fallback.key) === "lzo")
spark.conf.set(fallback, LZO.lowerCaseName())
assert(spark.conf.get(fallback.key) === LZO.lowerCaseName())

val newDisplayValue = spark.sessionState.conf.getAllDefinedConfs
.find { case (key, _, _, _) => key == fallback.key }
.map { case (_, v, _, _) => v }
.get
assert(newDisplayValue === "lzo")
assert(newDisplayValue === LZO.lowerCaseName())

SQLConf.unregister(fallback)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1926,7 +1926,7 @@ class HiveDDLSuite
checkAnswer(spark.table("t"), Row(1))
// Check if this is compressed as ZLIB.
val maybeOrcFile = path.listFiles().find(_.getName.startsWith("part"))
assertCompression(maybeOrcFile, "orc", "ZLIB")
assertCompression(maybeOrcFile, "orc", OrcCompressionCodec.ZLIB.name())

sql("CREATE TABLE t2 USING HIVE AS SELECT 1 AS c1, 'a' AS c2")
val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2"))
Expand Down