diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 01da0dc27d83..cb847a042031 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -851,8 +851,9 @@ def orc(self, path, mode=None, partitionBy=None, compression=None):
:param partitionBy: names of partitioning columns
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, snappy, zlib, and lzo).
- This will override ``orc.compress``. If None is set, it uses the
- default value, ``snappy``.
+ This will override ``orc.compress`` and
+ ``spark.sql.orc.compression.codec``. If None is set, it uses the value
+ specified in ``spark.sql.orc.compression.codec``.
>>> orc_df = spark.read.orc('python/test_support/sql/orc_partitioned')
>>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a685099505ee..c407874381ac 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -322,6 +322,14 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
+ .doc("Sets the compression codec use when writing ORC files. Acceptable values include: " +
+ "none, uncompressed, snappy, zlib, lzo.")
+ .stringConf
+ .transform(_.toLowerCase(Locale.ROOT))
+ .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))
+ .createWithDefault("snappy")
+
val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
.doc("When true, enable filter pushdown for ORC files.")
.booleanConf
@@ -998,6 +1006,8 @@ class SQLConf extends Serializable with Logging {
def useCompression: Boolean = getConf(COMPRESS_CACHED)
+ def orcCompressionCodec: String = getConf(ORC_COMPRESSION)
+
def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION)
def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index cca93525d679..07347d274854 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -517,9 +517,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
*
* You can set the following ORC-specific option(s) for writing ORC files:
*
- * - `compression` (default `snappy`): compression codec to use when saving to file. This can be
- * one of the known case-insensitive shorten names(`none`, `snappy`, `zlib`, and `lzo`).
- * This will override `orc.compress`.
+ * - `compression` (default is the value specified in `spark.sql.orc.compression.codec`):
+ * compression codec to use when saving to file. This can be one of the known case-insensitive
+ * shorten names(`none`, `snappy`, `zlib`, and `lzo`). This will override
+ * `orc.compress` and `spark.sql.parquet.compression.codec`. If `orc.compress` is given,
+ * it overrides `spark.sql.parquet.compression.codec`.
*
*
* @since 1.5.0
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 3a34ec55c8b0..edf2013a4c93 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -68,7 +68,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
- val orcOptions = new OrcOptions(options)
+ val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)
val configuration = job.getConfiguration
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
index 043eb69818ba..7f94c8c57902 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
@@ -20,30 +20,34 @@ package org.apache.spark.sql.hive.orc
import java.util.Locale
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.internal.SQLConf
/**
* Options for the ORC data source.
*/
-private[orc] class OrcOptions(@transient private val parameters: CaseInsensitiveMap[String])
+private[orc] class OrcOptions(
+ @transient private val parameters: CaseInsensitiveMap[String],
+ @transient private val sqlConf: SQLConf)
extends Serializable {
import OrcOptions._
- def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
+ def this(parameters: Map[String, String], sqlConf: SQLConf) =
+ this(CaseInsensitiveMap(parameters), sqlConf)
/**
- * Compression codec to use. By default snappy compression.
+ * Compression codec to use.
* Acceptable values are defined in [[shortOrcCompressionCodecNames]].
*/
val compressionCodec: String = {
- // `orc.compress` is a ORC configuration. So, here we respect this as an option but
- // `compression` has higher precedence than `orc.compress`. It means if both are set,
- // we will use `compression`.
+ // `compression`, `orc.compress`, and `spark.sql.orc.compression.codec` are
+ // in order of precedence from highest to lowest.
val orcCompressionConf = parameters.get(OrcRelation.ORC_COMPRESSION)
val codecName = parameters
.get("compression")
.orElse(orcCompressionConf)
- .getOrElse("snappy").toLowerCase(Locale.ROOT)
+ .getOrElse(sqlConf.orcCompressionCodec)
+ .toLowerCase(Locale.ROOT)
if (!shortOrcCompressionCodecNames.contains(codecName)) {
val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase(Locale.ROOT))
throw new IllegalArgumentException(s"Codec [$codecName] " +
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 52fa401d32c1..781de6631f32 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -22,8 +22,8 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.{QueryTest, Row}
-import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -149,7 +149,8 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
}
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
- assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE")
+ val conf = sqlContext.sessionState.conf
+ assert(new OrcOptions(Map("Orc.Compress" -> "NONE"), conf).compressionCodec == "NONE")
}
test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") {
@@ -194,6 +195,30 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
Utils.deleteRecursively(location)
}
}
+
+ test("SPARK-21839: Add SQL config for ORC compression") {
+ val conf = sqlContext.sessionState.conf
+ // Test if the default of spark.sql.orc.compression.codec is snappy
+ assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "SNAPPY")
+
+ // OrcOptions's parameters have a higher priority than SQL configuration.
+ // `compression` -> `orc.compression` -> `spark.sql.orc.compression.codec`
+ withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") {
+ assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE")
+ val map1 = Map("orc.compress" -> "zlib")
+ val map2 = Map("orc.compress" -> "zlib", "compression" -> "lzo")
+ assert(new OrcOptions(map1, conf).compressionCodec == "ZLIB")
+ assert(new OrcOptions(map2, conf).compressionCodec == "LZO")
+ }
+
+ // Test all the valid options of spark.sql.orc.compression.codec
+ Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO").foreach { c =>
+ withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) {
+ val expected = if (c == "UNCOMPRESSED") "NONE" else c
+ assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == expected)
+ }
+ }
+ }
}
class OrcSourceSuite extends OrcSuite {