diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 07347d274854..c9e45436ed42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -520,8 +520,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { *
  • `compression` (default is the value specified in `spark.sql.orc.compression.codec`): * compression codec to use when saving to file. This can be one of the known case-insensitive * shorten names(`none`, `snappy`, `zlib`, and `lzo`). This will override - * `orc.compress` and `spark.sql.parquet.compression.codec`. If `orc.compress` is given, - * it overrides `spark.sql.parquet.compression.codec`.
  • + * `orc.compress` and `spark.sql.orc.compression.codec`. If `orc.compress` is given, + * it overrides `spark.sql.orc.compression.codec`. * * * @since 1.5.0 diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 194e69c93e1a..d26ec15410d9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.io.{NullWritable, Writable} import org.apache.hadoop.mapred.{JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} +import org.apache.orc.OrcConf.COMPRESS import org.apache.spark.TaskContext import org.apache.spark.sql.SparkSession @@ -72,7 +73,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable val configuration = job.getConfiguration - configuration.set(OrcRelation.ORC_COMPRESSION, orcOptions.compressionCodec) + configuration.set(COMPRESS.getAttribute, orcOptions.compressionCodec) configuration match { case conf: JobConf => conf.setOutputFormat(classOf[OrcOutputFormat]) @@ -93,8 +94,8 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable override def getFileExtension(context: TaskAttemptContext): String = { val compressionExtension: String = { - val name = context.getConfiguration.get(OrcRelation.ORC_COMPRESSION) - OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "") + val name = context.getConfiguration.get(COMPRESS.getAttribute) + OrcFileFormat.extensionsForCompressionCodecNames.getOrElse(name, "") } compressionExtension + ".orc" @@ -120,7 +121,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable if (sparkSession.sessionState.conf.orcFilterPushDown) { // Sets pushed predicates OrcFilters.createFilter(requiredSchema, filters.toArray).foreach { f => - hadoopConf.set(OrcRelation.SARG_PUSHDOWN, f.toKryo) + hadoopConf.set(OrcFileFormat.SARG_PUSHDOWN, f.toKryo) hadoopConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true) } } @@ -138,7 +139,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable if (isEmptyFile) { Iterator.empty } else { - OrcRelation.setRequiredColumns(conf, dataSchema, requiredSchema) + OrcFileFormat.setRequiredColumns(conf, dataSchema, requiredSchema) val orcRecordReader = { val job = Job.getInstance(conf) @@ -160,7 +161,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => recordsIterator.close())) // Unwraps `OrcStruct`s to `UnsafeRow`s - OrcRelation.unwrapOrcStructs( + OrcFileFormat.unwrapOrcStructs( conf, dataSchema, requiredSchema, @@ -255,10 +256,7 @@ private[orc] class OrcOutputWriter( } } -private[orc] object OrcRelation extends HiveInspectors { - // The references of Hive's classes will be minimized. - val ORC_COMPRESSION = "orc.compress" - +private[orc] object OrcFileFormat extends HiveInspectors { // This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public. private[orc] val SARG_PUSHDOWN = "sarg.pushdown" diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala index 7f94c8c57902..6ce90c07b492 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.orc import java.util.Locale +import org.apache.orc.OrcConf.COMPRESS + import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.internal.SQLConf @@ -40,9 +42,9 @@ private[orc] class OrcOptions( * Acceptable values are defined in [[shortOrcCompressionCodecNames]]. */ val compressionCodec: String = { - // `compression`, `orc.compress`, and `spark.sql.orc.compression.codec` are - // in order of precedence from highest to lowest. - val orcCompressionConf = parameters.get(OrcRelation.ORC_COMPRESSION) + // `compression`, `orc.compress`(i.e., OrcConf.COMPRESS), and `spark.sql.orc.compression.codec` + // are in order of precedence from highest to lowest. + val orcCompressionConf = parameters.get(COMPRESS.getAttribute) val codecName = parameters .get("compression") .orElse(orcCompressionConf) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 60ccd996d6d5..1fa9091f967a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -22,6 +22,7 @@ import java.sql.Timestamp import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.io.orc.{OrcStruct, SparkOrcNewRecordReader} +import org.apache.orc.OrcConf.COMPRESS import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ @@ -176,11 +177,11 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } - test("SPARK-16610: Respect orc.compress option when compression is unset") { - // Respect `orc.compress`. + test("SPARK-16610: Respect orc.compress (i.e., OrcConf.COMPRESS) when compression is unset") { + // Respect `orc.compress` (i.e., OrcConf.COMPRESS). withTempPath { file => spark.range(0, 10).write - .option("orc.compress", "ZLIB") + .option(COMPRESS.getAttribute, "ZLIB") .orc(file.getCanonicalPath) val expectedCompressionKind = OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression @@ -191,7 +192,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { withTempPath { file => spark.range(0, 10).write .option("compression", "ZLIB") - .option("orc.compress", "SNAPPY") + .option(COMPRESS.getAttribute, "SNAPPY") .orc(file.getCanonicalPath) val expectedCompressionKind = OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression @@ -598,7 +599,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { val requestedSchema = StructType(Nil) val conf = new Configuration() val physicalSchema = OrcFileOperator.readSchema(Seq(path), Some(conf)).get - OrcRelation.setRequiredColumns(conf, physicalSchema, requestedSchema) + OrcFileFormat.setRequiredColumns(conf, physicalSchema, requestedSchema) val maybeOrcReader = OrcFileOperator.getFileReader(path, Some(conf)) assert(maybeOrcReader.isDefined) val orcRecordReader = new SparkOrcNewRecordReader( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 781de6631f32..ef9e67c74383 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.hive.orc import java.io.File +import java.util.Locale +import org.apache.orc.OrcConf.COMPRESS import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.{QueryTest, Row} @@ -150,7 +152,8 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { val conf = sqlContext.sessionState.conf - assert(new OrcOptions(Map("Orc.Compress" -> "NONE"), conf).compressionCodec == "NONE") + val option = new OrcOptions(Map(COMPRESS.getAttribute.toUpperCase(Locale.ROOT) -> "NONE"), conf) + assert(option.compressionCodec == "NONE") } test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") { @@ -205,8 +208,8 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA // `compression` -> `orc.compression` -> `spark.sql.orc.compression.codec` withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") { assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE") - val map1 = Map("orc.compress" -> "zlib") - val map2 = Map("orc.compress" -> "zlib", "compression" -> "lzo") + val map1 = Map(COMPRESS.getAttribute -> "zlib") + val map2 = Map(COMPRESS.getAttribute -> "zlib", "compression" -> "lzo") assert(new OrcOptions(map1, conf).compressionCodec == "ZLIB") assert(new OrcOptions(map2, conf).compressionCodec == "LZO") }